Commit 26fb4ecf authored by Eteri's avatar Eteri

embulk-input-filename: add first_path and start_date options

parent 774b5abb
......@@ -5,6 +5,9 @@ import java.util.*;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.nio.file.FileSystems;
import java.nio.file.attribute.FileTime;
import java.util.List;
......@@ -111,6 +114,14 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@ConfigDefault("null")
Optional <String> getFileNameSuffix();
@Config("start_date")
@ConfigDefault("null")
Optional<String> getStartDate();
@Config("first_path")
@ConfigDefault("null")
Optional<String> getFirstPath();
@Config("last_path")
@ConfigDefault("null")
Optional<String> getLastPath();
......@@ -282,6 +293,20 @@ public class FilenameFileInputPlugin implements FileInputPlugin
}
final ImmutableList.Builder<String> builder = ImmutableList.builder();
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
final String startDateString = task.getStartDate().orNull();
final Date startDate;
if (startDateString != null) {
try {
startDate = format.parse(startDateString);
} catch (ParseException e) {
throw new RuntimeException(String.format("Cannot parse start_date '%s'", startDateString), e);
}
} else {
startDate = null;
}
final String firstPath = task.getFirstPath().orNull();
final String lastPath = task.getLastPath().orNull();
final Integer fileSize = task.getFileSize().orNull();
final String fileNameSuffix = task.getFileNameSuffix().orNull();
......@@ -304,25 +329,24 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@Override
public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs)
{
if(useLastModified) {
FileTime fileTime;
long fileModTime = Long.MIN_VALUE;
long fileChangeTime = Long.MIN_VALUE;
try {
//fileTime = Files.getLastModifiedTime(path);
fileTime = (FileTime) Files.getAttribute(path, "unix:ctime");
fileModTime = fileTime.toMillis();
fileChangeTime = fileTime.toMillis();
} catch (IOException e) {
log.info("Cannot get the last modified time - {} ", e);
log.info("Cannot get the last changed time - {} ", e);
}
// log.info("########################FROM REVISIT#############################################");
// log.info ("file = {}", path);
// log.info("last_Modified = {}, files time = {}", lastModified, fileModTime);
// log.info("last_Modified = {}, files time = {}", lastModified, fileChangeTime);
if (path.equals(directory) || Files.isDirectory(path)) {
return FileVisitResult.CONTINUE;
} else if (lastModified != null && Long.parseLong(lastModified) > fileModTime) { //if file is older then lastModified
} else if (firstPath != null && path.toString().compareTo(firstPath.substring(0, path.toString().length())) < 0) {
return FileVisitResult.SKIP_SUBTREE;
} else if (lastModified != null && Long.parseLong(lastModified) > fileChangeTime) { //if file is older then lastModified
return FileVisitResult.SKIP_SUBTREE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.SKIP_SUBTREE;
......@@ -338,6 +362,8 @@ public class FilenameFileInputPlugin implements FileInputPlugin
else {
if (path.equals(directory)) {
return FileVisitResult.CONTINUE;
} else if (firstPath != null && path.toString().compareTo(firstPath.substring(0, path.toString().length())) < 0) {
return FileVisitResult.SKIP_SUBTREE;
} else if (lastPath != null && path.toString().compareTo(lastPath.substring(0, path.toString().length())) < 0) {
return FileVisitResult.SKIP_SUBTREE;
} else if (path.getFileName().toString().startsWith(".")) {
......@@ -355,25 +381,38 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs)
{
if(useLastModified) {
FileTime fileTime;
long fileModTime = Long.MIN_VALUE;
FileTime fileModifiedTime;
Date fileModifiedDate = null;
try {
fileModifiedTime = (FileTime) Files.getLastModifiedTime(path);
fileModifiedDate = new Date(fileModifiedTime.toMillis());
} catch (IOException e) {
log.info("Cannot get the last modified time - {} ", e);
}
if (firstPath != null && path.toString().compareTo(firstPath) <= 0) {
return FileVisitResult.CONTINUE;
}
if (startDate != null && startDate.compareTo(fileModifiedDate) > 0) {
return FileVisitResult.CONTINUE;
}
if(useLastModified) {
FileTime fileTime;
long fileChangeTime = Long.MIN_VALUE;
try {
//fileTime = Files.getLastModifiedTime(path);
fileTime = (FileTime) Files.getAttribute(path, "unix:ctime");
fileModTime = fileTime.toMillis();
fileChangeTime = fileTime.toMillis();
} catch (IOException e) {
log.info("Cannot get the last modified time - {} ", e);
log.info("Cannot get the last changed time - {} ", e);
}
// log.info("########################FROM VISIT#############################################");
// log.info ("file = {}", path);
// log.info("last_Modified = {}, files time = {}", lastModified, fileModTime);
// log.info("last_Modified = {}, files time = {}", lastModified, fileChangeTime);
if (lastModified != null && Long.parseLong(lastModified) > fileModTime) {
if (lastModified != null && Long.parseLong(lastModified) > fileChangeTime) {
return FileVisitResult.CONTINUE;
} else if (lastModified != null && Long.parseLong(lastModified) == fileModTime && lastPath != null && path.toString().compareTo(lastPath) <= 0 ) {
} else if (lastModified != null && Long.parseLong(lastModified) == fileChangeTime && lastPath != null && path.toString().compareTo(lastPath) <= 0 ) {
return FileVisitResult.CONTINUE;
}
else if (path.getFileName().toString().startsWith(".")) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment