Commit 76aa5470 authored by yu's avatar yu

not fix the last path yet

parent 47f086e0
......@@ -21,7 +21,11 @@ import java.io.ByteArrayOutputStream;
import com.google.common.base.Optional;
import org.apache.commons.codec.binary.Base64;
//import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
......@@ -57,25 +61,19 @@ public class FilenameInputPlugin
@ConfigDefault("[]")
ArrayList<String> getMultiTag();
@Config("last_paths")
@Config("lastPaths")
@ConfigDefault("[]")
ArrayList<String> getLastPaths();
@Config("order_by_modified_time")
@ConfigDefault("0")
int getOrderByModifiedTime();
@Config("order_by_creation_time")
@ConfigDefault("0")
int getOrderByCreationTime();
@Config("order")
@ConfigDefault("ALPHABETICAL")
String getOrder();
@Config("chunk_size")
@ConfigDefault("10485760")
int getChunkSize();
@Config("file_size")
@ConfigDefault("null")
Optional<Integer> getFileSize();
@Config("follow_symlinks")
@ConfigDefault("false")
......@@ -95,10 +93,9 @@ public class FilenameInputPlugin
private final static Path CURRENT_DIR = Paths.get(".").normalize();
private static ArrayList<String> tagList;
private static ArrayList<String> lastPaths;
private static int chunkSize;
private static ArrayList<String> lastPaths = new ArrayList<String>();
@Override
public ConfigDiff transaction(ConfigSource config,
......@@ -108,6 +105,7 @@ public class FilenameInputPlugin
chunkSize = task.getChunkSize();
ArrayList<String> dirList = task.getMultiDir();
ArrayList<String> lastPaths = task.getLastPaths();
ArrayList<ArrayList<String>> allFiles = new ArrayList<ArrayList<String>>();
tagList = task.getMultiTag();
......@@ -117,7 +115,7 @@ public class FilenameInputPlugin
// If the Number of tags is less than the directories, we say that the default tag is ""
tagList.add("");
}
while (lastPaths.size()< dirList.size()){
while (lastPaths.size() < dirList.size()){
lastPaths.add("");
}
} else {
......@@ -126,16 +124,17 @@ public class FilenameInputPlugin
for (int i =0; i<dirList.size(); i++ ){
for (int i =0; i < dirList.size();i++ ){
String dir = dirList.get(i);
String last_path = lastPaths.get(i);
ArrayList<String> files = listFiles(task,Paths.get(dir).normalize(),last_path);
String lastPath = lastPaths.get(i);
String order = task.getOrder();
ArrayList<String> files = listFiles(task,Paths.get(dir).normalize(),lastPath,order);
// Sort the files if each directory
int order_modified = task.getOrderByModifiedTime();
int order_creation = task.getOrderByCreationTime();
if (order_modified == 0 && order_creation == 0){
if (order.equals("ALPHABETICAL"){
Collections.sort(files);
} else if(order_creation == 0){
} else if(order.equals("ASCEND_MODIFIED") || order.equals("DESCEND_MODIFIED"){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
......@@ -147,10 +146,9 @@ public class FilenameInputPlugin
return 0;
}
});
if (order_modified == 1 ) { Collections.reverse(files); }
} else if (order_modified == 0 ){
if (order.equals("DESCEND_MODIFIED"){ Collections.reverse(files); }
}
} else if (order.equals("ASCEND_CREATION") || order.equals("DESCEND_CREATION") ){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
......@@ -163,15 +161,16 @@ public class FilenameInputPlugin
}
});
if ( order_creation == 1 ) { Collections.reverse(files);}
if ( order.equals("DESCEND_CREATION") ) { Collections.reverse(files);}
} else {
throw new RuntimeException("Could not order by creation time and lasModified time at the same time");
throw new RuntimeException("Input a correct order");
}
// End of sort
log.info("The files is " + files);
allFiles.add(files);
last_p.add(files.get(0));
}
......@@ -179,6 +178,7 @@ public class FilenameInputPlugin
// If the we upload only one directory, we set each file as a task.
// In this case the max_threads must equal 1 to keep the file uploading order
if (dirList.size() == 1){
log.info("size==1");
ArrayList<ArrayList<String>> oneFile = new ArrayList<ArrayList<String>> ();
for(String f : allFiles.get(0)){
ArrayList<String> file = new ArrayList<String> ();
......@@ -190,8 +190,6 @@ public class FilenameInputPlugin
}
task.setFiles(oneFile);
taskCount = oneFile.size();
last_p = new ArrayList<String>();
last_p.add(allFiles.get(0).get(0));
} else{
task.setFiles(allFiles);
taskCount = allFiles.size();
......@@ -208,6 +206,9 @@ public class FilenameInputPlugin
//Schema schema = task.getColumns().toSchema();
// number of run() method calls
log.info("TASKCOUNT " + taskCount);
return resume(task.dump(), schema, taskCount, control);
}
......@@ -217,10 +218,7 @@ public class FilenameInputPlugin
InputPlugin.Control control)
{
control.run(taskSource, schema, taskCount);
ConfigDiff diff = Exec.newConfigDiff();
diff.set("last_path",last_p);
return diff;
return Exec.newConfigDiff();
}
@Override
......@@ -239,6 +237,8 @@ public class FilenameInputPlugin
ArrayList<String> files = task.getFiles().get(taskIndex);
log.info("The files in the run:" + files);
for (String file : files)
{
......@@ -246,7 +246,6 @@ public class FilenameInputPlugin
{
int nRead;
byte[] data = new byte[chunkSize];
String filename = new File(file).getCanonicalPath();
FileInputStream dataIn = new FileInputStream(file);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
......@@ -255,18 +254,12 @@ public class FilenameInputPlugin
try (PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output))
{
pageBuilder.setString(0,buffer.toString());//Base64.encodeBase64String(buffer.toByteArray()));
pageBuilder.setString(1, tagList.get(taskIndex) + filename );
pageBuilder.setString(1, tagList.get(taskIndex) + new File(file).getCanonicalPath() );
pageBuilder.addRecord();
buffer.flush();
pageBuilder.finish();
}
}
if (last_p.size() > 1) {
last_p.set(taskIndex,filename);
}
else {
last_p.set(0,filename);
}
} catch (IOException ex){
ex.printStackTrace();
}
......@@ -284,7 +277,7 @@ public class FilenameInputPlugin
}
public ArrayList<String> listFiles(PluginTask task,Path pathPrefix, String lastPath)
public ArrayList<String> listFiles(PluginTask task,Path pathPrefix,String lastPath,String order)
{
//Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
final Path directory;
......@@ -298,27 +291,18 @@ public class FilenameInputPlugin
directory = (d == null ? CURRENT_DIR : d);
}
//final ImmutableList.Builder<String> builder = ImmutableList.builder();
final ArrayList<String> filesArray = new ArrayList<String>();
final Integer fileSize = task.getFileSize().orNull();
try {
log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
// This method check the dirname
@Override
public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs)
{
if (path.equals(directory)) {
return FileVisitResult.CONTINUE;
} else if (lastPath != "") {
if ( order == 1 && path.toString().compareTo(lastPath.substring(0, path.toString().length())) < 0)
{
return FileVisitResult.SKIP_SUBTREE;
}
else if (order ==2 && path.toString().compareTo(lastPath.substring(0, path.toString().length())) > 0)
{
return FileVisitResult.SKIP_SUBTREE;
}
} else if (lastPath != null && path.toString().compareTo(lastPath.substring(0, path.toString().length())) < 0) {
return FileVisitResult.SKIP_SUBTREE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.SKIP_SUBTREE;
} else {
......@@ -330,20 +314,26 @@ public class FilenameInputPlugin
}
}
// This method check the filename
@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs)
{
if (lastPath != null && path.toString().compareTo(lastPath) <= 0) {
if ( !lastPath.equals("") && order.equals("ALPHABETICAL") && path.toString().compareTo(lastPath) <= 0) {
return FileVisitResult.CONTINUE;
} else if (path.getFileName().toString().startsWith(".")) {
} else if (!lastPath.equals("") && order.equals("ASCEND_MODIFIED") && getLastModifiedTime(pah.toString()).compareTo(getLastModifiedTime(lastPath)) <= 0) {
return FileVisitResult.CONTINUE;
} else if (!lastPath.equals("") && order.equals("DESCEND_MODIFIED") && getLastModifiedTime(pah.toString()).compareTo(getLastModifiedTime(lastPath)) >= 0){
return FileVisitResult.CONTINUE;
} else if (!lastPath.equals("") && order.equals("ASCEND_CREATION") && getLastCreationTime(pah.toString()).compareTo(getLastCreationTime(lastPath)) <= 0){
return FileVisitResult.CONTINUE;
} else if (!lastPath.equals("") && order.equals("DESCEND_MODIFIED") && getLastCreationTime(pah.toString()).compareTo(getLastCreationTime(lastPath)) <= 0) {
return FileVisitResult.CONTINUE;
}
else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.CONTINUE;
} else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) {
if (fileSize == null || path.toFile().length() == fileSize) {
//builder.add(path.toString());
filesArray.add(path.toString());
}
}
return FileVisitResult.CONTINUE;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment