ebulk fix resume initial ingestion

parent c6e035d3
*~
ebulk-data/config/*config.yml
exec:
max_threads: 1
min_output_tasks: 1
in:
type: file
path_prefix: ./csv/
parser:
charset: UTF-8
type: csv
delimiter: ';'
columns:
- {name: id, type: string}
- {name: id2, type: string}
- {name: id3, type: string}
- {name: id4, type: string}
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "asd"
exec:
max_threads: 1
min_output_tasks: 1
in:
type: file
path_prefix: ./csv/
parser:
charset: UTF-8
# newline: CRLF
type: csv
delimiter: ';'
# quote: '"'
# escape: ''
# null_string: 'NULL'
columns:
- {name: id, type: string}
- {name: id2, type: string}
- {name: id3, type: string}
- {name: id4, type: string}
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "asd"
exec:
max_threads: 1
min_output_tasks: 1
in:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/"
user: "asd"
password: "asd"
data_set: "sample"
chunk_size: "50"
output_path: "sample"
tool_dir: "."
out:
type: fif
output_path: "sample"
tool_dir: "."
exec:
max_threads: 1
min_output_tasks: 1
in:
type: wendelin
erp5_url: $DOWN_URL
user: $USER
password: $pwd
data_set: $DATA_SET
chunk_size: $CHUNK
output_path: $DATASET_DIR
tool_dir: $TOOL_DIR
out:
type: fif
output_path: $DATASET_DIR
tool_dir: $TOOL_DIR
exec:
max_threads: 1
min_output_tasks: 1
in:
type: fif
path_prefix: ["input/"]
supplier: [SUPPLIER]
data_set: [DATA_SET]
chunk_size: 0
out:
type: wendelin
erp5_url: 'https://softinst79462.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk'
user: [USER]
password: [PASSWORD]
tag: supplier.dataset.filename.extension.end
exec:
max_threads: 1
min_output_tasks: 1
in:
type: fif
path_prefix: [$DATASET_DIR]
supplier: $USER
data_set: $DATA_SET
chunk_size: $CHUNK
erp5_url: $DOWN_URL
user: $USER
password: $pwd
tool_dir: $TOOL_DIR
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
tool_dir: $TOOL_DIR
# CUSTOM CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR CUSTOM EMBULK PLUGIN
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
# PLEASE FILL THE 'IN' SECTION ACCORDING TO YOUR PLUGIN
in:
# FOR EXAMPLE CSV FILES
# type: file
# path_prefix: MY_CSV_DIRECTORY
# FOR EXAMPLE AWS-S3 storage:
# type: s3
# bucket: MY_BUCKET
# path_prefix: ""
# access_key_id: MY_KEY_ID
# secret_access_key: MY_SECRET_KEY
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
input_plugin: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# FTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR FTP STORAGE
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: ftp
host: $FTP_HOST
user: $FTP_USER
password: $FTP_PASSWORD
path_prefix: $FTP_PATH
#ssl_verify: false
#port: 21
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# HTTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR HTTP URL
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: http
url: "http://archive.ics.uci.edu/ml/machine-learning-databases/00000/Donnees%20conso%20autos.txt"
method: "get"
# basic_auth:
# user: MyUser
# password: MyPassword
# params:
# - {name: paramA, value: valueA}
# - {name: paramB, value: valueB}
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: "zope"
data_set: "http"
tool_dir: "."
chunk_size: "50"
storage: "http"
path_prefix:
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "telecom"
exec:
max_threads: 1
min_output_tasks: 1
# HTTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR HTTP URL
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: http
url: $HTTP_URL
method: $HTTP_METHOD
# basic_auth:
# user: MyUser
# password: MyPassword
# params:
# - {name: paramA, value: valueA}
# - {name: paramB, value: valueB}
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
path_prefix: $HTTP_PREFIX
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
exec:
max_threads: 1
min_output_tasks: 1
in:
type: s3
bucket: "roque5"
path_prefix: ""
access_key_id: "AKIAJLY3N4YBNAJMBLGQ"
secret_access_key: "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"
parser:
type: binary
supplier: "zope"
data_set: "encoding"
tool_dir: "."
chunk_size: "5"
input_plugin "s3"
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "telecom"
# S3 CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR S3 BUCKET
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: s3
bucket: $S3_BUCKET
path_prefix: $S3_PREFIX
access_key_id: $S3_ACCESS_KEY
secret_access_key: $S3_SECRET_KEY
auth_method: $S3_AUTH_METHOD
# endpoint:
# region:
# path_match_pattern:
# http_proxy:
# host:
# port:
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
path_prefix: $S3_PREFIX
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
...@@ -7,6 +7,7 @@ class DatasetUtils ...@@ -7,6 +7,7 @@ class DatasetUtils
DATASET_REPORT_FILE = ".dataset-task-report" DATASET_REPORT_FILE = ".dataset-task-report"
DATASET_COMPLETED_FILE = ".dataset-completed" DATASET_COMPLETED_FILE = ".dataset-completed"
RESUME_OPERATION_FILE = ".resume-operation" RESUME_OPERATION_FILE = ".resume-operation"
INITIAL_INGESTION_FILE = ".initial-ingestion"
RUN_DONE = "done" RUN_DONE = "done"
RUN_ERROR = "error" RUN_ERROR = "error"
...@@ -22,6 +23,7 @@ class DatasetUtils ...@@ -22,6 +23,7 @@ class DatasetUtils
@task_report_file = @data_set_directory + DATASET_REPORT_FILE @task_report_file = @data_set_directory + DATASET_REPORT_FILE
@completed_file = @data_set_directory + DATASET_COMPLETED_FILE @completed_file = @data_set_directory + DATASET_COMPLETED_FILE
@resume_operation_file = @data_set_directory + RESUME_OPERATION_FILE @resume_operation_file = @data_set_directory + RESUME_OPERATION_FILE
@initial_ingestion_file = @data_set_directory + INITIAL_INGESTION_FILE
end end
def getLocalFiles(remove=nil) def getLocalFiles(remove=nil)
...@@ -130,6 +132,18 @@ class DatasetUtils ...@@ -130,6 +132,18 @@ class DatasetUtils
return File.exist?(@task_report_file) return File.exist?(@task_report_file)
end end
def deleteInitialIngestionFile()
File.delete(@initial_ingestion_file) if File.exist?(@initial_ingestion_file)
end
def createInitialIngestionFile()
File.open(@initial_ingestion_file, 'w') {}
end
def initialIngestionFileExist()
return File.exist?(@initial_ingestion_file)
end
def addToReport(reference, status, size, hash, data_set) def addToReport(reference, status, size, hash, data_set)
local_files = {} local_files = {}
begin begin
...@@ -183,7 +197,7 @@ class DatasetUtils ...@@ -183,7 +197,7 @@ class DatasetUtils
end end
def getLocalChanges(files, data_set) def getLocalChanges(files, data_set)
new_files = [] all_files, new_files, modified_files, deleted_files = [], [], [], []
begin begin
if reportFileExist() if reportFileExist()
File.readlines(@task_report_file).each do |line| File.readlines(@task_report_file).each do |line|
...@@ -199,27 +213,31 @@ class DatasetUtils ...@@ -199,27 +213,31 @@ class DatasetUtils
hash = getHash(file_path).to_s hash = getHash(file_path).to_s
if size == record[2].to_s if size == record[2].to_s
if hash != record[3].chomp if hash != record[3].chomp
new_files.push({"path" => file_path, "size" => size, "hash" => hash }) all_files.push({"path" => file_path, "size" => size, "hash" => hash })
modified_files.push(file_path)
end end
else else
new_files.push({"path" => file_path, "size" => size, "hash" => hash }) all_files.push({"path" => file_path, "size" => size, "hash" => hash })
modified_files.push(file_path)
end end
end end
files.delete(file_path) files.delete(file_path)
else else
new_files.push({"path" => file_path, "size" => "", "hash" => DELETE }) all_files.push({"path" => file_path, "size" => "", "hash" => DELETE })
deleted_files.push(file_path)
end end
end end
end end
end end
files.each do |path| files.each do |path|
new_files.push({"path" => path, "size" => "", "hash" => "" }) all_files.push({"path" => path, "size" => "", "hash" => "" })
new_files.push(path)
end end
rescue Exception => e rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getLocalChanges':" + e.to_s) @logger.error("An error occurred in DatasetUtils method 'getLocalChanges':" + e.to_s)
@logger.error(e.backtrace) @logger.error(e.backtrace)
end end
return new_files return all_files, new_files, modified_files, deleted_files
end end
def getRemoteChangedDataStreams(data_streams) def getRemoteChangedDataStreams(data_streams)
......
...@@ -10,6 +10,10 @@ module Embulk ...@@ -10,6 +10,10 @@ module Embulk
Plugin.register_input("fif", self) Plugin.register_input("fif", self)
NEW = "New"
MODIFIED = "Modified"
DELETED = "Deleted"
EOF = "EOF" EOF = "EOF"
CHUNK_SIZE = 50000000 #50mb CHUNK_SIZE = 50000000 #50mb
MEGA = 1000000 MEGA = 1000000
...@@ -24,6 +28,21 @@ module Embulk ...@@ -24,6 +28,21 @@ module Embulk
{"name"=>"hash", "type"=>"string"} {"name"=>"hash", "type"=>"string"}
] ]
def self.showChangesList(changes, type, print_short)
if not changes.empty?
puts
@logger.info("#{type} file(s):", print=TRUE)
if print_short and changes.length > 50
limit = changes.length > 130 ? 130/3 : changes.length/3
@logger.info(changes[0, limit], print=TRUE)
@logger.info("....", print=TRUE)
@logger.info(changes[changes.length-limit, changes.length-1], print=TRUE)
else
@logger.info(changes, print=TRUE)
end
end
end
def self.transaction(config, &control) def self.transaction(config, &control)
begin begin
tool_dir = config.param('tool_dir', :string) tool_dir = config.param('tool_dir', :string)
...@@ -36,6 +55,7 @@ module Embulk ...@@ -36,6 +55,7 @@ module Embulk
if task['chunk_size'] == 0 if task['chunk_size'] == 0
task['chunk_size'] = CHUNK_SIZE task['chunk_size'] = CHUNK_SIZE
end end
@data_set = task['data_set']
paths = config.param('path_prefix', :array) paths = config.param('path_prefix', :array)
paths[0] = paths[0].end_with?("/") ? paths[0] : paths[0] + "/" paths[0] = paths[0].end_with?("/") ? paths[0] : paths[0] + "/"
@data_set_directory = paths[0] @data_set_directory = paths[0]
...@@ -50,7 +70,10 @@ module Embulk ...@@ -50,7 +70,10 @@ module Embulk
@logger.info("Checking remote dataset...", print=TRUE) @logger.info("Checking remote dataset...", print=TRUE)
data_stream_dict = @wendelin.getDataStreams(task['data_set']) data_stream_dict = @wendelin.getDataStreams(task['data_set'])
@dataset_utils = DatasetUtils.new(@data_set_directory) @dataset_utils = DatasetUtils.new(@data_set_directory)
if @dataset_utils.reportFileExist() if not @dataset_utils.reportFileExist()
@dataset_utils.createInitialIngestionFile()
else
if not @dataset_utils.initialIngestionFileExist()
@logger.info("Checking local dataset...", print=TRUE) @logger.info("Checking local dataset...", print=TRUE)
if not @dataset_utils.reportUpToDate(data_stream_dict) if not @dataset_utils.reportUpToDate(data_stream_dict)
puts puts
...@@ -59,6 +82,7 @@ module Embulk ...@@ -59,6 +82,7 @@ module Embulk
@logger.abortExecution(error=FALSE) @logger.abortExecution(error=FALSE)
end end
end end
end
if data_stream_dict["status_code"] != 0 if data_stream_dict["status_code"] != 0
@logger.error(data_stream_dict["error_message"], print=TRUE) @logger.error(data_stream_dict["error_message"], print=TRUE)
@logger.abortExecution() @logger.abortExecution()
...@@ -79,20 +103,18 @@ module Embulk ...@@ -79,20 +103,18 @@ module Embulk
@logger.abortExecution() @logger.abortExecution()
end end
task['paths'] = @dataset_utils.getLocalChanges(task['paths'], task['data_set']) task['paths'], new_files, modified_files, deleted_files = @dataset_utils.getLocalChanges(task['paths'], task['data_set'])
if task['paths'].empty? if task['paths'].empty?
puts puts
@logger.info("No changes in '#{@data_set_directory}'. Everything up-to-date.", print=TRUE) @logger.info("No changes in '#{@data_set_directory}'. Everything up-to-date.", print=TRUE)
@logger.abortExecution(error=FALSE) @logger.abortExecution(error=FALSE)
end end
@logger.info("#{task['paths'].length} change(s) detected for ingestion: ", print=TRUE) changes = @dataset_utils.reportFileExist() ? "change" : "new file"
if task['paths'].length > 15 @logger.info("#{task['paths'].length} #{changes}(s) detected for ingestion: ", print=TRUE)
@logger.info(task['paths'][0, 5], print=TRUE) print_short = task['paths'].length > 500
@logger.info(".....", print=TRUE) self.showChangesList(new_files, NEW, print_short)
@logger.info(task['paths'][task['paths'].length-5, task['paths'].length-1], print=TRUE) self.showChangesList(modified_files, MODIFIED, print_short)
else self.showChangesList(deleted_files, DELETED, print_short)
@logger.info(task['paths'], print=TRUE)
end
puts puts
@logger.info("Continue with ingestion? (y/n)", print=TRUE) @logger.info("Continue with ingestion? (y/n)", print=TRUE)
option = gets option = gets
...@@ -101,6 +123,9 @@ module Embulk ...@@ -101,6 +123,9 @@ module Embulk
@logger.info("Ingestion cancelled by user.", print=TRUE) @logger.info("Ingestion cancelled by user.", print=TRUE)
@logger.abortExecution() @logger.abortExecution()
end end
if not @dataset_utils.reportFileExist()
@dataset_utils.createReportFile()
end
columns = [ columns = [
Column.new(0, "supplier", :string), Column.new(0, "supplier", :string),
...@@ -139,9 +164,11 @@ module Embulk ...@@ -139,9 +164,11 @@ module Embulk
@logger.info(task_reports, print=TRUE) @logger.info(task_reports, print=TRUE)
end end
next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact
@logger.info("#{next_config_diff.length} file(s) ingested.", print=TRUE) changes = @dataset_utils.initialIngestionFileExist() ? "new file" : "change"
@logger.info("#{next_config_diff.length} #{changes}(s) ingested.", print=TRUE)
if(next_config_diff.length == count) if(next_config_diff.length == count)
@logger.info("Dataset successfully ingested.", print=TRUE) @logger.info("Dataset successfully ingested.", print=TRUE)
@wendelin.increaseDatasetVersion(@data_set)
else else
next_config_diff = task_reports.map{|hash| hash["error"]}.flatten.compact next_config_diff = task_reports.map{|hash| hash["error"]}.flatten.compact
puts puts
......
...@@ -24,7 +24,7 @@ module Embulk ...@@ -24,7 +24,7 @@ module Embulk
next [] unless Dir.exist?(path) next [] unless Dir.exist?(path)
Dir[(path + '/**/*').gsub! '//', '/'] Dir[(path + '/**/*').gsub! '//', '/']
}.flatten.select{ |file| File.file?(file) } }.flatten.select{ |file| File.file?(file) }
local_changes = @dataset_utils.getLocalChanges(local_files, data_set) local_changes, a, b, c = @dataset_utils.getLocalChanges(local_files, data_set)
data_set = @data_set.end_with?("/") ? @data_set : @data_set + "/" data_set = @data_set.end_with?("/") ? @data_set : @data_set + "/"
remote_changes = remote_streams.map { |remote| remote_changes = remote_streams.map { |remote|
remote = @data_set_directory + remote["reference"].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "") remote = @data_set_directory + remote["reference"].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
...@@ -178,12 +178,15 @@ module Embulk ...@@ -178,12 +178,15 @@ module Embulk
@logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE) @logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE)
puts puts
self.askUserForAction(task, action=UPDATE) self.askUserForAction(task, action=UPDATE)
else elsif not @dataset_utils.initialIngestionFileExist()
puts puts
@logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE) @logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE)
@logger.info("What do you want to do?", print=TRUE) @logger.info("What do you want to do?", print=TRUE)
puts puts
self.askUserForAction(task, action=RESUME) self.askUserForAction(task, action=RESUME)
else
puts
self.askUserForAction(task, action=UPDATE)
end end
else else
dir_entries = Dir.entries(@data_set_directory).length dir_entries = Dir.entries(@data_set_directory).length
...@@ -203,6 +206,7 @@ module Embulk ...@@ -203,6 +206,7 @@ module Embulk
end end
@dataset_utils.createReportFile() @dataset_utils.createReportFile()
end end
@dataset_utils.deleteInitialIngestionFile()
columns = [ columns = [
Column.new(0, "reference", :string), Column.new(0, "reference", :string),
Column.new(1, "data_chunk", :string), Column.new(1, "data_chunk", :string),
......
...@@ -38,6 +38,21 @@ class WendelinClient ...@@ -38,6 +38,21 @@ class WendelinClient
end end
end end
def increaseDatasetVersion(reference)
if reference == ""
@logger.warn("Could not increase data set version because dataset reference is empty.")
else
@logger.info("Increasing dataset version")
uri = URI("#{@erp5_url}/ERP5Site_increaseDatasetVersion?reference=#{reference}")
begin
res = open(uri, http_basic_authentication: [@user, @password]).read
rescue Exception => e
@logger.error("An error occurred while increasing dataset version: " + e.to_s)
@logger.error(e.backtrace)
end
end
end
def ingest(reference, data_chunk) def ingest(reference, data_chunk)
@logger.info("Ingestion reference: #{reference}", print=TRUE) @logger.info("Ingestion reference: #{reference}", print=TRUE)
if Time.new - @last_ingestion < 2 if Time.new - @last_ingestion < 2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment