diff --git a/LICENSE.txt b/LICENSE.txt index 9c19f3cd96589b1335cb0e6180f34cfcd6cad5d6..43acdd5358578824b4178cb640c57f4eaac714cc 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,14 +1,21 @@ -Copyright (C) 2016 Nexedi SA and Contributors - Klaus Wölfel - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +MIT License - http://www.apache.org/licenses/LICENSE-2.0 +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index 68528da2c1df67bb46fbc4864b5915e0e73e1847..dcaf2fc1bd514bf125e43179bcbe59ec974b3cea 100644 --- a/README.md +++ b/README.md @@ -1,57 +1,151 @@ -# Filename file input plugin for Embulk +# Filename input plugin for Embulk -Embulk filename file input plugin similar to local file input which overloads FileInputStream read methods to provide the filename in the first bytes of the stream ## Overview -* **Plugin type**: file input -* **Resume supported**: yes +* **Plugin type**: input +* **Resume supported**: no * **Cleanup supported**: yes +* **Guess supported**: no ## Configuration -- **option1**: path_prefix (string, required) +- **multi_dir**: description (ArrayList, required) +- **multi_tag**: description (ArrayList, default: `[]`) +- **load_order**: description (String, default: `ALPHABETICAL`) +- **chunk_size**: description (int, default: `10485760(10M)`) -## Example +Attention: For the order option. There are many alternative: +ALPHABETICAL (default value) +ASCEND_MODIFIED +DESCEND_MODIFIED +ASCEND_CREATION (useless in the unix system, because the unix does not record the creation time) +DESCEND_CREATION (useless in the unix system) -seed.yml: +## Example ```yaml exec: min_output_tasks: 1 in: - type: filename - path_prefix: /path/to/my/files - parser: - type: none-bin -out: - type: wendelin - tag: my_tag - streamtool_uri: https://my_instance.host.vifib.net:/erp5/portal_ingestion_policies/my_ingestion_policy - user: my_user - password: my_password + type: filename + multi_dir: ["../sample/sample_","../example/example_"] + multi_tag: ["tag1","tag2"] + load_order: ASCEND_MODIFIED + chunk_size: 1000 ``` +Attention: + +**exec:** + **min_output_tasks: 1** -## Install +This configuration is oblige! +Embulk will optimize the task according the core number of the PC which means that, it will re-distribute the task and cause errors. +If the multi_dir contains more than one directory, each directory will be treated as a task. the Embulk will distribute those tasks to multi +thread. Each task will run consistently, the files in each directory will be uploading in order. +For example the upload order maybe: +- example1.txt +- sample1.txt +- sample2.txt +- example2.txt +- sample3.txt + +If you want to upload the directory one by one, you need to configure the +**max_thread: 1** + +then you will get +- example1.txt +- example2.txt +- sample1.txt +- sample2.txt +- sample3.txt + + +## Build +java 1.8 is required. ``` -$ embulk gem install embulk-input-filename embulk-parser-none-bin embulk-output-wendelin +$ ./gradlew gem # -t to watch change of files and rebuild continuously ``` -## Run +## Usage + +If you are a new user of embulk. +Here are some tips can help you use this plugin quickly. + +First of all, you need to have a java8 environment in a linux system. + +And you need a erp5 isntance. [if not, follow the tutorial to have +one](https://nexedi.erp5.net/web_page_module/7056/WebPage_view?ignore_layout:int=1&selection_index=0&portal_status_message=Status%20changed.&selection_name=web_page_module_view_web_page_list_selection&editable_mode:int=1) + +Then, you need a embulk on your PC, now there is a bug to load the plugin with the newest embulk. I recommand you use the embulk_v.8.27 instead +of the newest version. + +To install the embulk ``` -$ embulk run seed.yml -c diff.yml +curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.bintray.com/embulk/maven/embulk-0.8.27.jar" +chmod +x ~/.embulk/bin/embulk +echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc +source ~/.bashrc ``` -## Build + +After installing the embulk. You need to build this filename-input-plugin on your PC. ``` -$ ./gradlew package +git clone https://lab.nexedi.com/caiyu/embulk-input-filename/tree/multiThread +cd embulk-input-filename +./gradlew package ``` -## Build Package +If you want to test the Plugin. Just run ./gradlew test +In fact this input should be used with the wendelin-output-plugin. build it on your PC too. +``` +git https://lab.nexedi.com/caiyu/embulk-output-wendelin/tree/java-output +cd embulk-output-plugin +./gradlew package +``` +Now you can use the embulk with these two plugin to upload the data. + +In your workplace, create a yml file. Say that we create a config.yml, and fill in the configuration. +```yaml +exec: + min_output_tasks: 1 +in: + type: filename + multi_dir: ["../sample/sample_","../example/example_"] + multi_tag: ["tag1","tag2"] + load_order: ASCEND_MODIFIED + chunk_size: 1000 +out: + type: wendelin + tag: "weather-cc" + streamtool_uri: https://softinstxxxxx.host.vifib.net/erp5/portal_ingestion_policies/weather-cc + user: zope + password: yourpassword ``` -$ ./gradlew gem # -t to watch change of files and rebuild continuously +Prepare the sample data and example data to upload. +``` +mkdir ../sample +vim ../sample/sample_01.txt +vim ../sample/sample_02.txt +vim ../sample/sample_03.txt +mkdir ../example +vim ../example/example_01.txt +vim ../example/example_02.txt ``` +Then run the embulk +``` +embulk run -L path/to/embulk-input-filename -L path/to/embulk-output-wendelin config.yml +``` + + + + + + + + + diff --git a/build.gradle b/build.gradle index ac35ea659ff66cf7bf7e29b0dbe66d803aeef59b..63648167166bbf894fbf260f6fb0a9e3fa5669c7 100644 --- a/build.gradle +++ b/build.gradle @@ -15,18 +15,26 @@ configurations { version = "0.1.0" -sourceCompatibility = 1.7 -targetCompatibility = 1.7 +sourceCompatibility = 1.8 +targetCompatibility = 1.8 dependencies { - compile "org.embulk:embulk-core:0.8.13" - provided "org.embulk:embulk-core:0.8.13" - compile "org.embulk:embulk-standards:0.8.13" - provided "org.embulk:embulk-standards:0.8.13" + compile "org.embulk:embulk-core:0.8.29" + provided "org.embulk:embulk-core:0.8.29" + compile "org.embulk:embulk-standards:0.8.29" + provided "org.embulk:embulk-standards:0.8.29" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" testCompile "junit:junit:4.+" + testCompile "org.embulk:embulk-core:0.8.29:tests" + testCompile 'org.embulk:embulk-test:0.8.29' } +test { + dependsOn cleanTest + testLogging.showStandardStreams = true +} + + task classpath(type: Copy, dependsOn: ["jar"]) { doFirst { file("classpath").deleteDir() } from (configurations.runtime - configurations.provided + files(jar.archivePath)) @@ -62,9 +70,11 @@ task gemPush(type: JRubyExec, dependsOn: ["gem"]) { script "pkg/${project.name}-${project.version}.gem" } -task "package"(dependsOn: ["gemspec", "classpath"]) << { - println "> Build succeeded." - println "> You can run embulk with '-L ${file(".").absolutePath}' argument." +task "package"(dependsOn: ["gemspec", "classpath"]) { + doLast { + println "> Build succeeded." + println "> You can run embulk with '-L ${file(".").absolutePath}' argument." + } } task gemspec { @@ -75,12 +85,12 @@ task gemspec { Gem::Specification.new do |spec| spec.name = "${project.name}" spec.version = "${project.version}" - spec.authors = ["Klaus W\xC3\xB6lfel"] - spec.summary = %[Filename file input plugin for Embulk] - spec.description = %[Reads files stored on Filename.] - spec.email = ["klaus@nexedi.com"] + spec.authors = ["yu"] + spec.summary = %[Filename input plugin for Embulk] + spec.description = %[Loads records from Filename.] + spec.email = ["icaiyu0618@gmail.com"] spec.licenses = ["MIT"] - # TODO set this: spec.homepage = "https://github.com/klaus/embulk-input-filename" + # TODO set this: spec.homepage = "https://github.com/icaiyu0618/embulk-input-filename" spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] spec.test_files = spec.files.grep(%r"^(test|spec)/") diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 13372aef5e24af05341d49695ee84e5f9b594659..deedc7fa5e6310eac3148a7dd0b1f069b07364cb 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 989348bf30f763f7cb9653098f85305298f3cacb..722184f1a4c6b778d002fa44e811633fc107439a 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Wed Jan 13 12:41:02 JST 2016 +#Sun Jan 08 00:35:58 PST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.2.1-bin.zip diff --git a/gradlew b/gradlew index 9d82f78915133e1c35a6ea51252590fb38efac2f..9aa616c273d8c0be67cb25eaac11e07209f5c034 100755 --- a/gradlew +++ b/gradlew @@ -6,12 +6,30 @@ ## ############################################################################## -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS="" +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" @@ -30,6 +48,7 @@ die ( ) { cygwin=false msys=false darwin=false +nonstop=false case "`uname`" in CYGWIN* ) cygwin=true @@ -40,26 +59,11 @@ case "`uname`" in MINGW* ) msys=true ;; + NONSTOP* ) + nonstop=true + ;; esac -# Attempt to set APP_HOME -# Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi -done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null - CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar # Determine the Java command to use to start the JVM. @@ -85,7 +89,7 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then MAX_FD_LIMIT=`ulimit -H -n` if [ $? -eq 0 ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then @@ -157,4 +161,9 @@ function splitJvmOpts() { eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then + cd "$(dirname "$0")" +fi + exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" diff --git a/gradlew.bat b/gradlew.bat index aec99730b4e8fcd90b57a0e8e01544fea7c31a89..e95643d6a2ca62258464e83c72f5156dc941c609 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -8,14 +8,14 @@ @rem Set local scope for the variables with windows NT shell if "%OS%"=="Windows_NT" setlocal -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - set DIRNAME=%~dp0 if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome @@ -46,10 +46,9 @@ echo location of your Java installation. goto fail :init -@rem Get command-line arguments, handling Windowz variants +@rem Get command-line arguments, handling Windows variants if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args :win9xME_args @rem Slurp the command line arguments. @@ -60,11 +59,6 @@ set _SKIP=2 if "x%~1" == "x" goto execute set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ :execute @rem Setup the command line diff --git a/lib/embulk/input/filename.rb b/lib/embulk/input/filename.rb index fc97892b20b7d8dd188e849d98a62da7a3e56d91..dd9ccc1615f227895ca39799f89021f800486422 100644 --- a/lib/embulk/input/filename.rb +++ b/lib/embulk/input/filename.rb @@ -1,3 +1,3 @@ Embulk::JavaPlugin.register_input( - "filename", "org.embulk.input.filename.FilenameFileInputPlugin", + "filename", "org.embulk.input.filename.FilenameInputPlugin", File.expand_path('../../../../classpath', __FILE__)) diff --git a/src/main/java/org/embulk/input/filename/FilenameFileInputPlugin.java b/src/main/java/org/embulk/input/filename/FilenameFileInputPlugin.java deleted file mode 100644 index 07336e686dd6e125e5006ac03b04e64e5e392c33..0000000000000000000000000000000000000000 --- a/src/main/java/org/embulk/input/filename/FilenameFileInputPlugin.java +++ /dev/null @@ -1,262 +0,0 @@ -package org.embulk.input.filename; - -import java.util.List; -import java.util.ArrayList; -import java.util.Collections; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.InputStream; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.Files; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.FileVisitResult; -import java.nio.file.attribute.BasicFileAttributes; -import com.google.common.collect.ImmutableList; -import com.google.common.base.Optional; -import org.slf4j.Logger; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.config.ConfigInject; -import org.embulk.config.ConfigSource; -import org.embulk.config.ConfigDiff; -import org.embulk.config.TaskReport; -import org.embulk.config.Task; -import org.embulk.config.TaskSource; -import org.embulk.spi.Exec; -import org.embulk.spi.FileInputPlugin; -import org.embulk.spi.BufferAllocator; -import org.embulk.spi.TransactionalFileInput; -import org.embulk.spi.util.InputStreamTransactionalFileInput; -import org.embulk.standards.LocalFileInputPlugin; - - - -class FilenameFileInputStream extends FileInputStream { - static int MAX_NAME_LENGTH = 255; - int n; - byte[] name; - - FilenameFileInputStream(File file) throws FileNotFoundException { - super(file); - n = 0; - name = file.getName().getBytes(); - } - - FilenameFileInputStream(String path) throws FileNotFoundException { - super(path); - n = 0; - name = path.getBytes(); - } - - @Override - public int read() throws IOException { - if (n < name.length) { - byte b = name[n]; - n++; - return b; - } else if (n < MAX_NAME_LENGTH) { - n++; - return 0; - } else { - return super.read(); - } - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (n < MAX_NAME_LENGTH) { - int i = 0; - int c; - for (; i < len; i++) { - c = read(); - if (c == -1) { - if ( i == 0 ) { - return -1; - } - break; - } - b[off + i] = (byte)c; - } - return i; - } else { - return super.read(b, off, len); - } - } -} - -public class FilenameFileInputPlugin implements FileInputPlugin -{ - - public interface PluginTask extends Task - { - @Config("path_prefix") - String getPathPrefix(); - - @Config("last_path") - @ConfigDefault("null") - Optional getLastPath(); - - @Config("file_size") - @ConfigDefault("null") - Optional getFileSize(); - - @Config("follow_symlinks") - @ConfigDefault("false") - boolean getFollowSymlinks(); - - List getFiles(); - void setFiles(List files); - - @ConfigInject - BufferAllocator getBufferAllocator(); - } - - private final Logger log = Exec.getLogger(getClass()); - - private final static Path CURRENT_DIR = Paths.get(".").normalize(); - - @Override - public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) - { - PluginTask task = config.loadConfig(PluginTask.class); - - // list files recursively - List files = listFiles(task); - log.info("Loading files {}", files); - task.setFiles(files); - - // number of processors is same with number of files - int taskCount = task.getFiles().size(); - return resume(task.dump(), taskCount, control); - } - - @Override - public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileInputPlugin.Control control) - { - PluginTask task = taskSource.loadTask(PluginTask.class); - - control.run(taskSource, taskCount); - - // build next config - ConfigDiff configDiff = Exec.newConfigDiff(); - - // last_path - if (task.getFiles().isEmpty()) { - // keep the last value - if (task.getLastPath().isPresent()) { - configDiff.set("last_path", task.getLastPath().get()); - } - } else { - List files = new ArrayList(task.getFiles()); - Collections.sort(files); - configDiff.set("last_path", files.get(files.size() - 1)); - } - - return configDiff; - } - - @Override - public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { } - - public List listFiles(PluginTask task) - { - Path pathPrefix = Paths.get(task.getPathPrefix()).normalize(); - final Path directory; - final String fileNamePrefix; - if (Files.isDirectory(pathPrefix)) { - directory = pathPrefix; - fileNamePrefix = ""; - } else { - fileNamePrefix = pathPrefix.getFileName().toString(); - Path d = pathPrefix.getParent(); - directory = (d == null ? CURRENT_DIR : d); - } - - final ImmutableList.Builder builder = ImmutableList.builder(); - final String lastPath = task.getLastPath().orNull(); - final Integer fileSize = task.getFileSize().orNull(); - try { - log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix); - Files.walkFileTree(directory, new SimpleFileVisitor() { - @Override - public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) - { - if (path.equals(directory)) { - return FileVisitResult.CONTINUE; - } else if (lastPath != null && path.toString().compareTo(lastPath.substring(0, path.toString().length())) < 0) { - return FileVisitResult.SKIP_SUBTREE; - } else if (path.getFileName().toString().startsWith(".")) { - return FileVisitResult.SKIP_SUBTREE; - } else { - if (path.getFileName().toString().startsWith(fileNamePrefix)) { - return FileVisitResult.CONTINUE; - } else { - return FileVisitResult.SKIP_SUBTREE; - } - } - } - - - @Override - public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) - { - if (lastPath != null && path.toString().compareTo(lastPath) <= 0) { - return FileVisitResult.CONTINUE; - } else if (path.getFileName().toString().startsWith(".")) { - return FileVisitResult.CONTINUE; - } else { - if (path.getFileName().toString().startsWith(fileNamePrefix)) { - if (fileSize == null || path.toFile().length() == fileSize) { - builder.add(path.toString()); - } - } - return FileVisitResult.CONTINUE; - } - } - }); - } catch (IOException ex) { - throw new RuntimeException(String.format("Failed get a list of local files at '%s'", directory), ex); - } - return builder.build(); - } - - @Override - public TransactionalFileInput open(TaskSource taskSource, int taskIndex) - { - final PluginTask task = taskSource.loadTask(PluginTask.class); - final String path = task.getFiles().get(taskIndex); - - return new InputStreamTransactionalFileInput( - task.getBufferAllocator(), - new InputStreamTransactionalFileInput.Opener() { - public InputStream open() throws IOException - { - return new FilenameFileInputStream(path); - } - }) - { - @Override - public void abort() - { } - - @Override - public TaskReport commit() - { - return Exec.newTaskReport(); - } - }; - } -} diff --git a/src/main/java/org/embulk/input/filename/FilenameInputPlugin.java b/src/main/java/org/embulk/input/filename/FilenameInputPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..5be2f656ad390e3a4c7cd6826e2a212f941e7fa6 --- /dev/null +++ b/src/main/java/org/embulk/input/filename/FilenameInputPlugin.java @@ -0,0 +1,421 @@ +/* +Author: CAI Yu +Email: icaiyu0618@gmail.com + +This plugin is aimed for upload the files in multi directories for the Embulk. +To use the plugin read the ReadMe.md carefully. + +This plugin will parse the directories in the multi_dir parameter in your config.yml +The embulk will load the files in each directory. And each directory will be treated as a task. +The embulk will run those tasks in multi threads. + +This plugin should be used with the WendlinPlugin. + +*/ + +package org.embulk.input.filename; +import java.util.stream.Collectors; + +import java.util.List; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.nio.file.attribute.BasicFileAttributeView; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileTime; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.FileVisitResult; +import java.io.File; +import java.io.IOException; +import java.io.FileInputStream; +import java.io.ByteArrayOutputStream; + +//import com.google.common.base.Optional; +import org.slf4j.Logger; + + +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.Task; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.config.ConfigInject; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Exec; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.PageOutput; +import org.embulk.spi.Schema; +import org.embulk.spi.SchemaConfig; +import org.embulk.spi.BufferAllocator; +import org.embulk.spi.ColumnConfig; +import org.embulk.spi.SchemaConfig; + +import static org.embulk.spi.type.Types.STRING; + + +public class FilenameInputPlugin + implements InputPlugin +{ + public interface PluginTask + extends Task + { + @Config("multi_dir") + @ConfigDefault("[]") + ArrayList getMultiDir(); + + @Config("multi_tag") + @ConfigDefault("[]") + ArrayList getMultiTag(); + + @Config("lastPaths") + @ConfigDefault("[]") + ArrayList getLastPaths(); + + @Config("chunk_size") + @ConfigDefault("10485760") + int getChunkSize(); + + @Config("load_order") + @ConfigDefault("\"\"") + String getLoadOrder(); + + + // Not implements yet this Configuration will decide whether upload the symlinks files. + @Config("follow_symlinks") + @ConfigDefault("false") + boolean getFollowSymlinks(); + + + ArrayList> getFiles(); + void setFiles(ArrayList> allFiles); + + @ConfigInject + BufferAllocator getBufferAllocator(); + + + } + + // initialize a log. Very useful tool to display the important information. + private final Logger log = Exec.getLogger(getClass()); + + // Get the CURRENT directory of the embulk runing. + private final static Path CURRENT_DIR = Paths.get(".").normalize(); + + // This varibale record the tags for each directory + private static ArrayList tagList; + + + // This varibale decide the chunkSize of the upload data. + private static int chunkSize; + + + // You need to learn the workflow of the embulk before understand how this transaction work. + @Override + public ConfigDiff transaction(ConfigSource config, + InputPlugin.Control control) + { + // load the task from the TaskSource. from this varibale, we could read the configuration in the config.yml + PluginTask task = config.loadConfig(PluginTask.class); + + // Get the data chunk size + chunkSize = task.getChunkSize(); + + // Read the directories list from the task. + ArrayList dirList = task.getMultiDir(); + + // Read the LastPath list from the task + ArrayList lastPaths = task.getLastPaths(); + + // We create a big Array to contains all directories, and each directories will contain many files. + ArrayList> allFiles = new ArrayList>(); + + // Read the tags list from the task. + tagList = task.getMultiTag(); + + // If the dirList have no directory, we throw an RuntimeException. + if ( dirList.size() != 0 ){ + log.info ("The list of the directories: " + dirList ); + + // If the Number of tags is less than the directories, we say that the default tag for the directory is "" + while (tagList.size() < dirList.size()){ + tagList.add(""); + } + + // if the number of lastPaths is less than the directory, we say the default lastPaths for the directory is "" + while (lastPaths.size() < dirList.size()){ + lastPaths.add(""); + } + } else { + throw new RuntimeException("The multi_dir should contain at least 1 directory."); + } + + + // Now to time to read all files from each directory. + + String order = task.getLoadOrder(); + for (int i =0; i < dirList.size();i++ ){ + String dir = dirList.get(i); + String lastPath = lastPaths.get(i); + + // We have to sort the files before we set them to the tasks. + // Here we get the parameter about the order + + + // This method return the files in a directory,which is already sorted in the this method + ArrayList files = listFiles(task,Paths.get(dir).normalize(),lastPath,order); + + log.info("The files is " + files); + + // add the files to the big Array. + allFiles.add(files); + + } + + // each directory will be treated as a task. the taskCount is the size of the bigArray. + int taskCount = allFiles.size(); + log.info("taskCount of the input plugin is: " + taskCount); + // We set the allFiles to the tasks.files. + task.setFiles(allFiles); + + // Here we add we columns for the Schema. + ArrayList columns = new ArrayList(); + + // Here we add two columns to the columns. + // if you want, we can read the columnName from the seed.yml: final String columnName = task.getColumnName(); + columns.add(new ColumnConfig("payload", STRING, config)); + columns.add(new ColumnConfig("tag", STRING, config)); + + // Create a Schema for the Page. + Schema schema = new SchemaConfig(columns).toSchema(); + + return resume(task.dump(), schema, taskCount, control); + } + + @Override + public ConfigDiff resume(TaskSource taskSource, + Schema schema, int taskCount, + InputPlugin.Control control) + { + control.run(taskSource, schema, taskCount); + return Exec.newConfigDiff(); + } + + @Override + public void cleanup(TaskSource taskSource, + Schema schema, int taskCount, + List successTaskReports) + { + } + + + // This function will be run in every task. As we say that we distribute each directory as task. + // in this method. we have to load all files in the directory. + @Override + public TaskReport run(TaskSource taskSource, + Schema schema, int taskIndex, + PageOutput output) + { + PluginTask task = taskSource.loadTask(PluginTask.class); + + ArrayList files = task.getFiles().get(taskIndex); + + // Check how many files in this directory on the run. + log.info("The files in the run:" + files); + + // For each file, we create some pages. + // If the size of the file is less than chunkSize, we build just one page for this file. + // If the size of the file is more than chunkSize, we build more than one page for this file. + for (String file : files) + { + try + { + int nRead; + byte[] data = new byte[chunkSize]; + FileInputStream dataIn = new FileInputStream(file); + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + // Read the data and build the page. + while ((nRead = dataIn.read(data, 0, data.length)) != -1) { + buffer.write(data, 0, nRead); + try (PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) + { + pageBuilder.setString(0,buffer.toString());//Base64.encodeBase64String(buffer.toByteArray())); + pageBuilder.setString(1, tagList.get(taskIndex) + new File(file).getCanonicalPath() ); + pageBuilder.addRecord(); + buffer.flush(); + pageBuilder.finish(); + } + } + } catch (IOException ex){ + ex.printStackTrace(); + } + } + + TaskReport taskReport = Exec.newTaskReport(); + taskReport.set("columns", schema.size()); + return taskReport; + } + + @Override + public ConfigDiff guess(ConfigSource config) + { + return Exec.newConfigDiff(); + } + + // This method is for walk through the directory and record the files in the directory. It will compare the filename with the lastPath + // In we want to upload the files in ALPHABETICAL order. than the filename "smaller than" the lastPath will be abandonned. + // Be careful, that since we have alternative for the order. You should be careful what "smaller than" means! + public ArrayList listFiles(PluginTask task,Path pathPrefix,final String lastPath,final String order) + { + //Path pathPrefix = Paths.get(task.getPathPrefix()).normalize(); + final Path directory; + final String fileNamePrefix; + if (Files.isDirectory(pathPrefix)) { + directory = pathPrefix; + fileNamePrefix = ""; + } else { + fileNamePrefix = pathPrefix.getFileName().toString(); + Path d = pathPrefix.getParent(); + directory = (d == null ? CURRENT_DIR : d); + } + + + log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix); + ArrayList filesArray= new ArrayList(); + ArrayList files; + + // Walk the directory, attention: this method does not walk its sub dir! + try { + filesArray= Files.walk(directory) + .filter(Files::isRegularFile) + .map(Path::toFile) + .filter(f -> f.getName().startsWith(fileNamePrefix)) + .map(f -> f.getAbsolutePath()) + .collect(Collectors.toCollection(ArrayList::new)); + //return builder.build(); + } catch (IOException ex){ + throw new RuntimeException("Somethig wrong when load the files"); + } + + + + + + Comparator comparator; + + // Be careful, if the files are copied into the dir, it means that the last Modified time of those files are the same + // In this case the ASCEND_MODIFIED and DESCEND_MODIFIED get the same result. + switch (order) { + case "ASCEND_MODIFIED": comparator = new AscendModifiedSorter(); + break; + case "DESCEND_MODIFIED": comparator = new DescendModifiedSorter(); + break; + case "ASCEND_CREATION": comparator = new AscendCreationSorter(); + break; + case "DESCEND_CREATION": comparator = new DescendModifiedSorter(); + break; + default: comparator = new AlphabeticalSorter(); + break; + } + + // Sort the files for each directory + Collections.sort(filesArray,comparator); + + if (!lastPath.equals("")) { + int ind = filesArray.indexOf(lastPath); + if (ind >= 0 && ind < filesArray.size() ){ + return new ArrayList(filesArray.subList(ind + 1, filesArray.size())); + } + } + + return filesArray; + } + + + + // Static method to return a FileTime of a file + public static FileTime getCreationTime(String filename) throws IOException{ + File file = new File(filename); + Path p = Paths.get(file.getAbsolutePath()); + BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes(); + FileTime fileTime = view.creationTime(); + //System.out.println("The raw creation time of " +filename+ " is " + fileTime.toString()); + return fileTime; + } + + public static FileTime getLastModifiedTime(String filename) throws IOException{ + File file = new File(filename); + Path p = Paths.get(file.getAbsolutePath()); + BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes(); + FileTime fileTime = view.lastModifiedTime(); + //System.out.println("The raw last modified time of " +filename+ " is " + fileTime.toString()); + return fileTime; + } + + + // Those sorter is the implementation of the Comparator to help sort the files! + class AlphabeticalSorter implements Comparator { + @Override + public int compare(String f1, String f2) { + return f1.compareTo(f2); + } + } + + + class AscendModifiedSorter implements Comparator { + @Override + public int compare(String f1, String f2) { + try{ + return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + } + + + class AscendCreationSorter implements Comparator { + @Override + public int compare(String f1, String f2) { + try{ + return getCreationTime(f1).compareTo(getCreationTime(f2)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + } + + class DescendModifiedSorter implements Comparator{ + @Override + public int compare(String f1, String f2) { + try{ + return getLastModifiedTime(f2).compareTo(getLastModifiedTime(f1)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + } + + + class DescendCreationSorter implements Comparator{ + @Override + public int compare(String f1, String f2) { + try{ + return getCreationTime(f2).compareTo(getCreationTime(f1)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + } + +} \ No newline at end of file diff --git a/src/test/java/org/embulk/input/filename/JoinfileOutputPlugin.java b/src/test/java/org/embulk/input/filename/JoinfileOutputPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..44983993b3e01c7f09444d4469bfd4df40e58cf5 --- /dev/null +++ b/src/test/java/org/embulk/input/filename/JoinfileOutputPlugin.java @@ -0,0 +1,179 @@ +package org.embulk.input.filename; + + +import java.util.List; +import java.util.ArrayList; + +import com.google.common.base.Optional; + +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.Task; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.spi.Exec; +import org.embulk.spi.OutputPlugin; +import org.embulk.spi.PageOutput; +import org.embulk.spi.Schema; +import org.embulk.spi.Page; +import org.embulk.spi.TransactionalPageOutput; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; + + +public class JoinfileOutputPlugin + implements OutputPlugin +{ + public interface PluginTask + extends Task + { + // configuration option 1 (required integer) + @Config("path_prefix") + public String getPathPrefix(); + + // configuration option 2 (optional string, null is not allowed) + @Config("file_ext") + public String getFileExt(); + + + @Config("sum_type") + @ConfigDefault("filename") + public String getSumType(); + + } + + private final Logger log = Exec.getLogger(getClass()); + + private static ArrayList outputs; + + private static ArrayList lastP = new ArrayList (); + + private static String sumType; + + @Override + public ConfigDiff transaction(ConfigSource config, + Schema schema, int taskCount, + OutputPlugin.Control control) + { + PluginTask task = config.loadConfig(PluginTask.class); + + sumType = task.getSumType(); + + // retryable (idempotent) output: + // return resume(task.dump(), schema, taskCount, control); + + // non-retryable (non-idempotent) output: + + String path; + outputs = new ArrayList (); + FileOutputStream output; + for (int i = 0 ; i < taskCount; i ++){ + path = task.getPathPrefix() + i + "." + task.getFileExt(); + try{ + output = new FileOutputStream(new File(path)); + outputs.add(output); + } catch (FileNotFoundException ex) { + throw new RuntimeException (ex); + } + } + + // for the ConfigDiff, we set the last Path of each task is "" as default. + for (int i = 0 ; i< taskCount; i++) + { + lastP.add(""); + } + + + control.run(task.dump()); + + + closeFile(); + return Exec.newConfigDiff(); + } + + @Override + public ConfigDiff resume(TaskSource taskSource, + Schema schema, int taskCount, + OutputPlugin.Control control) + { + throw new UnsupportedOperationException("joinfile output plugin does not support resuming"); + } + + @Override + public void cleanup(TaskSource taskSource, + Schema schema, int taskCount, + List successTaskReports) + { + } + + @Override + public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex) + { + PluginTask task = taskSource.loadTask(PluginTask.class); + + final int ind = taskIndex; + + return new TransactionalPageOutput(){ + //private final List filenames = new ArrayList<>() ; + + public void add(Page page){ + //log.info("The ADD: " + page.getStringReferences() + " ## " +page.getValueReferences()); + try { + List pageArray = page.getStringReferences(); + String content = page.getStringReference(0); + String line = page.getStringReference(1) + "\n"; + String tag = page.getStringReference(1); + if (sumType.equals("filename")){ + outputs.get(ind).write(line.getBytes()); + }else{ + outputs.get(ind).write(content.getBytes()); + } + lastP.set(ind ,tag); + + } catch (IOException ex) { + + throw new RuntimeException(ex); + } + } + + public void finish(){ + //log.info("Finished"); + } + + public void close(){ + //log.info("closed"); + } + + public void abort(){ + + } + + public TaskReport commit(){ + return Exec.newTaskReport(); + + } + + }; + // Write your code here :) + //throw new UnsupportedOperationException("JoinfileOutputPlugin.run method is not implemented yet"); + } + + public static void closeFile() + { + for ( FileOutputStream outp: outputs){ + if (outp != null) { + try { + outp.close(); + }catch ( IOException ex ) { + throw new RuntimeException(ex); + } + } + } + } +} diff --git a/src/test/java/org/embulk/input/filename/TestFilenameFileInputPlugin.java b/src/test/java/org/embulk/input/filename/TestFilenameFileInputPlugin.java deleted file mode 100644 index 510953c0172db243794a18603eb4ab4017ba351f..0000000000000000000000000000000000000000 --- a/src/test/java/org/embulk/input/filename/TestFilenameFileInputPlugin.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.embulk.input.filename; - -public class TestFilenameFileInputPlugin -{ -} diff --git a/src/test/java/org/embulk/input/filename/TestFilenameInputPlugin.java b/src/test/java/org/embulk/input/filename/TestFilenameInputPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..20114ad0384cd2b5d94e762c225fada6770cc5ac --- /dev/null +++ b/src/test/java/org/embulk/input/filename/TestFilenameInputPlugin.java @@ -0,0 +1,403 @@ +package org.embulk.input.filename; + +import com.google.common.collect.ImmutableList; +import java.util.stream.Stream; +import java.util.stream.Collectors; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.BasicFileAttributeView; +import java.nio.file.attribute.FileTime; +import java.util.Comparator; + + +import org.embulk.config.ConfigSource; +import org.embulk.config.ConfigDiff; +import org.embulk.test.EmbulkTests; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.ParserPlugin; +import org.embulk.spi.OutputPlugin; +import org.embulk.spi.SchemaConfig; +import org.embulk.spi.ColumnConfig; +import org.junit.Rule; +import org.junit.Before; +import org.junit.Test; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.Files; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Arrays; +import java.util.ListIterator; + + +import static org.embulk.test.EmbulkTests.readSortedFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class TestFilenameInputPlugin +{ + + // This method is to return the creationTime of the file + public static FileTime getCreationTime(String filename) throws IOException{ + File file = new File(filename); + Path p = Paths.get(file.getAbsolutePath()); + BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes(); + FileTime fileTime = view.creationTime(); + return fileTime; + } + + // This method is to return the Last Modified time of the file. + public static FileTime getLastModifiedTime(String filename) throws IOException{ + File file = new File(filename); + Path p = Paths.get(file.getAbsolutePath()); + BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes(); + FileTime fileTime = view.lastModifiedTime(); + return fileTime; + } + + // In this test, we need to use the TestHelper which will simulate the embulk. + // To embulk run with the plugin we want. You need to register the plugin first + // (Just like you need to configure the plugin in the configur.yml) + // + @Rule + public TestHelper embulk = TestHelper.builder() + .registerPlugin(InputPlugin.class,"filename",FilenameInputPlugin.class) + .registerPlugin(OutputPlugin.class,"joinfile",JoinfileOutputPlugin.class) + .build(); + + + @Test + public void testOrderByModifiedTime() throws Exception{ + + ConfigSource execConfig = embulk.newConfig() + .set("min_output_tasks","1"); + + Path path_src = Paths.get("src/test/resources/testModifiedOrder"); + + ArrayList multi_dir = new ArrayList (); + multi_dir.add(path_src.toAbsolutePath().toString()+"/sample_"); + ConfigSource inConfig = embulk.newConfig() + .set("type","filename") + .set("multi_dir",multi_dir) + .set("load_order","ASCEND_MODIFIED"); + + Path tmp = embulk.createTempDir(); + + ConfigSource outConfig = embulk.newConfig() + .set("type","joinfile") + .set("sum_type","filename") + .set("path_prefix",tmp.toString()+"/outputfile") + .set("file_ext","txt"); + + TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig); + + //Attention the readAllLines load all lines into memory, it is not recommanded to read a big file. + List lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt")); + + List actual = Files.walk(path_src) + .filter(Files::isRegularFile) + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect(Collectors.toList()); + + + Collections.sort(actual,new Comparator(){ + @Override + public int compare(String f1, String f2) { + try{ + return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + }); + + System.out.println("The lines" + lines); + System.out.println("The actual" + actual); + + assertEquals(lines,actual); + + inConfig.set("load_order","DESCEND_MODIFIED"); + res = embulk.runAllBuilder(execConfig,inConfig,outConfig); + lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt")); + + + + //We reverse the actual files + Collections.reverse(actual); + + System.out.println("The lines" + lines); + System.out.println("The actual" + actual); + + assertEquals(lines,actual); + + } + + + + @Test + public void testTagList() throws Exception{ + + ConfigSource execConfig = embulk.newConfig() + .set("min_output_tasks","1"); + + Path path_src = Paths.get("src/test/resources/testDirList"); + + // Be careful the name of the List should be multi_dir! + List multi_dir = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_",path_src.toAbsolutePath().toString()+"/example/example_"); + List multi_tag = Arrays.asList("hello","world"); + + ConfigSource inConfig = embulk.newConfig() + .set("type","filename") + .set("load_order","ASCEND_MODIFIED") + .set("multi_dir",multi_dir) + .set("multi_tag",multi_tag); + System.out.println(inConfig); + + Path tmp = embulk.createTempDir(); + + ConfigSource outConfig = embulk.newConfig() + .set("type","joinfile") + .set("sum_type","filename") + .set("path_prefix",tmp.toString()+"/outputfile") + .set("file_ext","txt"); + + TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig); + + List lines1 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt")); + List lines2 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile1.txt")); + + //List actual = Files.readAllLines(Paths.get(path_src+"/test.csv")); + + List dir1 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/sample")) + .filter(Files::isRegularFile) + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect(Collectors.toList()); + + + Collections.sort(dir1,new Comparator(){ + @Override + public int compare(String f1, String f2) { + try{ + return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + }); + + List dir2 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/example")) + .filter(Files::isRegularFile) + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect(Collectors.toList()); + + + Collections.sort(dir2,new Comparator(){ + @Override + public int compare(String f1, String f2) { + try{ + return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + }); + + + for (ListIterator i = dir1.listIterator(); i.hasNext(); ) + { + i.set(multi_tag.get(0) + i.next()); + } + + for (ListIterator i = dir2.listIterator(); i.hasNext(); ) + { + i.set(multi_tag.get(1) + i.next()); + } + + + + //dir1.addAll(dir2); + System.out.println(lines1); + System.out.println(dir1); + assertEquals(lines1,dir1); + assertEquals(lines2,dir2); + + } + + + @Test + public void testDirList() throws Exception{ + + ConfigSource execConfig = embulk.newConfig() + .set("min_output_tasks","1"); + + Path path_src = Paths.get("src/test/resources/testDirList"); + + // Be careful the name of the List should be multi_dir! + List multi_dir = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_",path_src.toAbsolutePath().toString()+"/example/example_"); + List multi_tag = Arrays.asList("hello","world"); + + ConfigSource inConfig = embulk.newConfig() + .set("type","filename") + .set("load_order","ASCEND_MODIFIED") + .set("multi_dir",multi_dir); + Path tmp = embulk.createTempDir(); + + ConfigSource outConfig = embulk.newConfig() + .set("type","joinfile") + .set("sum_type","filename") + .set("path_prefix",tmp.toString()+"/outputfile") + .set("file_ext","txt"); + + TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig); + + List lines1 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt")); + List lines2 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile1.txt")); + + List dir1 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/sample")) + .filter(Files::isRegularFile) + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect(Collectors.toList()); + + + Collections.sort(dir1,new Comparator(){ + @Override + public int compare(String f1, String f2) { + try{ + return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + }); + + List dir2 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/example")) + .filter(Files::isRegularFile) + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect(Collectors.toList()); + + + Collections.sort(dir2,new Comparator(){ + @Override + public int compare(String f1, String f2) { + try{ + return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2)); + } catch (IOException ex){ + ex.printStackTrace(); + } + return 0; + } + }); + + + //dir1.addAll(dir2); + System.out.println(lines1); + System.out.println(dir1); + assertEquals(lines1,dir1); + assertEquals(lines2,dir2); + } + + + @Test + public void testContent() throws Exception{ + + ConfigSource execConfig = embulk.newConfig() + .set("min_output_tasks","1"); + + Path path_src = Paths.get("src/test/resources/data"); + + ArrayList multi_dir = new ArrayList (); + multi_dir.add(path_src.toAbsolutePath().toString()+"/test.csv"); + ConfigSource inConfig = embulk.newConfig() + .set("type","filename") + .set("load_order","ALPHABETICAL") + .set("multi_dir",multi_dir); + + Path tmp = embulk.createTempDir(); + + ConfigSource outConfig = embulk.newConfig() + .set("type","joinfile") + .set("sum_type","content") + .set("path_prefix",tmp.toString()+"/outputfile") + .set("file_ext","txt"); + + TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig); + + List lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt")); + + List actual = Files.readAllLines(Paths.get(path_src+"/test.csv")); + //System.out.println("The lines " + lines); + //System.out.println("The actual " + actual); + assertEquals(actual,lines); + } + + @Test + public void testLastPath() throws Exception{ + ConfigSource execConfig = embulk.newConfig() + .set("min_output_tasks","1"); + + Path path_src = Paths.get("src/test/resources/testDirList"); + + // Be careful the name of the List should be multi_dir! + List multi_dir = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_",path_src.toAbsolutePath().toString()+"/example/example_"); + List multi_tag = Arrays.asList("hello","world"); + + List lastPaths = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_02.txt",path_src.toAbsolutePath().toString()+"/example/example_01.txt"); + + ConfigSource inConfig = embulk.newConfig() + .set("type","filename") + .set("load_order","ALPHABETICAL") + .set("lastPaths",lastPaths) + .set("multi_dir",multi_dir); + Path tmp = embulk.createTempDir(); + + ConfigSource outConfig = embulk.newConfig() + .set("type","joinfile") + .set("sum_type","filename") + .set("path_prefix",tmp.toString()+"/outputfile") + .set("file_ext","txt"); + + TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig); + + List lines1 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt")); + List lines2 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile1.txt")); + + List dir1 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/sample")) + .filter(Files::isRegularFile) + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect(Collectors.toList()); + + + Collections.sort(dir1); + + List dir2 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/example")) + .filter(Files::isRegularFile) + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect(Collectors.toList()); + List fromLastPath = dir2.subList(0,dir2.size()); + + + Collections.sort(dir2); + + + //System.out.println(lines1); + //System.out.println(dir1.subList(2,dir1.size())); + //System.out.println(lines2); + //System.out.println(dir2.subList(1,dir2.size())); + assertEquals(lines1,dir1.subList(2,dir1.size())); + assertEquals(lines2,dir2.subList(1,dir2.size())); + + } +} diff --git a/src/test/java/org/embulk/input/filename/TestHelper.java b/src/test/java/org/embulk/input/filename/TestHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..da664f8c28bcf46f34292d0481702890538bba8f --- /dev/null +++ b/src/test/java/org/embulk/input/filename/TestHelper.java @@ -0,0 +1,535 @@ +/* +// With the embulk test framework, if you want to test the input plugin, it will use the csv parser and file output! +// This TestHelper is writed for unit test using many thrid party plugin; +// For example, to test the filename plugin, I need the parser-none-bin and output-joinfile +// To use the plugins, just register them when initialize the embulk + + @Rule + public TestingEmbulk embulk = TestingEmbulk.builder() + .registerPlugin(InputPlugin.class,"filename",FilenameFileInputPlugin.class) + .registerPlugin(ParserPlugin.class,"none-bin",NoneBinParserPlugin.class) + .build(); + +// For the configSource, you can read the yml file in the resources + + embulk.runAllBuilder("Path to your config.yml"); + +// Or you can generate the configSource manually + + ConfigSource inConfig = embulk.newConfig() + .set("type","filename") + .set("path_prefix",rootPath+"/data/test.csv") + .set("parser",embulk.newConfig() + .set("charset","UTF-8") + .set("newline","CRLF") + .set("type","csv") + .set("delimiter",",") + .set("quote","") + .set("columns",embulk.newSchemaConfig("filename:string"))); + + ConfigSource execConfig = embulk.newConfig() + .set("max_threads","1"); + + ConfigSource outConfig = embulk.newConfig() + +//two config are required: inConfig and outConfig, and two are optional: execConfig and filtersConfig + embulk.runAllBuilder(inConfig,outConfig); +//Or: + embulk.runAllBuilder(execConfig,inConfig,outConfig); +//Or: + embulk.runAllBuilder(execConfig,inConfig,filtersConfig,outConfig); + +//If you want to use the TempDiretory for the output path + + + Path tmp = embulk.createTempDir(); + ConfigSource outConfig = embulk.newConfig() + .set("type","joinfile") + .set("path_prefix",+tmp.toString() + "/sample_") + .set("file_ext","txt"); + + // After runing the embulk you can extract the file from tmp to assert that the result is ok. + + +*/ +package org.embulk.input.filename; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.common.io.ByteStreams; + + +import org.embulk.test.EmbulkTests; +import org.embulk.test.PreviewResultInputPlugin; +//import org.embulk.test.TestingBulkLoader; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; + +import org.embulk.EmbulkEmbed; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.config.TaskReport; +import org.embulk.spi.SchemaConfig; +import org.embulk.spi.ColumnConfig; +import org.embulk.spi.TempFileException; +import org.embulk.spi.TempFileSpace; +import org.embulk.spi.Schema; +import org.embulk.spi.SchemaConfig; +import org.embulk.exec.PreviewResult; + + +import org.junit.rules.TestRule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.embulk.plugin.InjectedPluginSource.registerPluginTo; + + + +//The import of bulkLoader +import com.google.common.base.Function; +import com.google.common.base.Optional; +//import com.google.common.collect.ImmutableList; +//import com.google.inject.Binder; +import com.google.inject.Inject; +//import com.google.inject.Injector; +//import com.google.inject.Module; +import com.google.inject.util.Modules; +//import java.util.List; + +//import org.embulk.config.ConfigSource; +//import org.embulk.config.TaskReport; +import org.embulk.exec.BulkLoader; +import org.embulk.exec.ExecutionResult; +import org.embulk.exec.ForSystemConfig; +import org.embulk.exec.ResumeState; +import org.embulk.spi.Exec; +import org.embulk.spi.ExecSession; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.Schema; +import org.slf4j.Logger; + + +public class TestHelper implements TestRule +{ + public static class Builder{ + private List modules = new ArrayList<>(); + + Builder() + {} + + public Builder registerPlugin(final Class iface, final String name, final Class impl) + { + modules.add(new Module() { + public void configure(Binder binder) + { + registerPluginTo(binder, iface, name, impl); + } + }); + + return this; + } + + public TestHelper build() + { + return new TestHelper(this); + } + } + + public static Builder builder() + { + return new Builder(); + } + + private final List modules; + + private EmbulkEmbed embed; + + private TempFileSpace tempFiles; + + TestHelper(Builder builder) + { + this.modules = ImmutableList.copyOf(builder.modules); + reset(); + } + + public void reset() + { + destroy(); + this.embed = new EmbulkEmbed.Bootstrap() + .addModules(modules) + .overrideModules(TestingBulkLoader.override()) + .initializeCloseable(); + + try { + this.tempFiles = new TempFileSpace(Files.createTempDirectory("embulk-test-temp-").toFile()); + } catch (IOException ex) { + throw new TempFileException(ex); + } + } + + + public void destroy() + { + if (embed != null){ + embed.destroy(); + embed = null; + } + if (tempFiles != null){ + tempFiles.cleanup(); + tempFiles = null; + } + } + + @Override + public Statement apply(Statement base, Description description) + { + return new EmbulkTestingEmbedWatcher().apply(base, description); + } + + private class EmbulkTestingEmbedWatcher extends TestWatcher + { + @Override + protected void starting(Description description) + { + reset(); + } + + @Override + protected void finished(Description description) + { + destroy(); + } + } + + + //This is very strange you need to create a file to create a temp directory + public Path createTempFile(String suffix) + { + return tempFiles.createTempFile(suffix).toPath(); + } + + public Path createTempDir() throws IOException + { + Path tp = Files.createTempDirectory(null); + tp.toFile().deleteOnExit(); + return tp; + } + + // Useless + public Injector injector() + { + return embed.getInjector(); + } + + public ConfigLoader configLoader() + { + return embed.newConfigLoader(); + } + + public ConfigSource newConfig() + { + return configLoader().newConfigSource(); + } + + public SchemaConfig newSchemaConfig(String...configs){ + ImmutableList.Builder schema = ImmutableList.builder(); + for (String column: configs){ + ColumnConfig columnConfig = newColumnConfig(column); + if (columnConfig != null){ + schema.add(columnConfig); + } + } + return new SchemaConfig(schema.build()); + } + + public ColumnConfig newColumnConfig(String column){ + String[] tuple = column.split(":",2); + return new ColumnConfig(newConfig() + .set("name",tuple[0]) + .set("type",tuple[1])); + } + + + //Need to import the EMbulkTests + public ConfigSource loadYamlResource(String name) + { + return configLoader().fromYamlString(EmbulkTests.readResource(name)); + } + + + private static final List SUPPORTED_TYPE = ImmutableList.of("boolean","long","double","string","timestamp","json"); + + public static interface RunResult + { + ConfigDiff getConfigDiff(); + List getIgnoredExceptions(); + Schema getInputSchema(); + Schema getOutputSchema(); + List getInputTaskReports(); + List getOutputTaskReports(); + } + + //Do not use the InputBuilder, ParserBuilder, OutputBuilder, I sum them together + public class AllBuilder + { + private ConfigSource inConfig = null; + private List filtersConfig = ImmutableList.of(); + private ConfigSource execConfig = null; + private ConfigSource outConfig = null; + private ConfigSource config = null; + //private Path outputPath = null; + + private AllBuilder() + {} + + + // In the inConfig, the parser config should be set. + public AllBuilder in(ConfigSource inConfig) + { + checkNotNull(inConfig,"inConfig"); + this.inConfig = inConfig.deepCopy(); + return this; + } + + public AllBuilder filters (List filtersConfig) + { + checkNotNull(filtersConfig,"filtersConfig"); + ImmutableList.Builder builder = ImmutableList.builder(); + for (ConfigSource filter : filtersConfig){ + builder.add(filter.deepCopy()); + } + + this.filtersConfig = builder.build(); + return this; + } + + public AllBuilder exec (ConfigSource execConfig) + { + checkNotNull(execConfig,"execConfig"); + this.execConfig = execConfig.deepCopy(); + return this; + } + + public AllBuilder out(ConfigSource outConfig) + { + checkNotNull(outConfig,"outConfig"); + this.outConfig = outConfig.deepCopy(); + return this; + } + + //public ConfigDiff guess(){} + + //public PreviewResult preview() throws IOException{} + + public RunResult run() throws IOException + { + checkState(inConfig != null, "in config must be set"); + checkState(outConfig != null, "out config must be set"); + + ConfigSource config = newConfig() + .set("exec",execConfig) + .set("in",inConfig) + .set("filters",filtersConfig) + .set("out",outConfig); + + return (RunResult) embed.run(config); + } + + public RunResult runFromYml(String name) throws IOException + { + ConfigSource config = loadYamlResource(name); + return (RunResult) embed.run(config); + } + + } + + + private RunResult buildRunResultWithOutput(RunResult result, Path outputDir, Path outputPath) throws IOException + { + copyToPath(outputDir, outputPath); + return result; + } + + private void copyToPath(Path outputDir, Path outputPath) throws IOException + { + try (OutputStream out = Files.newOutputStream(outputPath)){ + List fragments = new ArrayList (); + try (DirectoryStream stream = Files.newDirectoryStream(outputDir, "fragments_*.csv")){ + for (Path fragment : stream){ + fragments.add(fragment); + } + } + Collections.sort(fragments); + for (Path fragment : fragments) { + try (InputStream in = Files.newInputStream(fragment)){ + ByteStreams.copy(in,out); + } + } + } + } + + public AllBuilder allBuilder() + { + return new AllBuilder(); + } + + + + public RunResult runAllBuilder(String name) throws IOException + { + return allBuilder() + .runFromYml(name); + } + + public RunResult runAllBuilder(ConfigSource inConfig, ConfigSource outConfig) throws IOException + { + return allBuilder() + .in(inConfig) + .out(outConfig) + .run(); + } + + + public RunResult runAllBuilder(ConfigSource execConfig,ConfigSource inConfig, ConfigSource outConfig) throws IOException + { + return allBuilder() + .exec(execConfig) + .in(inConfig) + .out(outConfig) + .run(); + } + + public RunResult runAllBuilder(ConfigSource execConfig,ConfigSource inConfig, List filtersConfig, ConfigSource outConfig) throws IOException + { + return allBuilder() + .exec(execConfig) + .in(inConfig) + .filters(filtersConfig) + .out(outConfig) + .run(); + } +} + //the testingbulkloader is under here + + +class TestingBulkLoader + extends BulkLoader +{ + static Function, List> override() + { + return new Function, List>() { + @Override + public List apply(List modules) + { + Module override = new Module() { + public void configure(Binder binder) + { + binder.bind(BulkLoader.class).to(TestingBulkLoader.class); + registerPluginTo(binder, InputPlugin.class, "preview_result", PreviewResultInputPlugin.class); + } + }; + return ImmutableList.of(Modules.override(modules).with(ImmutableList.of(override))); + } + }; + } + + @Inject + public TestingBulkLoader(Injector injector, + @ForSystemConfig ConfigSource systemConfig) + { + super(injector, systemConfig); + } + + @Override + protected LoaderState newLoaderState(Logger logger, ProcessPluginSet plugins) + { + return new TestingLoaderState(logger, plugins); + } + + protected static class TestingLoaderState + extends LoaderState + { + public TestingLoaderState(Logger logger, ProcessPluginSet plugins) + { + super(logger, plugins); + } + + @Override + public ExecutionResult buildExecuteResultWithWarningException(Throwable ex) + { + ExecutionResult result = super.buildExecuteResultWithWarningException(ex); + return new TestingExecutionResult(result, buildResumeState(Exec.session()), Exec.session()); + } + } + + static class TestingExecutionResult + extends ExecutionResult + implements TestHelper.RunResult + { + private final Schema inputSchema; + private final Schema outputSchema; + private final List inputTaskReports; + private final List outputTaskReports; + + public TestingExecutionResult(ExecutionResult orig, + ResumeState resumeState, ExecSession session) + { + super(orig.getConfigDiff(), orig.isSkipped(), orig.getIgnoredExceptions()); + this.inputSchema = resumeState.getInputSchema(); + this.outputSchema = resumeState.getOutputSchema(); + this.inputTaskReports = buildReports(resumeState.getInputTaskReports(), session); + this.outputTaskReports = buildReports(resumeState.getOutputTaskReports(), session); + } + + private static List buildReports(List> optionalReports, ExecSession session) + { + ImmutableList.Builder reports = ImmutableList.builder(); + for (Optional report : optionalReports) { + reports.add(report.or(session.newTaskReport())); + } + return reports.build(); + } + + @Override + public Schema getInputSchema() + { + return inputSchema; + } + + @Override + public Schema getOutputSchema() + { + return outputSchema; + } + + @Override + public List getInputTaskReports() + { + return inputTaskReports; + } + + @Override + public List getOutputTaskReports() + { + return outputTaskReports; + } + } +} + + diff --git a/src/test/resources/data/test.csv b/src/test/resources/data/test.csv new file mode 100644 index 0000000000000000000000000000000000000000..d0c22f6568545acbd1d4cedc815a446f10afe6bb --- /dev/null +++ b/src/test/resources/data/test.csv @@ -0,0 +1,2 @@ +ABCDEFG +HIJKL diff --git a/src/test/resources/test.yml b/src/test/resources/test.yml new file mode 100644 index 0000000000000000000000000000000000000000..52cd06465059f101a1f6a4743bd7da131dda6235 --- /dev/null +++ b/src/test/resources/test.yml @@ -0,0 +1,9 @@ +type: filename +parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: '^@' + quote: '' + columns: + - {name: filename, type: string} diff --git a/src/test/resources/testDirList/example/example_01.txt b/src/test/resources/testDirList/example/example_01.txt new file mode 100644 index 0000000000000000000000000000000000000000..d36186f83e9865d46842c6db38a0986c07fd6044 --- /dev/null +++ b/src/test/resources/testDirList/example/example_01.txt @@ -0,0 +1,2 @@ +0101 +0101 diff --git a/src/test/resources/testDirList/example/example_02.txt b/src/test/resources/testDirList/example/example_02.txt new file mode 100644 index 0000000000000000000000000000000000000000..0d096255e4e07082a0b03cd5eb7ee0d219dfd04e --- /dev/null +++ b/src/test/resources/testDirList/example/example_02.txt @@ -0,0 +1,2 @@ +0202 +0202 diff --git a/src/test/resources/testDirList/sample/sample_01.txt b/src/test/resources/testDirList/sample/sample_01.txt new file mode 100644 index 0000000000000000000000000000000000000000..9d15efcb02611747a2a92711eb7a7444e2d18378 --- /dev/null +++ b/src/test/resources/testDirList/sample/sample_01.txt @@ -0,0 +1,3 @@ +a01 +a01 +a01 diff --git a/src/test/resources/testDirList/sample/sample_02.txt b/src/test/resources/testDirList/sample/sample_02.txt new file mode 100644 index 0000000000000000000000000000000000000000..324a6ec4201c5c1d59dee75a0a29f807d4043205 --- /dev/null +++ b/src/test/resources/testDirList/sample/sample_02.txt @@ -0,0 +1,3 @@ +b02 +b02 +b02 diff --git a/src/test/resources/testDirList/sample/sample_03.txt b/src/test/resources/testDirList/sample/sample_03.txt new file mode 100644 index 0000000000000000000000000000000000000000..72929f17c02505a3d99aecd232d3145a49523abc --- /dev/null +++ b/src/test/resources/testDirList/sample/sample_03.txt @@ -0,0 +1,3 @@ +ccc03 +ccc03 +ccc03 diff --git a/src/test/resources/testDirList/sample/sample_04.txt b/src/test/resources/testDirList/sample/sample_04.txt new file mode 100644 index 0000000000000000000000000000000000000000..eb03fcf8d048915c7e7f81f238a20287c7f56585 --- /dev/null +++ b/src/test/resources/testDirList/sample/sample_04.txt @@ -0,0 +1,6 @@ +dddd04 +dddd04 +dddd04 +ffff06 +gggg08 +hhhh06 diff --git a/src/test/resources/testDirList/sample/sample_05.txt b/src/test/resources/testDirList/sample/sample_05.txt new file mode 100644 index 0000000000000000000000000000000000000000..a80ff81d821de5565808e5f56ad4193c01601707 --- /dev/null +++ b/src/test/resources/testDirList/sample/sample_05.txt @@ -0,0 +1,4 @@ +eeeee05 +eeeee05 +eeeee05 + diff --git a/src/test/resources/testDirList/sample/sample_06.txt b/src/test/resources/testDirList/sample/sample_06.txt new file mode 100644 index 0000000000000000000000000000000000000000..b89a46188ee2e3c5763228da447e070142f0452c --- /dev/null +++ b/src/test/resources/testDirList/sample/sample_06.txt @@ -0,0 +1,3 @@ +fffff06 +fffff06 +fffff06 diff --git a/src/test/resources/testDirList/sample/sample_07.txt b/src/test/resources/testDirList/sample/sample_07.txt new file mode 100644 index 0000000000000000000000000000000000000000..a09451788c3db38d09eb85215bc2e51e714388a7 --- /dev/null +++ b/src/test/resources/testDirList/sample/sample_07.txt @@ -0,0 +1,9 @@ +afdasfgdfagdjg;ashdgklhdg;khdkjgndk;sagbnkadbnkghadskjgnvkdavbdfjbngkj;ldng;khg +hakd;hfehfkajdlgdabdba;hjag;sdgnkdngk;adsngjghkhjlkjljojaldfjlanf;aknhgk;adhg;ajg;lag +asdfgalkdhgkajdbngkahdgkahdkgndksjngkhkhjljiangladfgsdf +adfbkaldfhakdslhfkaldsh +abcdefghijlkjfafhodmjjmkdf +afkhjdofa;j;djfl;ajflkasjdfk;ankfjlndhkajlhgkalhgklahglkl +afhgakdhfgklasdhgkahknkdanfkhkhnkljahdfkanfhjjianlgla +afjljl;j;ajkajkldfakfhakjfdlajfldsjflajdslfjaldjfl +afjlkadsjflajlfjdlasfjlas diff --git a/src/test/resources/testModifiedOrder.yml b/src/test/resources/testModifiedOrder.yml new file mode 100644 index 0000000000000000000000000000000000000000..624d34453932f55949b7c57178058b85325b538d --- /dev/null +++ b/src/test/resources/testModifiedOrder.yml @@ -0,0 +1,5 @@ + +type: filename +path_prexfix: testModified/sample_ +order_by_modified_time: 2 + diff --git a/src/test/resources/testModifiedOrder/sample_1.txt b/src/test/resources/testModifiedOrder/sample_1.txt new file mode 100644 index 0000000000000000000000000000000000000000..e8183f05f5db68b3934e93f4bf6bed2bb664e0b5 --- /dev/null +++ b/src/test/resources/testModifiedOrder/sample_1.txt @@ -0,0 +1,3 @@ +1 +1 +1 diff --git a/src/test/resources/testModifiedOrder/sample_2.txt b/src/test/resources/testModifiedOrder/sample_2.txt new file mode 100644 index 0000000000000000000000000000000000000000..487b1165348be4c37571ab484762b1d5b43384e2 --- /dev/null +++ b/src/test/resources/testModifiedOrder/sample_2.txt @@ -0,0 +1,4 @@ +2 +2 +2 +2 diff --git a/src/test/resources/testModifiedOrder/sample_3.txt b/src/test/resources/testModifiedOrder/sample_3.txt new file mode 100644 index 0000000000000000000000000000000000000000..37080a7e4cff4fcc93465be93799e8816ab712e2 --- /dev/null +++ b/src/test/resources/testModifiedOrder/sample_3.txt @@ -0,0 +1,5 @@ +3 +3 +3 +3 + diff --git a/src/test/resources/testModifiedOrder/sample_4.txt b/src/test/resources/testModifiedOrder/sample_4.txt new file mode 100644 index 0000000000000000000000000000000000000000..e785149d264fc6730aade7e6d7c2c353f555a745 --- /dev/null +++ b/src/test/resources/testModifiedOrder/sample_4.txt @@ -0,0 +1,4 @@ +4 +4 +4 +4