Commit c820905b authored by yu's avatar yu

finish the unit test of the multi_dir and multi_tag.

parent 44cdd3e0
......@@ -38,75 +38,22 @@ import java.util.Comparator;
class FilenameFileInputStream extends FileInputStream {
static int MAX_NAME_LENGTH = 255;
int n;
byte[] name;
FilenameFileInputStream(File file) throws FileNotFoundException {
super(file);
n = 0;
name = file.getName().getBytes();
}
FilenameFileInputStream(String path) throws FileNotFoundException {
super(path);
n = 0;
name = path.getBytes();
}
@Override
public int read() throws IOException {
if (n < name.length) {
byte b = name[n];
n++;
return b;
} else if (n < MAX_NAME_LENGTH) {
n++;
return 0;
} else {
return super.read();
}
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (n < MAX_NAME_LENGTH) {
int i = 0;
int c;
for (; i < len; i++) {
c = read();
if (c == -1) {
if ( i == 0 ) {
return -1;
}
break;
}
b[off + i] = (byte)c;
}
return i;
} else {
return super.read(b, off, len);
}
}
}
public class FilenameFileInputPlugin implements FileInputPlugin
{
public interface PluginTask extends Task
{
@Config("multi_dir")
@ConfigDefault("null")
Optional<List<String>> getMultiDir();
@ConfigDefault("[]")
List<String> getMultiDir();
@Config("multi_tag")
@ConfigDefault("[]")
List<String> getMultiTag();
@Config("path_prefix")
@ConfigDefault("")
String getPathPrefix();
@Config("last_path")
......@@ -157,67 +104,108 @@ public class FilenameFileInputPlugin implements FileInputPlugin
private final Logger log = Exec.getLogger(getClass());
private final static Path CURRENT_DIR = Paths.get(".").normalize();
public static String theTag = "";
public static List<Integer> tagIndex = new ArrayList<Integer>();
public static List<String> tagList;
@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
Optional<List<String>> dirlst = task.getMultiDir();
List<String> allFiles = new ArrayList<String> ();
if (dirlst.isPresent()) {
log.info("The list of dir: " + dirlst);
tagIndex.add(0);
//int s = 0;
List<String> dirList = task.getMultiDir();
tagList = task.getMultiTag();
if ( dirList.size() != 0 ) {
log.info("The list of dir: " + dirList);
while (tagList.size() < dirList.size()){
tagList.add("");
}
} else {
if (task.getPathPrefix().equals("")){
throw new RuntimeException("Please input the path_prefix or the multi_dir");
}
dirList.add(task.getPathPrefix());
log.info("list of dir: " + dirList);
tagList.add("");
}
// list files recursively
List<String> files = listFiles(task);
//Sort the listFiles according to the configuration.
int order_modified = task.getOrderByModifiedTime();
int order_creation = task.getOrderByCreationTime();
ConfigDiff res = Exec.newConfigDiff();
for (int i=0; i< dirList.size();i++)
{
flag = 0;
List<String> files = listFiles(task,Paths.get(dirList.get(i)).normalize());
//Sort the listFiles according to the configuration.
int order_modified = task.getOrderByModifiedTime();
int order_creation = task.getOrderByCreationTime();
if (order_modified == 0 && order_creation == 0){
Collections.sort(files);
} else if(order_creation == 0){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
if (order_modified == 0 && order_creation == 0){
Collections.sort(files);
} else if(order_creation == 0){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
return 0;
}
});
});
if (order_modified == 1 ) { Collections.reverse(files); }
if (order_modified == 1 ) { Collections.reverse(files); }
} else if (order_modified == 0 ){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getCreationTime(f1).compareTo(getCreationTime(f2));
} catch (IOException ex){
ex.printStackTrace();
} else if (order_modified == 0 ){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getCreationTime(f1).compareTo(getCreationTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
return 0;
}
});
});
if ( order_creation == 1 ) { Collections.reverse(files);}
} else {
throw new RuntimeException("Could not order by creation time and lasModified time at the same time");
}
if ( order_creation == 1 ) { Collections.reverse(files);}
} else {
throw new RuntimeException("Could not order by creation time and lasModified time at the same time");
}
log.info("Loading files {}", files);
task.setFiles(files);
log.info("Loading files {}", files);
allFiles.addAll(files);
//task.setFiles(files);
//s += files.size()
tagIndex.add(allFiles.size());
//taskList.add(task.deepCopy);
// number of processors is same with number of files
int taskCount = task.getFiles().size();
//int taskCount = files.size();
//theTag = tagList.get(i);
//info.log();
//res = resume(task.dump(), taskCount, control);
}
task.setFiles(allFiles);
int taskCount = allFiles.size();
//return res;
return resume(task.dump(), taskCount, control);
}
......@@ -257,9 +245,9 @@ public class FilenameFileInputPlugin implements FileInputPlugin
List<TaskReport> successTaskReports)
{ }
public List<String> listFiles(PluginTask task)
public List<String> listFiles(PluginTask task,Path pathPrefix)
{
Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
//Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
final Path directory;
final String fileNamePrefix;
if (Files.isDirectory(pathPrefix)) {
......@@ -328,7 +316,11 @@ public class FilenameFileInputPlugin implements FileInputPlugin
final PluginTask task = taskSource.loadTask(PluginTask.class);
log.info("The task in open: " + taskSource.toString());
log.info("The taskIndex: " + taskIndex);
final String path = task.getFiles().get(taskIndex);
setTag(taskIndex);
log.info("The tag: " + theTag);
return new InputStreamTransactionalFileInput(
task.getBufferAllocator(),
......@@ -350,4 +342,74 @@ public class FilenameFileInputPlugin implements FileInputPlugin
}
};
}
}
public static int flag = 0;
public static void setTag(int index)
{
if (index == tagIndex.get(flag))
{
flag+=1;
}
theTag = tagList.get(flag-1);
}
class FilenameFileInputStream extends FileInputStream {
final int MAX_NAME_LENGTH = 255;
int n;
byte[] name;
FilenameFileInputStream(File file) throws FileNotFoundException {
super(file);
n = 0;
name = (theTag+file.getName()).getBytes();
}
FilenameFileInputStream(String path) throws FileNotFoundException {
super(path);
n = 0;
name = (theTag+path).getBytes();
}
@Override
public int read() throws IOException {
if (n < name.length) {
byte b = name[n];
n++;
return b;
} else if (n < MAX_NAME_LENGTH) {
n++;
return 0;
} else {
return super.read();
}
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (n < MAX_NAME_LENGTH) {
int i = 0;
int c;
for (; i < len; i++) {
c = read();
if (c == -1) {
if ( i == 0 ) {
return -1;
}
break;
}
b[off + i] = (byte)c;
}
return i;
} else {
return super.read(b, off, len);
}
}
}
}
\ No newline at end of file
......@@ -13,7 +13,6 @@ import org.apache.commons.codec.binary.Base64;
import org.embulk.config.ConfigSource;
import org.embulk.config.ConfigDiff;
import org.embulk.test.EmbulkTests;
//import TestHelper;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.OutputPlugin;
......@@ -28,9 +27,10 @@ import java.nio.file.Files;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.ListIterator;
import static org.embulk.test.EmbulkTests.readSortedFile;
......@@ -40,122 +40,294 @@ import static org.junit.Assert.assertThat;
public class TestFilenameFileInputPlugin
{
public static FileTime getCreationTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.creationTime();
//System.out.println("The raw creation time of " +filename+ " is " + fileTime.toString());
return fileTime;
}
public static FileTime getCreationTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.creationTime();
return fileTime;
}
public static FileTime getLastModifiedTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.lastModifiedTime();
//System.out.println("The file time is" + Long.valueOf(fileTime));
return fileTime;
}
public static FileTime getLastModifiedTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.lastModifiedTime();
return fileTime;
}
@Rule
public TestHelper embulk = TestHelper.builder()
@Rule
public TestHelper embulk = TestHelper.builder()
.registerPlugin(InputPlugin.class,"filename",FilenameFileInputPlugin.class)
.registerPlugin(ParserPlugin.class,"none-bin",NoneBinParserPlugin.class)
.registerPlugin(OutputPlugin.class,"joinfile",JoinfileOutputPlugin.class)
.build();
@Test
public void testOrderByModifiedTime() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1");
Path path_src = Paths.get("src/test/resources/testModifiedOrder");
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("path_prefix",path_src.toAbsolutePath().toString()+"/sample_")
.set("order_by_modified_time","2")
.set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
@Test
public void testOrderByModifiedTime() throws Exception{
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1");
//Attention the readAllLines load all lines into memory, it is not recommanded to read a big file.
Path path_src = Paths.get("src/test/resources/testModifiedOrder");
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("path_prefix",path_src.toAbsolutePath().toString()+"/sample_")
.set("order_by_modified_time","2")
.set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
//Attention the readAllLines load all lines into memory, it is not recommanded to read a big file.
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> actual = Files.walk(path_src)
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
Collections.sort(actual,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
});
//System.out.println(lines);
//System.out.println(actual);
assertEquals(lines,actual);
inConfig.set("order_by_modified_time","1");
res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
// We reverse the actual files
Collections.reverse(actual);
assertEquals(lines,actual);
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
}
List<String> actual = Files.walk(path_src)
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
@Test
public void testTagList() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1");
Collections.sort(actual,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
Path path_src = Paths.get("src/test/resources/testDirList");
// Be careful the name of the List should be multi_dir!
List<String> multi_dir = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_",path_src.toAbsolutePath().toString()+"/example/example_");
List<String> multi_tag = Arrays.asList("hello","world");
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("order_by_modified_time","2")
.set("multi_dir",multi_dir)
.set("multi_tag",multi_tag)
.set("path_prefix","/home/chronos/user/Downloads/embulk-input-filename/src/test/resources/testDirList/example/example_")
.set("parser",embulk.newConfig().set("type","none-bin"));
System.out.println(inConfig);
Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
//List<String> actual = Files.readAllLines(Paths.get(path_src+"/test.csv"));
List<String> dir1 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/sample"))
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
Collections.sort(dir1,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
return 0;
}
});
//System.out.println(lines);
//System.out.println(actual);
assertEquals(lines,actual);
});
List<String> dir2 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/example"))
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
Collections.sort(dir2,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
});
for (ListIterator i = dir1.listIterator(); i.hasNext(); )
{
i.set(multi_tag.get(0) + i.next());
}
for (ListIterator i = dir2.listIterator(); i.hasNext(); )
{
i.set(multi_tag.get(1) + i.next());
}
inConfig.set("order_by_modified_time","1");
res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
// We reverse the actual files
Collections.reverse(actual);
assertEquals(lines,actual);
dir1.addAll(dir2);
//System.out.println(lines);
//System.out.println(dir1);
assertEquals(lines,dir1);
}
}
@Test
public void testBase64() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1");
@Test
public void testDirList() throws Exception{
Path path_src = Paths.get("src/test/resources/data");
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("path_prefix",path_src.toAbsolutePath().toString()+"/test.csv")
.set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir();
ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1");
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("sum_type","content")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
Path path_src = Paths.get("src/test/resources/testDirList");
// Be careful the name of the List should be multi_dir!
List<String> multi_dir = Arrays.asList(path_src.toAbsolutePath().toString()+"/sample/sample_",path_src.toAbsolutePath().toString()+"/example/example_");
List<String> multi_tag = Arrays.asList("hello","world");
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("order_by_modified_time","2")
.set("multi_dir",multi_dir)
.set("path_prefix","/home/chronos/user/Downloads/embulk-input-filename/src/test/resources/testDirList/example/example_")
.set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("sum_type","filename")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> dir1 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/sample"))
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
Collections.sort(dir1,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
});
List<String> dir2 = Files.walk(Paths.get(path_src.toAbsolutePath().toString()+"/example"))
.filter(Files::isRegularFile)
.map(Path::toAbsolutePath)
.map(Path::toString)
.collect(Collectors.toList());
Collections.sort(dir2,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
});
dir1.addAll(dir2);
//System.out.println(lines);
//System.out.println(dir1);
assertEquals(lines,dir1);
}
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
@Test
public void testBase64() throws Exception{
ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1");
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
Path path_src = Paths.get("src/test/resources/data");
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("path_prefix",path_src.toAbsolutePath().toString()+"/test.csv")
.set("parser",embulk.newConfig().set("type","none-bin"));
Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("sum_type","content")
.set("path_prefix",tmp.toString()+"/outputfile")
.set("file_ext",".txt");
TestHelper.RunResult res = embulk.runAllBuilder(execConfig,inConfig,outConfig);
List<String> lines = Files.readAllLines(Paths.get(tmp.toString()+"/outputfile.txt"));
List<String> actual = Files.readAllLines(Paths.get(path_src+"/test.csv"));
//System.out.println(lines);
String ans = String.join("\n",actual) + "\n";
String actual_bytes = Base64.encodeBase64String(ans.getBytes());
assertEquals(lines.get(0),actual_bytes);
}
List<String> actual = Files.readAllLines(Paths.get(path_src+"/test.csv"));
//System.out.println(lines);
String ans = String.join("\n",actual) + "\n";
String actual_bytes = Base64.encodeBase64String(ans.getBytes());
assertEquals(lines.get(0),actual_bytes);
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment