ebulk: features to handle modifications an deletions

parent 2912f542
*~
ebulk-data/config/*config.yml
...@@ -221,10 +221,10 @@ function checkCurl { ...@@ -221,10 +221,10 @@ function checkCurl {
function checkSoftware { function checkSoftware {
# CHECK JAVA VERSION # CHECK JAVA VERSION
if type -p java >/dev/null; then if [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]]; then
_java=java
elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]]; then
_java="$JAVA_HOME/bin/java" _java="$JAVA_HOME/bin/java"
elif type -p java >/dev/null; then
_java=java
else else
javaNotInstalled >&2; return 1 javaNotInstalled >&2; return 1
fi fi
......
require_relative 'filelogger'
require 'digest/md5'
# class that handles dataset tasks report
class DatasetUtils
DATASET_REPORT_FILE = ".dataset-task-report"
DATASET_COMPLETED_FILE = ".dataset-completed"
RESUME_OPERATION_FILE = ".resume-operation"
RUN_DONE = "done"
RUN_ERROR = "error"
RUN_ABORTED = "aborted"
DELETE = "DELETE"
INGESTION = "ingestion"
MEGA = 1000000
def initialize(data_set_directory)
@data_set_directory = data_set_directory
@logger = LogManager.instance()
@task_report_file = @data_set_directory + DATASET_REPORT_FILE
@completed_file = @data_set_directory + DATASET_COMPLETED_FILE
@resume_operation_file = @data_set_directory + RESUME_OPERATION_FILE
end
def getLocalFiles(remove=nil)
local_files = {}
begin
File.readlines(@task_report_file).each do |line|
record = line.split(";")
if record[1].chomp == RUN_DONE
if (remove.nil?) || (remove != record[0])
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, "status" => record[1].chomp, "modification_date" => record[4].chomp }
end
end
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getLocalFiles':" + e.to_s)
@logger.error(e.backtrace)
end
return local_files
end
def saveReport(local_files)
begin
File.delete(@task_report_file) if File.exist?(@task_report_file)
if local_files.empty?
File.open(@task_report_file, 'w') {}
else
local_files.each do |key, array|
File.open(@task_report_file, 'ab') { |file| file.puts(key+";"+array["status"]+";"+array["size"].to_s+";"+array["hash"]+";"+array["modification_date"]) }
end
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'saveReport':" + e.to_s)
@logger.error(e.backtrace)
end
end
def removeCurrentOperation()
if File.exist?(@resume_operation_file)
File.delete(@resume_operation_file)
end
end
def saveCurrentOperation(operation, reference)
if File.exist?(@resume_operation_file)
File.delete(@resume_operation_file)
end
File.open(@resume_operation_file, 'w') { |file| file.puts(operation+";"+reference) }
end
def reportUpToDate(data_stream_dict)
begin
if not reportFileExist() and not completedFileExist()
# directory never downloaded -new or used for partial ingestions-
return TRUE
end
if reportFileExist() and not completedFileExist()
# download not finished
return FALSE
end
if data_stream_dict["status_code"] == 2
return FALSE
end
if data_stream_dict["status_code"] != 0
return TRUE
end
changes = getRemoteChangedDataStreams(data_stream_dict["result"])
if changes.empty?
return TRUE
elsif changes.length == 1
# check if the unique detected change corresponds to an interrumped ingestion
if File.exist?(@resume_operation_file)
operation=File.open(@resume_operation_file).read.chomp.split(";")
if operation[0] == INGESTION
if operation[1] == changes[0]["reference"]
File.delete(@resume_operation_file)
return TRUE
end
end
end
end
return FALSE
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'reportUpToDate':" + e.to_s)
@logger.error(e.backtrace)
return FALSE
end
end
def deleteCompletedFile()
File.delete(@completed_file) if File.exist?(@completed_file)
end
def createCompletedFile()
File.open(@completed_file, 'w') {}
end
def completedFileExist()
return File.exist?(@completed_file)
end
def createReportFile()
File.open(@task_report_file, 'w') {}
end
def reportFileExist()
return File.exist?(@task_report_file)
end
def addToReport(reference, status, size, hash, data_set)
local_files = {}
begin
data_set = data_set.end_with?("/") ? data_set : data_set + "/"
file_path = @data_set_directory + reference.reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
file_path = file_path[0...-5] if file_path.end_with?(".none")
modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S") : "not-modification-date"
if not reportFileExist()
File.open(@task_report_file, 'w') {}
end
new_file = TRUE
File.readlines(@task_report_file).each do |line|
record = line.split(";")
if reference.to_s == record[0].to_s
local_files[reference] = {"size" => size, "hash" => hash, "status" => status, "modification_date" => modification_date }
new_file = FALSE
else
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, "status" => record[1].chomp, "modification_date" => record[4].chomp }
end
end
if new_file
local_files[reference] = {"size" => size, "hash" => hash, "status" => status, "modification_date" => modification_date }
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'addToReport':" + e.to_s)
@logger.error(e.backtrace)
end
saveReport(local_files)
end
def deleteFromReport(reference, status)
local_files = getLocalFiles(remove=reference)
saveReport(local_files)
end
def getHash(file)
begin
chunk_size = 4 * MEGA
md5 = Digest::MD5.new
open(file) do |f|
while chunk=f.read(chunk_size)
md5.update(chunk)
end
end
return md5.hexdigest
rescue Exception => e
@logger.error("An error occurred while getting hash of file " + file + ":" + e.to_s, print=TRUE)
@logger.error(e.backtrace)
raise e
end
end
def getLocalChanges(files, data_set)
new_files = []
begin
if reportFileExist()
File.readlines(@task_report_file).each do |line|
record = line.split(";")
if record[1].chomp == RUN_DONE
data_set = data_set.end_with?("/") ? data_set : data_set + "/"
file_path = @data_set_directory + record[0].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
file_path = file_path[0...-5] if file_path.end_with?(".none")
if files.include? file_path
modification_date = File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S")
if modification_date != record[4].chomp
size = File.size(file_path).to_s
hash = getHash(file_path).to_s
if size == record[2].to_s
if hash != record[3].chomp
new_files.push({"path" => file_path, "size" => size, "hash" => hash })
end
else
new_files.push({"path" => file_path, "size" => size, "hash" => hash })
end
end
files.delete(file_path)
else
new_files.push({"path" => file_path, "size" => "", "hash" => DELETE })
end
end
end
end
files.each do |path|
new_files.push({"path" => path, "size" => "", "hash" => "" })
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getLocalChanges':" + e.to_s)
@logger.error(e.backtrace)
end
return new_files
end
def getRemoteChangedDataStreams(data_streams)
pending_data_streams = []
begin
if reportFileExist()
local_files = {}
remote_files = []
File.readlines(@task_report_file).each do |line|
record = line.split(";")
if record[1].chomp == RUN_DONE
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, }
end
end
data_streams.each do |data_stream|
remote_files.push(data_stream["reference"])
pending = TRUE
reference = data_stream["reference"]
if local_files.has_key? reference
size = local_files[reference]["size"]
if size.to_s == data_stream["size"].to_s
hash = local_files[reference]["hash"]
if hash == data_stream["hash"] or data_stream["hash"] == ""
pending = FALSE
end
end
end
if pending
local_files.delete(reference)
pending_data_streams.push(data_stream)
end
end
local_files.each do |key, array|
if not remote_files.include? key
pending_data_streams.push({"reference" => key, "hash" => DELETE })
end
end
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getRemoteChangedDataStreams':" + e.to_s)
@logger.error(e.backtrace)
end
return pending_data_streams
end
end
...@@ -38,12 +38,14 @@ class LogManager ...@@ -38,12 +38,14 @@ class LogManager
log(message, print, type=ERROR) log(message, print, type=ERROR)
end end
def abortExecution() def abortExecution(error=TRUE)
puts
info("PROCESS ABORTED") info("PROCESS ABORTED")
if error
puts
unless @path.nil? unless @path.nil?
puts "PROCESS ABORTED : For more detailed information, please refer to the log file '#{@path}'" puts "PROCESS ABORTED : For more detailed information, please refer to the log file '#{@path}'"
end end
end
exec("Process.kill 9, Process.pid >/dev/null 2>&1") exec("Process.kill 9, Process.pid >/dev/null 2>&1")
end end
......
require_relative '../wendelin_client' require_relative '../wendelin_client'
require_relative '../dataset_utils'
require_relative '../filelogger' require_relative '../filelogger'
require 'digest/md5'
module Embulk module Embulk
module Input module Input
class Fifinput < InputPlugin class Fifinput < InputPlugin
TASK_REPORT_FILE = ".ingestion-task-report"
COMPLETED_FILE = ".ingestion-completed"
RUN_DONE = "done"
RUN_ERROR = "error"
RUN_ABORTED = "aborted"
Plugin.register_input("fif", self) Plugin.register_input("fif", self)
def self.getTaskReportFilename(data_set_directory)
return data_set_directory + TASK_REPORT_FILE
end
def self.getCompletedFilename(data_set_directory)
return data_set_directory + COMPLETED_FILE
end
EOF = "EOF" EOF = "EOF"
CHUNK_SIZE = 50000000 #50mb CHUNK_SIZE = 50000000 #50mb
MEGA = 1000000 MEGA = 1000000
...@@ -32,53 +19,11 @@ module Embulk ...@@ -32,53 +19,11 @@ module Embulk
{"name"=>"file", "type"=>"string"}, {"name"=>"file", "type"=>"string"},
{"name"=>"extension", "type"=>"string"}, {"name"=>"extension", "type"=>"string"},
{"name"=>"data_chunk", "type"=>"string"}, {"name"=>"data_chunk", "type"=>"string"},
{"name"=>"eof", "type"=>"string"} {"name"=>"eof", "type"=>"string"},
{"name"=>"size", "type"=>"string"},
{"name"=>"hash", "type"=>"string"}
] ]
def self.filterDoneTasks(files, task_report_file, data_set, root_path)
data_set = data_set.end_with?("/") ? data_set : data_set + "/"
ingested_files = []
File.readlines(task_report_file).each do |line|
record = line.split(";")
if record[1].chomp == RUN_DONE
ingested = record[0].sub(data_set, root_path)
ingested_files.push(ingested)
end
end
pending_files = []
files.each do |file|
if ! ingested_files.include? file
pending_files.push(file)
end
end
return pending_files
end
def self.filterExistingFiles(files, data_streams, data_set, root_path)
data_set = data_set.end_with?("/") ? data_set : data_set + "/"
existing_files = []
data_streams["result"].each do |data_stream|
file_path = root_path + data_stream[1].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
if file_path.end_with?(".none")
file_path = file_path[0...-5]
end
existing_files.push(file_path)
end
filtered_files = files - existing_files
ignored_files = files - filtered_files
if not ignored_files.empty?
puts
@logger.info("There are files in the local directory that already exist in data set and will be ignored for ingestion.", print=TRUE)
puts
end
if filtered_files.empty?
@logger.info("The dataset directory '#{root_path}' does not contain any new file for the data set '#{data_set}'.", print=TRUE)
@logger.info("Please make sure your dataset directory contains new files for ingestion.", print=TRUE)
@logger.abortExecution()
end
return filtered_files
end
def self.transaction(config, &control) def self.transaction(config, &control)
begin begin
tool_dir = config.param('tool_dir', :string) tool_dir = config.param('tool_dir', :string)
...@@ -93,18 +38,29 @@ module Embulk ...@@ -93,18 +38,29 @@ module Embulk
end end
paths = config.param('path_prefix', :array) paths = config.param('path_prefix', :array)
paths[0] = paths[0].end_with?("/") ? paths[0] : paths[0] + "/" paths[0] = paths[0].end_with?("/") ? paths[0] : paths[0] + "/"
@data_set_directory = paths[0]
task['inputs'] = paths task['inputs'] = paths
@logger.info("Getting local files for ingestion...", print=TRUE)
task['paths'] = paths.map {|path| task['paths'] = paths.map {|path|
next [] unless Dir.exist?(path) next [] unless Dir.exist?(path)
Dir[(path + '/**/*').gsub! '//', '/'] Dir[(path + '/**/*').gsub! '//', '/']
}.flatten.select{ |file| File.file?(file) } }.flatten.select{ |file| File.file?(file) }
@wendelin = WendelinClient.new(config.param('erp5_url', :string), config.param('user', :string), config.param('password', :string)) @wendelin = WendelinClient.new(config.param('erp5_url', :string), config.param('user', :string), config.param('password', :string))
data_stream_list = @wendelin.getDataStreams(task['data_set']) @logger.info("Checking remote dataset...", print=TRUE)
if data_stream_list["status_code"] == 0 data_stream_dict = @wendelin.getDataStreams(task['data_set'])
task['data_streams'] = data_stream_list["result"] @dataset_utils = DatasetUtils.new(@data_set_directory)
else if @dataset_utils.reportFileExist()
@logger.error(data_stream_list["error_message"], print=TRUE) @logger.info("Checking local dataset...", print=TRUE)
if not @dataset_utils.reportUpToDate(data_stream_dict)
puts
@logger.error("Your current dataset is outdated. Please, run a download to update it before ingest your changes.", print=TRUE)
puts
@logger.abortExecution(error=FALSE)
end
end
if data_stream_dict["status_code"] != 0
@logger.error(data_stream_dict["error_message"], print=TRUE)
@logger.abortExecution() @logger.abortExecution()
end end
...@@ -116,32 +72,20 @@ module Embulk ...@@ -116,32 +72,20 @@ module Embulk
@logger.info("Please try manual ingestion or update manually the ingestion configuration file.", print=TRUE) @logger.info("Please try manual ingestion or update manually the ingestion configuration file.", print=TRUE)
@logger.abortExecution() @logger.abortExecution()
end end
if task['paths'].empty? if task['paths'].empty? and not @dataset_utils.reportFileExist()
@logger.error("The dataset directory '#{task['inputs'][0]}' is empty.", print=TRUE) @logger.error("The dataset directory '#{task['inputs'][0]}' is empty.", print=TRUE)
@logger.error("Could not find any valid file.", print=TRUE) @logger.error("Could not find any valid file.", print=TRUE)
@logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE) @logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE)
@logger.abortExecution() @logger.abortExecution()
end end
@data_set_directory = paths[0] task['paths'] = @dataset_utils.getLocalChanges(task['paths'], task['data_set'])
task_report_file = getTaskReportFilename(@data_set_directory) if task['paths'].empty?
completed_file = getCompletedFilename(@data_set_directory)
delete_task_report = FALSE
if File.file?(task_report_file)
task['paths'] = filterDoneTasks(task['paths'], task_report_file, task['data_set'], @data_set_directory)
if File.file?(completed_file)
delete_task_report = TRUE
else
puts puts
@logger.info("There was a previous attempt to ingest this dataset but it did not finish successfully.", print=TRUE) @logger.info("No changes in '#{@data_set_directory}'. Everything up-to-date.", print=TRUE)
@logger.info("Resuming ingestion...", print=TRUE) @logger.abortExecution(error=FALSE)
end end
else @logger.info("#{task['paths'].length} change(s) detected for ingestion: ", print=TRUE)
File.open(task_report_file, 'w') {}
end
task['paths'] = filterExistingFiles(task['paths'], data_stream_list, task['data_set'], @data_set_directory)
@logger.info("#{task['paths'].length} new file(s) detected for ingestion: ", print=TRUE)
if task['paths'].length > 15 if task['paths'].length > 15
@logger.info(task['paths'][0, 5], print=TRUE) @logger.info(task['paths'][0, 5], print=TRUE)
@logger.info(".....", print=TRUE) @logger.info(".....", print=TRUE)
...@@ -158,19 +102,15 @@ module Embulk ...@@ -158,19 +102,15 @@ module Embulk
@logger.abortExecution() @logger.abortExecution()
end end
if delete_task_report
File.delete(task_report_file) if File.exist?(task_report_file)
File.open(task_report_file, 'w') {}
end
File.delete(completed_file) if File.exist?(completed_file)
columns = [ columns = [
Column.new(0, "supplier", :string), Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string), Column.new(1, "data_set", :string),
Column.new(2, "file", :string), Column.new(2, "file", :string),
Column.new(3, "extension", :string), Column.new(3, "extension", :string),
Column.new(4, "data_chunk", :string), Column.new(4, "data_chunk", :string),
Column.new(5, "eof", :string) Column.new(5, "eof", :string),
Column.new(6, "size", :string),
Column.new(7, "hash", :string)
] ]
commit_reports = yield(task, columns, task['paths'].length) commit_reports = yield(task, columns, task['paths'].length)
...@@ -200,13 +140,8 @@ module Embulk ...@@ -200,13 +140,8 @@ module Embulk
end end
next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact
@logger.info("#{next_config_diff.length} file(s) ingested.", print=TRUE) @logger.info("#{next_config_diff.length} file(s) ingested.", print=TRUE)
#if next_config_diff.length > 0
# @logger.info("FIF INPUT - Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
#end
if(next_config_diff.length == count) if(next_config_diff.length == count)
@logger.info("Dataset successfully ingested.", print=TRUE) @logger.info("Dataset successfully ingested.", print=TRUE)
completed_file = getCompletedFilename(@data_set_directory)
File.open(completed_file, 'w') {}
else else
next_config_diff = task_reports.map{|hash| hash["error"]}.flatten.compact next_config_diff = task_reports.map{|hash| hash["error"]}.flatten.compact
puts puts
...@@ -233,47 +168,78 @@ module Embulk ...@@ -233,47 +168,78 @@ module Embulk
@input_dirs = task['inputs'] @input_dirs = task['inputs']
@data_set_directory = task['inputs'][0] @data_set_directory = task['inputs'][0]
@logger = LogManager.instance() @logger = LogManager.instance()
@dataset_utils = DatasetUtils.new(@data_set_directory)
end end
def run def run
path = task['paths'][@index] file_dict = task['paths'][@index]
@logger.info("Processing file #{path.to_s}", print=TRUE) @logger.info("Processing file #{file_dict["path"].to_s}", print=TRUE)
begin begin
each_chunk(path, schema[1..-1].map{|elm| elm.name}, @chunk_size) do |entry| path = file_dict["path"]
size = file_dict["size"]
hash = file_dict["hash"]
delete = hash == DatasetUtils::DELETE
if size == "" and hash == "" #new file
size = File.size(path)
hash = @dataset_utils.getHash(path)
end
extension = File.extname path
if path.start_with?(@input_dirs[0])
filename = path.sub(@input_dirs[0], "")
filename = filename.reverse.sub(extension.reverse, "").reverse
end
extension.gsub! '.', ''
extension = extension == "" ? "none" : extension
dataset_file = file_dict["path"].sub(@data_set_directory, @dataset.end_with?("/") ? @dataset : @dataset + "/")
if extension != "none"
old_pattern = filename + '.' + extension
new_pattern = filename + '/' + extension
dataset_file = dataset_file.reverse.sub(old_pattern.reverse, new_pattern.reverse).reverse
else
dataset_file += "/" + extension
end
@dataset_utils.saveCurrentOperation(DatasetUtils::INGESTION, dataset_file)
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete) do |entry|
@page_builder.add(entry) @page_builder.add(entry)
end end
@page_builder.finish @page_builder.finish
rescue java.lang.OutOfMemoryError rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path) @logger.logOutOfMemoryError(file_dict["path"])
return_value = RUN_ABORTED return_value = DatasetUtils::RUN_ABORTED
rescue Exception => e rescue Exception => e
@logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE) @logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE)
@logger.error(e.backtrace) @logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath() puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
return_value = RUN_ERROR return_value = DatasetUtils::RUN_ERROR
else else
return_value = RUN_DONE return_value = DatasetUtils::RUN_DONE
end end
task_report_file = Fifinput.getTaskReportFilename(@data_set_directory)
dataset_file = path.sub(@data_set_directory, @dataset.end_with?("/") ? @dataset : @dataset + "/")
File.open(task_report_file, 'ab') { |file| file.puts(dataset_file+";"+return_value+";") }
return {return_value => path}
end
private
def each_chunk(path, fields, chunk_size=CHUNK_SIZE)
extension = File.extname path
if path.start_with?(@input_dirs[0]) # update reports if operation successfully ended
filename = path.sub(@input_dirs[0], "") if return_value == DatasetUtils::RUN_DONE
filename = filename.reverse.sub(extension.reverse, "").reverse if delete
if @dataset_utils.reportFileExist()
@dataset_utils.deleteFromReport(dataset_file, return_value)
end
else
if @dataset_utils.reportFileExist()
@dataset_utils.addToReport(dataset_file, return_value, size, hash, task['data_set'])
end
end
end end
extension.gsub! '.', '' @dataset_utils.removeCurrentOperation()
if extension == "" return {return_value => file_dict["path"]}
extension = "none"
end end
private
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=CHUNK_SIZE, delete=FALSE)
if delete
values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", ""]
yield(values)
else
file_object = File.open(path, "rb") file_object = File.open(path, "rb")
npart = 0 npart = 0
next_byte = file_object.read(1) next_byte = file_object.read(1)
...@@ -281,26 +247,30 @@ module Embulk ...@@ -281,26 +247,30 @@ module Embulk
while true while true
data = next_byte data = next_byte
if not next_byte if not next_byte
if first if first # this means this is an empty file
values = [@supplier, @dataset, filename, extension, "", EOF] values = [@supplier, @dataset, filename, extension, "", "", size, hash]
yield(values) yield(values)
end end
break break
end end
first = FALSE
data += file_object.read(chunk_size) data += file_object.read(chunk_size)
next_byte = file_object.read(1) next_byte = file_object.read(1)
if not next_byte if not next_byte
eof = EOF eof = EOF
if first # this means that the whole file will be ingested at once (not split)
eof = ""
end
else else
npart += 1 npart += 1
eof = npart.to_s.rjust(3, "0") eof = npart.to_s.rjust(3, "0")
end end
content = Base64.encode64(data) content = Base64.encode64(data)
values = [@supplier, @dataset, filename, extension, content, eof] values = [@supplier, @dataset, filename, extension, content, eof, size, hash]
first = FALSE
yield(values) yield(values)
end end
end end
end end
end end
end
end end
require_relative '../wendelin_client' require_relative '../wendelin_client'
require_relative '../dataset_utils'
require 'fileutils' require 'fileutils'
module Embulk module Embulk
...@@ -14,46 +15,53 @@ module Embulk ...@@ -14,46 +15,53 @@ module Embulk
DOWNLOAD = "D" DOWNLOAD = "D"
ABORT = "A" ABORT = "A"
TASK_REPORT_FILE = ".download-task-report"
COMPLETED_FILE = ".download-completed"
RUN_DONE = "done"
RUN_ERROR = "error"
RUN_ABORTED = "aborted"
Plugin.register_input("wendelin", self) Plugin.register_input("wendelin", self)
def self.getTaskReportFilename(data_set_directory) def self.warnConflicts(remote_streams, data_set, action)
return data_set_directory + TASK_REPORT_FILE if not remote_streams.empty?
end paths = [@data_set_directory.end_with?("/") ? @data_set_directory : @data_set_directory + "/"]
local_files = paths.map {|path|
def self.getCompletedFilename(data_set_directory) next [] unless Dir.exist?(path)
return data_set_directory + COMPLETED_FILE Dir[(path + '/**/*').gsub! '//', '/']
end }.flatten.select{ |file| File.file?(file) }
local_changes = @dataset_utils.getLocalChanges(local_files, data_set)
def self.getPendingDataStreams(data_streams, task_report_file) data_set = @data_set.end_with?("/") ? @data_set : @data_set + "/"
donwloaded_data_streams = [] remote_changes = remote_streams.map { |remote|
File.readlines(task_report_file).each do |line| remote = @data_set_directory + remote["reference"].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
record = line.split(";") remote.end_with?(".none") ? remote[0...-5] : remote
if record[1].chomp == RUN_DONE }
donwloaded_data_streams.push(record[0]) conflicts = local_changes.select{ |conflict| remote_changes.include? conflict["path"] }.map{ |conflict| conflict["path"] }
end # check scenario where the last version file exists but not in report
# (due download interrumped right after save the file but before add it to report)
if action == RESUME and conflicts.length == 1 and File.exist?(conflicts[0])
@logger.warn("The file #{conflicts[0]} was detected as false positive conflict and it was not informed to user.")
conflicts = []
end
if not conflicts.empty?
@logger.warn("CONFLICT: there are conflicts with some of your local changes.", print=TRUE)
puts "** press key **"
option = gets
@logger.warn("Conflicted files:", print=TRUE)
@logger.warn(conflicts, print=TRUE)
puts
@logger.warn("Your local conflicted files will be overwritten by download.", print=TRUE)
@logger.warn("Do you want to continue? (y/n)", print=TRUE)
option = gets
option = option.chomp
if option == "n" or option == "N"
@logger.info("Download cancelled by user.", print=TRUE)
@logger.abortExecution(error=FALSE)
end end
pending_data_streams = []
data_streams.each do |data_stream|
if ! donwloaded_data_streams.include? data_stream[1]
pending_data_streams.push(data_stream)
end end
end end
return pending_data_streams
end end
def self.askUserForAction(task, completed_file, task_report_file, action) def self.askUserForAction(task, action)
if action == RESUME if action == RESUME
action_message = "#{RESUME}: Resume. Continues download from last file." action_message = "#{RESUME}: Resume. Continues download from last file."
else else
action = UPDATE action = UPDATE
action_message = "#{UPDATE}: Update. Checks for new files." action_message = "#{UPDATE}: Update. Checks for changes in dataset."
end end
valid_option = FALSE valid_option = FALSE
while not valid_option while not valid_option
...@@ -71,8 +79,9 @@ module Embulk ...@@ -71,8 +79,9 @@ module Embulk
end end
case option case option
when action when action
task['data_streams'] = getPendingDataStreams(task['data_streams'], task_report_file) task['data_streams'] = @dataset_utils.getRemoteChangedDataStreams(task['data_streams'])
File.delete(completed_file) if File.exist?(completed_file) self.warnConflicts(task['data_streams'], task['data_set'], action)
@dataset_utils.deleteCompletedFile()
if task['data_streams'].empty? if task['data_streams'].empty?
@logger.info("No new files in dataset.", print=TRUE) @logger.info("No new files in dataset.", print=TRUE)
@logger.info("Your downloaded dataset is already up to date.", print=TRUE) @logger.info("Your downloaded dataset is already up to date.", print=TRUE)
...@@ -90,8 +99,8 @@ module Embulk ...@@ -90,8 +99,8 @@ module Embulk
if ebulk_file_content != "" if ebulk_file_content != ""
File.open(ebulk_file, 'w') { |file| file.write(ebulk_file_content) } File.open(ebulk_file, 'w') { |file| file.write(ebulk_file_content) }
end end
File.delete(completed_file) if File.exist?(completed_file) @dataset_utils.deleteCompletedFile()
File.open(task_report_file, 'w') {} @dataset_utils.createReportFile()
when ABORT when ABORT
@logger.abortExecution() @logger.abortExecution()
end end
...@@ -137,34 +146,44 @@ module Embulk ...@@ -137,34 +146,44 @@ module Embulk
@logger.info("Chunk size set in #{task['chunk_size']/MEGA}MB") @logger.info("Chunk size set in #{task['chunk_size']/MEGA}MB")
@data_set_directory = @output_path.end_with?("/") ? @output_path : @output_path + "/" @data_set_directory = @output_path.end_with?("/") ? @output_path : @output_path + "/"
task['data_set_directory'] = @data_set_directory task['data_set_directory'] = @data_set_directory
@dataset_utils = DatasetUtils.new(@data_set_directory)
@logger.info("Getting remote file list from dataset '#{@data_set}'...", print=TRUE)
data_stream_list = @wendelin.getDataStreams(@data_set)
n_retry = 0
while data_stream_list["status_code"] == 2 and n_retry < 6
sleep 10
data_stream_list = @wendelin.getDataStreams(@data_set) data_stream_list = @wendelin.getDataStreams(@data_set)
n_retry += 1
end
if data_stream_list["status_code"] == 0 if data_stream_list["status_code"] == 0
if data_stream_list["result"].empty? if data_stream_list["result"].empty?
@logger.error("No valid data found for data set " + @data_set, print=TRUE) @logger.error("No valid data found for data set " + @data_set, print=TRUE)
@logger.info("Please use a valid dataset reference from the list of datasets available in the site.", print=TRUE) @logger.abortExecution(error=FALSE)
@logger.abortExecution()
end end
task['data_streams'] = data_stream_list["result"] task['data_streams'] = data_stream_list["result"]
elsif data_stream_list["status_code"] == 2
@logger.error("Dataset '#{@data_set}' has files recently ingested waiting for processing.", print=TRUE)
@logger.error("Please retry in some minutes.", print=TRUE)
@logger.abortExecution(error=FALSE)
else else
@logger.error(data_stream_list["error_message"], print=TRUE) @logger.error(data_stream_list["error_message"], print=TRUE)
@logger.abortExecution() @logger.abortExecution()
end end
@logger.info("Done", print=TRUE)
task_report_file = getTaskReportFilename(@data_set_directory) if @dataset_utils.reportFileExist()
completed_file = getCompletedFilename(@data_set_directory) if @dataset_utils.completedFileExist()
if File.file?(task_report_file)
if File.file?(completed_file)
puts puts
@logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE) @logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE)
puts puts
self.askUserForAction(task, completed_file, task_report_file, action=UPDATE) self.askUserForAction(task, action=UPDATE)
else else
puts puts
@logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE) @logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE)
@logger.info("What do you want to do?", print=TRUE) @logger.info("What do you want to do?", print=TRUE)
puts puts
self.askUserForAction(task, completed_file, task_report_file, action=RESUME) self.askUserForAction(task, action=RESUME)
end end
else else
dir_entries = Dir.entries(@data_set_directory).length dir_entries = Dir.entries(@data_set_directory).length
...@@ -179,27 +198,16 @@ module Embulk ...@@ -179,27 +198,16 @@ module Embulk
option = option.chomp option = option.chomp
if option == "n" if option == "n"
@logger.info("Download cancelled by user.", print=TRUE) @logger.info("Download cancelled by user.", print=TRUE)
@logger.abortExecution() @logger.abortExecution(error=FALSE)
end
end
ebulk_file = @data_set_directory + "/.ebulk_dataset"
ebulk_file_content = ""
if File.file?(ebulk_file)
ebulk_file_content = File.read(ebulk_file)
end end
FileUtils.rm_rf(@data_set_directory)
unless File.directory?(@data_set_directory)
FileUtils.mkdir_p(@data_set_directory)
end end
if ebulk_file_content != "" @dataset_utils.createReportFile()
File.open(ebulk_file, 'w') { |file| file.write(ebulk_file_content) }
end
File.open(task_report_file, 'w') {}
end end
columns = [ columns = [
Column.new(0, "reference", :string), Column.new(0, "reference", :string),
Column.new(1, "data_chunk", :string), Column.new(1, "data_chunk", :string),
Column.new(2, "data_set", :string) Column.new(2, "data_set", :string),
Column.new(3, "mode", :string)
] ]
resume(task, columns, task['data_streams'].length, &control) resume(task, columns, task['data_streams'].length, &control)
rescue Exception => e rescue Exception => e
...@@ -225,18 +233,19 @@ module Embulk ...@@ -225,18 +233,19 @@ module Embulk
@logger.info("Full task report:") @logger.info("Full task report:")
@logger.info(task_reports) @logger.info(task_reports)
end end
next_config_diff = task_reports.map{|hash| hash[RUN_DONE]}.flatten.compact next_config_diff = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact
if(next_config_diff.length == count) if(next_config_diff.length == count)
if(count > 0)
@logger.info("Dataset successfully downloaded.", print=TRUE) @logger.info("Dataset successfully downloaded.", print=TRUE)
@logger.info("#{count} files downloaded.", print=TRUE) @logger.info("#{count} files processed.", print=TRUE)
@logger.info("The files were saved in dataset directory: " + @data_set_directory, print=TRUE) @logger.info("Dataset files are in dataset directory: " + @data_set_directory, print=TRUE)
completed_file = getCompletedFilename(@data_set_directory) end
File.open(completed_file, 'w') {} @dataset_utils.createCompletedFile()
if count > 10 if count > 10
next_config_diff = {} next_config_diff = {}
end end
end end
return {RUN_DONE => next_config_diff} return {DatasetUtils::RUN_DONE => next_config_diff}
end end
def initialize(task, schema, index, page_builder) def initialize(task, schema, index, page_builder)
...@@ -246,37 +255,51 @@ module Embulk ...@@ -246,37 +255,51 @@ module Embulk
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
@wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password']) @wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password'])
@logger = LogManager.instance() @logger = LogManager.instance()
@dataset_utils = DatasetUtils.new(@data_set_directory)
end end
def run def run
data_stream = task['data_streams'][@index] data_stream = task['data_streams'][@index]
id = data_stream[0] id = data_stream["id"]
ref = data_stream[1] ref = data_stream["reference"]
@logger.info("Getting content from remote #{ref}", print=TRUE) size = data_stream["size"]
hash = data_stream["hash"]
begin begin
@wendelin.eachDataStreamContentChunk(id, @chunk_size) do |chunk| if hash.to_s == DatasetUtils::DELETE
if chunk.nil? || chunk.empty? @logger.info("Deleting #{ref}", print=TRUE)
content = "" entry = [ref, "", @data_set, hash.to_s]
page_builder.add(entry)
else else
content = Base64.encode64(chunk) @logger.info("Getting content from remote #{ref}", print=TRUE)
end n_chunk = 0
entry = [ref, content, @data_set] @wendelin.eachDataStreamContentChunk(id, @chunk_size) do |chunk|
content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk)
begin_of_file = n_chunk == 0
entry = [ref, content, @data_set, begin_of_file]
page_builder.add(entry) page_builder.add(entry)
page_builder.finish n_chunk += 1
end end
end
page_builder.finish
rescue java.lang.OutOfMemoryError rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(ref) @logger.logOutOfMemoryError(ref)
return_value = RUN_ABORTED return_value = DatasetUtils::RUN_ABORTED
rescue Exception => e rescue Exception => e
@logger.error("An error occurred during processing: " + e.to_s, print=TRUE) @logger.error(e.to_s, print=TRUE)
@logger.error(e.backtrace) @logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath() puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
return_value = RUN_ERROR return_value = DatasetUtils::RUN_ERROR
else else
return_value = RUN_DONE return_value = DatasetUtils::RUN_DONE
end
# update reports if operation successfully ended
if return_value == DatasetUtils::RUN_DONE
if hash.to_s == DatasetUtils::DELETE
@dataset_utils.deleteFromReport(ref, return_value)
else
@dataset_utils.addToReport(ref, return_value, size, hash, task['data_set'])
end
end end
task_report_file = Wendelininput.getTaskReportFilename(@data_set_directory)
File.open(task_report_file, 'ab') { |file| file.puts(ref+";"+return_value+";") }
return {return_value => ref} return {return_value => ref}
end end
end end
......
require 'base64' require 'base64'
require 'fileutils' require 'fileutils'
require_relative '../dataset_utils'
require_relative '../filelogger' require_relative '../filelogger'
module Embulk module Embulk
...@@ -39,14 +40,23 @@ module Embulk ...@@ -39,14 +40,23 @@ module Embulk
if ref.end_with?(".none") if ref.end_with?(".none")
ref = ref[0...-5] ref = ref[0...-5]
end end
file_path = data_set_directory + ref
write_mode = 'ab'
if record[3] == DatasetUtils::DELETE
File.delete(file_path) if File.exist?(file_path)
else
if record[3] == TRUE.to_s
write_mode = 'w'
end
dirname = File.dirname(data_set_directory + ref) dirname = File.dirname(data_set_directory + ref)
unless File.directory?(dirname) unless File.directory?(dirname)
FileUtils.mkdir_p(dirname) FileUtils.mkdir_p(dirname)
end end
File.open(data_set_directory + ref, 'ab') { |file| file.write(data_chunk) } File.open(file_path, write_mode) { |file| file.write(data_chunk) }
end
end end
rescue Exception => e rescue Exception => e
@logger.error("An error occurred while writing file.", print=TRUE) @logger.error("An error occurred while procesing file.", print=TRUE)
@logger.error(e.backtrace) @logger.error(e.backtrace)
raise e raise e
end end
......
require 'base64' require 'base64'
require_relative '../wendelin_client' require_relative '../wendelin_client'
require_relative '../dataset_utils'
module Embulk module Embulk
module Output module Output
...@@ -41,11 +42,18 @@ module Embulk ...@@ -41,11 +42,18 @@ module Embulk
extension = record[3] extension = record[3]
eof = record[5] eof = record[5]
data_chunk = record[4] data_chunk = record[4]
reference = [supplier, dataset, filename, extension, eof].join("/") size = record[6]
hash = record[7]
begin begin
if eof == DatasetUtils::DELETE
reference = [dataset, filename, extension].join("/")
@wendelin.delete(reference)
else
reference = [supplier, dataset, filename, extension, eof, size, hash].join("/")
if not @wendelin.ingest(reference, data_chunk) if not @wendelin.ingest(reference, data_chunk)
raise "could not ingest" raise "could not ingest"
end end
end
rescue Exception => e rescue Exception => e
raise e raise e
@logger.error(e.backtrace) @logger.error(e.backtrace)
......
...@@ -40,7 +40,9 @@ module Embulk ...@@ -40,7 +40,9 @@ module Embulk
Column.new(2, "file", :string), Column.new(2, "file", :string),
Column.new(3, "extension", :string), Column.new(3, "extension", :string),
Column.new(4, "data_chunk", :string), Column.new(4, "data_chunk", :string),
Column.new(5, "eof", :string) Column.new(5, "eof", :string),
Column.new(6, "size", :string),
Column.new(7, "hash", :string)
] ]
yield(task, columns) yield(task, columns)
...@@ -78,22 +80,27 @@ module Embulk ...@@ -78,22 +80,27 @@ module Embulk
data = next_byte data = next_byte
if not next_byte if not next_byte
if first if first
values = [task['supplier'], task['data_set'], filename, extension, "", EOF] # this means this is an empty file
values = [task['supplier'], task['data_set'], filename, extension, "", "", "", ""]
yield(values) yield(values)
end end
break break
end end
first = FALSE
data += file.read(chunk_size) data += file.read(chunk_size)
next_byte = file.read(1) next_byte = file.read(1)
if not next_byte if not next_byte
eof = EOF eof = EOF
if first
# this means that the whole file will be ingested at once (not split)
eof = ""
end
else else
npart += 1 npart += 1
eof = npart.to_s.rjust(3, "0") eof = npart.to_s.rjust(3, "0")
end end
content = Base64.encode64(data) content = Base64.encode64(data)
values = [task['supplier'], task['data_set'], filename, extension, content, eof] values = [task['supplier'], task['data_set'], filename, extension, content, eof, "", ""]
first = FALSE
yield(values) yield(values)
end end
end end
......
...@@ -16,11 +16,6 @@ class WendelinClient ...@@ -16,11 +16,6 @@ class WendelinClient
@last_ingestion = Time.new - 2 @last_ingestion = Time.new - 2
end end
def removeEOF(reference)
root = reference.dup
return root[0...root.rindex('/')]
end
def exists(reference) def exists(reference)
uri = URI("#{@erp5_url}/ingestionReferenceExists?reference=#{reference}") uri = URI("#{@erp5_url}/ingestionReferenceExists?reference=#{reference}")
begin begin
...@@ -34,20 +29,25 @@ class WendelinClient ...@@ -34,20 +29,25 @@ class WendelinClient
end end
end end
def delete(reference)
@logger.info("Deletion requested for reference #{reference}", print=TRUE)
uri = URI("#{@erp5_url}/ERP5Site_invalidateIngestionObjects?reference=#{reference}")
res = handleRequest(uri)
if res == FALSE
@logger.abortExecution()
end
end
def ingest(reference, data_chunk) def ingest(reference, data_chunk)
@logger.info("Ingestion reference: #{reference}", print=TRUE) @logger.info("Ingestion reference: #{reference}", print=TRUE)
if @banned_references_list.include? removeEOF(reference)
return FALSE
end
if Time.new - @last_ingestion < 2 if Time.new - @last_ingestion < 2
# avoid send ingestions to close (specially for split ones) # avoid send ingestions to close (specially for split ones)
sleep 3 sleep 2
end end
if exists(reference) if exists(reference)
@logger.info("There is another ingestion already done for the pair data_set-filename. Reference "\ @logger.info("There is another ingestion already done for the pair data_set-filename. Reference "\
+ removeEOF(reference), print=TRUE) + reference, print=TRUE)
@logger.info("Rename your reference or delete the older ingestion.", print=TRUE) @logger.info("Rename your reference or delete the older ingestion.", print=TRUE)
@banned_references_list << removeEOF(reference)
return FALSE return FALSE
end end
if reference.include? "#" or reference.include? "+" if reference.include? "#" or reference.include? "+"
...@@ -91,7 +91,6 @@ class WendelinClient ...@@ -91,7 +91,6 @@ class WendelinClient
end end
def getDataStreams(data_set_reference) def getDataStreams(data_set_reference)
@logger.info("Getting file list for dataset '#{data_set_reference}'", print=TRUE)
uri = URI("#{@erp5_url}getDataStreamList?data_set_reference=#{data_set_reference}") uri = URI("#{@erp5_url}getDataStreamList?data_set_reference=#{data_set_reference}")
str = handleRequest(uri) str = handleRequest(uri)
if str == FALSE if str == FALSE
...@@ -115,7 +114,6 @@ class WendelinClient ...@@ -115,7 +114,6 @@ class WendelinClient
req.set_form_data('data_chunk' => data_chunk) req.set_form_data('data_chunk' => data_chunk)
rescue java.lang.OutOfMemoryError rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(reference) @logger.logOutOfMemoryError(reference)
@banned_references_list << removeEOF(reference)
return FALSE return FALSE
end end
@logger.info("Sending record:'#{reference}'...", print=TRUE) @logger.info("Sending record:'#{reference}'...", print=TRUE)
...@@ -125,7 +123,7 @@ class WendelinClient ...@@ -125,7 +123,7 @@ class WendelinClient
res = Net::HTTP.start(uri.hostname, uri.port, res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'), :use_ssl => (uri.scheme == 'https'),
:verify_mode => OpenSSL::SSL::VERIFY_NONE, :verify_mode => OpenSSL::SSL::VERIFY_NONE,
:ssl_timeout => 32000, :open_timeout => 32000, :read_timeout => 32000, :ssl_timeout => 20, :open_timeout => 20, :read_timeout => 20,
) do |http| ) do |http|
http.request(req) http.request(req)
end end
...@@ -135,7 +133,7 @@ class WendelinClient ...@@ -135,7 +133,7 @@ class WendelinClient
return FALSE return FALSE
else else
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
@logger.info("Done", print=TRUE) @logger.info("Done")
return res.body return res.body
else else
@logger.error("HTTP FAIL - code: #{res.code}", print=TRUE) @logger.error("HTTP FAIL - code: #{res.code}", print=TRUE)
...@@ -146,7 +144,6 @@ class WendelinClient ...@@ -146,7 +144,6 @@ class WendelinClient
@logger.abortExecution() @logger.abortExecution()
else else
@logger.error("Sorry, an error ocurred. If the error persists, please contact the administrator.", print=TRUE) @logger.error("Sorry, an error ocurred. If the error persists, please contact the administrator.", print=TRUE)
#@logger.error(res.value)
end end
return FALSE return FALSE
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment