Commit 8ef3d8ee authored by root's avatar root

add the modified_time order

parent b907380b
...@@ -32,6 +32,10 @@ import org.embulk.spi.TransactionalFileInput; ...@@ -32,6 +32,10 @@ import org.embulk.spi.TransactionalFileInput;
import org.embulk.spi.util.InputStreamTransactionalFileInput; import org.embulk.spi.util.InputStreamTransactionalFileInput;
import org.embulk.standards.LocalFileInputPlugin; import org.embulk.standards.LocalFileInputPlugin;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.FileTime;
import java.util.Comparator;
class FilenameFileInputStream extends FileInputStream { class FilenameFileInputStream extends FileInputStream {
...@@ -68,13 +72,13 @@ class FilenameFileInputStream extends FileInputStream { ...@@ -68,13 +72,13 @@ class FilenameFileInputStream extends FileInputStream {
@Override @Override
public int read(byte[] b) throws IOException { public int read(byte[] b) throws IOException {
return read(b, 0, b.length); return read(b, 0, b.length);
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
if (n < MAX_NAME_LENGTH) { if (n < MAX_NAME_LENGTH) {
int i = 0; int i = 0;
int c; int c;
for (; i < len; i++) { for (; i < len; i++) {
c = read(); c = read();
if (c == -1) { if (c == -1) {
...@@ -89,7 +93,7 @@ class FilenameFileInputStream extends FileInputStream { ...@@ -89,7 +93,7 @@ class FilenameFileInputStream extends FileInputStream {
} else { } else {
return super.read(b, off, len); return super.read(b, off, len);
} }
} }
} }
public class FilenameFileInputPlugin implements FileInputPlugin public class FilenameFileInputPlugin implements FileInputPlugin
...@@ -103,6 +107,14 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -103,6 +107,14 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@Config("last_path") @Config("last_path")
@ConfigDefault("null") @ConfigDefault("null")
Optional<String> getLastPath(); Optional<String> getLastPath();
@Config("order_by_modified_time")
@ConfigDefault("0")
int getOrderByModifiedTime();
@Config("order_by_creation_time")
@ConfigDefault("0")
int getOrderByCreationTime();
@Config("file_size") @Config("file_size")
@ConfigDefault("null") @ConfigDefault("null")
...@@ -119,6 +131,24 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -119,6 +131,24 @@ public class FilenameFileInputPlugin implements FileInputPlugin
BufferAllocator getBufferAllocator(); BufferAllocator getBufferAllocator();
} }
public static FileTime getCreationTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.creationTime();
//System.out.println("The raw creation time of " +filename+ " is " + fileTime.toString());
return fileTime;
}
public static FileTime getLastModifiedTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
BasicFileAttributes view = Files.getFileAttributeView(p,BasicFileAttributeView.class).readAttributes();
FileTime fileTime = view.lastModifiedTime();
//System.out.println("The raw last modified time of " +filename+ " is " + fileTime.toString());
return fileTime;
}
private final Logger log = Exec.getLogger(getClass()); private final Logger log = Exec.getLogger(getClass());
private final static Path CURRENT_DIR = Paths.get(".").normalize(); private final static Path CURRENT_DIR = Paths.get(".").normalize();
...@@ -130,6 +160,47 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -130,6 +160,47 @@ public class FilenameFileInputPlugin implements FileInputPlugin
// list files recursively // list files recursively
List<String> files = listFiles(task); List<String> files = listFiles(task);
//Sort the listFiles according to the configuration.
int order_modified = task.getOrderByModifiedTime();
int order_creation = task.getOrderByCreationTime();
if (order_modified == 0 && order_creation == 0){
Collections.sort(files);
} else if(order_creation == 0){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getLastModifiedTime(f1).compareTo(getLastModifiedTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
});
if (order_modified == 1 ) { Collections.reverse(files); }
} else if (order_modified == 0 ){
Collections.sort(files,new Comparator<String>(){
@Override
public int compare(String f1, String f2) {
try{
return getCreationTime(f1).compareTo(getCreationTime(f2));
} catch (IOException ex){
ex.printStackTrace();
}
return 0;
}
});
if ( order_creation == 1 ) { Collections.reverse(files);}
} else {
throw new RuntimeException("Could not order by creation time and lasModified time at the same time");
}
log.info("Loading files {}", files); log.info("Loading files {}", files);
task.setFiles(files); task.setFiles(files);
...@@ -144,12 +215,12 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -144,12 +215,12 @@ public class FilenameFileInputPlugin implements FileInputPlugin
FileInputPlugin.Control control) FileInputPlugin.Control control)
{ {
PluginTask task = taskSource.loadTask(PluginTask.class); PluginTask task = taskSource.loadTask(PluginTask.class);
log.info("The taskSource of the FileName in the ConfigDiff resume: " + taskSource.toString());
control.run(taskSource, taskCount); control.run(taskSource, taskCount);
log.info("Filename 1 stop point");
// build next config // build next config
ConfigDiff configDiff = Exec.newConfigDiff(); ConfigDiff configDiff = Exec.newConfigDiff();
log.info("Filename 2 stop point");
// last_path // last_path
if (task.getFiles().isEmpty()) { if (task.getFiles().isEmpty()) {
// keep the last value // keep the last value
...@@ -158,10 +229,10 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -158,10 +229,10 @@ public class FilenameFileInputPlugin implements FileInputPlugin
} }
} else { } else {
List<String> files = new ArrayList<String>(task.getFiles()); List<String> files = new ArrayList<String>(task.getFiles());
Collections.sort(files); log.info("The File order is {}",files);
configDiff.set("last_path", files.get(files.size() - 1)); configDiff.set("last_path", files.get(files.size() - 1));
} }
log.info("FileName 3 stop point");
return configDiff; return configDiff;
} }
...@@ -185,7 +256,8 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -185,7 +256,8 @@ public class FilenameFileInputPlugin implements FileInputPlugin
directory = (d == null ? CURRENT_DIR : d); directory = (d == null ? CURRENT_DIR : d);
} }
final ImmutableList.Builder<String> builder = ImmutableList.builder(); //final ImmutableList.Builder<String> builder = ImmutableList.builder();
final List<String> filesArray = new ArrayList<String>();
final String lastPath = task.getLastPath().orNull(); final String lastPath = task.getLastPath().orNull();
final Integer fileSize = task.getFileSize().orNull(); final Integer fileSize = task.getFileSize().orNull();
try { try {
...@@ -219,8 +291,9 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -219,8 +291,9 @@ public class FilenameFileInputPlugin implements FileInputPlugin
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
} else { } else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) { if (path.getFileName().toString().startsWith(fileNamePrefix)) {
if (fileSize == null || path.toFile().length() == fileSize) { if (fileSize == null || path.toFile().length() == fileSize) {
builder.add(path.toString()); //builder.add(path.toString());
filesArray.add(path.toString());
} }
} }
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
...@@ -230,7 +303,8 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -230,7 +303,8 @@ public class FilenameFileInputPlugin implements FileInputPlugin
} catch (IOException ex) { } catch (IOException ex) {
throw new RuntimeException(String.format("Failed get a list of local files at '%s'", directory), ex); throw new RuntimeException(String.format("Failed get a list of local files at '%s'", directory), ex);
} }
return builder.build(); //return builder.build();
return filesArray;
} }
@Override @Override
......
type: filename
path_prexfix: testModified/sample_
order_by_modified_time: 2
parser:
type: none-bin
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment