ebulk: discard changes features

- --discard-changes parameter on command pull
- new help and example files
- several fixes
parent f7de6621
......@@ -4,6 +4,10 @@ DOWN_URL='https://softinst104003.host.vifib.net/erp5/'
ING_URL='https://softinst104003.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk'
EBULK_DATA_PATH=~/.ebulk
EBULK_DATASET_FILE_NAME="/.ebulk_dataset"
DATASET_REPORT_FILE_NAME="/.dataset-task-report"
DATASET_COMPLETE_FILE_NAME="/.dataset-completed"
DISCARD_CHANGES_FILE_NAME="/.discard-changes"
LOG_DIR="$EBULK_DATA_PATH/logs"
TOOL_PATH="$(dirname "$0")/ebulk-data"
DOWN_FILE="$EBULK_DATA_PATH/download-config.yml"
......@@ -46,6 +50,11 @@ function checkParameters {
fi
if [ "$STORAGE" = "" ] ; then
if [ ! -d "$DATASET_DIR" ]; then
if [ "$STATUS" ]; then
echo
echo -e "${ORANGE}[ERROR] ${GREEN}'$DATASET_DIR'${ORANGE} is not a dataset directory.${NC}"
echo >&2; return 1
fi
echo
mkdir "$DATASET_DIR" 2>/dev/null
if [ ! $? -eq 0 ]; then
......@@ -56,16 +65,29 @@ function checkParameters {
helpReadme >&2; return 1
fi
fi
EBULK_DATASET_FILE="$DATASET_DIR/.ebulk_dataset"
EBULK_DATASET_FILE="$DATASET_DIR$EBULK_DATASET_FILE_NAME"
if [[ $DATASET_DIR != $REFERENCE ]]; then
if [ "$REFERENCE" = "." ] ; then
REFERENCE=$(basename "$DATASET_DIR")
fi
DATA_SET=$REFERENCE
if [ -f "$EBULK_DATASET_FILE" ]; then
PREVIOUS_DATA_SET=$(cat "$EBULK_DATASET_FILE" 2>/dev/null)
if [[ "$PREVIOUS_DATA_SET" != "$REFERENCE" ]]; then
DATASET_REPORT_FILE="$DATASET_DIR$DATASET_REPORT_FILE_NAME"
if [ -f "$DATASET_REPORT_FILE" ]; then
rm -f ${DATASET_REPORT_FILE}
fi
DATASET_COMPLETE_FILE="$DATASET_DIR$DATASET_COMPLETE_FILE_NAME"
if [ -f "$DATASET_COMPLETE_FILE" ]; then
rm -f ${DATASET_COMPLETE_FILE}
fi
fi
fi
echo $REFERENCE > "$EBULK_DATASET_FILE" 2>/dev/null
else
if [ -f "$EBULK_DATASET_FILE" ]; then
DATA_SET=$(cat "$DATASET_DIR/.ebulk_dataset" 2>/dev/null)
DATA_SET=$(cat "$EBULK_DATASET_FILE" 2>/dev/null)
else
DATA_SET=$(basename "$DATASET_DIR")
if [ "$DATA_SET" != "." ] ; then
......@@ -403,7 +425,7 @@ function askS3parameters {
}
function stage {
EBULK_DATASET_FILE="./.ebulk_dataset"
EBULK_DATASET_FILE=".$EBULK_DATASET_FILE_NAME"
if [ ! -f "$EBULK_DATASET_FILE" ]; then
echo
echo -e "${ORANGE}[ERROR] You are not in a dataset directory."
......@@ -461,6 +483,8 @@ while [ "$1" != "" ]; do
;;
-a | --advanced ) ADVANCED=true
;;
-dc | --discard-changes ) DISCARD_CHANGES=true
;;
-c | --chunk ) shift
CHUNK=$1
;;
......@@ -490,7 +514,7 @@ while [ "$1" != "" ]; do
shift
done
for ELEMENT in '' '-d' '--directory' '-s' '--storage' '-cs' '--custom-storage' '-a' '--advanced' '-c' '--chunk'; do
for ELEMENT in '' '-d' '--directory' '-s' '--storage' '-cs' '--custom-storage' '-a' '--advanced' '-c' '--chunk' '-dc' '--discard-changes'; do
if [ "$ELEMENT" = "$REFERENCE" ]; then
REFERENCE="."
fi
......@@ -554,7 +578,13 @@ case $OPERATION in
fi
echo "### DATASET DOWNLOAD ###"
echo
echo -e "** The dataset will be downloaded in the specified directory: $DATASET_DIR"
if [ "$DISCARD_CHANGES" != "" ] ; then
DISCARD_CHANGES_FILE="$DATASET_DIR$DISCARD_CHANGES_FILE_NAME"
touch "$DISCARD_CHANGES_FILE" 2>/dev/null
echo -e "** Discard all local changes in directory: $DATASET_DIR"
else
echo -e "** The dataset will be downloaded in the specified directory: $DATASET_DIR"
fi
echo
read -n 1 -s -r -p "Press any key to continue"
echo
......@@ -588,8 +618,9 @@ case $OPERATION in
PARAMETER_FUNCTION=askFTPparameters
STORAGE_GEM=embulk-input-ftp
;;
*) echo -e "${ORANGE}[ERROR] '$STORAGE' storage is not available in ebulk tool yet.${NC}"
*) echo -e "${ORANGE}[ERROR] '$STORAGE' storage is not available in ebulk tool yet or it is not a valid storage.${NC}"
echo "[INFO] If you want to configure yourself this storage, you can run the tool with parameter --custom-storage"
echo "[INFO] Current Ebulk version has the following storages available: ftp, http, s3."
echo
exit
esac
......
......@@ -11,6 +11,7 @@ in:
user: $USER
password: $pwd
tool_dir: $TOOL_DIR
status: $STATUS
out:
type: wendelin
......
......@@ -11,15 +11,18 @@ class DatasetUtils
RESUME_OPERATION_FILE = ".resume-operation"
INITIAL_INGESTION_FILE = ".initial-ingestion"
STAGED_FILE = ".staged"
DISCARD_CHANGES_FILE = ".discard-changes"
RUN_DONE = "done"
RUN_ERROR = "error"
RUN_ABORTED = "aborted"
DELETE = "DELETE"
RENAME = "RENAME"
INGESTION = "ingestion"
ADD = "add"
REMOVE = "remove"
STATUS_NEW = "new"
STATUS_RENAMED = "renamed"
STATUS_MODIFIED = "modified"
STATUS_DELETED = "deleted"
STAGE_ADD="add"
......@@ -31,6 +34,7 @@ class DatasetUtils
OVERWRITE = "overwrite: "
OUTPUT_MODIFIED = "modified: "
OUTPUT_DELETED = "deleted: "
OUTPUT_RENAMED = "renamed: "
MEGA = 1000000
EOF = "EOF"
......@@ -38,6 +42,7 @@ class DatasetUtils
NONE_EXT = "none"
REFERENCE_SEPARATOR = "/"
RECORD_SEPARATOR = ";"
DATE_FORMAT = "%Y-%m-%d-%H-%M-%S"
def initialize(data_set_directory)
@data_set_directory = data_set_directory
......@@ -48,25 +53,27 @@ class DatasetUtils
@resume_operation_file = @data_set_directory + RESUME_OPERATION_FILE
@initial_ingestion_file = @data_set_directory + INITIAL_INGESTION_FILE
@staged_file = @data_set_directory + STAGED_FILE
@discard_changes_file = @data_set_directory + DISCARD_CHANGES_FILE
end
def getLocalPaths(paths)
return paths.map {|path|
next [] unless Dir.exist?(path)
Dir[(path + '/**/*').gsub! '//', '/']
}.flatten.select{ |file| File.file?(file) }
next [] unless Dir.exist?(path)
Dir[(path + '/**/*').gsub! '//', '/']
}.flatten.select{ |file| File.file?(file) }
end
def getLocalFiles(remove=nil)
local_files = {}
begin
File.readlines(@task_report_file).each do |line|
record = line.split(RECORD_SEPARATOR)
if record[1].chomp == RUN_DONE
if (remove.nil?) || (remove != record[0])
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, "status" => record[1].chomp, "modification_date" => record[4].chomp }
record = line.split(RECORD_SEPARATOR)
if record[1].chomp == RUN_DONE
if (remove.nil?) || (remove != record[0])
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp,
"status" => record[1].chomp, "modification_date" => record[4].chomp }
end
end
end
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getLocalFiles':" + e.to_s)
......@@ -79,11 +86,12 @@ class DatasetUtils
begin
File.delete(@temp_report_file) if File.exist?(@temp_report_file)
if local_files.empty?
File.open(@temp_report_file, 'w') {}
File.open(@temp_report_file, 'w') {}
else
local_files.each do |key, array|
File.open(@temp_report_file, 'ab') { |file| file.puts(key+RECORD_SEPARATOR+array["status"]+RECORD_SEPARATOR+array["size"].to_s+RECORD_SEPARATOR+array["hash"]+RECORD_SEPARATOR+array["modification_date"]) }
end
local_files.each do |key, array|
record = [key, array["status"], array["size"].to_s, array["hash"], array["modification_date"]].join(RECORD_SEPARATOR)
File.open(@temp_report_file, 'ab') { |file| file.puts(record) }
end
end
FileUtils.cp_r(@temp_report_file, @task_report_file, :remove_destination => true)
rescue Exception => e
......@@ -98,43 +106,47 @@ class DatasetUtils
end
end
def saveCurrentOperation(operation, reference)
def saveCurrentOperation(operation, reference, new_reference)
if File.exist?(@resume_operation_file)
File.delete(@resume_operation_file)
end
File.open(@resume_operation_file, 'w') { |file| file.puts(operation+RECORD_SEPARATOR+reference) }
record = new_reference ? [operation, reference, new_reference].join(RECORD_SEPARATOR) : [operation, reference].join(RECORD_SEPARATOR)
File.open(@resume_operation_file, 'w') { |file| file.puts(record) }
end
def reportUpToDate(data_stream_dict)
def reportUpToDate(data_stream_dict, data_set)
begin
if not reportFileExist() and not completedFileExist()
# directory never downloaded -new or used for partial ingestions-
return TRUE
end
if reportFileExist() and not completedFileExist()
# download not finished
return FALSE
end
if data_stream_dict["status_code"] == 2
return FALSE
end
if data_stream_dict["status_code"] != 0
return TRUE
end
changes = getRemoteChangedDataStreams(data_stream_dict["result"])
# directory never downloaded -new or used for partial ingestions-
return TRUE if not reportFileExist() and not completedFileExist()
# download not finished
return FALSE if reportFileExist() and not completedFileExist()
return FALSE if data_stream_dict["status_code"] == 2
return TRUE if data_stream_dict["status_code"] != 0
return TRUE if data_stream_dict["result"].empty?
changes = getRemoteChangedDataStreams(data_stream_dict["result"], data_set)
if changes.empty?
return TRUE
return TRUE
elsif changes.length == 1
# check if the unique detected change corresponds to an interrumped ingestion
if File.exist?(@resume_operation_file)
# check if the unique detected change corresponds to an interrupted ingestion
if File.exist?(@resume_operation_file)
operation=File.open(@resume_operation_file).read.chomp.split(RECORD_SEPARATOR)
if operation[0] == INGESTION
if operation[1] == changes[0]["reference"]
File.delete(@resume_operation_file)
return TRUE
if operation[1] == changes[0]["reference"]
@logger.info("File '#{operation[1]}' was detected as a change in remote dataset, but it is a false positive.")
@logger.info("That file operation ingestion was interrupted right after successfully ingest it but local report is outdated.")
@logger.info("Dataset is up to date, file operation was reported as ingested in local report.", print=TRUE)
new_reference = operation[0] == RENAME ? changes[0]["new_reference"] : FALSE
if new_reference
file_path = referenceToPath(new_reference, @data_set_directory, data_set)
else
file_path = referenceToPath(changes[0]["reference"], @data_set_directory, data_set)
end
size = changes[0]["size"]
hash = changes[0]["hash"]
addToReport(changes[0]["reference"], RUN_DONE, size, hash, data_set, new_reference)
File.delete(@resume_operation_file)
return TRUE
end
end
end
end
return FALSE
rescue Exception => e
......@@ -147,33 +159,36 @@ class DatasetUtils
def showChanges(changes, status)
changes.each do |change|
if status != ""
status_output = status
status_output = status
elsif change["status"] == STATUS_NEW
status_output = OUTPUT_NEW
status_output = OUTPUT_NEW
elsif change["status"] == STATUS_MODIFIED
status_output = OUTPUT_MODIFIED
status_output = OUTPUT_MODIFIED
elsif change["status"] == STATUS_DELETED
status_output = OUTPUT_DELETED
status_output = OUTPUT_DELETED
elsif change["status"] == STATUS_RENAMED
status_output = OUTPUT_RENAMED
else
status_output = "no-status"
status_output = "change: "
end
path = status != OVERWRITE ? change["path"] : change
@logger.info(" #{status_output}#{path}", print=TRUE)
new_path = change["status"] == STATUS_RENAMED ? " --> #{change["new_path"]}" : ""
@logger.info(" #{status_output}#{path}#{new_path}", print=TRUE)
end
end
def showChangesList(changes, message, print_short, status="")
if not changes.empty?
if message and message != ""
@logger.info(message, print=TRUE)
@logger.info(message, print=TRUE)
end
if print_short and changes.length > 200
limit = changes.length > 300 ? 100 : changes.length/3
showChanges(changes[0, limit], status)
puts "....."
showChanges(changes[changes.length-limit, changes.length-1], status)
limit = changes.length > 300 ? 100 : changes.length/3
showChanges(changes[0, limit], status)
puts "....."
showChanges(changes[changes.length-limit, changes.length-1], status)
else
showChanges(changes, status)
showChanges(changes, status)
end
end
end
......@@ -206,6 +221,14 @@ class DatasetUtils
puts
end
def deleteDiscardChangesFile()
File.delete(@discard_changes_file) if File.exist?(@discard_changes_file)
end
def discardChangesFileExist()
return File.exist?(@discard_changes_file)
end
def deleteCompletedFile()
File.delete(@completed_file) if File.exist?(@completed_file)
end
......@@ -223,7 +246,13 @@ class DatasetUtils
end
def reportFileExist()
return File.exist?(@task_report_file)
return TRUE if File.exist?(@task_report_file)
if File.exist?(@temp_report_file)
FileUtils.cp_r(@temp_report_file, @task_report_file)
return TRUE
else
return FALSE
end
end
def deleteInitialIngestionFile()
......@@ -272,26 +301,34 @@ class DatasetUtils
return filename, extension, reference
end
def addToReport(reference, status, size, hash, data_set)
def addToReport(reference, status, size, hash, data_set, new_reference=FALSE)
local_files = {}
begin
file_path = referenceToPath(reference, @data_set_directory, data_set)
modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S") : "not-modification-date"
modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime(DATE_FORMAT) : "not-modification-date"
if not reportFileExist()
File.open(@task_report_file, 'w') {}
File.open(@task_report_file, 'w') {}
end
new_file = TRUE
File.readlines(@task_report_file).each do |line|
record = line.split(RECORD_SEPARATOR)
if reference.to_s == record[0].to_s
local_files[reference] = {"size" => size, "hash" => hash, "status" => status, "modification_date" => modification_date }
record = line.split(RECORD_SEPARATOR)
if reference.to_s == record[0].to_s
if new_reference
reference = new_reference
file_path = referenceToPath(reference, @data_set_directory, data_set)
modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime(DATE_FORMAT) : "not-modification-date"
end
local_files[reference] = {"size" => size, "hash" => hash, "status" => status,
"modification_date" => modification_date }
new_file = FALSE
else
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, "status" => record[1].chomp, "modification_date" => record[4].chomp }
end
else
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp,
"status" => record[1].chomp, "modification_date" => record[4].chomp }
end
end
if new_file
local_files[reference] = {"size" => size, "hash" => hash, "status" => status, "modification_date" => modification_date }
local_files[reference] = {"size" => size, "hash" => hash, "status" => status,
"modification_date" => modification_date }
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'addToReport':" + e.to_s)
......@@ -306,13 +343,14 @@ class DatasetUtils
end
def getHash(file)
return "FILE-NOT-EXISTS" if ! File.exist?(file)
begin
chunk_size = 4 * MEGA
md5 = Digest::MD5.new
open(file) do |f|
while chunk=f.read(chunk_size)
md5.update(chunk)
end
while chunk=f.read(chunk_size)
md5.update(chunk)
end
end
return md5.hexdigest
rescue Exception => e
......@@ -338,7 +376,7 @@ class DatasetUtils
data_streams.each do |data_stream|
file_path = referenceToPath(data_stream["reference"], @data_set_directory, dataset)
if files.include? file_path
conflicts.push(file_path.sub(@data_set_directory, "./"))
conflicts.push(file_path.sub(@data_set_directory, "./"))
end
end
return conflicts
......@@ -369,6 +407,21 @@ class DatasetUtils
end
end
def isRenamed(file, file_dict_list, file_dict=FALSE)
hash = file_dict ? file_dict["hash"] : ""
size = file_dict ? file_dict["size"] : (File.size(file).to_s if File.exist?(file))
file_dict_list.each do |path, dict|
if size == dict["size"].to_s
hash = hash != "" ? hash : getHash(file).to_s
if hash == dict["hash"]
old_path = path
return {"key" => old_path, "size" => size, "hash" => hash}
end
end
end
return FALSE
end
def isStaged(path, staged_dict, status)
return FALSE if staged_dict.nil? || staged_dict.empty?
staged_status = {"index" => -1, "status" => ""}
......@@ -389,40 +442,53 @@ class DatasetUtils
return staged_status["status"] == status
end
def checkRenamed(path, deleted_files, change)
renamed_dict = isRenamed(path, deleted_files)
if renamed_dict
deleted_files.delete(renamed_dict["key"])
change["status"] = STATUS_RENAMED
change["new_path"] = change["path"]
change["path"] = renamed_dict["key"]
change["size"] = renamed_dict["size"]
change["hash"] = renamed_dict["hash"]
end
return change
end
def getLocalChanges(files, data_set, staged, partial_ingestion=FALSE)
staged_changes, untracked_changes = [], []
staged_dict = getStagedRecords() if staged
deleted_files = {}
begin
if reportFileExist()
File.readlines(@task_report_file).each do |line|
record = line.split(RECORD_SEPARATOR)
File.readlines(@task_report_file).each do |line|
record = line.split(RECORD_SEPARATOR)
if record[1].chomp == RUN_DONE
file_path = referenceToPath(record[0], @data_set_directory, data_set)
if files.include? file_path
modification_date = File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S")
if files.include? file_path
modification_date = File.mtime(file_path).strftime(DATE_FORMAT)
if staged && isStaged(file_path, staged_dict, STAGE_REMOVE)
staged_changes.push({"path" => file_path, "size" => "", "hash" => DELETE, "status" => STATUS_DELETED })
elsif modification_date != record[4].chomp
size = File.size(file_path).to_s
hash = getHash(file_path).to_s
size = File.size(file_path).to_s
hash = getHash(file_path).to_s
change = {"path" => file_path, "size" => size, "hash" => hash, "status" => STATUS_MODIFIED }
if size == record[2].to_s
if hash != record[3].chomp
if size == record[2].to_s
if hash != record[3].chomp
staged && isStaged(file_path, staged_dict, STAGE_ADD) ? staged_changes.push(change) : untracked_changes.push(change)
end
else
end
else
staged && isStaged(file_path, staged_dict, STAGE_ADD) ? staged_changes.push(change) : untracked_changes.push(change)
end
end
end
files.delete(file_path)
else
else
if not partial_ingestion
change = {"path" => file_path, "size" => "", "hash" => DELETE, "status" => STATUS_DELETED }
staged && isStaged(file_path, staged_dict, STAGE_REMOVE) ? staged_changes.push(change) : untracked_changes.push(change)
deleted_files[file_path] = {"size" => record[2].to_s, "hash" => record[3].chomp}
end
end
end
end
end
end
end
untrucked_deletions = []
files.each do |path|
......@@ -433,6 +499,7 @@ class DatasetUtils
if File.exist?(path)
# check scenario where new files were created inside a directory staged as deleted
if File.mtime(path) > File.mtime(@staged_file)
change = checkRenamed(path, deleted_files, change)
untracked_changes.push(change)
else
File.delete(path)
......@@ -441,13 +508,19 @@ class DatasetUtils
end
untrucked_deletions.push(path)
else
change = checkRenamed(path, deleted_files, change)
isStaged(path, staged_dict, STAGE_ADD) ? staged_changes.push(change) : untracked_changes.push(change)
end
else
change = checkRenamed(path, deleted_files, change)
untracked_changes.push(change)
end
end
updateStagedDeletions(untrucked_deletions)
deleted_files.each do |deleted_path, dict|
change = {"path" => deleted_path, "size" => "", "hash" => DELETE, "status" => STATUS_DELETED }
staged && isStaged(deleted_path, staged_dict, STAGE_REMOVE) ? staged_changes.push(change) : untracked_changes.push(change)
end
updateStagedDeletions(untrucked_deletions) if staged
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getLocalChanges':" + e.to_s)
@logger.error(e.backtrace)
......@@ -462,8 +535,54 @@ class DatasetUtils
return TRUE
end
def getRemoteChangedDataStreams(data_streams)
pending_data_streams = []
def getRemoteFileListForDiscardLocalChanges(data_streams, data_set, check_changes=FALSE, changes=[])
remotes_for_discard_local = []
if changes.empty?
paths = [appendSlashTo(@data_set_directory)]
local_files = getLocalPaths(paths)
ignore, local_changes = getLocalChanges(local_files, data_set, staged=FALSE)
else
local_changes = changes
end
return local_changes if check_changes
local_changes.each do |change|
f, e, reference = getPathInfo(change["path"], data_set)
case change["status"]
when STATUS_NEW
@logger.info("Discarding local change on '#{change["path"]}'", print=TRUE)
@logger.info("New file removed.", print=TRUE)
puts
File.delete(change["path"]) if File.exist?(change["path"])
when STATUS_RENAMED
@logger.info("Discarding local change on '#{change["new_path"]}'", print=TRUE)
@logger.info("File moved/renamed back to '#{change["path"]}'.", print=TRUE)
puts
new_reference = reference
f, e, reference = getPathInfo(change["new_path"], data_set)
unless File.directory?(File.dirname(change["path"]))
FileUtils.mkdir_p(File.dirname(change["path"]))
end
FileUtils.mv(change["new_path"], change["path"]) if File.exist?(change["new_path"])
when STATUS_MODIFIED
data_stream = data_streams.find{|data_stream| data_stream["reference"] == reference }
if data_stream
data_stream["path"] = change["path"]
remotes_for_discard_local.push(data_stream)
end
when STATUS_DELETED
data_stream = data_streams.find{|data_stream| data_stream["reference"] == reference }
if data_stream
data_stream["path"] = change["path"]
remotes_for_discard_local.push(data_stream)
end
end
end
return remotes_for_discard_local
end
def getRemoteChangedDataStreams(data_streams, data_set)
changed_data_streams = []
new_changed_files = {}
begin
if reportFileExist()
local_files = {}
......@@ -489,19 +608,32 @@ class DatasetUtils
end
if pending
local_files.delete(reference)
pending_data_streams.push(data_stream)
file_path = referenceToPath(data_stream["reference"], @data_set_directory, data_set)
new_changed_files[file_path] = data_stream
end
end
local_files.each do |key, array|
if not remote_files.include? key
pending_data_streams.push({"reference" => key, "hash" => DELETE })
local_files.each do |reference, file_dict|
if not remote_files.include? reference
changed_data_stream = {"reference" => reference, "hash" => DELETE }
file_path = referenceToPath(reference, @data_set_directory, data_set)
renamed_dict = isRenamed(file_path, new_changed_files, file_dict)
if renamed_dict
changed_data_stream = {"reference" => reference, "id" => new_changed_files[renamed_dict["key"]]["id"],
"new_reference" => new_changed_files[renamed_dict["key"]]["reference"], "status" => STATUS_RENAMED,
"size" => renamed_dict["size"], "hash" => renamed_dict["hash"] }
new_changed_files.delete(renamed_dict["key"])
end
changed_data_streams.push(changed_data_stream)
end
end
new_changed_files.each do |path, data_stream|
changed_data_streams.push(data_stream)
end
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getRemoteChangedDataStreams':" + e.to_s)
@logger.error(e.backtrace)
end
return pending_data_streams
return changed_data_streams
end
end
......@@ -19,7 +19,7 @@ module Embulk
{"name"=>"eof", "type"=>"string"},
{"name"=>"size", "type"=>"string"},
{"name"=>"hash", "type"=>"string"}
]
]
def self.status(task, push=FALSE)
partial_ingestion = @dataset_utils.initialIngestionFileExist()
......@@ -94,6 +94,7 @@ module Embulk
@dataset_utils = DatasetUtils.new(@data_set_directory)
@status = config.param('status', :string, default: FALSE)
@status = @status == "" ? FALSE : @status
@dataset_utils.deleteDiscardChangesFile()
if @status
if not @dataset_utils.initialIngestionFileExist()
if not @dataset_utils.reportFileExist()
......@@ -102,7 +103,7 @@ module Embulk
@logger.abortExecution()
elsif not @dataset_utils.completedFileExist()
puts
@logger.error("There is an interrumped download operation in dataset directory. Please resume the download first.", print=TRUE)
@logger.error("There is an interrupted download operation in dataset directory. Please resume the download first.", print=TRUE)
@logger.abortExecution()
end
end
......@@ -120,7 +121,7 @@ module Embulk
if data_stream_dict["status_code"] != 0
@logger.error(data_stream_dict["error_message"], print=TRUE)
@logger.abortExecution()
end
end
task['data_streams'] = data_stream_dict["result"]
if not @dataset_utils.reportFileExist()
......@@ -128,11 +129,11 @@ module Embulk
else
if not @dataset_utils.initialIngestionFileExist()
@logger.info("Checking local dataset...", print=TRUE)
if not @dataset_utils.reportUpToDate(data_stream_dict)
puts
@logger.error("Your current dataset is outdated. Please, run a download to update it before ingest your changes.", print=TRUE)
puts
@logger.abortExecution(error=FALSE)
if not @dataset_utils.reportUpToDate(data_stream_dict, @data_set)
puts
@logger.error("Your current dataset is outdated. Please, run a download to update it before ingest your changes.", print=TRUE)
puts
@logger.abortExecution(error=FALSE)
end
end
end
......@@ -145,12 +146,12 @@ module Embulk
@logger.error("Could not find any valid file.", print=TRUE)
@logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE)
@logger.abortExecution()
end
end
self.status(task, push=TRUE)
@logger.info("Continue with ingestion? (y/n)", print=TRUE)
option = gets
option = option.chomp
option = gets
option = option.chomp
if option == "n"
@logger.info("Ingestion cancelled by user.", print=TRUE)
@logger.abortExecution()
......@@ -160,20 +161,20 @@ module Embulk
end
columns = [
Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string),
Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string),
Column.new(2, "file", :string),
Column.new(3, "extension", :string),
Column.new(4, "data_chunk", :string),
Column.new(5, "eof", :string),
Column.new(6, "size", :string),
Column.new(7, "hash", :string)
]
]
commit_reports = yield(task, columns, task['paths'].length)
done = commit_reports.map{|hash| hash["done"]}.flatten.compact
done = commit_reports.map{|hash| hash["done"]}.flatten.compact
resume(task, columns, task['paths'].length, &control)
resume(task, columns, task['paths'].length, &control)
rescue Exception => e
@logger.error("An error occurred during operation: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
......@@ -184,9 +185,9 @@ module Embulk
def self.resume(task, columns, count, &control)
@logger = LogManager.instance()
task_reports = yield(task, columns, count)
@dataset_utils.showTaskReport(task_reports)
next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact
task_reports = yield(task, columns, count)
next_config_diff = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact
@dataset_utils.showTaskReport(next_config_diff)
element_output = @dataset_utils.initialIngestionFileExist() ? "new file" : "change"
@logger.info("#{next_config_diff.length} #{element_output}(s) ingested.", print=TRUE)
if(next_config_diff.length == count)
......@@ -194,7 +195,7 @@ module Embulk
@wendelin.increaseDatasetVersion(@data_set)
@dataset_utils.deleteStagedFile()
else
failed_tasks = task_reports.map{|hash| hash["error"]}.flatten.compact
failed_tasks = task_reports.map{|hash| hash[DatasetUtils::RUN_ERROR] || hash[DatasetUtils::RUN_ABORTED] }.flatten.compact
@dataset_utils.showTaskErrors(failed_tasks)
end
next_config_diff = {}
......@@ -202,9 +203,9 @@ module Embulk
end
def initialize(task, schema, index, page_builder)
super
@supplier = task['supplier']
@dataset = task['data_set']
super
@supplier = task['supplier']
@dataset = task['data_set']
@chunk_size = task['chunk_size']
@data_set_directory = task['data_set_directory']
@logger = LogManager.instance()
......@@ -219,16 +220,19 @@ module Embulk
size = file_dict["size"]
hash = file_dict["hash"]
delete = hash == DatasetUtils::DELETE
rename = file_dict["status"] == DatasetUtils::STATUS_RENAMED
if size == "" and hash == "" #new file
size = File.size(path)
hash = @dataset_utils.getHash(path)
end
new_filename, new_extension, new_reference = @dataset_utils.getPathInfo(file_dict["new_path"], @dataset) if rename
filename, extension, reference = @dataset_utils.getPathInfo(path, @dataset)
@dataset_utils.saveCurrentOperation(DatasetUtils::INGESTION, reference)
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete) do |entry|
operation = rename ? DatasetUtils::RENAME : DatasetUtils::INGESTION
@dataset_utils.saveCurrentOperation(operation, reference, new_reference)
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete, new_reference) do |entry|
@page_builder.add(entry)
end
@page_builder.finish
end
@page_builder.finish
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path)
return_value = DatasetUtils::RUN_ABORTED
......@@ -247,7 +251,7 @@ module Embulk
end
else
if @dataset_utils.reportFileExist()
@dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'])
@dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'], new_reference)
end
end
end
......@@ -257,29 +261,33 @@ module Embulk
private
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE)
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE, new_reference=FALSE)
if delete
File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", ""]
yield(values)
elsif new_reference
File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, new_reference, DatasetUtils::RENAME, "", ""]
yield(values)
else
file_object = File.open(path, "rb")
file_object = File.open(path, "rb")
npart = 0
next_byte = file_object.read(1)
next_byte = file_object.read(1)
first = TRUE
while true
data = next_byte
if not next_byte
while true
data = next_byte
if not next_byte
if first # this means this is an empty file
values = [@supplier, @dataset, filename, extension, "", "", size, hash]
yield(values)
end
break
end
data += file_object.read(chunk_size)
next_byte = file_object.read(1)
if not next_byte
eof = DatasetUtils::EOF
data += file_object.read(chunk_size)
next_byte = file_object.read(1)
if not next_byte
eof = DatasetUtils::EOF
if first # this means that the whole file will be ingested at once (not split)
eof = ""
end
......
......@@ -38,43 +38,43 @@ module Embulk
end
def self.askUserForAction(task, action)
if action == RESUME
action_message = "#{RESUME}: Resume. Continues download from last file."
else
action = UPDATE
action_message = "#{UPDATE}: Update. Checks for changes in dataset."
end
valid_option = FALSE
while not valid_option
@logger.info("Please select an option [#{action}, #{DOWNLOAD}, #{ABORT}]", print=TRUE)
@logger.info(action_message, print=TRUE)
@logger.info("#{DOWNLOAD}: Download. Downloads the dataset from scratch.", print=TRUE)
@logger.info("#{ABORT}: Abort operation.", print=TRUE)
option = gets
option = option.chomp
if not [action, DOWNLOAD, ABORT].include? option
if action == RESUME
action_message = "#{RESUME}: Resume. Continues download from last file."
else
action = UPDATE
action_message = "#{UPDATE}: Update. Checks for changes in dataset."
end
valid_option = FALSE
while not valid_option
@logger.info("Please select an option [#{action}, #{DOWNLOAD}, #{ABORT}]", print=TRUE)
@logger.info(action_message, print=TRUE)
@logger.info("#{DOWNLOAD}: Download. Downloads the dataset from scratch.", print=TRUE)
@logger.info("#{ABORT}: Abort operation.", print=TRUE)
option = gets
option = option.chomp
if not [action, DOWNLOAD, ABORT].include? option
@logger.info("Invalid option", print=TRUE)
else
else
valid_option = TRUE
end
end
case option
when action
end
end
case option
when action
@logger.info("Checking remote changes and posible local conflicts...", print=TRUE) if action != RESUME
task['data_streams'] = @dataset_utils.getRemoteChangedDataStreams(task['data_streams'])
task['data_streams'] = @dataset_utils.getRemoteChangedDataStreams(task['data_streams'], @data_set)
self.warnConflicts(task['data_streams'], task['data_set']) if action != RESUME
@dataset_utils.deleteCompletedFile()
if task['data_streams'].empty?
@logger.info("Your downloaded dataset is already up to date.", print=TRUE)
end
when DOWNLOAD
if task['data_streams'].empty?
@logger.info("Your downloaded dataset is already up to date.", print=TRUE)
end
when DOWNLOAD
@logger.info("Checking remote files and posible local conflicts...", print=TRUE)
self.warnConflicts(task['data_streams'], task['data_set'])
@dataset_utils.deleteCompletedFile()
@dataset_utils.createReportFile()
when ABORT
when ABORT
@logger.abortExecution()
end
end
end
def self.transaction(config, &control)
......@@ -95,7 +95,6 @@ module Embulk
@logger.abortExecution()
end
@wendelin = WendelinClient.new(@erp5_url, @user, @password)
task = {
'erp5_url' => @erp5_url,
'data_set' => @data_set,
......@@ -113,6 +112,17 @@ module Embulk
task['data_set_directory'] = @dataset_utils.appendSlashTo(@output_path)
@data_set_directory = task['data_set_directory']
@dataset_utils = DatasetUtils.new(@data_set_directory)
if @dataset_utils.reportFileExist() && @dataset_utils.completedFileExist() && @dataset_utils.discardChangesFileExist() && ! @dataset_utils.initialIngestionFileExist()
task['discard_changes'] = @dataset_utils.discardChangesFileExist()
local_changes = @dataset_utils.getRemoteFileListForDiscardLocalChanges([], @data_set, check_changes=TRUE)
if local_changes.empty?
puts
@logger.info("No local changes to discard.", print=TRUE)
puts
@dataset_utils.deleteDiscardChangesFile()
@logger.abortExecution(error=FALSE)
end
end
@logger.info("Getting remote file list from dataset '#{@data_set}'...", print=TRUE)
data_stream_list = @wendelin.getDataStreams(@data_set)
if data_stream_list["status_code"] == 0
......@@ -129,10 +139,26 @@ module Embulk
if @dataset_utils.reportFileExist()
if @dataset_utils.completedFileExist()
puts
@logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE)
puts
self.askUserForAction(task, action=UPDATE)
if task['discard_changes']
puts
@logger.warn("All your local changes will be discarded.", print=TRUE)
@logger.warn("Do you want to continue? (y/n)", print=TRUE)
option = gets
option = option.chomp
if option == "n" or option == "N"
@logger.info("Download cancelled by user.", print=TRUE)
@dataset_utils.deleteDiscardChangesFile()
@logger.abortExecution(error=FALSE)
end
@dataset_utils.deleteStagedFile()
task['data_streams'] = @dataset_utils.getRemoteFileListForDiscardLocalChanges(task['data_streams'], @data_set,
check_changes=FALSE, changes=local_changes)
else
puts
@logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE)
puts
self.askUserForAction(task, action=UPDATE)
end
elsif not @dataset_utils.initialIngestionFileExist()
puts
@logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE)
......@@ -140,10 +166,20 @@ module Embulk
puts
self.askUserForAction(task, action=RESUME)
else
if @dataset_utils.discardChangesFileExist()
puts
@logger.info("Discard changes feature do not apply in current dataset directory status.", print=TRUE)
@logger.info("Continuing with dataset download.", print=TRUE)
end
puts
self.askUserForAction(task, action=UPDATE)
end
else
if @dataset_utils.discardChangesFileExist()
puts
@logger.info("Discard changes feature do not apply in current dataset directory status.", print=TRUE)
@logger.info("Continuing with dataset download.", print=TRUE)
end
if not @dataset_utils.dirEmpty(@data_set_directory)
puts
@logger.info("Dataset download directory is not empty! Its files could be overwritten: " + @data_set_directory, print=TRUE)
......@@ -151,8 +187,8 @@ module Embulk
option = gets
option = option.chomp
if option == "n"
@logger.info("Download cancelled by user.", print=TRUE)
@logger.abortExecution(error=FALSE)
@logger.info("Download cancelled by user.", print=TRUE)
@logger.abortExecution(error=FALSE)
end
@logger.info("Checking remote files and posible local conflicts...", print=TRUE)
self.warnConflicts(task['data_streams'], task['data_set'])
......@@ -160,11 +196,13 @@ module Embulk
@dataset_utils.createReportFile()
end
@dataset_utils.deleteInitialIngestionFile()
@dataset_utils.deleteDiscardChangesFile()
columns = [
Column.new(0, "reference", :string),
Column.new(1, "data_chunk", :string),
Column.new(2, "data_set", :string),
Column.new(3, "mode", :string)
Column.new(3, "mode", :string),
Column.new(4, "rename", :string)
]
resume(task, columns, task['data_streams'].length, &control)
rescue Exception => e
......@@ -177,14 +215,18 @@ module Embulk
def self.resume(task, columns, count, &control)
@logger = LogManager.instance()
task_reports = yield(task, columns, count)
@dataset_utils.showTaskReport(task_reports)
next_config_diff = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact
task_reports = yield(task, columns, count)
@dataset_utils.showTaskReport(task_reports) if not task['discard_changes']
next_config_diff = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact
if(next_config_diff.length == count)
if(count > 0)
@logger.info("Dataset successfully downloaded.", print=TRUE)
@logger.info("#{count} files processed.", print=TRUE)
@logger.info("Dataset files are in dataset directory: " + @data_set_directory, print=TRUE)
if task['discard_changes']
@logger.info("All local changes were discarded.", print=TRUE)
else
@logger.info("Dataset successfully downloaded.", print=TRUE)
@logger.info("#{count} files processed.", print=TRUE)
@logger.info("Dataset files are in dataset directory: " + @data_set_directory, print=TRUE)
end
end
@dataset_utils.createCompletedFile()
else
......@@ -200,8 +242,8 @@ module Embulk
end
def initialize(task, schema, index, page_builder)
super
@data_set = task['data_set']
super
@data_set = task['data_set']
@chunk_size = task['chunk_size']
@data_set_directory = task['data_set_directory']
@wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password'])
......@@ -215,27 +257,33 @@ module Embulk
ref = data_stream["reference"]
size = data_stream["size"]
hash = data_stream["hash"]
renamed = data_stream["status"] == DatasetUtils::STATUS_RENAMED
deleted = hash.to_s == DatasetUtils::DELETE
begin
if hash.to_s == DatasetUtils::DELETE
@logger.info("Deleting #{ref}", print=TRUE)
entry = [ref, "", @data_set, hash.to_s]
if deleted
entry = [ref, "", @data_set, DatasetUtils::DELETE, renamed]
page_builder.add(entry)
elsif renamed
new_reference = data_stream["new_reference"]
entry = [ref, new_reference, @data_set, TRUE, renamed]
page_builder.add(entry)
else
@logger.info("Discarding local change on '#{data_stream["path"]}'", print=TRUE) if task['discard_changes']
@logger.info("Getting content from remote #{ref}", print=TRUE)
n_chunk = 0
@wendelin.eachDataStreamContentChunk(id, @chunk_size) do |chunk|
content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk)
begin_of_file = n_chunk == 0
entry = [ref, content, @data_set, begin_of_file]
entry = [ref, content, @data_set, begin_of_file, renamed]
page_builder.add(entry)
n_chunk += 1
end
end
page_builder.finish
page_builder.finish
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(ref)
@logger.logOutOfMemoryError(ref)
return_value = DatasetUtils::RUN_ABORTED
rescue Exception => e
rescue Exception => e
@logger.error(e.to_s, print=TRUE)
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
......@@ -244,13 +292,13 @@ module Embulk
return_value = DatasetUtils::RUN_DONE
end
if return_value == DatasetUtils::RUN_DONE
if hash.to_s == DatasetUtils::DELETE
if deleted
@dataset_utils.deleteFromReport(ref, return_value)
else
@dataset_utils.addToReport(ref, return_value, size, hash, task['data_set'])
@dataset_utils.addToReport(ref, return_value, size, hash, task['data_set'], new_reference)
end
end
return {return_value => ref}
return {return_value => ref}
end
end
end
......
......@@ -11,15 +11,15 @@ module Embulk
def self.transaction(config, schema, count, &control)
@logger = LogManager.instance()
task = { "output_path" => config.param("output_path", :string, :default => nil) }
task = { "output_path" => config.param("output_path", :string, :default => nil) }
if File.directory?(task['output_path'])
else
@logger.error("Output directory not found.", print=TRUE)
@logger.abortExecution()
end
task_reports = yield(task)
next_config_diff = {}
return next_config_diff
task_reports = yield(task)
next_config_diff = {}
return next_config_diff
end
def init
......@@ -32,29 +32,37 @@ module Embulk
def add(page)
begin
page.each do |record|
page.each do |record|
reference = record[0]
data_chunk = Base64.decode64(record[1])
@dataset_utils = DatasetUtils.new("")
data_set_directory = @dataset_utils.appendSlashTo(@output_path)
file_path = @dataset_utils.referenceToPath(reference, data_set_directory, record[2])
write_mode = 'ab'
if record[3] == DatasetUtils::DELETE
@logger.info("Deleting '#{file_path}'", print=TRUE)
File.delete(file_path) if File.exist?(file_path)
elsif record[4] == TRUE.to_s # if renamed
new_file_path = @dataset_utils.referenceToPath(record[1], data_set_directory, record[2])
@logger.info("Renaming '#{file_path}' to '#{new_file_path}'", print=TRUE)
unless File.directory?(File.dirname(new_file_path))
FileUtils.mkdir_p(File.dirname(new_file_path))
end
FileUtils.mv(file_path, new_file_path) if File.exist?(file_path)
else
data_chunk = Base64.decode64(record[1])
if record[3] == TRUE.to_s
write_mode = 'w'
write_mode = 'w'
end
dirname = File.dirname(file_path)
unless File.directory?(dirname)
FileUtils.mkdir_p(dirname)
FileUtils.mkdir_p(dirname)
end
File.open(file_path, write_mode) { |file| file.write(data_chunk) }
end
end
end
rescue Exception => e
@logger.error("An error occurred while procesing file.", print=TRUE)
@logger.error(e.backtrace)
@logger.error("An error occurred while procesing file.", print=TRUE)
@logger.error(e.backtrace)
raise e
end
end
......@@ -66,8 +74,8 @@ module Embulk
end
def commit
task_report = {}
return task_report
task_report = {}
return task_report
end
end
......
......@@ -9,33 +9,33 @@ module Embulk
Plugin.register_output("wendelin", self)
def self.transaction(config, schema, count, &control)
task = {
"erp5_url" => config.param("erp5_url", :string),
"user" => config.param("user", :string, defualt: nil),
"password" => config.param("password", :string, default: nil),
"path_prefix" => config.param("path_prefix", :string, :default => nil),
}
task_reports = yield(task)
next_config_diff = {}
@logger = LogManager.instance()
task = {
"erp5_url" => config.param("erp5_url", :string),
"user" => config.param("user", :string, defualt: nil),
"password" => config.param("password", :string, default: nil),
"path_prefix" => config.param("path_prefix", :string, :default => nil),
}
task_reports = yield(task)
next_config_diff = {}
@logger = LogManager.instance()
@logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
return next_config_diff
return next_config_diff
end
def init
credentials = {}
@erp5_url = task["erp5_url"]
credentials = {}
@erp5_url = task["erp5_url"]
@user = task["user"]
@password = task["password"]
@logger = LogManager.instance()
@wendelin = WendelinClient.new(@erp5_url, @user, @password)
@password = task["password"]
@logger = LogManager.instance()
@wendelin = WendelinClient.new(@erp5_url, @user, @password)
end
def close
end
def add(page)
page.each do |record|
page.each do |record|
supplier = (record[0].nil? || record[0].empty?) ? "default" : record[0]
dataset = (record[1].nil? || record[1].empty?) ? "default" : record[1]
filename = record[2]
......@@ -48,18 +48,21 @@ module Embulk
if eof == DatasetUtils::DELETE
reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.delete(reference)
elsif eof == DatasetUtils::RENAME
reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.rename(reference, record[4].to_s)
else
reference = [supplier, dataset, filename, extension, eof, size, hash].join(DatasetUtils::REFERENCE_SEPARATOR)
split = eof != ""
if not @wendelin.ingest(reference, data_chunk, split)
raise "could not ingest"
raise "could not ingest"
end
end
rescue Exception => e
raise e
@logger.error(e.backtrace)
@logger.error(e.backtrace)
end
end
end
end
def finish
......@@ -69,8 +72,8 @@ module Embulk
end
def commit
task_report = {}
return task_report
task_report = {}
return task_report
end
end
......
......@@ -24,42 +24,42 @@ module Embulk
tool_dir = config.param('tool_dir', :string, default: ".")
@logger = LogManager.instance()
@logger.setFilename(tool_dir, "parser")
task = {
task = {
chunk_size: config.param('chunk_size', :float, default: 0) * DatasetUtils::MEGA,
supplier: config.param("supplier", :string, default: "parser"),
data_set: config.param("data_set", :string),
input_plugin: config.param("storage", :string, default: "parser"),
date: Time.now.strftime("%Y-%m-%d_%H-%M-%S")
}
}
if task['chunk_size'] == 0
task['chunk_size'] = DatasetUtils::CHUNK_SIZE
end
columns = [
Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string),
Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string),
Column.new(2, "file", :string),
Column.new(3, "extension", :string),
Column.new(4, "data_chunk", :string),
Column.new(5, "eof", :string),
Column.new(6, "size", :string),
Column.new(7, "hash", :string)
]
]
yield(task, columns)
yield(task, columns)
end
def run(file_input)
@index = Index.instance().get()
@index = Index.instance().get()
@logger = LogManager.instance()
while file = file_input.next_file
while file = file_input.next_file
begin
filename = "file_from_#{task['input_plugin']}_#{task['date']}"
each_chunk(file, filename, task['chunk_size']) do |record|
@page_builder.add(record)
end
@page_builder.finish
@page_builder.add(record)
end
@page_builder.finish
Index.instance().increase()
rescue java.lang.OutOfMemoryError
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path)
return
rescue Exception => e
......@@ -67,18 +67,18 @@ module Embulk
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
end
end
end
end
private
def each_chunk(file, filename, chunk_size=DatasetUtils::CHUNK_SIZE)
extension = @index.to_s.rjust(3, "0")
npart = 0
next_byte = file.read(1)
next_byte = file.read(1)
first = TRUE
while true
data = next_byte
if not next_byte
while true
data = next_byte
if not next_byte
if first
# this means this is an empty file
values = [task['supplier'], task['data_set'], filename, extension, "", "", "", ""]
......@@ -86,10 +86,10 @@ module Embulk
end
break
end
data += file.read(chunk_size)
next_byte = file.read(1)
if not next_byte
eof = DatasetUtils::EOF
data += file.read(chunk_size)
next_byte = file.read(1)
if not next_byte
eof = DatasetUtils::EOF
if first
# this means that the whole file will be ingested at once (not split)
eof = ""
......
......@@ -16,8 +16,15 @@ class WendelinClient
@last_ingestion = Time.new - 2
end
def checkReferenceChars(reference)
if ["&", ";", "#", "%", '"', "+"].any? { |char| reference.include?(char) }
raise "invalid char in filename. Following chars are not allowed for filenames: \& \; \% \" \+ \# Please rename it."
end
end
def exists(reference)
uri = URI("#{@erp5_url}/ingestionReferenceExists?reference=#{reference}")
checkReferenceChars(reference)
uri = URI(URI.escape("#{@erp5_url}/ingestionReferenceExists?reference=#{reference}"))
begin
res = open(uri, http_basic_authentication: [@user, @password]).read
rescue Exception => e
......@@ -34,11 +41,25 @@ class WendelinClient
def delete(reference)
@logger.info("Deletion requested for reference #{reference}", print=TRUE)
uri = URI("#{@erp5_url}/ERP5Site_invalidateIngestionObjects?reference=#{reference}")
checkReferenceChars(reference)
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_invalidateIngestionObjects?reference=#{reference}"))
res = handleRequest(uri)
if res == FALSE
@logger.abortExecution()
end
@logger.info("Remote file successfully ingested.", print=TRUE)
end
def rename(reference, new_reference)
@logger.info("Rename requested for reference #{reference}, new reference #{new_reference}", print=TRUE)
checkReferenceChars(reference)
checkReferenceChars(new_reference)
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_renameIngestion?reference=#{reference}&new_reference=#{new_reference}"))
res = handleRequest(uri)
if res == FALSE
@logger.abortExecution()
end
@logger.info("Remote file successfully renamed.", print=TRUE)
end
def increaseDatasetVersion(reference)
......@@ -46,12 +67,12 @@ class WendelinClient
@logger.warn("Could not increase data set version because dataset reference is empty.")
else
@logger.info("Increasing dataset version")
uri = URI("#{@erp5_url}/ERP5Site_increaseDatasetVersion?reference=#{reference}")
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_increaseDatasetVersion?reference=#{reference}"))
begin
res = open(uri, http_basic_authentication: [@user, @password]).read
res = open(uri, http_basic_authentication: [@user, @password]).read
rescue Exception => e
@logger.error("An error occurred while increasing dataset version: " + e.to_s)
@logger.error(e.backtrace)
@logger.error("An error occurred while increasing dataset version: " + e.to_s)
@logger.error(e.backtrace)
end
end
end
......@@ -63,21 +84,13 @@ class WendelinClient
sleep 3
end
if exists(reference)
@logger.info("There is another ingestion already done for the pair dataset-filename. Reference "\
+ reference, print=TRUE)
@logger.info("There is another ingestion already done for the pair dataset-filename. Reference "\
+ reference, print=TRUE)
@logger.info("Rename your file or download the full dataset to make local changes.", print=TRUE)
return FALSE
end
if reference.include? "#" or reference.include? "+"
raise "invalid chars in file name. Please rename it."
end
begin
uri = URI("#{@erp5_url}/ingest?reference=#{reference}")
rescue Exception => e
@logger.error("An error occurred while generating url: " + e.to_s)
@logger.error(e.backtrace)
raise "invalid chars in file name. Please rename it."
return FALSE
end
checkReferenceChars(reference)
uri = URI(URI.escape("#{@erp5_url}/ingest?reference=#{reference}"))
response = handleRequest(uri, reference, data_chunk)
if response == FALSE
return FALSE
......@@ -88,28 +101,28 @@ class WendelinClient
end
def eachDataStreamContentChunk(id, chunk_size)
uri = URI("#{@erp5_url}#{id}/getData")
uri = URI(URI.escape("#{@erp5_url}#{id}/getData"))
@logger.info("Downloading...", print=TRUE)
first = TRUE
res = open(uri, http_basic_authentication: [@user, @password]) {
|content|
while true
chunk = content.read(chunk_size)
if chunk.nil?
while true
chunk = content.read(chunk_size)
if chunk.nil?
if first
yield chunk
end
@logger.info("Done", print=TRUE)
break
end
break
end
first = FALSE
yield chunk
end
yield chunk
end
}
end
def getDataStreams(data_set_reference)
uri = URI("#{@erp5_url}getDataStreamList?data_set_reference=#{data_set_reference}")
uri = URI(URI.escape("#{@erp5_url}getDataStreamList?data_set_reference=#{data_set_reference}"))
str = handleRequest(uri)
if str == FALSE
@logger.abortExecution()
......@@ -127,44 +140,44 @@ class WendelinClient
req.basic_auth @user, @password
if data_chunk != nil
@logger.info("Setting request form data...", print=TRUE)
begin
req.set_form_data('data_chunk' => data_chunk)
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(reference)
return FALSE
end
@logger.info("Sending record:'#{reference}'...", print=TRUE)
@logger.info("Setting request form data...", print=TRUE)
begin
req.set_form_data('data_chunk' => data_chunk)
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(reference)
return FALSE
end
@logger.info("Sending record:'#{reference}'...", print=TRUE)
end
begin
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
:ssl_timeout => 300, :open_timeout => 300, :read_timeout => 300,
) do |http|
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
:ssl_timeout => 300, :open_timeout => 300, :read_timeout => 300,
) do |http|
http.request(req)
end
rescue Exception => e
@logger.error("HTTP ERROR: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
@logger.error("HTTP ERROR: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
return FALSE
else
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
@logger.info("Done")
return res.body
else
@logger.error("HTTP FAIL - code: #{res.code}", print=TRUE)
else
@logger.error("HTTP FAIL - code: #{res.code}", print=TRUE)
if res.code == '500' or res.code == '502' or res.code == '503'
@logger.error("Internal Server Error: if the error persists, please contact the administrator.", print=TRUE)
@logger.error("Internal Server Error: if the error persists, please contact the administrator.", print=TRUE)
elsif res.code == '401'
@logger.error("Unauthorized access. Please check your user credentials and try again.", print=TRUE)
@logger.error("Unauthorized access. Please check your user credentials and try again.", print=TRUE)
@logger.abortExecution()
else
@logger.error("Sorry, an error ocurred. If the error persists, please contact the administrator.", print=TRUE)
@logger.error("Sorry, an error ocurred. If the error persists, please contact the administrator.", print=TRUE)
end
return FALSE
end
end
end
end
end
ebulk ingest-download tool examples
Basic ingestion/download
ebulk pull <DATASET>
* downloads the content of target dataset
ebulk push <DATASET>
* ingests files into the target dataset
ebulk pull <DATASET> -d <PATH>
* downloads the content of target dataset in target PATH
* future operations on PATH directory will use the DATASET reference implicitly
ebulk push <DATASET> -c 20
* ingests files into the <DATASET> splitting them in chunks of 20MB
ebulk push <DATASET> -s <STORAGE>
* ingests the content of the input storage [http, ftp, s3] into the target dataset
ebulk push <DATASET> -s <STORAGE> --advanced
* allows the user to edit the configuration file of the selected storage
ebulk push <DATASET> --custom-storage
* user can install and configure a new input plugin storage
Manage local changes
ebulk status <DATASET>
* checks local changes of target dataset
ebulk add <PATH>
* marks files in path for ingestion
ebulk remove <PATH>
* marks files in path for deletion
ebulk reset <PATH>
* resets marked files in path
ebulk pull --discard-changes
* discards local changes by checking the remote dataset
ebulk ingest-download tool help
usage: ebulk <command> <dataset> [options...]
ebulk [-h|--help] [-r|--readme] [-e|--examples] <command> [<args>]
[-d|--directory <path>] [-c|--chunk <size>]
[-s|--storage <storage>] [-cs|--custom-storage]
[-a|--advanced] [-dc|--discard-changes]
commands:
pull <dataset> Downloads the content of the target dataset from the site into the output folder
push <dataset> Ingests the content of the input folder into a target dataset on the site
-h, --help Tool help
-r, --readme Opens README file
pull [<dataset>] Downloads the content of the target dataset from the site into the output location
push [<dataset>] Ingests the content of the input location into a target dataset on the site
status [<dataset>] Lists the local changes of target dataset
add <path> Marks new or modified files in path for ingestion
remove <path> Marks files in path for removal
reset <path> Resets marked files in path
-h, --help Tool help
-r, --readme Opens README file
-e, --examples Shows some tool usage examples
argument:
dataset Mandatory. Unique reference for the target dataset
dataset argument Unique reference for the target dataset
If empty, current directory will be used as dataset directory and reference
It must start with a letter, and only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed
* For download, the reference must be one of the available datasets on the site
* For ingestion, an existing reference will append the files to the corresponding dataset
* A new reference will create a new dataset on the site
It could be a path, then the last directory will be interpreted as the reference
It could be a path, then that directory will be used as dataset reference
e.g. pull my_directory/sample/ --> dataset reference will be "sample"
options:
-d, --directory <path> Besides the dataset reference, sets the dataset directory and it links that location to the reference
-c, --chunk <chunk> Sets the chunk size (in megabytes) to split large files
-c, --chunk <size> Sets the chunk size (in megabytes) to split large files
-s, --storage <storage> Uses the selected input storage from this set: [http, ftp, s3]
-cs, --custom-storage Allows user to set a new input storage.
-a, --advanced Allows to edit the Embulk cofiguration file of the input storage
examples:
ebulk pull <DATASET>
* downloads the content of target dataset
ebulk push <DATASET>
* ingests files into the target dataset
ebulk pull <DATASET> -d <PATH>
* downloads the content of target dataset in target PATH
* future operations on PATH directory will use the DATASET reference implicitly
ebulk push <DATASET> -c 20
* ingests files into the <DATASET> splitting them in chunks of 20MB
ebulk push <DATASET> -s <STORAGE>
* ingests the content of the input storage [http, ftp, s3] into the target dataset
ebulk push <DATASET> -s <STORAGE> --advanced
* allows the user to edit the configuration file of the selected storage
ebulk push <DATASET> --custom-storage
* user can install and configure a new input plugin storage
-dc, --discard-changes Discards local changes by checking the remote dataset
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment