Commit a9881575 authored by Eteri's avatar Eteri

embulk-input-filename: add last modification date. add python scripts to test uploads

parent 9cec4744
import argparse
from datetime import datetime, timedelta, date
import glob
parser = argparse.ArgumentParser(description='Test if files in a given range are on the server')
parser.add_argument("-path", "--path", help = "Specify the path, e.g. /mic/L0444-001/duo/MDA01", required = True, default = "")
parser.add_argument("-data", "--data", help = "Specify the data name, e.g. 002 or 06 or Nsp", required = True, default = "")
parser.add_argument("-start_date", "--start_date", help = "Specify the start date : year-month-day, e.g. 2017-11-15", required = True, default = "")
parser.add_argument("-start_time", "--start_time", help = "Specify the start time : hour-minute, e.g. 11-10", required = True, default = "")
parser.add_argument("-end_date", "--end_date", help = "Specify the end date : year-month-day, e.g. 2017-11-17", required = True, default = "")
parser.add_argument("-end_time", "--end_time", help = "Specify the end time : hour-minute, e.g. 09-15", required = True, default = "")
args = parser.parse_args()
path = args.path
data_name = args.data
start_date = args.start_date
end_date = args.end_date
start_time = args.start_time
end_time = args.end_time
first_year = int(start_date.split("-")[0])
first_month = int(start_date.split("-")[1])
first_day = int(start_date.split("-")[2])
last_year = int(end_date.split("-")[0])
last_month = int(end_date.split("-")[1])
last_day = int(end_date.split("-")[2])
first_hour = int(start_time.split("-")[0])
first_minute = int(start_time.split("-")[1])
last_hour = int(end_time.split("-")[0])
last_minute = int(end_time.split("-")[1])
start = datetime(first_year, first_month, first_day, first_hour, first_minute, 0)
end = datetime(last_year, last_month, last_day, last_hour, last_minute, 0)
list_of_dates_to_check = []
list_of_files_to_check = []
list_of_missing_files = []
while start <= end:
list_of_dates_to_check.append(start)
start += timedelta(minutes=5)
for time in list_of_dates_to_check:
file_time = (str(time).split()[1].replace(':','-'))
file_date = (str(time).split()[0] + '_' + file_time)
full_file_name = path + file_date + "." + data_name
list_of_files_to_check.append(full_file_name)
all_files = glob.glob(path + "/*." + data_name)
for file_to_check in list_of_files_to_check:
if file_to_check in all_files :
next
else:
# print (file_to_check)
# print ("File is missing")
list_of_missing_files.append(file_to_check)
print ("List of missing files")
print (list_of_missing_files)
print ("Number of missing files")
print (len(list_of_missing_files))
#!/usr/bin/python
import os
import requests
import hashlib
import argparse
def get_not_uploaded_files_list(path, path_with_dots):
r = requests.get(request_string_has_bucket_key + path_with_dots, auth=(username, password))
has_bucket_key_res = str(r.text)
md5sum_local_file = hashlib.md5(open(path, 'rb').read()).hexdigest() # get md5 sum of the file_to_be_uploaded
r = requests.get(request_string_md5sum + path_with_dots, auth=(username, password))
md5sum_uploaded_file = r.text # get md5 sum of the uploaded file
if has_bucket_key_res == 'True' :
if md5sum_local_file == md5sum_uploaded_file :
print (path)
print ("local file ", md5sum_local_file)
print ("uploaded file= ", md5sum_uploaded_file)
print ("UPLOADED")
next
else :
print (path)
print ("File is uploaded BUT md5sum is Different")
print ("local file ", md5sum_local_file)
print ("uploaded file= ", md5sum_uploaded_file)
uploaded_with_diff_md5sum.append(path)
else :
print (path)
print ("NOT UPLOADED")
files_not_uploaded.append(path)
return files_not_uploaded, uploaded_with_diff_md5sum
def check_files (files, *args, **kwargs):
start = kwargs.get('start', None)
end = kwargs.get('end', None)
if end != None and start != None :
for file in files[start:end] :
file_size = os.stat(file).st_size # check only files with the proper size
if file_size != 1792 or file.startswith(".") :
# if file.startswith(".") :
print (file)
print ("Not loaded because of size or hidden")
next
else :
path_after_background = file.split('background/')[1]
# path_after_background = file.split('events/')[1]
path_with_dots = path_after_background.replace('/', '.') # replace i.e. 2017/10/17280145.BMR -> 2017.10.17280145.BMR
print ("path_after_background", path_after_background)
print ("path_with_dots", path_with_dots)
files_not_uploaded, uploaded_with_diff_md5sum = get_not_uploaded_files_list(file, path_with_dots)
else :
for file in files :
file_size = os.stat(file).st_size # check only files with the proper size
if file_size != 1792 or file.startswith(".") :
# if file.startswith(".") :
print (file)
print ("Not uploaded because of size or hidden")
next
else :
path_after_background = file.split('background/')[1]
# path_after_background = file.split('events/')[1]
path_with_dots = path_after_background.replace('/', '.') # replace i.e. 2017/10/17280145.BMR -> 2017.10.17280145.BMR
files_not_uploaded, uploaded_with_diff_md5sum = get_not_uploaded_files_list(file, path_with_dots)
return sorted(files_not_uploaded), sorted(uploaded_with_diff_md5sum)
# start
# get the command line arguments
parser = argparse.ArgumentParser(description='Test if all files are uploaded')
parser.add_argument("-p", "--path", help = "Path of the files to be uploaded, e.g. /mic/syscomtestuser/syscom/SYSCOM02-12400555/background/", required = True, default = "")
parser.add_argument("-string_md5sum", "--request_string_md5sum", help = "Request string to get md5sum, e.g. https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/getMd5sum?key=SYSCOM02-12400555.background.", required = True, default = "")
parser.add_argument("-string_has_bucket_key", "--request_string_has_bucket_key", help = "Request string to get hasBucketKey value, e.g. https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/hasBucketKey?key=SYSCOM02-12400555.background.", required = True, default = "")
parser.add_argument("-user", "--username", help = "Username", required = True, default = "")
parser.add_argument("-pswd", "--password", help = "Password", required = True, default = "")
parser.add_argument("-c", "--chunk", help = "Check only chunk of files", required = False, action='store_true')
parser.add_argument("-s", "--start", help = "Start of the chunk. Used only when -c", required = False, default = "")
parser.add_argument("-e", "--end", help = "End of the chunk. Used only when -c and -s", required = False, default = "")
args = parser.parse_args()
chunk_start = ""
chunk_end = ""
if args.chunk :
print ("chunk is set")
if not args.start or not args.end :
print ("Start and/or End of the chunk is not given")
exit()
else :
chunk_start = args.start
chunk_end = args.end
dir_path = args.path
request_string_md5sum = args.request_string_md5sum
request_string_has_bucket_key = args.request_string_has_bucket_key
username = args.username
password = args.password
print ("Directory to be uploaded")
print (dir_path)
print ("Request string to get md5sum")
print (request_string_md5sum)
print ("Request string to get the value of hasBucketKey")
print (request_string_has_bucket_key)
print ("Username")
print (username)
print ("Password")
print (password)
# specify the directory of files to be uploaded and request strings
# data_stream_module/26/ : the very first one with 2016
#dir_path = "/mic/syscomtestuser/syscom/SYSCOM02-12400555/background/"
#request_string_md5sum = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/getMd5sum?key=SYSCOM02-12400555.background."
#request_string_has_bucket_key = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/hasBucketKey?key=SYSCOM02-12400555.background."
#username = "test"
#password = "Lty5Gg54gtzr"
# data_stream_module/42/ : the one i deleted
#dir_path = "/mic/L0444-001/syscom/syscom004-14360007/background/"
#request_string_md5sum = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/42/getMd5sum?key=syscom004-14360007.background."
#request_string_has_bucket_key = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/42/hasBucketKey?key=syscom004-14360007.background."
# my own test linked to /data_stream_module/28/
#dir_path = "/home/eteri/data/syscom004-14360007/background/" #my own test
#request_string_md5sum = 'https://softinst84835.host.vifib.net/erp5/data_stream_module/28/getMd5sum?key=SYSCOM013-17090003.background.'
#request_string_has_bucket_key = "https://softinst84835.host.vifib.net/erp5/data_stream_module/28/hasBucketKey?key=SYSCOM013-17090003.background."
#username = "zope"
#password = "dbguylpn"
# some initializations
files_not_uploaded = []
path_with_dots = ""
files_full_path = []
uploaded_with_diff_md5sum = []
# walk through directories, get the files and compare md5sums with uploaded ones
for root, dirs, files in os.walk(dir_path):
files[:] = [f for f in files if not f.startswith('.')]
for file in files:
path = os.path.join(root,file)
files_full_path.append(path)
files_full_path = sorted(files_full_path)
if args.chunk and len(files_full_path) != 0:
index_start = files_full_path.index(chunk_start)
index_end = files_full_path.index(chunk_end)
files_not_uploaded, uploaded_with_diff_md5sum = check_files (files_full_path, start = index_start, end = index_end)
else :
files_not_uploaded, uploaded_with_diff_md5sum = check_files (files_full_path)
print("number of not uploaded files : %s" %( len(files_not_uploaded)))
print("list of not uploaded files: %s" %( files_not_uploaded))
print("number of uploaded files with different md5sum : %s" %( len(uploaded_with_diff_md5sum)))
print("list of uploaded files with different md5sum: %s" %( uploaded_with_diff_md5sum))
#!/usr/bin/python
import os
import requests
import hashlib
import argparse
def get_not_uploaded_files_list(path, path_with_dots):
r = requests.get(request_string_has_bucket_key + path_with_dots, auth=(username, password))
has_bucket_key_res = str(r.text)
md5sum_local_file = hashlib.md5(open(path, 'rb').read()).hexdigest() # get md5 sum of the file_to_be_uploaded
r = requests.get(request_string_md5sum + path_with_dots, auth=(username, password))
md5sum_uploaded_file = r.text # get md5 sum of the uploaded file
if has_bucket_key_res == 'True' :
if md5sum_local_file == md5sum_uploaded_file :
print (path)
print ("local file ", md5sum_local_file)
print ("uploaded file= ", md5sum_uploaded_file)
print ("UPLOADED")
next
else :
print (path)
print ("File is uploaded BUT md5sum is Different")
print ("local file ", md5sum_local_file)
print ("uploaded file= ", md5sum_uploaded_file)
uploaded_with_diff_md5sum.append(path)
else :
print (path)
print ("NOT UPLOADED")
files_not_uploaded.append(path)
return files_not_uploaded, uploaded_with_diff_md5sum
def check_files (files, *args, **kwargs):
start = kwargs.get('start', None)
end = kwargs.get('end', None)
if end != None and start != None :
for file in files[start:end] :
file_size = os.stat(file).st_size # check only files with the proper size
# if file_size != 1792 or file.startswith(".") :
if file.startswith(".") :
print (file)
print ("Not loaded because is hidden")
next
else :
# path_after_background = file.split('background/')[1]
path_after_background = file.split('events/')[1]
path_with_dots = path_after_background.replace('/', '.') # replace i.e. 2017/10/17280145.BMR -> 2017.10.17280145.BMR
print ("path_after_background", path_after_background)
print ("path_with_dots", path_with_dots)
files_not_uploaded, uploaded_with_diff_md5sum = get_not_uploaded_files_list(file, path_with_dots)
else :
for file in files :
file_size = os.stat(file).st_size # check only files with the proper size
# if file_size != 1792 or file.startswith(".") :
if file.startswith(".") :
print (file)
print ("Not uploaded because is hidden")
next
else :
# path_after_background = file.split('background/')[1]
path_after_background = file.split('events/')[1]
path_with_dots = path_after_background.replace('/', '.') # replace i.e. 2017/10/17280145.BMR -> 2017.10.17280145.BMR
files_not_uploaded, uploaded_with_diff_md5sum = get_not_uploaded_files_list(file, path_with_dots)
return sorted(files_not_uploaded), sorted(uploaded_with_diff_md5sum)
# start
# get the command line arguments
parser = argparse.ArgumentParser(description='Test if all files are uploaded')
parser.add_argument("-p", "--path", help = "Path of the files to be uploaded, e.g. /mic/syscomtestuser/syscom/SYSCOM02-12400555/background/", required = True, default = "")
parser.add_argument("-string_md5sum", "--request_string_md5sum", help = "Request string to get md5sum, e.g. https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/getMd5sum?key=SYSCOM02-12400555.background.", required = True, default = "")
parser.add_argument("-string_has_bucket_key", "--request_string_has_bucket_key", help = "Request string to get hasBucketKey value, e.g. https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/hasBucketKey?key=SYSCOM02-12400555.background.", required = True, default = "")
parser.add_argument("-user", "--username", help = "Username", required = True, default = "")
parser.add_argument("-pswd", "--password", help = "Password", required = True, default = "")
parser.add_argument("-c", "--chunk", help = "Check only chunk of files", required = False, action='store_true')
parser.add_argument("-s", "--start", help = "Start of the chunk. Used only when -c", required = False, default = "")
parser.add_argument("-e", "--end", help = "End of the chunk. Used only when -c and -s", required = False, default = "")
args = parser.parse_args()
chunk_start = ""
chunk_end = ""
if args.chunk :
print ("chunk is set")
if not args.start or not args.end :
print ("Start and/or End of the chunk is not given")
exit()
else :
chunk_start = args.start
chunk_end = args.end
dir_path = args.path
request_string_md5sum = args.request_string_md5sum
request_string_has_bucket_key = args.request_string_has_bucket_key
username = args.username
password = args.password
print ("Directory to be uploaded")
print (dir_path)
print ("Request string to get md5sum")
print (request_string_md5sum)
print ("Request string to get the value of hasBucketKey")
print (request_string_has_bucket_key)
print ("Username")
print (username)
print ("Password")
print (password)
# specify the directory of files to be uploaded and request strings
# data_stream_module/26/ : the very first one with 2016
#dir_path = "/mic/syscomtestuser/syscom/SYSCOM02-12400555/background/"
#request_string_md5sum = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/getMd5sum?key=SYSCOM02-12400555.background."
#request_string_has_bucket_key = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/26/hasBucketKey?key=SYSCOM02-12400555.background."
#username = "test"
#password = "Lty5Gg54gtzr"
# data_stream_module/42/ : the one i deleted
#dir_path = "/mic/L0444-001/syscom/syscom004-14360007/background/"
#request_string_md5sum = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/42/getMd5sum?key=syscom004-14360007.background."
#request_string_has_bucket_key = "https://monitoring.woelfel.de/neo-erp5/data_stream_module/42/hasBucketKey?key=syscom004-14360007.background."
# my own test linked to /data_stream_module/28/
#dir_path = "/home/eteri/data/syscom004-14360007/background/" #my own test
#request_string_md5sum = 'https://softinst84835.host.vifib.net/erp5/data_stream_module/28/getMd5sum?key=SYSCOM013-17090003.background.'
#request_string_has_bucket_key = "https://softinst84835.host.vifib.net/erp5/data_stream_module/28/hasBucketKey?key=SYSCOM013-17090003.background."
#username = "zope"
#password = "dbguylpn"
# some initializations
files_not_uploaded = []
path_with_dots = ""
files_full_path = []
uploaded_with_diff_md5sum = []
# walk through directories, get the files and compare md5sums with uploaded ones
for root, dirs, files in os.walk(dir_path):
files[:] = [f for f in files if not f.startswith('.')]
for file in files:
path = os.path.join(root,file)
files_full_path.append(path)
files_full_path = sorted(files_full_path)
if args.chunk and len(files_full_path) != 0:
index_start = files_full_path.index(chunk_start)
index_end = files_full_path.index(chunk_end)
files_not_uploaded, uploaded_with_diff_md5sum = check_files (files_full_path, start = index_start, end = index_end)
else :
files_not_uploaded, uploaded_with_diff_md5sum = check_files (files_full_path)
print("number of not uploaded files : %s" %( len(files_not_uploaded)))
print("list of not uploaded files: %s" %( files_not_uploaded))
print("number of uploaded files with different md5sum : %s" %( len(uploaded_with_diff_md5sum)))
print("list of uploaded files with different md5sum: %s" %( uploaded_with_diff_md5sum))
package org.embulk.input.filename; package org.embulk.input.filename;
import java.util.Arrays;
import java.util.Comparator;
import java.nio.file.FileSystems;
import java.nio.file.attribute.FileTime;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
...@@ -68,13 +72,13 @@ class FilenameFileInputStream extends FileInputStream { ...@@ -68,13 +72,13 @@ class FilenameFileInputStream extends FileInputStream {
@Override @Override
public int read(byte[] b) throws IOException { public int read(byte[] b) throws IOException {
return read(b, 0, b.length); return read(b, 0, b.length);
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
if (n < MAX_NAME_LENGTH) { if (n < MAX_NAME_LENGTH) {
int i = 0; int i = 0;
int c; int c;
for (; i < len; i++) { for (; i < len; i++) {
c = read(); c = read();
if (c == -1) { if (c == -1) {
...@@ -89,7 +93,7 @@ class FilenameFileInputStream extends FileInputStream { ...@@ -89,7 +93,7 @@ class FilenameFileInputStream extends FileInputStream {
} else { } else {
return super.read(b, off, len); return super.read(b, off, len);
} }
} }
} }
public class FilenameFileInputPlugin implements FileInputPlugin public class FilenameFileInputPlugin implements FileInputPlugin
...@@ -115,15 +119,23 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -115,15 +119,23 @@ public class FilenameFileInputPlugin implements FileInputPlugin
@Config("max_file_count") @Config("max_file_count")
@ConfigDefault("null") @ConfigDefault("null")
Optional<Integer> getMaxFileCount(); Optional<Integer> getMaxFileCount();
@Config("follow_symlinks") @Config("follow_symlinks")
@ConfigDefault("false") @ConfigDefault("false")
boolean getFollowSymlinks(); boolean getFollowSymlinks();
@Config("ignore_last_file") @Config("use_last_modified")
@ConfigDefault("false") @ConfigDefault("true")
boolean getIgnoreLastFile(); boolean getUseLastModified();
@Config("last_modified")
@ConfigDefault("null")
Optional<String> getLastModified();
// @Config("files_same_mod_time")
// @ConfigDefault("")
// Optional<List> getListFilesSameModTime();
List<String> getFiles(); List<String> getFiles();
void setFiles(List<String> files); void setFiles(List<String> files);
...@@ -143,22 +155,65 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -143,22 +155,65 @@ public class FilenameFileInputPlugin implements FileInputPlugin
// list files recursively // list files recursively
List<String> files = new ArrayList<String>(listFiles(task)); List<String> files = new ArrayList<String>(listFiles(task));
// only process <= max_file_count number of files
final Integer maxFileCount = task.getMaxFileCount().orNull();
if(task.getUseLastModified()) {
File [] testFiles = new File[files.size()] ;
List<File> dateFileList = new ArrayList<>();
for(int i = 0; i< files.size(); i++){
dateFileList.add(new File(files.get(i)));
testFiles[i] = new File(files.get(i));
}
/** Collections.sort(dateFileList, new Comparator<File>() {
public int compare(File f1, File f2) {
return (f1.lastModified() <= f2.lastModified()) ? -1 : 1;
}
});
**/
Arrays.sort( testFiles, new Comparator()
{
public int compare(Object o1, Object o2) {
if (((File)o1).lastModified() > ((File)o2).lastModified()) {
return -1;
} else if (((File)o1).lastModified() < ((File)o2).lastModified()) {
return +1;
} else {
return 0;
}
}
});
List<String> dateSortedFiles = new ArrayList<String>();
for(File f: dateFileList) {
dateSortedFiles.add(f.toString());
}
task.setFiles(dateSortedFiles);
// number of processors is same with number of files
int taskCount = task.getFiles().size();
return resume(task.dump(), taskCount, control);
}
else {
Collections.sort(files); Collections.sort(files);
if (maxFileCount != null && files.size() > maxFileCount) {
// only process <= max_file_count number of files files = files.subList(0, maxFileCount);
final Integer maxFileCount = task.getMaxFileCount().orNull();
if (maxFileCount != null && files.size() > maxFileCount) {
files = files.subList(0, maxFileCount);
} }
log.info("Loading files {}", files);
log.info("Loading files {}", files); task.setFiles(files);
task.setFiles(files);
// number of processors is same with number of files // number of processors is same with number of files
int taskCount = task.getFiles().size(); int taskCount = task.getFiles().size();
return resume(task.dump(), taskCount, control); return resume(task.dump(), taskCount, control);
} }
}
@Override @Override
public ConfigDiff resume(TaskSource taskSource, public ConfigDiff resume(TaskSource taskSource,
int taskCount, int taskCount,
...@@ -177,14 +232,51 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -177,14 +232,51 @@ public class FilenameFileInputPlugin implements FileInputPlugin
if (task.getLastPath().isPresent()) { if (task.getLastPath().isPresent()) {
configDiff.set("last_path", task.getLastPath().get()); configDiff.set("last_path", task.getLastPath().get());
} }
if(task.getUseLastModified()){
log.info("File list is empty:try to get last modified time");
if (task.getLastModified().isPresent()) {
log.info("GetLASTMODIFIED is present");
configDiff.set("last_modified", task.getLastModified().get());
log.info("check it {}", task.getLastModified().get());
}
}
} else { } else {
List<String> files = new ArrayList<String>(task.getFiles()); List<String> files = new ArrayList<String>(task.getFiles());
Collections.sort(files); Collections.sort(files);
configDiff.set("last_path", files.get(files.size() - 1)); configDiff.set("last_path", files.get(files.size() - 1));
} log.info("REAL LAST PATH {}", files.get(files.size() - 1));
if(task.getUseLastModified()) {
long last = Long.MIN_VALUE;
long lastFileTime = Long.MIN_VALUE;
int index = 0;
List sameTime = new ArrayList();
for (int i = 0; i < files.size(); i++) {
File f = new File(files.get(i));
if (f.lastModified() >= last){
// if(f.lastModified() == last){
// sameTime.add(f);
// }
last = f.lastModified();
index = i;
}
}
File f = new File(files.get(index));
configDiff.set("last_modified", f.lastModified());
log.info(" FINAL f.lastModified() {}", f.lastModified());
log.info("FINAL file for that time {}", f);
}
}
return configDiff; return configDiff;
} }
@Override @Override
public void cleanup(TaskSource taskSource, public void cleanup(TaskSource taskSource,
...@@ -197,65 +289,142 @@ public class FilenameFileInputPlugin implements FileInputPlugin ...@@ -197,65 +289,142 @@ public class FilenameFileInputPlugin implements FileInputPlugin
Path pathPrefix = Paths.get(task.getPathPrefix()).normalize(); Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
final Path directory; final Path directory;
final String fileNamePrefix; final String fileNamePrefix;
if (Files.isDirectory(pathPrefix)) { if (Files.isDirectory(pathPrefix)) {
directory = pathPrefix; directory = pathPrefix;
fileNamePrefix = ""; fileNamePrefix = "";
log.info("PRINT in IF :(Files.isDirectory(pathPrefix) directory= {}", directory);
} else { } else {
log.info("PRINT in ELSE:");
fileNamePrefix = pathPrefix.getFileName().toString(); fileNamePrefix = pathPrefix.getFileName().toString();
Path d = pathPrefix.getParent(); Path d = pathPrefix.getParent();
directory = (d == null ? CURRENT_DIR : d); directory = (d == null ? CURRENT_DIR : d);
// log.info("PRINT from ELSE fileNamePrefix {}", fileNamePrefix);
// log.info("PRINT from ELSE Path d {}", d);
// log.info("PRINT from ELSE directory {}", directory);
} }
final ImmutableList.Builder<String> builder = ImmutableList.builder(); final ImmutableList.Builder<String> builder = ImmutableList.builder();
final String lastPath = task.getLastPath().orNull(); final String lastPath = task.getLastPath().orNull();
final Integer fileSize = task.getFileSize().orNull(); final Integer fileSize = task.getFileSize().orNull();
final String fileNameSuffix = task.getFileNameSuffix().orNull(); final String fileNameSuffix = task.getFileNameSuffix().orNull();
// final List listOfUploadedFiles = task.getListFilesSameModTime().orNull();
log.info("PRINT lastPath {}", lastPath);
final String lastModified = task.getLastModified().orNull();
final boolean useLastModified = task.getUseLastModified();
try { try {
log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix); log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override @Override
public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs)
{ {
if (path.equals(directory)) {
return FileVisitResult.CONTINUE; if(useLastModified) {
} else if (lastPath != null && path.toString().compareTo(lastPath.substring(0, path.toString().length())) < 0) { log.info("PRINT lastModified {}", lastModified);
return FileVisitResult.SKIP_SUBTREE; FileTime fileTime;
} else if (path.getFileName().toString().startsWith(".")) { long fileModTime = Long.MIN_VALUE;
return FileVisitResult.SKIP_SUBTREE; try {
} else { fileTime = Files.getLastModifiedTime(path);
if (path.getFileName().toString().startsWith(fileNamePrefix)) { fileModTime = fileTime.toMillis();
return FileVisitResult.CONTINUE; log.info("IN TRY: fileModTime = {}", fileModTime);
} else { } catch (IOException e) {
return FileVisitResult.SKIP_SUBTREE; log.info("Cannot get the last modified time - {} ", e);
}
if (path.equals(directory) || Files.isDirectory(path)) {
return FileVisitResult.CONTINUE;
} else if (lastModified != null && Long.parseLong(lastModified) > fileModTime) { //if file is older then lastModified
return FileVisitResult.SKIP_SUBTREE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.SKIP_SUBTREE;
} else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) {
return FileVisitResult.CONTINUE;
} else {
return FileVisitResult.SKIP_SUBTREE;
}
}
}
else {
if (path.equals(directory)) {
return FileVisitResult.CONTINUE;
} else if (lastPath != null && path.toString().compareTo(lastPath.substring(0, path.toString().length())) < 0) {
return FileVisitResult.SKIP_SUBTREE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.SKIP_SUBTREE;
} else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) {
return FileVisitResult.CONTINUE;
} else {
return FileVisitResult.SKIP_SUBTREE;
}
}
} //end_else lastPath
}
@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs)
{
if(useLastModified) {
FileTime fileTime;
long fileModTime = Long.MIN_VALUE;
try {
fileTime = Files.getLastModifiedTime(path);
fileModTime = fileTime.toMillis();
//log.info("from visit fileModTime {}", fileModTime);
} catch (IOException e) {
log.info("Cannot get the last modified time - {} ", e);
} }
}
}
@Override if (lastModified != null && Long.parseLong(lastModified) > fileModTime) {
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) return FileVisitResult.CONTINUE;
{ } else if (lastModified != null && Long.parseLong(lastModified) == fileModTime && lastPath != null && path.toString().compareTo(lastPath) <= 0 ) {
if (lastPath != null && path.toString().compareTo(lastPath) <= 0) { return FileVisitResult.CONTINUE;
return FileVisitResult.CONTINUE; }
} else if (path.getFileName().toString().startsWith(".")) { else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
} else { } else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) { if (path.getFileName().toString().startsWith(fileNamePrefix)) {
if (fileNameSuffix == null || path.getFileName().toString().endsWith(fileNameSuffix)) { if (fileNameSuffix == null || path.getFileName().toString().endsWith(fileNameSuffix)) {
if (fileSize == null || path.toFile().length() == fileSize) { if (fileSize == null || path.toFile().length() == fileSize) {
builder.add(path.toString()); builder.add(path.toString());
} }
}
}
return FileVisitResult.CONTINUE;
}
}
else{
if (lastPath != null && path.toString().compareTo(lastPath) <= 0) {
return FileVisitResult.CONTINUE;
} else if (path.getFileName().toString().startsWith(".")) {
return FileVisitResult.CONTINUE;
} else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) {
if (fileNameSuffix == null || path.getFileName().toString().endsWith(fileNameSuffix)) {
if (fileSize == null || path.toFile().length() == fileSize) {
builder.add(path.toString());
}
} }
} }
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
} }
} }
}
}); });
} catch (IOException ex) { } catch (IOException ex) {
throw new RuntimeException(String.format("Failed get a list of local files at '%s'", directory), ex); throw new RuntimeException(String.format("Failed get a list of local files at '%s'", directory), ex);
} }
return builder.build();
return builder.build();
} }
@Override @Override
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment