Commit 774b5abb authored by Eteri's avatar Eteri

embulk-input-filename: change lastModification time to last change (ctime) time

parent a9881575
package org.embulk.input.filename; package org.embulk.input.filename;
import java.io.*;
import java.util.*;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.nio.file.FileSystems; import java.nio.file.FileSystems;
...@@ -133,9 +136,6 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -133,9 +136,6 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@ConfigDefault("null") @ConfigDefault("null")
Optional<String> getLastModified(); Optional<String> getLastModified();
// @Config("files_same_mod_time")
// @ConfigDefault("")
// Optional<List> getListFilesSameModTime();
List<String> getFiles(); List<String> getFiles();
void setFiles(List<String> files); void setFiles(List<String> files);
...@@ -160,60 +160,42 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -160,60 +160,42 @@ public class FilenameFileInputPlugin implements FileInputPlugin
final Integer maxFileCount = task.getMaxFileCount().orNull(); final Integer maxFileCount = task.getMaxFileCount().orNull();
if(task.getUseLastModified()) { if(task.getUseLastModified()) {
File [] testFiles = new File[files.size()] ; File [] testFiles = new File[files.size()] ;
List<File> dateFileList = new ArrayList<>(); List<File> dateFileList = new ArrayList<>();
for(int i = 0; i< files.size(); i++){ for(int i = 0; i< files.size(); i++){
dateFileList.add(new File(files.get(i))); dateFileList.add(new File(files.get(i)));
testFiles[i] = new File(files.get(i)); testFiles[i] = new File(files.get(i));
} }
/** Collections.sort(dateFileList, new Comparator<File>() {
public int compare(File f1, File f2) {
return (f1.lastModified() <= f2.lastModified()) ? -1 : 1;
}
});
**/
Arrays.sort( testFiles, new Comparator()
{
public int compare(Object o1, Object o2) {
if (((File)o1).lastModified() > ((File)o2).lastModified()) {
return -1;
} else if (((File)o1).lastModified() < ((File)o2).lastModified()) {
return +1;
} else {
return 0;
}
}
});
List<String> dateSortedFiles = new ArrayList<String>(); List<String> dateSortedFiles = new ArrayList<String>();
for(File f: dateFileList) { for(File f: dateFileList) {
dateSortedFiles.add(f.toString()); dateSortedFiles.add(f.toString());
} }
Collections.sort(dateSortedFiles);
task.setFiles(dateSortedFiles); task.setFiles(dateSortedFiles);
// number of processors is same with number of files // number of processors is same with number of files
log.info("Loading files {}", dateSortedFiles);
int taskCount = task.getFiles().size(); int taskCount = task.getFiles().size();
return resume(task.dump(), taskCount, control); return resume(task.dump(), taskCount, control);
} }
else { else {
Collections.sort(files); Collections.sort(files);
if (maxFileCount != null && files.size() > maxFileCount) { if (maxFileCount != null && files.size() > maxFileCount) {
files = files.subList(0, maxFileCount); files = files.subList(0, maxFileCount);
}
log.info("Loading files {}", files);
task.setFiles(files);
// number of processors is same with number of files
int taskCount = task.getFiles().size();
return resume(task.dump(), taskCount, control);
} }
log.info("Loading files {}", files);
task.setFiles(files);
// number of processors is same with number of files
int taskCount = task.getFiles().size();
return resume(task.dump(), taskCount, control);
} }
}
@Override @Override
public ConfigDiff resume(TaskSource taskSource, public ConfigDiff resume(TaskSource taskSource,
int taskCount, int taskCount,
...@@ -233,51 +215,51 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -233,51 +215,51 @@ public class FilenameFileInputPlugin implements FileInputPlugin
configDiff.set("last_path", task.getLastPath().get()); configDiff.set("last_path", task.getLastPath().get());
} }
if(task.getUseLastModified()){ if(task.getUseLastModified()){
log.info("File list is empty:try to get last modified time");
if (task.getLastModified().isPresent()) { if (task.getLastModified().isPresent()) {
log.info("GetLASTMODIFIED is present");
configDiff.set("last_modified", task.getLastModified().get()); configDiff.set("last_modified", task.getLastModified().get());
log.info("check it {}", task.getLastModified().get());
} }
} }
} else { } else {
List<String> files = new ArrayList<String>(task.getFiles()); List<String> files = new ArrayList<String>(task.getFiles());
Collections.sort(files); Collections.sort(files);
configDiff.set("last_path", files.get(files.size() - 1)); configDiff.set("last_path", files.get(files.size() - 1));
log.info("REAL LAST PATH {}", files.get(files.size() - 1));
if(task.getUseLastModified()) { if(task.getUseLastModified()) {
long last = Long.MIN_VALUE; long last = Long.MIN_VALUE;
long lastFileTime = Long.MIN_VALUE; long lastFileTime = Long.MIN_VALUE;
int index = 0; int index = 0;
List sameTime = new ArrayList(); List sameTime = new ArrayList();
for (int i = 0; i < files.size(); i++) { for (int i = 0; i < files.size(); i++) {
File f = new File(files.get(i)); File f = new File(files.get(i));
if (f.lastModified() >= last){ Path f_name = Paths.get(f.getPath());
// if(f.lastModified() == last){
// sameTime.add(f); try {
// } FileTime ctime = (FileTime) Files.getAttribute(f_name, "unix:ctime");
last = f.lastModified(); long ctime_long = ctime.toMillis();
index = i; if (ctime_long >= last){
last = ctime_long;
index = i;
}
} catch (IOException e) {
log.info("Cannot get the unix:ctime - {} ", e);
} }
} }
File f = new File(files.get(index)); File f = new File(files.get(index));
configDiff.set("last_modified", f.lastModified()); Path f_name = Paths.get(f.getPath());
log.info(" FINAL f.lastModified() {}", f.lastModified()); try {
log.info("FINAL file for that time {}", f); FileTime ctime = (FileTime) Files.getAttribute(f_name, "unix:ctime");
configDiff.set("last_modified", ctime.toMillis());
} catch (IOException e) {
log.info("Cannot get the unix:ctime - {} ", e);
}
} }
} }
return configDiff; return configDiff;
} }
@Override @Override
public void cleanup(TaskSource taskSource, public void cleanup(TaskSource taskSource,
int taskCount, int taskCount,
...@@ -293,28 +275,28 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -293,28 +275,28 @@ public class FilenameFileInputPlugin implements FileInputPlugin
if (Files.isDirectory(pathPrefix)) { if (Files.isDirectory(pathPrefix)) {
directory = pathPrefix; directory = pathPrefix;
fileNamePrefix = ""; fileNamePrefix = "";
log.info("PRINT in IF :(Files.isDirectory(pathPrefix) directory= {}", directory);
} else { } else {
log.info("PRINT in ELSE:");
fileNamePrefix = pathPrefix.getFileName().toString(); fileNamePrefix = pathPrefix.getFileName().toString();
Path d = pathPrefix.getParent(); Path d = pathPrefix.getParent();
directory = (d == null ? CURRENT_DIR : d); directory = (d == null ? CURRENT_DIR : d);
// log.info("PRINT from ELSE fileNamePrefix {}", fileNamePrefix);
// log.info("PRINT from ELSE Path d {}", d);
// log.info("PRINT from ELSE directory {}", directory);
} }
final ImmutableList.Builder<String> builder = ImmutableList.builder(); final ImmutableList.Builder<String> builder = ImmutableList.builder();
final String lastPath = task.getLastPath().orNull(); final String lastPath = task.getLastPath().orNull();
final Integer fileSize = task.getFileSize().orNull(); final Integer fileSize = task.getFileSize().orNull();
final String fileNameSuffix = task.getFileNameSuffix().orNull(); final String fileNameSuffix = task.getFileNameSuffix().orNull();
// final List listOfUploadedFiles = task.getListFilesSameModTime().orNull();
log.info("PRINT lastPath {}", lastPath);
final String lastModified = task.getLastModified().orNull(); final String lastModified = task.getLastModified().orNull();
final boolean useLastModified = task.getUseLastModified(); final boolean useLastModified = task.getUseLastModified();
/* final Path yyy = Paths.get("/mic/L0444-001/syscom/syscom004-14360007/events/2017/12/04/17338004.XMR");
try {
FileTime t = (FileTime) Files.getAttribute(yyy, "unix:ctime");
log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!time for {} = {}", yyy, t.toMillis());
} catch (IOException e) {
log.info("Cannot get the unix:ctime - {} ", e);
}
*/
try { try {
log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix); log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix);
...@@ -324,17 +306,19 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -324,17 +306,19 @@ public class FilenameFileInputPlugin implements FileInputPlugin
{ {
if(useLastModified) { if(useLastModified) {
log.info("PRINT lastModified {}", lastModified);
FileTime fileTime; FileTime fileTime;
long fileModTime = Long.MIN_VALUE; long fileModTime = Long.MIN_VALUE;
try { try {
fileTime = Files.getLastModifiedTime(path); //fileTime = Files.getLastModifiedTime(path);
fileTime = (FileTime) Files.getAttribute(path, "unix:ctime");
fileModTime = fileTime.toMillis(); fileModTime = fileTime.toMillis();
log.info("IN TRY: fileModTime = {}", fileModTime);
} catch (IOException e) { } catch (IOException e) {
log.info("Cannot get the last modified time - {} ", e); log.info("Cannot get the last modified time - {} ", e);
} }
// log.info("########################FROM REVISIT#############################################");
// log.info ("file = {}", path);
// log.info("last_Modified = {}, files time = {}", lastModified, fileModTime);
if (path.equals(directory) || Files.isDirectory(path)) { if (path.equals(directory) || Files.isDirectory(path)) {
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
} else if (lastModified != null && Long.parseLong(lastModified) > fileModTime) { //if file is older then lastModified } else if (lastModified != null && Long.parseLong(lastModified) > fileModTime) { //if file is older then lastModified
...@@ -375,13 +359,17 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -375,13 +359,17 @@ public class FilenameFileInputPlugin implements FileInputPlugin
FileTime fileTime; FileTime fileTime;
long fileModTime = Long.MIN_VALUE; long fileModTime = Long.MIN_VALUE;
try { try {
fileTime = Files.getLastModifiedTime(path); //fileTime = Files.getLastModifiedTime(path);
fileTime = (FileTime) Files.getAttribute(path, "unix:ctime");
fileModTime = fileTime.toMillis(); fileModTime = fileTime.toMillis();
//log.info("from visit fileModTime {}", fileModTime);
} catch (IOException e) { } catch (IOException e) {
log.info("Cannot get the last modified time - {} ", e); log.info("Cannot get the last modified time - {} ", e);
} }
// log.info("########################FROM VISIT#############################################");
// log.info ("file = {}", path);
// log.info("last_Modified = {}, files time = {}", lastModified, fileModTime);
if (lastModified != null && Long.parseLong(lastModified) > fileModTime) { if (lastModified != null && Long.parseLong(lastModified) > fileModTime) {
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment