Commit a3fe2add authored by yu's avatar yu

Finish the multi thread

parent e226df76
Copyright (C) 2016 Nexedi SA and Contributors
Klaus Wölfel <klaus@nexedi.com>
Licensed under the Apache License, Version 2.0 (the "License"); MIT License
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
Unless required by applicable law or agreed to in writing, software The above copyright notice and this permission notice shall be
distributed under the License is distributed on an "AS IS" BASIS, included in all copies or substantial portions of the Software.
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
limitations under the License. EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# Filename file input plugin for Embulk # Filename input plugin for Embulk
Embulk filename file input plugin similar to local file input which overloads FileInputStream read methods to provide the filename in the first bytes of the stream TODO: Write short description here and build.gradle file.
## Overview ## Overview
* **Plugin type**: file input * **Plugin type**: input
* **Resume supported**: yes * **Resume supported**: yes
* **Cleanup supported**: yes * **Cleanup supported**: yes
* **Guess supported**: no
## Configuration ## Configuration
- **option1**: path_prefix (string, required) - **option1**: description (integer, required)
- **option2**: description (string, default: `"myvalue"`)
- **option3**: description (string, default: `null`)
## Example ## Example
seed.yml:
```yaml ```yaml
exec:
min_output_tasks: 1
in: in:
type: filename type: filename
path_prefix: /path/to/my/files option1: example1
parser: option2: example2
type: none-bin
out:
type: wendelin
tag: my_tag
streamtool_uri: https://my_instance.host.vifib.net:/erp5/portal_ingestion_policies/my_ingestion_policy
user: my_user
password: my_password
```
## Install
```
$ embulk gem install embulk-input-filename embulk-parser-none-bin embulk-output-wendelin
``` ```
## Run
```
$ embulk run seed.yml -c diff.yml
```
## Build ## Build
```
$ ./gradlew package
```
## Build Package
``` ```
$ ./gradlew gem # -t to watch change of files and rebuild continuously $ ./gradlew gem # -t to watch change of files and rebuild continuously
``` ```
...@@ -14,16 +14,20 @@ configurations { ...@@ -14,16 +14,20 @@ configurations {
} }
version = "0.1.0" version = "0.1.0"
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies { dependencies {
compile "org.embulk:embulk-core:0.8.23" compile "org.embulk:embulk-core:0.8.27"
provided "org.embulk:embulk-core:0.8.23" provided "org.embulk:embulk-core:0.8.27"
compile "org.embulk:embulk-standards:0.8.23" compile "org.embulk:embulk-standards:0.8.27"
provided "org.embulk:embulk-standards:0.8.23" provided "org.embulk:embulk-standards:0.8.27"
compile "commons-codec:commons-codec:1.9"
// compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION"
testCompile "commons-codec:commons-codec:1.9"
testCompile "junit:junit:4.+" testCompile "junit:junit:4.+"
testCompile "org.embulk:embulk-core:0.8.23:tests" testCompile "org.embulk:embulk-core:0.8.27:tests"
testCompile 'org.embulk:embulk-test:0.8.23' testCompile 'org.embulk:embulk-test:0.8.27'
} }
test { test {
...@@ -31,6 +35,7 @@ test { ...@@ -31,6 +35,7 @@ test {
testLogging.showStandardStreams = true testLogging.showStandardStreams = true
} }
task classpath(type: Copy, dependsOn: ["jar"]) { task classpath(type: Copy, dependsOn: ["jar"]) {
doFirst { file("classpath").deleteDir() } doFirst { file("classpath").deleteDir() }
from (configurations.runtime - configurations.provided + files(jar.archivePath)) from (configurations.runtime - configurations.provided + files(jar.archivePath))
...@@ -66,9 +71,11 @@ task gemPush(type: JRubyExec, dependsOn: ["gem"]) { ...@@ -66,9 +71,11 @@ task gemPush(type: JRubyExec, dependsOn: ["gem"]) {
script "pkg/${project.name}-${project.version}.gem" script "pkg/${project.name}-${project.version}.gem"
} }
task "package"(dependsOn: ["gemspec", "classpath"]) << { task "package"(dependsOn: ["gemspec", "classpath"]) {
println "> Build succeeded." doLast {
println "> You can run embulk with '-L ${file(".").absolutePath}' argument." println "> Build succeeded."
println "> You can run embulk with '-L ${file(".").absolutePath}' argument."
}
} }
task gemspec { task gemspec {
...@@ -79,12 +86,12 @@ task gemspec { ...@@ -79,12 +86,12 @@ task gemspec {
Gem::Specification.new do |spec| Gem::Specification.new do |spec|
spec.name = "${project.name}" spec.name = "${project.name}"
spec.version = "${project.version}" spec.version = "${project.version}"
spec.authors = ["Klaus W\xC3\xB6lfel"] spec.authors = ["yu"]
spec.summary = %[Filename file input plugin for Embulk] spec.summary = %[Filename input plugin for Embulk]
spec.description = %[Reads files stored on Filename.] spec.description = %[Loads records from Filename.]
spec.email = ["klaus@nexedi.com"] spec.email = ["icaiyu0618@gmail.com"]
spec.licenses = ["MIT"] spec.licenses = ["MIT"]
# TODO set this: spec.homepage = "https://github.com/klaus/embulk-input-filename" # TODO set this: spec.homepage = "https://github.com/icaiyu0618/embulk-input-filename"
spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"]
spec.test_files = spec.files.grep(%r"^(test|spec)/") spec.test_files = spec.files.grep(%r"^(test|spec)/")
......
# The first argument is the file name
# The second argument is the size the total size
data = "abcdefghij" * ARGV[1].to_i
File.open(ARGV[0], 'w') { |file| file.write(data)}
#Wed Jan 13 12:41:02 JST 2016 #Sun Jan 08 00:35:58 PST 2017
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-3.2.1-bin.zip
...@@ -6,12 +6,30 @@ ...@@ -6,12 +6,30 @@
## ##
############################################################################## ##############################################################################
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. # Attempt to set APP_HOME
DEFAULT_JVM_OPTS="" # Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle" APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"` APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value. # Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum" MAX_FD="maximum"
...@@ -30,6 +48,7 @@ die ( ) { ...@@ -30,6 +48,7 @@ die ( ) {
cygwin=false cygwin=false
msys=false msys=false
darwin=false darwin=false
nonstop=false
case "`uname`" in case "`uname`" in
CYGWIN* ) CYGWIN* )
cygwin=true cygwin=true
...@@ -40,26 +59,11 @@ case "`uname`" in ...@@ -40,26 +59,11 @@ case "`uname`" in
MINGW* ) MINGW* )
msys=true msys=true
;; ;;
NONSTOP* )
nonstop=true
;;
esac esac
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM. # Determine the Java command to use to start the JVM.
...@@ -85,7 +89,7 @@ location of your Java installation." ...@@ -85,7 +89,7 @@ location of your Java installation."
fi fi
# Increase the maximum file descriptors if we can. # Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n` MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
...@@ -157,4 +161,9 @@ function splitJvmOpts() { ...@@ -157,4 +161,9 @@ function splitJvmOpts() {
eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then
cd "$(dirname "$0")"
fi
exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
...@@ -8,14 +8,14 @@ ...@@ -8,14 +8,14 @@
@rem Set local scope for the variables with windows NT shell @rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal if "%OS%"=="Windows_NT" setlocal
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
set DIRNAME=%~dp0 set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=. if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0 set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME% set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
@rem Find java.exe @rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome if defined JAVA_HOME goto findJavaFromJavaHome
...@@ -46,10 +46,9 @@ echo location of your Java installation. ...@@ -46,10 +46,9 @@ echo location of your Java installation.
goto fail goto fail
:init :init
@rem Get command-line arguments, handling Windowz variants @rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args if not "%OS%" == "Windows_NT" goto win9xME_args
if "%@eval[2+2]" == "4" goto 4NT_args
:win9xME_args :win9xME_args
@rem Slurp the command line arguments. @rem Slurp the command line arguments.
...@@ -60,11 +59,6 @@ set _SKIP=2 ...@@ -60,11 +59,6 @@ set _SKIP=2
if "x%~1" == "x" goto execute if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%* set CMD_LINE_ARGS=%*
goto execute
:4NT_args
@rem Get arguments from the 4NT Shell from JP Software
set CMD_LINE_ARGS=%$
:execute :execute
@rem Setup the command line @rem Setup the command line
......
Embulk::JavaPlugin.register_input( Embulk::JavaPlugin.register_input(
"filename", "org.embulk.input.filename.FilenameFileInputPlugin", "filename", "org.embulk.input.filename.FilenameInputPlugin",
File.expand_path('../../../../classpath', __FILE__)) File.expand_path('../../../../classpath', __FILE__))
package org.embulk.input.filename; package org.embulk.input.filename;
import java.util.List; import java.util.List;
import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.io.File; import java.util.Comparator;
import java.io.FileInputStream; import java.nio.file.attribute.BasicFileAttributeView;
import java.io.FileNotFoundException; import java.nio.file.attribute.BasicFileAttributes;
import java.io.InputStream; import java.nio.file.attribute.FileTime;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor; import java.nio.file.SimpleFileVisitor;
import java.nio.file.FileVisitResult; import java.nio.file.FileVisitResult;
import java.nio.file.attribute.BasicFileAttributes; import java.io.File;
import com.google.common.collect.ImmutableList; import java.io.IOException;
import java.io.FileInputStream;
import java.io.ByteArrayOutputStream;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.embulk.config.Config; import org.embulk.config.Config;
import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigInject;
import org.embulk.config.ConfigSource;
import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigDiff;
import org.embulk.config.TaskReport; import org.embulk.config.ConfigSource;
import org.embulk.config.Task; import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource; import org.embulk.config.TaskSource;
import org.embulk.config.ConfigInject;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.Exec; import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin; import org.embulk.spi.InputPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.SchemaConfig;
import org.embulk.spi.BufferAllocator; import org.embulk.spi.BufferAllocator;
import org.embulk.spi.TransactionalFileInput; import org.embulk.spi.ColumnConfig;
import org.embulk.spi.util.InputStreamTransactionalFileInput; import org.embulk.spi.SchemaConfig;
import org.embulk.standards.LocalFileInputPlugin;
import java.nio.file.attribute.BasicFileAttributeView; import static org.embulk.spi.type.Types.STRING;
import java.nio.file.attribute.FileTime;
import java.util.Comparator;
public class FilenameInputPlugin
public class FilenameFileInputPlugin implements FileInputPlugin implements InputPlugin
{ {
public interface PluginTask
public interface PluginTask extends Task extends Task
{ {
@Config("multi_dir") @Config("multi_dir")
@ConfigDefault("[]") @ConfigDefault("[]")
List<String> getMultiDir(); ArrayList<String> getMultiDir();
@Config("multi_tag") @Config("multi_tag")
@ConfigDefault("[]") @ConfigDefault("[]")
List<String> getMultiTag(); ArrayList<String> getMultiTag();
@Config("path_prefix")
@ConfigDefault("")
String getPathPrefix();
@Config("last_path") @Config("last_path")
@ConfigDefault("null") @ConfigDefault("null")
Optional<String> getLastPath(); Optional<String> getLastPath();
...@@ -67,6 +68,10 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -67,6 +68,10 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@Config("order_by_creation_time") @Config("order_by_creation_time")
@ConfigDefault("0") @ConfigDefault("0")
int getOrderByCreationTime(); int getOrderByCreationTime();
@Config("chunk_size")
@ConfigDefault("10485760")
int getChunkSize();
@Config("file_size") @Config("file_size")
@ConfigDefault("null") @ConfigDefault("null")
...@@ -75,82 +80,52 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -75,82 +80,52 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@Config("follow_symlinks") @Config("follow_symlinks")
@ConfigDefault("false") @ConfigDefault("false")
boolean getFollowSymlinks(); boolean getFollowSymlinks();
List<String> getFiles(); ArrayList<ArrayList<String>> getFiles();
void setFiles(List<String> files); void setFiles(ArrayList<ArrayList<String>> allFiles);
@ConfigInject @ConfigInject
BufferAllocator getBufferAllocator(); BufferAllocator getBufferAllocator();
} }
public static FileTime getCreationTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.creationTime();
//System.out.println("The raw creation time of " +filename+ " is " + fileTime.toString());
return fileTime;
}
public static FileTime getLastModifiedTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.lastModifiedTime();
//System.out.println("The raw last modified time of " +filename+ " is " + fileTime.toString());
return fileTime;
}
private final Logger log = Exec.getLogger(getClass()); private final Logger log = Exec.getLogger(getClass());
private final static Path CURRENT_DIR = Paths.get(".").normalize(); private final static Path CURRENT_DIR = Paths.get(".").normalize();
private static ArrayList<String> tagList;
public static String theTag = ""; private static int chunkSize;
public static List<Integer> tagIndex = new ArrayList<Integer>();
public static List<String> tagList;
@Override @Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) public ConfigDiff transaction(ConfigSource config,
InputPlugin.Control control)
{ {
PluginTask task = config.loadConfig(PluginTask.class); PluginTask task = config.loadConfig(PluginTask.class);
List<String> allFiles = new ArrayList<String> (); chunkSize = task.getChunkSize();
ArrayList<String> dirList = task.getMultiDir();
tagIndex.add(0);
//int s = 0;
List<String> dirList = task.getMultiDir(); ArrayList<ArrayList<String>> allFiles = new ArrayList<ArrayList<String>>();
tagList = task.getMultiTag(); tagList = task.getMultiTag();
if ( dirList.size() != 0 ) { if ( dirList.size() != 0 ){
log.info("The list of dir: " + dirList); log.info ("The list of the directories: " + dirList );
while (tagList.size() < dirList.size()){ while (tagList.size() < dirList.size()){
// If the Number of tags is less than the directories, we say that the default tag is ""
tagList.add(""); tagList.add("");
} }
} else { } else {
if (task.getPathPrefix().equals("")){ throw new RuntimeException("The multi_dir should contain at least 1 directory.");
throw new RuntimeException("Please input the path_prefix or the multi_dir");
}
dirList.add(task.getPathPrefix());
log.info("list of dir: " + dirList);
tagList.add("");
} }
// list files recursively for ( String dir : dirList){
ConfigDiff res = Exec.newConfigDiff(); ArrayList<String> files = listFiles(task,Paths.get(dir).normalize());
for (int i=0; i< dirList.size();i++) // Sort the files if each directory
{
flag = 0;
List<String> files = listFiles(task,Paths.get(dirList.get(i)).normalize());
//Sort the listFiles according to the configuration.
int order_modified = task.getOrderByModifiedTime(); int order_modified = task.getOrderByModifiedTime();
int order_creation = task.getOrderByCreationTime(); int order_creation = task.getOrderByCreationTime();
if (order_modified == 0 && order_creation == 0){ if (order_modified == 0 && order_creation == 0){
Collections.sort(files); Collections.sort(files);
} else if(order_creation == 0){ } else if(order_creation == 0){
...@@ -185,66 +160,111 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -185,66 +160,111 @@ public class FilenameFileInputPlugin implements FileInputPlugin
} else { } else {
throw new RuntimeException("Could not order by creation time and lasModified time at the same time"); throw new RuntimeException("Could not order by creation time and lasModified time at the same time");
} }
// End of sort
allFiles.add(files);
}
log.info("Loading files {}", files); int taskCount;
allFiles.addAll(files); // If the we upload only one directory, we set each file as a task.
//task.setFiles(files); // In this case the max_threads must equal 1 to keep the file uploading order
if (dirList.size() == 1){
//s += files.size() ArrayList<ArrayList<String>> oneFile = new ArrayList<ArrayList<String>> ();
tagIndex.add(allFiles.size()); for(String f : allFiles.get(0)){
//taskList.add(task.deepCopy); ArrayList<String> file = new ArrayList<String> ();
// number of processors is same with number of files file.add(f);
oneFile.add(file);
//int taskCount = files.size(); }
//theTag = tagList.get(i); while (tagList.size()< oneFile.size()){
//info.log(); tagList.add(tagList.get(0));
//res = resume(task.dump(), taskCount, control); }
task.setFiles(oneFile);
taskCount = oneFile.size();
} else{
task.setFiles(allFiles);
taskCount = allFiles.size();
} }
task.setFiles(allFiles); ArrayList<ColumnConfig> columns = new ArrayList<ColumnConfig>();
//final String columnName = task.getColumnName();
columns.add(new ColumnConfig("payload", STRING, config));
columns.add(new ColumnConfig("tag", STRING, config));
int taskCount = allFiles.size(); Schema schema = new SchemaConfig(columns).toSchema();
//return res;
return resume(task.dump(), taskCount, control);
//Schema schema = task.getColumns().toSchema();
// number of run() method calls
return resume(task.dump(), schema, taskCount, control);
} }
@Override @Override
public ConfigDiff resume(TaskSource taskSource, public ConfigDiff resume(TaskSource taskSource,
int taskCount, Schema schema, int taskCount,
FileInputPlugin.Control control) InputPlugin.Control control)
{
control.run(taskSource, schema, taskCount);
return Exec.newConfigDiff();
}
@Override
public void cleanup(TaskSource taskSource,
Schema schema, int taskCount,
List<TaskReport> successTaskReports)
{
}
@Override
public TaskReport run(TaskSource taskSource,
Schema schema, int taskIndex,
PageOutput output)
{ {
PluginTask task = taskSource.loadTask(PluginTask.class); PluginTask task = taskSource.loadTask(PluginTask.class);
// Here the taskSource contains all the Configuration of the 'in' ArrayList<String> files = task.getFiles().get(taskIndex);
log.info("The taskSource of the FileName in the ConfigDiff resume: " + taskSource.toString());
// Here will run all the tasks. Each task is to deal with a file.
control.run(taskSource, taskCount); for (String file : files)
{
// build next config try
ConfigDiff configDiff = Exec.newConfigDiff(); {
int nRead;
// last_path byte[] data = new byte[chunkSize];
if (task.getFiles().isEmpty()) { FileInputStream dataIn = new FileInputStream(file);
// keep the last value ByteArrayOutputStream buffer = new ByteArrayOutputStream();
if (task.getLastPath().isPresent()) {
configDiff.set("last_path", task.getLastPath().get()); while ((nRead = dataIn.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
try (PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output))
{
pageBuilder.setString(0,buffer.toString());//Base64.encodeBase64String(buffer.toByteArray()));
pageBuilder.setString(1, tagList.get(taskIndex) + new File(file).getCanonicalPath() );
pageBuilder.addRecord();
buffer.flush();
pageBuilder.finish();
}
}
} catch (IOException ex){
ex.printStackTrace();
} }
} else {
List<String> files = new ArrayList<String>(task.getFiles());
log.info("The File order is {}",files);
configDiff.set("last_path", files.get(files.size() - 1));
} }
return configDiff;
TaskReport taskReport = Exec.newTaskReport();
taskReport.set("columns", schema.size());
return taskReport;
} }
@Override @Override
public void cleanup(TaskSource taskSource, public ConfigDiff guess(ConfigSource config)
int taskCount, {
List<TaskReport> successTaskReports) return Exec.newConfigDiff();
{ } }
public List<String> listFiles(PluginTask task,Path pathPrefix)
public ArrayList<String> listFiles(PluginTask task,Path pathPrefix)
{ {
//Path pathPrefix = Paths.get(task.getPathPrefix()).normalize(); //Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
final Path directory; final Path directory;
...@@ -259,7 +279,7 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -259,7 +279,7 @@ public class FilenameFileInputPlugin implements FileInputPlugin
} }
//final ImmutableList.Builder<String> builder = ImmutableList.builder(); //final ImmutableList.Builder<String> builder = ImmutableList.builder();
final List<String> filesArray = new ArrayList<String>(); final ArrayList<String> filesArray = new ArrayList<String>();
final String lastPath = task.getLastPath().orNull(); final String lastPath = task.getLastPath().orNull();
final Integer fileSize = task.getFileSize().orNull(); final Integer fileSize = task.getFileSize().orNull();
try { try {
...@@ -309,106 +329,25 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -309,106 +329,25 @@ public class FilenameFileInputPlugin implements FileInputPlugin
return filesArray; return filesArray;
} }
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
log.info("The task in open: " + taskSource.toString());
log.info("The taskIndex: " + taskIndex);
final String path = task.getFiles().get(taskIndex);
setTag(taskIndex);
log.info("The tag: " + theTag);
return new InputStreamTransactionalFileInput(
task.getBufferAllocator(),
new InputStreamTransactionalFileInput.Opener() {
public InputStream open() throws IOException
{
return new FilenameFileInputStream(path);
}
})
{
@Override
public void abort()
{ }
@Override
public TaskReport commit()
{
return Exec.newTaskReport();
}
};
}
public static int flag = 0; // End
public static void setTag(int index) public static FileTime getCreationTime(String filename) throws IOException{
{ File file = new File(filename);
if (index == tagIndex.get(flag)) Path p = Paths.get(file.getAbsolutePath());
{ BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
flag+=1; FileTime fileTime = view.creationTime();
} //System.out.println("The raw creation time of " +filename+ " is " + fileTime.toString());
theTag = tagList.get(flag-1); return fileTime;
} }
class FilenameFileInputStream extends FileInputStream { public static FileTime getLastModifiedTime(String filename) throws IOException{
final int MAX_NAME_LENGTH = 255; File file = new File(filename);
int n; Path p = Paths.get(file.getAbsolutePath());
byte[] name; BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.lastModifiedTime();
FilenameFileInputStream(File file) throws FileNotFoundException { //System.out.println("The raw last modified time of " +filename+ " is " + fileTime.toString());
super(file); return fileTime;
n = 0;
name = (theTag+file.getName()).getBytes();
}
FilenameFileInputStream(String path) throws FileNotFoundException {
super(path);
n = 0;
name = (theTag+path).getBytes();
}
@Override
public int read() throws IOException {
if (n < name.length) {
byte b = name[n];
n++;
return b;
} else if (n < MAX_NAME_LENGTH) {
n++;
return 0;
} else {
return super.read();
}
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (n < MAX_NAME_LENGTH) {
int i = 0;
int c;
for (; i < len; i++) {
c = read();
if (c == -1) {
if ( i == 0 ) {
return -1;
}
break;
}
b[off + i] = (byte)c;
}
return i;
} else {
return super.read(b, off, len);
}
}
} }
} }
\ No newline at end of file
package org.embulk.input.filename; package org.embulk.input.filename;
import java.util.List; import java.util.List;
import java.util.ArrayList;
import com.google.common.base.Optional; import com.google.common.base.Optional;
...@@ -31,14 +33,15 @@ public class JoinfileOutputPlugin ...@@ -31,14 +33,15 @@ public class JoinfileOutputPlugin
public interface PluginTask public interface PluginTask
extends Task extends Task
{ {
// configuration option 1 (required integer)
@Config("path_prefix") @Config("path_prefix")
public String getPathPrefix(); public String getPathPrefix();
// configuration option 2 (optional string, null is not allowed)
@Config("file_ext") @Config("file_ext")
public String getFileExt(); public String getFileExt();
@Config("sum_type") @Config("sum_type")
@ConfigDefault("filename") @ConfigDefault("filename")
public String getSumType(); public String getSumType();
...@@ -49,6 +52,8 @@ public class JoinfileOutputPlugin ...@@ -49,6 +52,8 @@ public class JoinfileOutputPlugin
private static FileOutputStream output = null; private static FileOutputStream output = null;
private static ArrayList<String> lastP = new ArrayList<String> ();
private static String sumType; private static String sumType;
@Override @Override
...@@ -57,19 +62,16 @@ public class JoinfileOutputPlugin ...@@ -57,19 +62,16 @@ public class JoinfileOutputPlugin
OutputPlugin.Control control) OutputPlugin.Control control)
{ {
PluginTask task = config.loadConfig(PluginTask.class); PluginTask task = config.loadConfig(PluginTask.class);
sumType = task.getSumType();
// retryable (idempotent) output: // retryable (idempotent) output:
// return resume(task.dump(), schema, taskCount, control); // return resume(task.dump(), schema, taskCount, control);
// non-retryable (non-idempotent) output: // non-retryable (non-idempotent) output:
log.info("In the transaction " + config);
String path = task.getPathPrefix() + task.getFileExt(); String path = task.getPathPrefix() + task.getFileExt();
sumType = task.getSumType();
log.info("The SumType is: " + sumType);
try { try {
output = new FileOutputStream(new File(path)); output = new FileOutputStream(new File(path));
} catch (FileNotFoundException ex) { } catch (FileNotFoundException ex) {
...@@ -77,12 +79,17 @@ public class JoinfileOutputPlugin ...@@ -77,12 +79,17 @@ public class JoinfileOutputPlugin
} }
// for the ConfigDiff, we set the last Path of each task is "" as default.
for (int i = 0 ; i< taskCount; i++)
{
lastP.add("");
}
control.run(task.dump()); control.run(task.dump());
closeFile(); closeFile();
log.info("In the transaction ");
return Exec.newConfigDiff(); return Exec.newConfigDiff();
} }
...@@ -106,34 +113,37 @@ public class JoinfileOutputPlugin ...@@ -106,34 +113,37 @@ public class JoinfileOutputPlugin
{ {
PluginTask task = taskSource.loadTask(PluginTask.class); PluginTask task = taskSource.loadTask(PluginTask.class);
log.info("In the open " + taskSource.toString()+ " # " + taskIndex); final int ind = taskIndex;
return new TransactionalPageOutput(){ return new TransactionalPageOutput(){
//private final List<String> filenames = new ArrayList<>() ; //private final List<String> filenames = new ArrayList<>() ;
public void add(Page page){ public void add(Page page){
log.info("The ADD: " + page.getStringReferences() + " ## " +page.getValueReferences()); //log.info("The ADD: " + page.getStringReferences() + " ## " +page.getValueReferences());
try { try {
//log.info("The content: " + page.getStringReference(0)); List<String> pageArray = page.getStringReferences();
if (sumType.equals("filename")){ String content = page.getStringReference(0);
String line = page.getStringReference(1) + "\n"; String line = page.getStringReference(1) + "\n";
output.write(line.getBytes()); String tag = page.getStringReference(1);
} else{ if (sumType.equals("filename")){
String line = page.getStringReference(0) + "\n"; output.write(line.getBytes());
output.write(line.getBytes()); }else{
} output.write(content.getBytes());
}
lastP.set(ind ,tag);
} catch (IOException ex) { } catch (IOException ex) {
throw new RuntimeException(ex);
throw new RuntimeException(ex);
} }
} }
public void finish(){ public void finish(){
log.info("Finished"); //log.info("Finished");
} }
public void close(){ public void close(){
log.info("closed"); //log.info("closed");
} }
public void abort(){ public void abort(){
......
package org.embulk.input.filename;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.FileInput;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.SchemaConfig;
import org.embulk.spi.Exec;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.util.FileInputInputStream;
import org.embulk.spi.ColumnConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import org.apache.commons.codec.binary.Base64;
import static org.embulk.spi.type.Types.STRING;
import org.slf4j.Logger;
public class NoneBinParserPlugin
implements ParserPlugin
{
static int MAX_NAME_LENGTH = 255;
Schema schema;
public interface PluginTask
extends Task //, LineDecoder.DecoderTask //, TimestampParser.Task
{
@Config("column_name")
@ConfigDefault("\"payload\"")
public String getColumnName();
}
private final Logger log;
public NoneBinParserPlugin()
{
this.log = Exec.getLogger(NoneBinParserPlugin.class);
}
@Override
public void transaction(ConfigSource config, ParserPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
log.info("The ConfigSource is: " + config.toString());
ArrayList<ColumnConfig> columns = new ArrayList<ColumnConfig>();
final String columnName = task.getColumnName();
columns.add(new ColumnConfig(columnName, STRING, config));
columns.add(new ColumnConfig("tag", STRING, config));
// In the Unit test we need to convert the output of the parser to java object
// Such conversion is based on the parser's schema so that we need keep this schema in parser instance's variable instead
// of using it just once in this method.
this.schema = new SchemaConfig(columns).toSchema();
control.run(task.dump(), this.schema);
}
@Override
public void run(TaskSource taskSource, Schema schema,
FileInput input, PageOutput output)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
log.info("The taskSource of the Parser: "+ taskSource.toString());
FileInputInputStream dataIn = new FileInputInputStream(input);
PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output);
int chunksize = 1024 * 1024 * 1;
while( input.nextFile() ){
byte[] pathBytesArray = new byte[MAX_NAME_LENGTH];
int i = 0;
int c;
for (; i < MAX_NAME_LENGTH; i++) {
c = dataIn.read();
if ( c == -1) {
break;
} else if ( c == 0 ) {
// read empty bytes until MAX_NAME_LENGTH;
for (int j = i + 1; j < MAX_NAME_LENGTH; j++) {
dataIn.read();
}
break;
}
pathBytesArray[i] = (byte)c;
}
String path = new String(Arrays.copyOfRange(pathBytesArray, 0, i));
// To read the data, we read one byte from the dataIn, if it isn't the end of file we record it to the bytesArray,
// we jugde the length of the added bytes, if len == chunksize we record bytesArray to the page record the bytesArray again
int bytes_read = 0;
bytes_read = dataIn.read();
int len = 0;
byte[] bytesArray = new byte[chunksize];
while(bytes_read != -1) {
// Read one byte from the dataIn and record it to the bytesArray
bytesArray[len] = (byte) bytes_read;
bytes_read = dataIn.read();
len += 1 ;
if (len == chunksize) {
log.info(path);
pageBuilder.setString(0, Base64.encodeBase64String(bytesArray));
pageBuilder.setString(1, path);
pageBuilder.addRecord();
len = 0;
}
}
// In case the the remain part of the data is less than chunksize we need to record it to the page as well.
if (len != 0) {
pageBuilder.setString(0,Base64.encodeBase64String(Arrays.copyOfRange(bytesArray, 0, len)));
pageBuilder.setString(1,path);
pageBuilder.addRecord();
}
}
pageBuilder.finish();
}
}
...@@ -8,7 +8,6 @@ import java.nio.file.attribute.BasicFileAttributeView; ...@@ -8,7 +8,6 @@ import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.FileTime; import java.nio.file.attribute.FileTime;
import java.util.Comparator; import java.util.Comparator;
import org.apache.commons.codec.binary.Base64;
import org.embulk.config.ConfigSource; import org.embulk.config.ConfigSource;
import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigDiff;
...@@ -37,7 +36,7 @@ import static org.embulk.test.EmbulkTests.readSortedFile; ...@@ -37,7 +36,7 @@ import static org.embulk.test.EmbulkTests.readSortedFile;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
public class TestFilenameFileInputPlugin public class TestFilenameInputPlugin
{ {
public static FileTime getCreationTime(String filename) throws IOException{ public static FileTime getCreationTime(String filename) throws IOException{
...@@ -59,11 +58,10 @@ public class TestFilenameFileInputPlugin ...@@ -59,11 +58,10 @@ public class TestFilenameFileInputPlugin
@Rule @Rule
public TestHelper embulk = TestHelper.builder() public TestHelper embulk = TestHelper.builder()
.registerPlugin(InputPlugin.class,"filename",FilenameFileInputPlugin.class) .registerPlugin(InputPlugin.class,"filename",FilenameInputPlugin.class)
.registerPlugin(ParserPlugin.class,"none-bin",NoneBinParserPlugin.class)
.registerPlugin(OutputPlugin.class,"joinfile",JoinfileOutputPlugin.class) .registerPlugin(OutputPlugin.class,"joinfile",JoinfileOutputPlugin.class)
.build(); .build();
@Test @Test
public void testOrderByModifiedTime() throws Exception{ public void testOrderByModifiedTime() throws Exception{
...@@ -72,12 +70,14 @@ public class TestFilenameFileInputPlugin ...@@ -72,12 +70,14 @@ public class TestFilenameFileInputPlugin
.set("max_threads","1"); .set("max_threads","1");
Path path_src = Paths.get("src/test/resources/testModifiedOrder"); Path path_src = Paths.get("src/test/resources/testModifiedOrder");
ArrayList<String> multi_dir = new ArrayList<String> ();
multi_dir.add(path_src.toAbsolutePath().toString()+"/sample_");
ConfigSource inConfig = embulk.newConfig() ConfigSource inConfig = embulk.newConfig()
.set("type","filename") .set("type","filename")
.set("path_prefix",path_src.toAbsolutePath().toString()+"/sample_") .set("multi_dir",multi_dir)
.set("order_by_modified_time","2") .set("order_by_modified_time","2");
.set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir(); Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig() ConfigSource outConfig = embulk.newConfig()
...@@ -110,8 +110,8 @@ public class TestFilenameFileInputPlugin ...@@ -110,8 +110,8 @@ public class TestFilenameFileInputPlugin
} }
}); });
//System.out.println(lines); //System.out.println("The lines" + lines);
//System.out.println(actual); //System.out.println("The actual" + actual);
assertEquals(lines,actual); assertEquals(lines,actual);
inConfig.set("order_by_modified_time","1"); inConfig.set("order_by_modified_time","1");
...@@ -143,10 +143,7 @@ public class TestFilenameFileInputPlugin ...@@ -143,10 +143,7 @@ public class TestFilenameFileInputPlugin
.set("type","filename") .set("type","filename")
.set("order_by_modified_time","2") .set("order_by_modified_time","2")
.set("multi_dir",multi_dir) .set("multi_dir",multi_dir)
.set("multi_tag",multi_tag) .set("multi_tag",multi_tag);
.set("path_prefix","/home/chronos/user/Downloads/embulk-input-filename/src/test/resources/testDirList/example/example_")
.set("parser",embulk.newConfig().set("type","none-bin"));
System.out.println(inConfig); System.out.println(inConfig);
Path tmp = embulk.createTempDir(); Path tmp = embulk.createTempDir();
...@@ -238,9 +235,7 @@ public class TestFilenameFileInputPlugin ...@@ -238,9 +235,7 @@ public class TestFilenameFileInputPlugin
.set("type","filename") .set("type","filename")
.set("order_by_modified_time","2") .set("order_by_modified_time","2")
.set("multi_dir",multi_dir) .set("multi_dir",multi_dir)
.set("path_prefix","/home/chronos/user/Downloads/embulk-input-filename/src/test/resources/testDirList/example/example_") .set("path_prefix","/home/chronos/user/Downloads/embulk-input-filename/src/test/resources/testDirList/example/example_");
.set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir(); Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig() ConfigSource outConfig = embulk.newConfig()
...@@ -298,17 +293,20 @@ public class TestFilenameFileInputPlugin ...@@ -298,17 +293,20 @@ public class TestFilenameFileInputPlugin
assertEquals(lines,dir1); assertEquals(lines,dir1);
} }
@Test @Test
public void testBase64() throws Exception{ public void testContent() throws Exception{
ConfigSource execConfig = embulk.newConfig() ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1"); .set("max_threads","1");
Path path_src = Paths.get("src/test/resources/data"); Path path_src = Paths.get("src/test/resources/data");
ArrayList<String> multi_dir = new ArrayList<String> ();
multi_dir.add(path_src.toAbsolutePath().toString()+"/test.csv");
ConfigSource inConfig = embulk.newConfig() ConfigSource inConfig = embulk.newConfig()
.set("type","filename") .set("type","filename")
.set("path_prefix",path_src.toAbsolutePath().toString()+"/test.csv") .set("multi_dir",multi_dir)
.set("parser",embulk.newConfig().set("type","none-bin")); .set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir(); Path tmp = embulk.createTempDir();
...@@ -324,10 +322,10 @@ public class TestFilenameFileInputPlugin ...@@ -324,10 +322,10 @@ public class TestFilenameFileInputPlugin
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt")); List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> actual = Files.readAllLines(Paths.get(path_src+"/test.csv")); List<String> actual = Files.readAllLines(Paths.get(path_src+"/test.csv"));
//System.out.println(lines); //System.out.println("The lines " + lines);
String ans = String.join("\n",actual) + "\n"; //System.out.println("The actual " + actual);
String actual_bytes = Base64.encodeBase64String(ans.getBytes()); assertEquals(actual,lines);
assertEquals(lines.get(0),actual_bytes);
} }
} }
dddd04 dddd04
dddd04 dddd04
dddd04 dddd04
ffff06
gggg08
hhhh06
afdasfgdfagdjg;ashdgklhdg;khdkjgndk;sagbnkadbnkghadskjgnvkdavbdfjbngkj;ldng;khg
hakd;hfehfkajdlgdabdba;hjag;sdgnkdngk;adsngjghkhjlkjljojaldfjlanf;aknhgk;adhg;ajg;lag
asdfgalkdhgkajdbngkahdgkahdkgndksjngkhkhjljiangladfgsdf
adfbkaldfhakdslhfkaldsh
abcdefghijlkjfafhodmjjmkdf
afkhjdofa;j;djfl;ajflkasjdfk;ankfjlndhkajlhgkalhgklahglkl
afhgakdhfgklasdhgkahknkdanfkhkhnkljahdfkanfhjjianlgla
afjljl;j;ajkajkldfakfhakjfdlajfldsjflajdslfjaldjfl
afjlkadsjflajlfjdlasfjlas
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment