Commit de0dfe93 authored by yu's avatar yu

finished the modification of the sorter

parent f83e6c97
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
## Configuration ## Configuration
- **multi_dir**: description (ArrayList<String>, required) - **multi_dir**: description (ArrayList<String>, required)
- **mulit_tag**: description (ArrayList<String>, default: `[]`) - **multi_tag**: description (ArrayList<String>, default: `[]`)
- **load_order**: description (String, default: `ALPHABETICAL`) - **load_order**: description (String, default: `ALPHABETICAL`)
- **chunk_size**: description (int, default: `10485760(10M)`) - **chunk_size**: description (int, default: `10485760(10M)`)
...@@ -115,7 +115,7 @@ exec: ...@@ -115,7 +115,7 @@ exec:
min_output_tasks: 1 min_output_tasks: 1
in: in:
type: filename type: filename
mulit_dir: ["../sample/sample_","../example/example_"] multi_dir: ["../sample/sample_","../example/example_"]
multi_tag: ["tag1","tag2"] multi_tag: ["tag1","tag2"]
load_order: ASCEND_MODIFIED load_order: ASCEND_MODIFIED
chunk_size: 1000 chunk_size: 1000
......
...@@ -14,6 +14,7 @@ This plugin should be used with the WendlinPlugin. ...@@ -14,6 +14,7 @@ This plugin should be used with the WendlinPlugin.
*/ */
package org.embulk.input.filename; package org.embulk.input.filename;
import java.util.stream.Collectors;
import java.util.List; import java.util.List;
import java.util.Arrays; import java.util.Arrays;
...@@ -156,6 +157,8 @@ public class FilenameInputPlugin ...@@ -156,6 +157,8 @@ public class FilenameInputPlugin
// Now to time to read all files from each directory. // Now to time to read all files from each directory.
String order = task.getLoadOrder();
for (int i =0; i < dirList.size();i++ ){ for (int i =0; i < dirList.size();i++ ){
String dir = dirList.get(i); String dir = dirList.get(i);
String lastPath = lastPaths.get(i); String lastPath = lastPaths.get(i);
...@@ -163,34 +166,10 @@ public class FilenameInputPlugin ...@@ -163,34 +166,10 @@ public class FilenameInputPlugin
// We have to sort the files before we set them to the tasks. // We have to sort the files before we set them to the tasks.
// Here we get the parameter about the order // Here we get the parameter about the order
Comparator comparator;
String order = task.getLoadOrder();
switch (order) {
case "ASCEND_MODIFIED": comparator = AscendModifiedSorter.getComparator();
break;
case "DESCEND_MODIFIED": comparator = DescendModifiedSorter.getComparator();
break;
case "ASCEND_CREATION": comparator = AscendCreationSorter.getComparator();
break;
case "DESCEND_CREATION": comparator = DescendModifiedSorter.getComparator();
break;
default: comparator = new Comparator<String>();
break;
}
// This method return the files in a directory,which is already sorted in the this method
// This method return the files in a directory,however the files in the varibale files is in random order. we have to sort them next
ArrayList<String> files = listFiles(task,Paths.get(dir).normalize(),lastPath,order); ArrayList<String> files = listFiles(task,Paths.get(dir).normalize(),lastPath,order);
// Sort the files for each directory
Collections.sort(files,comparator);
// End of sort
log.info("The files is " + files); log.info("The files is " + files);
// add the files to the big Array. // add the files to the big Array.
...@@ -306,61 +285,55 @@ public class FilenameInputPlugin ...@@ -306,61 +285,55 @@ public class FilenameInputPlugin
directory = (d == null ? CURRENT_DIR : d); directory = (d == null ? CURRENT_DIR : d);
} }
final ArrayList<String> filesArray = new ArrayList<String>();
try {
log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs)
{
if (path.equals(directory)) {
return FileVisitResult.CONTINUE;
} else if (lastPath != null && path.toString().compareTo(lastPath.substring(0, path.toString().length())) < 0) {
return FileVisitResult.SKIP_SUBTREE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.SKIP_SUBTREE;
} else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) {
return FileVisitResult.CONTINUE;
} else {
return FileVisitResult.SKIP_SUBTREE;
}
}
}
@Override log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix);
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) ArrayList<String> filesArray= new ArrayList<String>();
{ ArrayList<File> files;
try
{ // Walk the directory, attention: this method does not walk its sub dir!
if ( !lastPath.equals("") && order.equals("ALPHABETICAL") && path.toString().compareTo(lastPath) <= 0) { try {
return FileVisitResult.CONTINUE; filesArray= Files.walk(directory)
} else if (!lastPath.equals("") && order.equals("ASCEND_MODIFIED") && getLastModifiedTime(path.toString()).compareTo(getLastModifiedTime(lastPath)) <= 0) { .filter(Files::isRegularFile)
return FileVisitResult.CONTINUE; .map(Path::toFile)
} else if (!lastPath.equals("") && order.equals("DESCEND_MODIFIED") && getLastModifiedTime(path.toString()).compareTo(getLastModifiedTime(lastPath)) >= 0){ .filter(f -> f.getName().startsWith(fileNamePrefix))
return FileVisitResult.CONTINUE; .map(f -> f.getAbsolutePath())
} else if (!lastPath.equals("") && order.equals("ASCEND_CREATION") && getCreationTime(path.toString()).compareTo(getCreationTime(lastPath)) <= 0){ .collect(Collectors.toCollection(ArrayList::new));
return FileVisitResult.CONTINUE;
} else if (!lastPath.equals("") && order.equals("DESCEND_MODIFIED") && getCreationTime(path.toString()).compareTo(getCreationTime(lastPath)) <= 0) {
return FileVisitResult.CONTINUE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.CONTINUE;
} else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) {
filesArray.add(path.toString());
}
return FileVisitResult.CONTINUE;
}
} catch ( IOException e){
throw new RuntimeException("IOException during the uploading files");
}
}
});
} catch (IOException ex) {
throw new RuntimeException(String.format("Failed get a list of local files at '%s'", directory), ex);
}
//return builder.build(); //return builder.build();
} catch (IOException ex){
throw new RuntimeException("Somethig wrong when load the files");
}
Comparator comparator;
// Be careful, if the files are copied into the dir, it means that the last Modified time of those files are the same
// In this case the ASCEND_MODIFIED and DESCEND_MODIFIED get the same result.
switch (order) {
case "ASCEND_MODIFIED": comparator = new AscendModifiedSorter();
break;
case "DESCEND_MODIFIED": comparator = new DescendModifiedSorter();
break;
case "ASCEND_CREATION": comparator = new AscendCreationSorter();
break;
case "DESCEND_CREATION": comparator = new DescendModifiedSorter();
break;
default: comparator = new AlphabeticalSorter();
break;
}
// Sort the files for each directory
Collections.sort(filesArray,comparator);
if (!lastPath.equals("")) {
int ind = filesArray.indexOf(lastPath);
if (ind >= 0 && ind < filesArray.size() ){
return new ArrayList(filesArray.subList(ind + 1, filesArray.size()));
}
}
return filesArray; return filesArray;
} }
...@@ -384,15 +357,18 @@ public class FilenameInputPlugin ...@@ -384,15 +357,18 @@ public class FilenameInputPlugin
//System.out.println("The raw last modified time of " +filename+ " is " + fileTime.toString()); //System.out.println("The raw last modified time of " +filename+ " is " + fileTime.toString());
return fileTime; return fileTime;
} }
}
interface Sorter {
public static Comparator getComparator();
}
class AscendModifiedSorter implements Sorter { // Those sorter is the implementation of the Comparator to help sort the files!
public static Comparator getComparator(){ class AlphabeticalSorter implements Comparator<String> {
return new Comparator<String>(){ @Override
public int compare(String f1, String f2) {
return f1.compareTo(f2);
}
}
class AscendModifiedSorter implements Comparator<String> {
@Override @Override
public int compare(String f1, String f2) { public int compare(String f1, String f2) {
try{ try{
...@@ -402,16 +378,10 @@ class AscendModifiedSorter implements Sorter { ...@@ -402,16 +378,10 @@ class AscendModifiedSorter implements Sorter {
} }
return 0; return 0;
} }
}); }
}
}
class AscendCreationSorter implements Sorter { class AscendCreationSorter implements Comparator<String> {
public static Comparator getComparator(){
return new Comparator<String>(){
@Override @Override
public int compare(String f1, String f2) { public int compare(String f1, String f2) {
try{ try{
...@@ -421,40 +391,31 @@ class AscendCreationSorter implements Sorter { ...@@ -421,40 +391,31 @@ class AscendCreationSorter implements Sorter {
} }
return 0; return 0;
} }
}); }
}
}
class DescendModifiedSorter implements Sorter { class DescendModifiedSorter implements Comparator<String>{
public static Comparator getComparator(){
return new Comparator<String>(){
@Override @Override
public int compare(String f1, String f2) { public int compare(String f1, String f2) {
try{ try{
return - getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2)); return getLastModifiedTime(f2).compareTo(getLastModifiedTime(f1));
} catch (IOException ex){ } catch (IOException ex){
ex.printStackTrace(); ex.printStackTrace();
} }
return 0; return 0;
} }
}); }
}
}
class DescendCreationSorter implements Sorter { class DescendCreationSorter implements Comparator<String>{
public static Comparator getComparator(){
return new Comparator<String>(){
@Override @Override
public int compare(String f1, String f2) { public int compare(String f1, String f2) {
try{ try{
return - getCreationTime(f1).compareTo(getCreationTime(f2)); return getCreationTime(f2).compareTo(getCreationTime(f1));
} catch (IOException ex){ } catch (IOException ex){
ex.printStackTrace(); ex.printStackTrace();
} }
return 0; return 0;
} }
}); }
}
} }
\ No newline at end of file
...@@ -124,9 +124,14 @@ public class TestFilenameInputPlugin ...@@ -124,9 +124,14 @@ public class TestFilenameInputPlugin
res = embulk.runAllBuilder(execConfig,inConfig,outConfig); res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt")); lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt"));
//We reverse the actual files //We reverse the actual files
Collections.reverse(actual); Collections.reverse(actual);
System.out.println("The lines" + lines);
System.out.println("The actual" + actual);
assertEquals(lines,actual); assertEquals(lines,actual);
} }
...@@ -222,6 +227,7 @@ public class TestFilenameInputPlugin ...@@ -222,6 +227,7 @@ public class TestFilenameInputPlugin
System.out.println(lines1); System.out.println(lines1);
System.out.println(dir1); System.out.println(dir1);
assertEquals(lines1,dir1); assertEquals(lines1,dir1);
assertEquals(lines2,dir2);
} }
...@@ -298,6 +304,7 @@ public class TestFilenameInputPlugin ...@@ -298,6 +304,7 @@ public class TestFilenameInputPlugin
System.out.println(lines1); System.out.println(lines1);
System.out.println(dir1); System.out.println(dir1);
assertEquals(lines1,dir1); assertEquals(lines1,dir1);
assertEquals(lines2,dir2);
} }
...@@ -334,5 +341,63 @@ public class TestFilenameInputPlugin ...@@ -334,5 +341,63 @@ public class TestFilenameInputPlugin
assertEquals(actual,lines); assertEquals(actual,lines);
} }
@Test
public void testLastPath() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("min_output_tasks","1");
Path path_src = Paths.get("src/test/resources/testDirList");
// Be careful the name of the List should be multi_dir!
List<String> multi_dir = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_",path_src.toAbsolutePath().toString()+"/example/example_");
List<String> multi_tag = Arrays.asList("hello","world");
List<String> lastPaths = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_02.txt",path_src.toAbsolutePath().toString()+"/example/example_01.txt");
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("load_order","ALPHABETICAL")
.set("lastPaths",lastPaths)
.set("multi_dir",multi_dir);
Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext","txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
List<String> lines1 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt"));
List<String> lines2 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile1.txt"));
List<String> dir1 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/sample"))
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
Collections.sort(dir1);
List<String> dir2 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/example"))
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
List<String> fromLastPath = dir2.subList(0,dir2.size());
Collections.sort(dir2);
//System.out.println(lines1);
//System.out.println(dir1.subList(2,dir1.size()));
//System.out.println(lines2);
//System.out.println(dir2.subList(1,dir2.size()));
assertEquals(lines1,dir1.subList(2,dir1.size()));
assertEquals(lines2,dir2.subList(1,dir2.size()));
}
} }
...@@ -2,5 +2,4 @@ ...@@ -2,5 +2,4 @@
3 3
3 3
3 3
3
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment