Commit 6253b2d8 authored by yu's avatar yu

fix the joinfile to join the file for each directory

parent cd1303ab
......@@ -50,7 +50,7 @@ public class JoinfileOutputPlugin
private final Logger log = Exec.getLogger(getClass());
private static FileOutputStream output = null;
private static ArrayList<FileOutputStream> outputs;
private static ArrayList<String> lastP = new ArrayList<String> ();
......@@ -70,15 +70,19 @@ public class JoinfileOutputPlugin
// non-retryable (non-idempotent) output:
String path = task.getPathPrefix() + task.getFileExt();
try {
output = new FileOutputStream(new File(path));
} catch (FileNotFoundException ex) {
throw new RuntimeException (ex);
String path;
outputs = new ArrayList<FileOutputStream> ();
FileOutputStream output;
for (int i = 0 ; i < taskCount; i ++){
path = task.getPathPrefix() + i + "." + task.getFileExt();
try{
output = new FileOutputStream(new File(path));
outputs.add(output);
} catch (FileNotFoundException ex) {
throw new RuntimeException (ex);
}
}
// for the ConfigDiff, we set the last Path of each task is "" as default.
for (int i = 0 ; i< taskCount; i++)
{
......@@ -126,9 +130,9 @@ public class JoinfileOutputPlugin
String line = page.getStringReference(1) + "\n";
String tag = page.getStringReference(1);
if (sumType.equals("filename")){
output.write(line.getBytes());
outputs.get(ind).write(line.getBytes());
}else{
output.write(content.getBytes());
outputs.get(ind).write(content.getBytes());
}
lastP.set(ind ,tag);
......@@ -162,12 +166,14 @@ public class JoinfileOutputPlugin
public static void closeFile()
{
if (output!= null){
try {
output.close();
}catch (IOException ex ) {
throw new RuntimeException(ex);
for ( FileOutputStream outp: outputs){
if (outp != null) {
try {
outp.close();
}catch ( IOException ex ) {
throw new RuntimeException(ex);
}
}
}
}
}
}
......@@ -72,8 +72,7 @@ public class TestFilenameInputPlugin
public void testOrderByModifiedTime() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("min_output_tasks","1")
.set("max_threads","1");
.set("min_output_tasks","1");
Path path_src = Paths.get("src/test/resources/testModifiedOrder");
......@@ -90,12 +89,12 @@ public class TestFilenameInputPlugin
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
.set("file_ext","txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
//Attention the readAllLines load all lines into memory, it is not recommanded to read a big file.
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt"));
List<String> actual = Files.walk(path_src)
.filter(Files::isRegularFile)
......@@ -116,15 +115,16 @@ public class TestFilenameInputPlugin
}
});
//System.out.println("The lines" + lines);
//System.out.println("The actual" + actual);
System.out.println("The lines" + lines);
System.out.println("The actual" + actual);
assertEquals(lines,actual);
inConfig.set("load_order","DESCEND_MODIFIED");
res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt"));
// We reverse the actual files
//We reverse the actual files
Collections.reverse(actual);
assertEquals(lines,actual);
......@@ -137,8 +137,7 @@ public class TestFilenameInputPlugin
public void testTagList() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("min_output_tasks","1")
.set("max_threads","1");
.set("min_output_tasks","1");
Path path_src = Paths.get("src/test/resources/testDirList");
......@@ -159,11 +158,12 @@ public class TestFilenameInputPlugin
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
.set("file_ext","txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> lines1 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt"));
List<String> lines2 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile1.txt"));
//List<String> actual = Files.readAllLines(Paths.get(path_src+"/test.csv"));
......@@ -218,10 +218,10 @@ public class TestFilenameInputPlugin
dir1.addAll(dir2);
System.out.println(lines);
//dir1.addAll(dir2);
System.out.println(lines1);
System.out.println(dir1);
assertEquals(lines,dir1);
assertEquals(lines1,dir1);
}
......@@ -230,8 +230,7 @@ public class TestFilenameInputPlugin
public void testDirList() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("min_output_tasks","1")
.set("max_threads","1");
.set("min_output_tasks","1");
Path path_src = Paths.get("src/test/resources/testDirList");
......@@ -249,11 +248,12 @@ public class TestFilenameInputPlugin
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
.set("file_ext","txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> lines1 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt"));
List<String> lines2 = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile1.txt"));
List<String> dir1 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/sample"))
.filter(Files::isRegularFile)
......@@ -294,10 +294,10 @@ public class TestFilenameInputPlugin
});
dir1.addAll(dir2);
System.out.println(lines);
//dir1.addAll(dir2);
System.out.println(lines1);
System.out.println(dir1);
assertEquals(lines,dir1);
assertEquals(lines1,dir1);
}
......@@ -305,8 +305,7 @@ public class TestFilenameInputPlugin
public void testContent() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("min_output_tasks","1")
.set("max_threads","1");
.set("min_output_tasks","1");
Path path_src = Paths.get("src/test/resources/data");
......@@ -323,11 +322,11 @@ public class TestFilenameInputPlugin
.set("type","joinfile")
.set("sum_type","content")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
.set("file_ext","txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile0.txt"));
List<String> actual = Files.readAllLines(Paths.get(path_src+"/test.csv"));
//System.out.println("The lines " + lines);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment