Commit 554744bd authored by yu's avatar yu

Merge branch 'fixTheLastPath' into multiThread

fix the multi dir problem and single dir problem
parents 58d3b89e 996b5c7a
...@@ -21,7 +21,11 @@ import java.io.ByteArrayOutputStream; ...@@ -21,7 +21,11 @@ import java.io.ByteArrayOutputStream;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
//import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.embulk.config.Config; import org.embulk.config.Config;
import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigDiff;
...@@ -57,25 +61,18 @@ public class FilenameInputPlugin ...@@ -57,25 +61,18 @@ public class FilenameInputPlugin
@ConfigDefault("[]") @ConfigDefault("[]")
ArrayList<String> getMultiTag(); ArrayList<String> getMultiTag();
@Config("last_path") @Config("lastPaths")
@ConfigDefault("null") @ConfigDefault("[]")
Optional<String> getLastPath(); ArrayList<String> getLastPaths();
@Config("order_by_modified_time")
@ConfigDefault("0")
int getOrderByModifiedTime();
@Config("order_by_creation_time")
@ConfigDefault("0")
int getOrderByCreationTime();
@Config("chunk_size") @Config("chunk_size")
@ConfigDefault("10485760") @ConfigDefault("10485760")
int getChunkSize(); int getChunkSize();
@Config("load_order")
@ConfigDefault("\"\"")
String getLoadOrder();
@Config("file_size")
@ConfigDefault("null")
Optional<Integer> getFileSize();
@Config("follow_symlinks") @Config("follow_symlinks")
@ConfigDefault("false") @ConfigDefault("false")
...@@ -95,10 +92,9 @@ public class FilenameInputPlugin ...@@ -95,10 +92,9 @@ public class FilenameInputPlugin
private final static Path CURRENT_DIR = Paths.get(".").normalize(); private final static Path CURRENT_DIR = Paths.get(".").normalize();
private static ArrayList<String> tagList; private static ArrayList<String> tagList;
private static ArrayList<String> lastPaths;
private static int chunkSize; private static int chunkSize;
private static ArrayList<String> last_p = new ArrayList<String>();
@Override @Override
public ConfigDiff transaction(ConfigSource config, public ConfigDiff transaction(ConfigSource config,
...@@ -108,6 +104,7 @@ public class FilenameInputPlugin ...@@ -108,6 +104,7 @@ public class FilenameInputPlugin
chunkSize = task.getChunkSize(); chunkSize = task.getChunkSize();
ArrayList<String> dirList = task.getMultiDir(); ArrayList<String> dirList = task.getMultiDir();
ArrayList<String> lastPaths = task.getLastPaths();
ArrayList<ArrayList<String>> allFiles = new ArrayList<ArrayList<String>>(); ArrayList<ArrayList<String>> allFiles = new ArrayList<ArrayList<String>>();
tagList = task.getMultiTag(); tagList = task.getMultiTag();
...@@ -117,20 +114,27 @@ public class FilenameInputPlugin ...@@ -117,20 +114,27 @@ public class FilenameInputPlugin
// If the Number of tags is less than the directories, we say that the default tag is "" // If the Number of tags is less than the directories, we say that the default tag is ""
tagList.add(""); tagList.add("");
} }
while (lastPaths.size() < dirList.size()){
lastPaths.add("");
}
} else { } else {
throw new RuntimeException("The multi_dir should contain at least 1 directory."); throw new RuntimeException("The multi_dir should contain at least 1 directory.");
} }
for ( String dir : dirList){ for (int i =0; i < dirList.size();i++ ){
ArrayList<String> files = listFiles(task,Paths.get(dir).normalize()); String dir = dirList.get(i);
String lastPath = lastPaths.get(i);
String order = task.getLoadOrder();
if (order.equals("")){order = "ALPHABETICAL";}
ArrayList<String> files = listFiles(task,Paths.get(dir).normalize(),lastPath,order);
// Sort the files if each directory // Sort the files if each directory
int order_modified = task.getOrderByModifiedTime();
int order_creation = task.getOrderByCreationTime();
if (order_modified == 0 && order_creation == 0){ if (order.equals("ALPHABETICAL")){
Collections.sort(files); Collections.sort(files);
} else if(order_creation == 0){ } else if(order.equals("ASCEND_MODIFIED") || order.equals("DESCEND_MODIFIED")){
Collections.sort(files,new Comparator<String>(){ Collections.sort(files,new Comparator<String>(){
@Override @Override
public int compare(String f1, String f2) { public int compare(String f1, String f2) {
...@@ -142,10 +146,8 @@ public class FilenameInputPlugin ...@@ -142,10 +146,8 @@ public class FilenameInputPlugin
return 0; return 0;
} }
}); });
if (order.equals("DESCEND_MODIFIED")){ Collections.reverse(files); }
if (order_modified == 1 ) { Collections.reverse(files); } } else if ( order.equals("ASCEND_CREATION") || order.equals("DESCEND_CREATION") ){
} else if (order_modified == 0 ){
Collections.sort(files,new Comparator<String>(){ Collections.sort(files,new Comparator<String>(){
@Override @Override
public int compare(String f1, String f2) { public int compare(String f1, String f2) {
...@@ -158,39 +160,23 @@ public class FilenameInputPlugin ...@@ -158,39 +160,23 @@ public class FilenameInputPlugin
} }
}); });
if ( order_creation == 1 ) { Collections.reverse(files);} if ( order.equals("DESCEND_CREATION") ) { Collections.reverse(files);}
} else { } else {
throw new RuntimeException("Could not order by creation time and lasModified time at the same time"); throw new RuntimeException("Input a correct order");
} }
// End of sort // End of sort
log.info("The files is " + files);
allFiles.add(files); allFiles.add(files);
last_p.add(files.get(0));
} }
int taskCount; int taskCount = allFiles.size();
// If the we upload only one directory, we set each file as a task.
// In this case the max_threads must equal 1 to keep the file uploading order
if (dirList.size() == 1){ task.setFiles(allFiles);
ArrayList<ArrayList<String>> oneFile = new ArrayList<ArrayList<String>> ();
for(String f : allFiles.get(0)){
ArrayList<String> file = new ArrayList<String> ();
file.add(f);
oneFile.add(file);
}
while (tagList.size()< oneFile.size()){
tagList.add(tagList.get(0));
}
task.setFiles(oneFile);
taskCount = oneFile.size();
last_p = new ArrayList<String>();
last_p.add(allFiles.get(0).get(0));
} else{
task.setFiles(allFiles);
taskCount = allFiles.size();
}
ArrayList<ColumnConfig> columns = new ArrayList<ColumnConfig>(); ArrayList<ColumnConfig> columns = new ArrayList<ColumnConfig>();
//final String columnName = task.getColumnName(); //final String columnName = task.getColumnName();
...@@ -203,6 +189,9 @@ public class FilenameInputPlugin ...@@ -203,6 +189,9 @@ public class FilenameInputPlugin
//Schema schema = task.getColumns().toSchema(); //Schema schema = task.getColumns().toSchema();
// number of run() method calls // number of run() method calls
log.info("TASKCOUNT " + taskCount);
return resume(task.dump(), schema, taskCount, control); return resume(task.dump(), schema, taskCount, control);
} }
...@@ -212,10 +201,7 @@ public class FilenameInputPlugin ...@@ -212,10 +201,7 @@ public class FilenameInputPlugin
InputPlugin.Control control) InputPlugin.Control control)
{ {
control.run(taskSource, schema, taskCount); control.run(taskSource, schema, taskCount);
ConfigDiff diff = Exec.newConfigDiff(); return Exec.newConfigDiff();
diff.set("last_path",last_p);
return diff;
} }
@Override @Override
...@@ -234,6 +220,8 @@ public class FilenameInputPlugin ...@@ -234,6 +220,8 @@ public class FilenameInputPlugin
ArrayList<String> files = task.getFiles().get(taskIndex); ArrayList<String> files = task.getFiles().get(taskIndex);
log.info("The files in the run:" + files);
for (String file : files) for (String file : files)
{ {
...@@ -241,7 +229,6 @@ public class FilenameInputPlugin ...@@ -241,7 +229,6 @@ public class FilenameInputPlugin
{ {
int nRead; int nRead;
byte[] data = new byte[chunkSize]; byte[] data = new byte[chunkSize];
String filename = new File(file).getCanonicalPath();
FileInputStream dataIn = new FileInputStream(file); FileInputStream dataIn = new FileInputStream(file);
ByteArrayOutputStream buffer = new ByteArrayOutputStream(); ByteArrayOutputStream buffer = new ByteArrayOutputStream();
...@@ -250,18 +237,12 @@ public class FilenameInputPlugin ...@@ -250,18 +237,12 @@ public class FilenameInputPlugin
try (PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) try (PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output))
{ {
pageBuilder.setString(0,buffer.toString());//Base64.encodeBase64String(buffer.toByteArray())); pageBuilder.setString(0,buffer.toString());//Base64.encodeBase64String(buffer.toByteArray()));
pageBuilder.setString(1, tagList.get(taskIndex) + filename ); pageBuilder.setString(1, tagList.get(taskIndex) + new File(file).getCanonicalPath() );
pageBuilder.addRecord(); pageBuilder.addRecord();
buffer.flush(); buffer.flush();
pageBuilder.finish(); pageBuilder.finish();
} }
} }
if (last_p.size() > 1) {
last_p.set(taskIndex,filename);
}
else {
last_p.set(0,filename);
}
} catch (IOException ex){ } catch (IOException ex){
ex.printStackTrace(); ex.printStackTrace();
} }
...@@ -279,7 +260,9 @@ public class FilenameInputPlugin ...@@ -279,7 +260,9 @@ public class FilenameInputPlugin
} }
public ArrayList<String> listFiles(PluginTask task,Path pathPrefix)
public ArrayList<String> listFiles(PluginTask task,Path pathPrefix,String lastPath,String order)
{ {
//Path pathPrefix = Paths.get(task.getPathPrefix()).normalize(); //Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
final Path directory; final Path directory;
...@@ -295,8 +278,6 @@ public class FilenameInputPlugin ...@@ -295,8 +278,6 @@ public class FilenameInputPlugin
//final ImmutableList.Builder<String> builder = ImmutableList.builder(); //final ImmutableList.Builder<String> builder = ImmutableList.builder();
final ArrayList<String> filesArray = new ArrayList<String>(); final ArrayList<String> filesArray = new ArrayList<String>();
final String lastPath = task.getLastPath().orNull();
final Integer fileSize = task.getFileSize().orNull();
try { try {
log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix); log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
...@@ -322,18 +303,28 @@ public class FilenameInputPlugin ...@@ -322,18 +303,28 @@ public class FilenameInputPlugin
@Override @Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) public FileVisitResult visitFile(Path path, BasicFileAttributes attrs)
{ {
if (lastPath != null && path.toString().compareTo(lastPath) <= 0) { try
return FileVisitResult.CONTINUE; {
} else if (path.getFileName().toString().startsWith(".")) { if ( !lastPath.equals("") && order.equals("ALPHABETICAL") && path.toString().compareTo(lastPath) <= 0) {
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
} else { } else if (!lastPath.equals("") && order.equals("ASCEND_MODIFIED") && getLastModifiedTime(path.toString()).compareTo(getLastModifiedTime(lastPath)) <= 0) {
if (path.getFileName().toString().startsWith(fileNamePrefix)) { return FileVisitResult.CONTINUE;
if (fileSize == null || path.toFile().length() == fileSize) { } else if (!lastPath.equals("") && order.equals("DESCEND_MODIFIED") && getLastModifiedTime(path.toString()).compareTo(getLastModifiedTime(lastPath)) >= 0){
//builder.add(path.toString()); return FileVisitResult.CONTINUE;
filesArray.add(path.toString()); } else if (!lastPath.equals("") && order.equals("ASCEND_CREATION") && getCreationTime(path.toString()).compareTo(getCreationTime(lastPath)) <= 0){
return FileVisitResult.CONTINUE;
} else if (!lastPath.equals("") && order.equals("DESCEND_MODIFIED") && getCreationTime(path.toString()).compareTo(getCreationTime(lastPath)) <= 0) {
return FileVisitResult.CONTINUE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.CONTINUE;
} else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) {
filesArray.add(path.toString());
} }
return FileVisitResult.CONTINUE;
} }
return FileVisitResult.CONTINUE; } catch ( IOException e){
throw new RuntimeException("IOException during the uploading files");
} }
} }
}); });
......
...@@ -77,7 +77,7 @@ public class TestFilenameInputPlugin ...@@ -77,7 +77,7 @@ public class TestFilenameInputPlugin
ConfigSource inConfig = embulk.newConfig() ConfigSource inConfig = embulk.newConfig()
.set("type","filename") .set("type","filename")
.set("multi_dir",multi_dir) .set("multi_dir",multi_dir)
.set("order_by_modified_time","2"); .set("load_order","ASCEND_MODIFIED");
Path tmp = embulk.createTempDir(); Path tmp = embulk.createTempDir();
...@@ -115,7 +115,7 @@ public class TestFilenameInputPlugin ...@@ -115,7 +115,7 @@ public class TestFilenameInputPlugin
//System.out.println("The actual" + actual); //System.out.println("The actual" + actual);
assertEquals(lines,actual); assertEquals(lines,actual);
inConfig.set("order_by_modified_time","1"); inConfig.set("load_order","DESCEND_MODIFIED");
res = embulk.runAllBuilder(execConfig,inConfig,outConfig); res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt")); lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
...@@ -143,7 +143,7 @@ public class TestFilenameInputPlugin ...@@ -143,7 +143,7 @@ public class TestFilenameInputPlugin
ConfigSource inConfig = embulk.newConfig() ConfigSource inConfig = embulk.newConfig()
.set("type","filename") .set("type","filename")
.set("order_by_modified_time","2") .set("load_order","ASCEND_MODIFIED")
.set("multi_dir",multi_dir) .set("multi_dir",multi_dir)
.set("multi_tag",multi_tag); .set("multi_tag",multi_tag);
System.out.println(inConfig); System.out.println(inConfig);
...@@ -236,9 +236,8 @@ public class TestFilenameInputPlugin ...@@ -236,9 +236,8 @@ public class TestFilenameInputPlugin
ConfigSource inConfig = embulk.newConfig() ConfigSource inConfig = embulk.newConfig()
.set("type","filename") .set("type","filename")
.set("order_by_modified_time","2") .set("load_order","ASCEND_MODIFIED")
.set("multi_dir",multi_dir) .set("multi_dir",multi_dir);
.set("path_prefix","/home/chronos/user/Downloads/embulk-input-filename/src/test/resources/testDirList/example/example_");
Path tmp = embulk.createTempDir(); Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig() ConfigSource outConfig = embulk.newConfig()
...@@ -310,9 +309,9 @@ public class TestFilenameInputPlugin ...@@ -310,9 +309,9 @@ public class TestFilenameInputPlugin
multi_dir.add(path_src.toAbsolutePath().toString()+"/test.csv"); multi_dir.add(path_src.toAbsolutePath().toString()+"/test.csv");
ConfigSource inConfig = embulk.newConfig() ConfigSource inConfig = embulk.newConfig()
.set("type","filename") .set("type","filename")
.set("multi_dir",multi_dir) .set("load_order","ALPHABETICAL")
.set("parser",embulk.newConfig().set("type","none-bin")); .set("multi_dir",multi_dir);
Path tmp = embulk.createTempDir(); Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig() ConfigSource outConfig = embulk.newConfig()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment