Commit 452e2f8d authored by Roque's avatar Roque

Ebulk changes

See merge request !1
parents 428caf79 0d56bdcc
#! /usr/bin/env bash #! /usr/bin/env bash
DATA_LAKE_URL='https://wendelin.io/' DEFAULT_DATA_LAKE_URL='https://wendelin.io/'
DOWN_URL="$DATA_LAKE_URL/" DOWN_URL="$DEFAULT_DATA_LAKE_URL"
ING_POLICY="portal_ingestion_policies/default_embulk" ING_POLICY="portal_ingestion_policies/default_ebulk"
ING_URL="$DATA_LAKE_URL$ING_POLICY" ING_URL="$DEFAULT_DATA_LAKE_URL$ING_POLICY"
EBULK_VERSION="1.1.3" EBULK_VERSION="1.1.5"
EMBULK_VERSION="0.9.7" EMBULK_VERSION="0.9.7"
EBULK_DATA_PATH=~/.ebulk EBULK_DATA_PATH=~/.ebulk
EBULK_DATASET_FILE_NAME="/.ebulk_dataset" EBULK_DATASET_FILE_NAME="/.ebulk_dataset"
...@@ -49,7 +49,7 @@ ASK="A" ...@@ -49,7 +49,7 @@ ASK="A"
if [ -f "$DATA_LAKE_URL_FILE" ]; then if [ -f "$DATA_LAKE_URL_FILE" ]; then
URL=$(cat "$DATA_LAKE_URL_FILE" 2>/dev/null) URL=$(cat "$DATA_LAKE_URL_FILE" 2>/dev/null)
if [[ "$URL" != "" ]]; then if [[ "$URL" != "" ]]; then
DOWN_URL="$URL/" DOWN_URL="$URL"
ING_URL="$URL$ING_POLICY" ING_URL="$URL$ING_POLICY"
fi fi
fi fi
...@@ -221,13 +221,22 @@ function setDataLakeUrl { ...@@ -221,13 +221,22 @@ function setDataLakeUrl {
echo -e "[INFO] Please enter a valid url.${NC}" echo -e "[INFO] Please enter a valid url.${NC}"
echo >&2; return 1 echo >&2; return 1
fi fi
if [ "${URL: -1}" != "/" ] ; then
URL="$URL/"
fi
echo "$URL" > "$DATA_LAKE_URL_FILE" 2>/dev/null echo "$URL" > "$DATA_LAKE_URL_FILE" 2>/dev/null
rm -f ${CREDENTIALS_FILE}
echo
echo "[INFO] Data-lake url set to '$URL'"
} }
function defaultDataLakeUrl { function defaultDataLakeUrl {
echo "" > "$DATA_LAKE_URL_FILE" 2>/dev/null echo "" > "$DATA_LAKE_URL_FILE" 2>/dev/null
DOWN_URL="$DATA_LAKE_URL/" DOWN_URL="$DEFAULT_DATA_LAKE_URL"
ING_URL="$DATA_LAKE_URL$ING_POLICY" ING_URL="$DEFAULT_DATA_LAKE_URL$ING_POLICY"
rm -f ${CREDENTIALS_FILE}
echo
echo "[INFO] Data-lake url set to default '$DEFAULT_DATA_LAKE_URL'"
} }
function updateConfigFile { function updateConfigFile {
...@@ -305,9 +314,9 @@ function runProcess { ...@@ -305,9 +314,9 @@ function runProcess {
if [ -z "$STATUS" ]; then if [ -z "$STATUS" ]; then
if [ ! -z "$CHUNK" ]; then if [ ! -z "$CHUNK" ]; then
if [ "$CHUNK" -eq "0" ]; then if [ "$CHUNK" -eq "0" ]; then
echo "[INFO] Default chunk size: $DEFAULT_CHUNK_SIZE Mb." echo "[INFO] Default chunk size: $DEFAULT_CHUNK_SIZE MB."
else else
echo "[INFO] Chunk size set in $CHUNK Mb." echo "[INFO] Chunk size set in $CHUNK MB."
fi fi
fi fi
if [ "$DATASET_DESCRIPTION" != "" ] ; then if [ "$DATASET_DESCRIPTION" != "" ] ; then
......
...@@ -6,18 +6,9 @@ require 'open-uri' ...@@ -6,18 +6,9 @@ require 'open-uri'
# class that handles dataset tasks report # class that handles dataset tasks report
class DatasetUtils class DatasetUtils
begin EOF = "EOF"
uri = URI(URI.escape("https://wendelin.io/getIngestionConstantsJson")) NONE_EXT = "none"
res = open(uri).read REFERENCE_SEPARATOR = "/"
json = JSON.parse(res)
EOF = json['split_end_suffix']
NONE_EXT = json['none_extension']
REFERENCE_SEPARATOR = json['reference_separator']
rescue
EOF = "EOF"
NONE_EXT = "none"
REFERENCE_SEPARATOR = "/"
end
DATASET_REPORT_FILE = ".dataset-task-report" DATASET_REPORT_FILE = ".dataset-task-report"
DATASET_TEMP_REPORT_FILE = ".temp-dataset-task-report" DATASET_TEMP_REPORT_FILE = ".temp-dataset-task-report"
...@@ -63,12 +54,26 @@ class DatasetUtils ...@@ -63,12 +54,26 @@ class DatasetUtils
OUTPUT_DELETED = "deleted: " OUTPUT_DELETED = "deleted: "
OUTPUT_RENAMED = "renamed: " OUTPUT_RENAMED = "renamed: "
MEGA = 1000000 MEGA = 1048576
CHUNK_SIZE = 50000000 #50mb CHUNK_SIZE = 52428800 #50MB
RECORD_SEPARATOR = ";" RECORD_SEPARATOR = ";"
DATE_FORMAT = "%Y-%m-%d-%H-%M-%S" DATE_FORMAT = "%Y-%m-%d-%H-%M-%S"
def initialize(data_set_directory) def initialize(data_set_directory, url=FALSE)
if url
begin
uri = URI(URI.escape(url + "/ERP5Site_getIngestionConstantsJson"))
res = open(uri).read
json = JSON.parse(res)
@EOF = json['split_end_suffix']
@NONE_EXT = json['none_extension']
@REFERENCE_SEPARATOR = json['reference_separator']
rescue
@EOF = EOF
@NONE_EXT = NONE_EXT
@REFERENCE_SEPARATOR = REFERENCE_SEPARATOR
end
end
@data_set_directory = appendSlashTo(data_set_directory) @data_set_directory = appendSlashTo(data_set_directory)
@logger = LogManager.instance() @logger = LogManager.instance()
@task_report_file = @data_set_directory + DATASET_REPORT_FILE @task_report_file = @data_set_directory + DATASET_REPORT_FILE
...@@ -179,7 +184,7 @@ class DatasetUtils ...@@ -179,7 +184,7 @@ class DatasetUtils
if record[1].chomp == RUN_DONE if record[1].chomp == RUN_DONE
if (remove.nil?) || (remove != record[0]) if (remove.nil?) || (remove != record[0])
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp,
"status" => record[1].chomp, "modification_date" => record[4].chomp } "status" => record[1].chomp, "modification_date" => record[4].chomp, "large_hash" => record[5].chomp }
end end
end end
end end
...@@ -197,7 +202,7 @@ class DatasetUtils ...@@ -197,7 +202,7 @@ class DatasetUtils
File.open(@temp_report_file, 'w') {} File.open(@temp_report_file, 'w') {}
else else
local_files.each do |key, array| local_files.each do |key, array|
record = [key, array["status"], array["size"].to_s, array["hash"], array["modification_date"]].join(RECORD_SEPARATOR) record = [key, array["status"], array["size"].to_s, array["hash"], array["modification_date"], array["large_hash"]].join(RECORD_SEPARATOR)
File.open(@temp_report_file, 'ab') { |file| file.puts(record) } File.open(@temp_report_file, 'ab') { |file| file.puts(record) }
end end
end end
...@@ -262,9 +267,10 @@ class DatasetUtils ...@@ -262,9 +267,10 @@ class DatasetUtils
else else
file_path = referenceToPath(changes[0]["reference"], @data_set_directory, data_set) file_path = referenceToPath(changes[0]["reference"], @data_set_directory, data_set)
end end
size = changes[0]["size"] size = changes[0]["full-size"]
hash = changes[0]["hash"] large_hash = changes[0]["large-hash"]
addToReport(changes[0]["reference"], RUN_DONE, size, hash, data_set, new_reference) hash = getHash(file_path).to_s
addToReport(changes[0]["reference"], RUN_DONE, size, hash, large_hash, data_set, new_reference)
File.delete(@resume_operation_file) File.delete(@resume_operation_file)
return TRUE return TRUE
end end
...@@ -376,9 +382,9 @@ class DatasetUtils ...@@ -376,9 +382,9 @@ class DatasetUtils
end end
end end
def saveSplitOperation(operation, reference, eof, hash, chunk_size) def saveSplitOperation(operation, reference, eof, hash, chunk_size, large_hash="NONE")
file_reference = reference.gsub('/', '__') file_reference = reference.gsub('/', '__')
record = [operation, reference, eof, hash, Integer(chunk_size)].join(RECORD_SEPARATOR) record = [operation, reference, eof, hash, Integer(chunk_size), large_hash].join(RECORD_SEPARATOR)
File.open(@split_file + "__" + file_reference, 'w') { |file| file.puts(record) } File.open(@split_file + "__" + file_reference, 'w') { |file| file.puts(record) }
deleteSplitOperationControlFile(reference) deleteSplitOperationControlFile(reference)
end end
...@@ -388,9 +394,16 @@ class DatasetUtils ...@@ -388,9 +394,16 @@ class DatasetUtils
return File.exist?(@split_file + "__" + file_reference) return File.exist?(@split_file + "__" + file_reference)
end end
def getLastSplitOperation(operation, reference, hash, chunk_size) def getLastSplitOperation(operation, reference, hash, chunk_size, large_hash=FALSE)
# returns last split operation index (and partial large hash if large_hash is TRUE)
file_reference = reference.gsub('/', '__') file_reference = reference.gsub('/', '__')
return 0 if not File.exist?(@split_file + "__" + file_reference) if not File.exist?(@split_file + "__" + file_reference)
if large_hash
return 0, ""
else
return 0
end
end
record = File.open(@split_file + "__" + file_reference).read.chomp.split(RECORD_SEPARATOR) record = File.open(@split_file + "__" + file_reference).read.chomp.split(RECORD_SEPARATOR)
if record[0] == operation && record[1] == reference && record[3] == hash && record[4] == Integer(chunk_size).to_s && record[2] != EOF if record[0] == operation && record[1] == reference && record[3] == hash && record[4] == Integer(chunk_size).to_s && record[2] != EOF
# discard if user interrupted (ctrl+c) the operation # discard if user interrupted (ctrl+c) the operation
...@@ -398,16 +411,32 @@ class DatasetUtils ...@@ -398,16 +411,32 @@ class DatasetUtils
@logger.warn("Previous split operation attempt for file #{reference} was interrupt by user (aborted tool execution), it will be restarted.", print=TRUE) @logger.warn("Previous split operation attempt for file #{reference} was interrupt by user (aborted tool execution), it will be restarted.", print=TRUE)
deleteSplitOperationFile(file_reference) deleteSplitOperationFile(file_reference)
deleteSplitOperationControlFile(file_reference) deleteSplitOperationControlFile(file_reference)
return 0 if large_hash
return 0, ""
else
return 0
end
end end
createSplitOperationControlFile(file_reference) createSplitOperationControlFile(file_reference)
return record[2].to_i if large_hash
return record[2].to_i, record[5]
else
return record[2].to_i
end
end
if large_hash
return 0, ""
else
return 0
end end
return 0
rescue Exception => e rescue Exception => e
@logger.error("An error occurred in getLastSplitOperation method:" + e.to_s) @logger.error("An error occurred in getLastSplitOperation method:" + e.to_s)
@logger.error(e.backtrace) @logger.error(e.backtrace)
return 0 if large_hash
return 0, ""
else
return 0
end
end end
def deleteDiscardChangesFile() def deleteDiscardChangesFile()
...@@ -502,7 +531,7 @@ class DatasetUtils ...@@ -502,7 +531,7 @@ class DatasetUtils
return filename, extension, reference return filename, extension, reference
end end
def addToReport(reference, status, size, hash, data_set, new_reference=FALSE) def addToReport(reference, status, size, hash, large_hash, data_set, new_reference=FALSE)
local_files = {} local_files = {}
begin begin
file_path = referenceToPath(reference, @data_set_directory, data_set) file_path = referenceToPath(reference, @data_set_directory, data_set)
...@@ -518,18 +547,19 @@ class DatasetUtils ...@@ -518,18 +547,19 @@ class DatasetUtils
reference = new_reference reference = new_reference
file_path = referenceToPath(reference, @data_set_directory, data_set) file_path = referenceToPath(reference, @data_set_directory, data_set)
modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime(DATE_FORMAT) : "not-modification-date" modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime(DATE_FORMAT) : "not-modification-date"
large_hash = record[5] if large_hash == ""
end end
local_files[reference] = {"size" => size, "hash" => hash, "status" => status, local_files[reference] = {"size" => size, "hash" => hash, "status" => status,
"modification_date" => modification_date } "modification_date" => modification_date, "large_hash" => large_hash }
new_file = FALSE new_file = FALSE
else else
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp,
"status" => record[1].chomp, "modification_date" => record[4].chomp } "status" => record[1].chomp, "modification_date" => record[4].chomp, "large_hash" => record[5] }
end end
end end
if new_file if new_file
local_files[reference] = {"size" => size, "hash" => hash, "status" => status, local_files[reference] = {"size" => size, "hash" => hash, "status" => status,
"modification_date" => modification_date } "modification_date" => modification_date, "large_hash" => large_hash }
end end
rescue Exception => e rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'addToReport':" + e.to_s) @logger.error("An error occurred in DatasetUtils method 'addToReport':" + e.to_s)
...@@ -543,9 +573,21 @@ class DatasetUtils ...@@ -543,9 +573,21 @@ class DatasetUtils
saveReport(local_files) saveReport(local_files)
end end
def getHashFromChunk(content)
begin
md5 = Digest::MD5.new
md5.update(content)
return md5.hexdigest
rescue Exception => e
@logger.error("An error occurred while getting hash from chunk:" + e.to_s, print=TRUE)
@logger.error(e.backtrace)
raise e
end
end
def getHash(file) def getHash(file)
return "FILE-NOT-EXISTS" if ! File.exist?(file)
begin begin
raise "File doesn't exist" if ! File.exist?(file)
chunk_size = 4 * MEGA chunk_size = 4 * MEGA
md5 = Digest::MD5.new md5 = Digest::MD5.new
open(file) do |f| open(file) do |f|
...@@ -608,16 +650,32 @@ class DatasetUtils ...@@ -608,16 +650,32 @@ class DatasetUtils
end end
end end
def isRenamed(file, file_dict_list, file_dict=FALSE) def isLocalChangeRenamed(local_file_path, deleted_file_dict_list)
hash = file_dict ? file_dict["hash"] : "" # checking if local_file_path marked as new is also a deleted file, so it's in fact a renamed
size = file_dict ? file_dict["size"] : (File.size(file).to_s if File.exist?(file)) size = File.size(local_file_path).to_s if File.exist?(local_file_path)
file_dict_list.each do |path, dict| deleted_file_dict_list.each do |path, deleted_file_dict|
if size == dict["size"].to_s if size == deleted_file_dict["size"].to_s
hash = hash != "" ? hash : getHash(file).to_s hash = getHash(local_file_path).to_s
if hash == dict["hash"] if hash == deleted_file_dict["hash"]
old_path = path old_path = path
return {"key" => old_path, "size" => size, "hash" => hash} return {"key" => old_path, "size" => size, "hash" => hash}
end end
end
end
return FALSE
end
def isRemoteChangeRenamed(new_remote_file_dict_list, local_report_file_dict)
#checking if report local file missing in remote dataset is one of the remote files marked as new/modified, so it's in fact a remote rename
hash = local_report_file_dict["large-hash"]
size = local_report_file_dict["size"]
new_remote_file_dict_list.each do |path, new_remote_file_dict|
if size == new_remote_file_dict["full-size"].to_s
hash = hash != "" ? hash : getHash(file).to_s
if hash == new_remote_file_dict["large-hash"]
old_path = path
return {"key" => old_path, "size" => size, "hash" => hash}
end
end end
end end
return FALSE return FALSE
...@@ -643,10 +701,10 @@ class DatasetUtils ...@@ -643,10 +701,10 @@ class DatasetUtils
return staged_status["status"] == status return staged_status["status"] == status
end end
def checkRenamed(path, deleted_files, change) def checkRenamed(path, deleted_file_dict_list, change)
renamed_dict = isRenamed(path, deleted_files) renamed_dict = isLocalChangeRenamed(path, deleted_file_dict_list)
if renamed_dict if renamed_dict
deleted_files.delete(renamed_dict["key"]) deleted_file_dict_list.delete(renamed_dict["key"])
change["status"] = STATUS_RENAMED change["status"] = STATUS_RENAMED
change["new_path"] = change["path"] change["new_path"] = change["path"]
change["path"] = renamed_dict["key"] change["path"] = renamed_dict["key"]
...@@ -791,37 +849,37 @@ class DatasetUtils ...@@ -791,37 +849,37 @@ class DatasetUtils
File.readlines(@task_report_file).each do |line| File.readlines(@task_report_file).each do |line|
record = line.split(RECORD_SEPARATOR) record = line.split(RECORD_SEPARATOR)
if record[1].chomp == RUN_DONE if record[1].chomp == RUN_DONE
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, } local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, "large-hash" => record[5].chomp, }
end end
end end
data_streams.each do |data_stream| data_streams.each do |data_stream|
remote_files.push(data_stream["reference"])
pending = TRUE
reference = data_stream["reference"] reference = data_stream["reference"]
remote_files.push(reference)
pending = TRUE
if local_files.has_key? reference if local_files.has_key? reference
size = local_files[reference]["size"] size = local_files[reference]["size"]
if size.to_s == data_stream["size"].to_s if size.to_s == data_stream["full-size"].to_s
hash = local_files[reference]["hash"] hash = local_files[reference]["large-hash"]
if hash == data_stream["hash"] or data_stream["hash"] == "" if hash == data_stream["large-hash"] or data_stream["large-hash"] == ""
pending = FALSE pending = FALSE
end end
end end
end end
if pending if pending
local_files.delete(reference) local_files.delete(reference)
file_path = referenceToPath(data_stream["reference"], @data_set_directory, data_set) file_path = referenceToPath(reference, @data_set_directory, data_set)
new_changed_files[file_path] = data_stream new_changed_files[file_path] = data_stream
end end
end end
local_files.each do |reference, file_dict| local_files.each do |reference, file_dict|
if not remote_files.include? reference if not remote_files.include? reference
changed_data_stream = {"reference" => reference, "hash" => DELETE } changed_data_stream = {"reference" => reference, "large-hash" => DELETE }
file_path = referenceToPath(reference, @data_set_directory, data_set) file_path = referenceToPath(reference, @data_set_directory, data_set)
renamed_dict = isRenamed(file_path, new_changed_files, file_dict) renamed_dict = isRemoteChangeRenamed(new_changed_files, file_dict)
if renamed_dict if renamed_dict
changed_data_stream = {"reference" => reference, "id" => new_changed_files[renamed_dict["key"]]["id"], changed_data_stream = {"reference" => reference, "id" => new_changed_files[renamed_dict["key"]]["id"],
"new_reference" => new_changed_files[renamed_dict["key"]]["reference"], "status" => STATUS_RENAMED, "new_reference" => new_changed_files[renamed_dict["key"]]["reference"], "status" => STATUS_RENAMED,
"size" => renamed_dict["size"], "hash" => renamed_dict["hash"] } "full-size" => renamed_dict["size"], "large-hash" => renamed_dict["hash"] }
new_changed_files.delete(renamed_dict["key"]) new_changed_files.delete(renamed_dict["key"])
end end
changed_data_streams.push(changed_data_stream) changed_data_streams.push(changed_data_stream)
......
...@@ -138,9 +138,9 @@ module Embulk ...@@ -138,9 +138,9 @@ module Embulk
@logger.error("Your current dataset is outdated. Please, run a download to update it before ingest your changes.", print=TRUE) @logger.error("Your current dataset is outdated. Please, run a download to update it before ingest your changes.", print=TRUE)
puts puts
@logger.abortExecution(error=FALSE) @logger.abortExecution(error=FALSE)
end end
end end
end end
@logger.info("Supplier: #{task['supplier']}") @logger.info("Supplier: #{task['supplier']}")
@logger.info("Dataset name: #{task['data_set']}") @logger.info("Dataset name: #{task['data_set']}")
...@@ -239,13 +239,19 @@ module Embulk ...@@ -239,13 +239,19 @@ module Embulk
filename, extension, reference = @dataset_utils.getPathInfo(path, @dataset) filename, extension, reference = @dataset_utils.getPathInfo(path, @dataset)
operation = rename ? DatasetUtils::RENAME : DatasetUtils::INGESTION operation = rename ? DatasetUtils::RENAME : DatasetUtils::INGESTION
@dataset_utils.saveCurrentOperation(operation, reference, new_reference) @dataset_utils.saveCurrentOperation(operation, reference, new_reference)
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(operation, reference, hash, @chunk_size) : 0 resume_split, large_hash = 0, ""
if @dataset_utils.splitOperationFileExist(reference)
resume_split, large_hash = @dataset_utils.getLastSplitOperation(operation, reference, hash, @chunk_size, large_hash=TRUE)
end
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete, new_reference, resume_split) do |entry| each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete, new_reference, resume_split) do |entry|
@dataset_utils.createSplitOperationControlFile(reference) if split @dataset_utils.createSplitOperationControlFile(reference) if split
large_hash += entry[8]
#no need to send large hash to server
entry.pop()
@page_builder.add(entry) @page_builder.add(entry)
if ! delete && ! rename && entry[5] != "" if ! delete && ! rename && entry[5] != ""
split = TRUE split = TRUE
@dataset_utils.saveSplitOperation(operation, reference, entry[5], hash, @chunk_size) @dataset_utils.saveSplitOperation(operation, reference, entry[5], hash, @chunk_size, large_hash)
end end
end end
@page_builder.finish @page_builder.finish
...@@ -269,7 +275,7 @@ module Embulk ...@@ -269,7 +275,7 @@ module Embulk
end end
else else
if @dataset_utils.reportFileExist() if @dataset_utils.reportFileExist()
@dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'], new_reference) @dataset_utils.addToReport(reference, return_value, size, hash, large_hash, task['data_set'], new_reference)
end end
end end
end end
...@@ -282,11 +288,11 @@ module Embulk ...@@ -282,11 +288,11 @@ module Embulk
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE, new_reference=FALSE, resume_split=0) def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE, new_reference=FALSE, resume_split=0)
if delete if delete
File.delete(path) if File.exist?(path) File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", ""] values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", "", ""]
yield(values) yield(values)
elsif new_reference elsif new_reference
File.delete(path) if File.exist?(path) File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, new_reference, DatasetUtils::RENAME, "", ""] values = [@supplier, @dataset, filename, extension, new_reference, DatasetUtils::RENAME, "", "", ""]
yield(values) yield(values)
else else
file_object = File.open(path, "rb") file_object = File.open(path, "rb")
...@@ -297,7 +303,7 @@ module Embulk ...@@ -297,7 +303,7 @@ module Embulk
data = next_byte data = next_byte
if not next_byte if not next_byte
if first # this means this is an empty file if first # this means this is an empty file
values = [@supplier, @dataset, filename, extension, "", "", size, hash] values = [@supplier, @dataset, filename, extension, "", "", size, hash, hash]
yield(values) yield(values)
end end
break break
...@@ -320,7 +326,8 @@ module Embulk ...@@ -320,7 +326,8 @@ module Embulk
eof = npart.to_s.rjust(3, "0") eof = npart.to_s.rjust(3, "0")
end end
content = Base64.encode64(data) content = Base64.encode64(data)
values = [@supplier, @dataset, filename, extension, content, eof, size, hash] chunk_hash = @dataset_utils.getHashFromChunk(data)
values = [@supplier, @dataset, filename, extension, content, eof, size, hash, chunk_hash]
first = FALSE first = FALSE
yield(values) yield(values)
end end
......
...@@ -94,7 +94,6 @@ module Embulk ...@@ -94,7 +94,6 @@ module Embulk
@erp5_url = config.param('erp5_url', :string) @erp5_url = config.param('erp5_url', :string)
@data_set = config.param('data_set', :string) @data_set = config.param('data_set', :string)
@logger.info("Dataset name: #{@data_set}") @logger.info("Dataset name: #{@data_set}")
@chunk_size = config.param('chunk_size', :float, default: 0) * DatasetUtils::MEGA
@output_path = config.param("output_path", :string, :default => nil) @output_path = config.param("output_path", :string, :default => nil)
if not File.directory?(@output_path) if not File.directory?(@output_path)
@logger.error("Output directory not found.", print=TRUE) @logger.error("Output directory not found.", print=TRUE)
...@@ -103,14 +102,14 @@ module Embulk ...@@ -103,14 +102,14 @@ module Embulk
task = { task = {
'erp5_url' => @erp5_url, 'erp5_url' => @erp5_url,
'data_set' => @data_set, 'data_set' => @data_set,
'chunk_size' => @chunk_size, 'chunk_size' => DatasetUtils::CHUNK_SIZE + 10,
'output_path' => @output_path, 'output_path' => @output_path,
'tool_dir' => @tool_dir 'tool_dir' => @tool_dir
} }
if task['chunk_size'] == 0 if task['chunk_size'] == 0
task['chunk_size'] = DatasetUtils::CHUNK_SIZE task['chunk_size'] = DatasetUtils::CHUNK_SIZE
end end
@logger.info("Chunk size set in #{task['chunk_size']/DatasetUtils::MEGA}MB") @logger.info("Download chunk size relies on server file chunks.")
@dataset_utils = DatasetUtils.new("") @dataset_utils = DatasetUtils.new("")
task['data_set_directory'] = @dataset_utils.appendSlashTo(@output_path) task['data_set_directory'] = @dataset_utils.appendSlashTo(@output_path)
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
...@@ -242,7 +241,6 @@ module Embulk ...@@ -242,7 +241,6 @@ module Embulk
def initialize(task, schema, index, page_builder) def initialize(task, schema, index, page_builder)
super super
@data_set = task['data_set'] @data_set = task['data_set']
@chunk_size = task['chunk_size']
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
@wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password']) @wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password'])
@logger = LogManager.instance() @logger = LogManager.instance()
...@@ -250,46 +248,61 @@ module Embulk ...@@ -250,46 +248,61 @@ module Embulk
end end
def run def run
data_stream = task['data_streams'][@index] remote_file = task['data_streams'][@index]
id = data_stream["id"] reference = remote_file["reference"]
reference = data_stream["reference"] size = remote_file["full-size"]
size = data_stream["size"] large_hash = remote_file["large-hash"]
hash = data_stream["hash"] data_stream_chunk_list = remote_file["data-stream-list"]
renamed = data_stream["status"] == DatasetUtils::STATUS_RENAMED renamed = remote_file["status"] == DatasetUtils::STATUS_RENAMED
deleted = hash.to_s == DatasetUtils::DELETE deleted = large_hash.to_s == DatasetUtils::DELETE
begin begin
if deleted if deleted
entry = [reference, "", @data_set, DatasetUtils::DELETE, renamed] entry = [reference, "", @data_set, DatasetUtils::DELETE, renamed]
page_builder.add(entry) page_builder.add(entry)
elsif renamed elsif renamed
new_reference = data_stream["new_reference"] new_reference = remote_file["new_reference"]
entry = [reference, new_reference, @data_set, TRUE, renamed] entry = [reference, new_reference, @data_set, TRUE, renamed]
page_builder.add(entry) page_builder.add(entry)
else else
@logger.info("Discarding local change on '#{data_stream["path"]}'", print=TRUE) if task['discard_changes'] chunk_size = data_stream_chunk_list[0]["size"] #first chunk size
@logger.info("Discarding local change on '#{remote_file["path"]}'", print=TRUE) if task['discard_changes']
@logger.info("Getting content from remote #{reference}", print=TRUE) @logger.info("Getting content from remote #{reference}", print=TRUE)
@logger.info("Downloading...", print=TRUE) resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::DOWNLOAD, reference, large_hash, chunk_size) : 0
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::DOWNLOAD, reference, hash, @chunk_size) : 0 n_chunk = resume_split == 0 ? 0 : resume_split+1
n_chunk = resume_split == 0 ? 0 : resume_split+1 split = n_chunk > 0
split = n_chunk > 0 if split
@logger.info("Resuming interrupted split download...", print=TRUE) if split @logger.info("Resuming interrupted split download...", print=TRUE)
@wendelin.eachDataStreamContentChunk(id, @chunk_size, n_chunk) do |chunk| else
content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk) if data_stream_chunk_list.length > 1
begin_of_file = n_chunk == 0 @logger.info("Downloading large file split in chunks...", print=TRUE)
split = n_chunk > 0 else
@dataset_utils.createSplitOperationControlFile(reference) if split @logger.info("Downloading...", print=TRUE)
entry = [reference, content, @data_set, begin_of_file, renamed] end
page_builder.add(entry) end
@dataset_utils.saveSplitOperation(DatasetUtils::DOWNLOAD, reference, n_chunk, hash, @chunk_size) if split data_stream_chunk_list.each_with_index do |data_stream_chunk, index|
n_chunk += 1 #skip datastreams/chunks already downloaded
end if n_chunk == index
content = ""
@wendelin.eachDataStreamContentChunk(data_stream_chunk["id"], chunk_size + 10, 0, data_stream_chunk_list.length > 1) do |chunk|
content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk)
end
begin_of_file = n_chunk == 0
split = n_chunk > 0
@dataset_utils.createSplitOperationControlFile(reference) if split
entry = [reference, content, @data_set, begin_of_file, renamed]
page_builder.add(entry)
@dataset_utils.saveSplitOperation(DatasetUtils::DOWNLOAD, reference, n_chunk, large_hash, chunk_size) if split
n_chunk += 1
end
end
@logger.info("Done", print=TRUE) if data_stream_chunk_list.length > 1
end end
page_builder.finish page_builder.finish
@dataset_utils.deleteSplitOperationFile(reference) if split @dataset_utils.deleteSplitOperationFile(reference) if split
rescue java.lang.OutOfMemoryError rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(reference) @logger.logOutOfMemoryError(reference)
return_value = DatasetUtils::RUN_ABORTED return_value = DatasetUtils::RUN_ABORTED
rescue Exception => e rescue Exception => e
@logger.error(e.to_s, print=TRUE) @logger.error(e.to_s, print=TRUE)
@logger.error(e.backtrace) @logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath() puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
...@@ -302,7 +315,9 @@ module Embulk ...@@ -302,7 +315,9 @@ module Embulk
if deleted if deleted
@dataset_utils.deleteFromReport(reference, return_value) @dataset_utils.deleteFromReport(reference, return_value)
else else
@dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'], new_reference) file_path = renamed ? @dataset_utils.referenceToPath(new_reference, @data_set_directory, @data_set) : @dataset_utils.referenceToPath(reference, @data_set_directory, @data_set)
hash = @dataset_utils.getHash(file_path).to_s
@dataset_utils.addToReport(reference, return_value, size, hash, large_hash, task['data_set'], new_reference)
end end
end end
return {return_value => reference} return {return_value => reference}
......
...@@ -60,7 +60,7 @@ module Embulk ...@@ -60,7 +60,7 @@ module Embulk
File.open(file_path, write_mode) { |file| file.write(data_chunk) } File.open(file_path, write_mode) { |file| file.write(data_chunk) }
end end
end end
rescue Exception => e rescue Exception => e
@logger.error("An error occurred while procesing file.", print=TRUE) @logger.error("An error occurred while procesing file.", print=TRUE)
@logger.error(e.backtrace) @logger.error(e.backtrace)
raise e raise e
......
...@@ -34,7 +34,7 @@ class WendelinClient ...@@ -34,7 +34,7 @@ class WendelinClient
def exists(reference) def exists(reference)
checkReferenceChars(reference) checkReferenceChars(reference)
uri = URI(URI.escape("#{@erp5_url}/ingestionReferenceExists?reference=#{reference}")) uri = URI(URI.escape("#{@erp5_url}/ERP5Site_checkIngestionReferenceExists?reference=#{reference}"))
begin begin
response = handleRequest(uri) response = handleRequest(uri)
rescue Exception => e rescue Exception => e
...@@ -139,14 +139,14 @@ class WendelinClient ...@@ -139,14 +139,14 @@ class WendelinClient
return {"success"=>TRUE, "message"=>"success"} return {"success"=>TRUE, "message"=>"success"}
end end
def eachDataStreamContentChunk(id, chunk_size, n_chunk=0) def eachDataStreamContentChunk(id, chunk_size, n_chunk=0, split_operation=FALSE)
n_part = n_chunk n_part = n_chunk
done = FALSE done = FALSE
first = TRUE first = TRUE
while not done while not done
start_offset = n_part*chunk_size start_offset = n_part*chunk_size
end_offset = n_part*chunk_size+chunk_size end_offset = n_part*chunk_size+chunk_size
uri = URI(URI.escape("#{@erp5_url}getDataStreamChunk?id=#{id}&start_offset=#{start_offset}&end_offset=#{end_offset}")) uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamChunk?id=#{id}&start_offset=#{start_offset}&end_offset=#{end_offset}"))
success = FALSE success = FALSE
n_retry = 0 n_retry = 0
while ! success && n_retry < 10 while ! success && n_retry < 10
...@@ -158,7 +158,11 @@ class WendelinClient ...@@ -158,7 +158,11 @@ class WendelinClient
if first if first
yield chunk yield chunk
end end
@logger.info("Done", print=TRUE) if split_operation
@logger.info("File chunk downloaded", print=TRUE)
else
@logger.info("Done", print=TRUE)
end
done = TRUE done = TRUE
else else
first = FALSE first = FALSE
...@@ -181,7 +185,7 @@ class WendelinClient ...@@ -181,7 +185,7 @@ class WendelinClient
end end
def getDataStreams(data_set_reference) def getDataStreams(data_set_reference)
uri = URI(URI.escape("#{@erp5_url}getDataStreamList?data_set_reference=#{data_set_reference}")) uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamList?data_set_reference=#{data_set_reference}"))
response = handleRequest(uri) response = handleRequest(uri)
if response["success"] == FALSE if response["success"] == FALSE
@logger.abortExecution() @logger.abortExecution()
...@@ -235,6 +239,8 @@ class WendelinClient ...@@ -235,6 +239,8 @@ class WendelinClient
return {"success"=>TRUE, "message"=>res.body} return {"success"=>TRUE, "message"=>res.body}
else else
@logger.error("HTTP FAIL - code: #{res.code}", print=TRUE) @logger.error("HTTP FAIL - code: #{res.code}", print=TRUE)
@logger.error("During request to " + uri.hostname.to_s, print=TRUE)
@logger.error(uri.to_s)
if res.code == '500' or res.code == '502' or res.code == '503' if res.code == '500' or res.code == '502' or res.code == '503'
@logger.error(HTTP_MESSAGE_5XX, print=TRUE) @logger.error(HTTP_MESSAGE_5XX, print=TRUE)
elsif res.code == '401' elsif res.code == '401'
......
...@@ -16,6 +16,7 @@ commands: ...@@ -16,6 +16,7 @@ commands:
-h, --help Tool help -h, --help Tool help
-r, --readme Opens README file -r, --readme Opens README file
-e, --examples Shows some tool usage examples -e, --examples Shows some tool usage examples
-v, --version Ebulk tool version
store-credentials Stores user and password for automatic authentication store-credentials Stores user and password for automatic authentication
set-data-lake-url Sets the data lake url where to ingest/download set-data-lake-url Sets the data lake url where to ingest/download
default-data-lake-url Sets the data lake url to default default-data-lake-url Sets the data lake url to default
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment