command parameter for set dataset description

parent 51c6af91
...@@ -11,6 +11,8 @@ DISCARD_CHANGES_FILE_NAME="/.discard-changes" ...@@ -11,6 +11,8 @@ DISCARD_CHANGES_FILE_NAME="/.discard-changes"
LOG_DIR="$EBULK_DATA_PATH/logs" LOG_DIR="$EBULK_DATA_PATH/logs"
CREDENTIALS_FILE="$EBULK_DATA_PATH/.credentials" CREDENTIALS_FILE="$EBULK_DATA_PATH/.credentials"
CONFIG_FILE="$EBULK_DATA_PATH/.config" CONFIG_FILE="$EBULK_DATA_PATH/.config"
DESCRIPTION_DATASET_PATH_PREFIX="$EBULK_DATA_PATH/.dataset-description"
DESCRIPTION_DATASET_PENDING_PATH_PREFIX="$EBULK_DATA_PATH/.dataset-description-pending"
TOOL_PATH="$(dirname "$0")/ebulk-data" TOOL_PATH="$(dirname "$0")/ebulk-data"
DOWN_FILE="$EBULK_DATA_PATH/download-config.yml" DOWN_FILE="$EBULK_DATA_PATH/download-config.yml"
DOWN_TEMPLATE_FILE="$TOOL_PATH/config/download-config_template.yml" DOWN_TEMPLATE_FILE="$TOOL_PATH/config/download-config_template.yml"
...@@ -96,14 +98,15 @@ function checkParameters { ...@@ -96,14 +98,15 @@ function checkParameters {
DATA_SET=$(cat "$EBULK_DATASET_FILE" 2>/dev/null) DATA_SET=$(cat "$EBULK_DATASET_FILE" 2>/dev/null)
else else
DATA_SET=$(basename "$DATASET_DIR") DATA_SET=$(basename "$DATASET_DIR")
if [ "$DATA_SET" != "." ] ; then
SAVE_DATASET_NAME="TRUE" SAVE_DATASET_NAME="TRUE"
fi fi
fi fi
fi
else else
DATA_SET=$DATASET_DIR DATA_SET=$DATASET_DIR
fi fi
if [ "$DATA_SET" == "." ] ; then
DATA_SET=$(basename $(pwd))
fi
re='^[_A-Za-z.0-9-]*$' re='^[_A-Za-z.0-9-]*$'
if ! [[ $DATA_SET =~ $re ]] ; then if ! [[ $DATA_SET =~ $re ]] ; then
if [ "$DATA_SET" = "." ] && [[ -z "$STORAGE" ]] ; then if [ "$DATA_SET" = "." ] && [[ -z "$STORAGE" ]] ; then
...@@ -270,6 +273,22 @@ function runProcess { ...@@ -270,6 +273,22 @@ function runProcess {
echo "[INFO] Chunk size set in $CHUNK Mb." echo "[INFO] Chunk size set in $CHUNK Mb."
fi fi
fi fi
if [ "$DATASET_DESCRIPTION" != "" ] ; then
echo
echo -e "** You have chosen to set the dataset description **"
echo
read -n 1 -s -r -p "Press any key to continue editing the dataset description"
echo
DESCRIPTION_DATASET_PATH="$DESCRIPTION_DATASET_PATH_PREFIX-$DATA_SET"
DESCRIPTION_DATASET_PENDING_PATH="$DESCRIPTION_DATASET_PENDING_PATH_PREFIX-$DATA_SET"
if [ ! -f $DESCRIPTION_DATASET_PATH ]; then
echo "" > "$DESCRIPTION_DATASET_PATH" 2>/dev/null
echo "" >> "$DESCRIPTION_DATASET_PATH" 2>/dev/null
echo "# Please write the dataset descritpion. Lines starting with '#' will be ignored." >> "$DESCRIPTION_DATASET_PATH" 2>/dev/null
fi
touch $DESCRIPTION_DATASET_PENDING_PATH 2>/dev/null
vi $DESCRIPTION_DATASET_PATH 2>/dev/null
fi
fi fi
updateConfigFile updateConfigFile
echo "[INFO] Starting operation..." echo "[INFO] Starting operation..."
...@@ -281,6 +300,7 @@ function runProcess { ...@@ -281,6 +300,7 @@ function runProcess {
fi fi
$embulk run -I $TOOL_PATH/embulk-wendelin-dataset-tool/lib $FILE $DIFF_COMMAND 2> "$LOG_DIR/error.log" || { $embulk run -I $TOOL_PATH/embulk-wendelin-dataset-tool/lib $FILE $DIFF_COMMAND 2> "$LOG_DIR/error.log" || {
if [ "$STATUS" == \"\" ] ; then if [ "$STATUS" == \"\" ] ; then
if [ "$DATASET_DESCRIPTION" == "" ] ; then
echo echo
echo -e "${ORANGE}[ERROR] Embulk tool stopped its execution.${NC}" echo -e "${ORANGE}[ERROR] Embulk tool stopped its execution.${NC}"
if [ "$STORAGE" != \"\" ] ; then if [ "$STORAGE" != \"\" ] ; then
...@@ -290,6 +310,7 @@ function runProcess { ...@@ -290,6 +310,7 @@ function runProcess {
echo "[INFO] Please check the logs in '$LOG_DIR' directory for more details." echo "[INFO] Please check the logs in '$LOG_DIR' directory for more details."
echo echo
fi fi
fi
} }
} }
...@@ -543,6 +564,8 @@ while [ "$1" != "" ]; do ...@@ -543,6 +564,8 @@ while [ "$1" != "" ]; do
;; ;;
-dc | --discard-changes ) DISCARD_CHANGES=true -dc | --discard-changes ) DISCARD_CHANGES=true
;; ;;
-dd | --set-description ) DATASET_DESCRIPTION=true
;;
-c | --chunk ) shift -c | --chunk ) shift
CHUNK=$1 CHUNK=$1
;; ;;
...@@ -578,7 +601,7 @@ while [ "$1" != "" ]; do ...@@ -578,7 +601,7 @@ while [ "$1" != "" ]; do
shift shift
done done
for ELEMENT in '' '-d' '--directory' '-s' '--storage' '-cs' '--custom-storage' '-a' '--advanced' '-c' '--chunk' '-dc' '--discard-changes'; do for ELEMENT in '' '-d' '--directory' '-s' '--storage' '-cs' '--custom-storage' '-a' '--advanced' '-c' '--chunk' '-dc' '--discard-changes' '-dd' '--set-description'; do
if [ "$ELEMENT" = "$REFERENCE" ]; then if [ "$ELEMENT" = "$REFERENCE" ]; then
REFERENCE="." REFERENCE="."
fi fi
......
...@@ -31,6 +31,8 @@ class DatasetUtils ...@@ -31,6 +31,8 @@ class DatasetUtils
FIRST_INGESTION_FILE = ".first-ingestion" FIRST_INGESTION_FILE = ".first-ingestion"
CREDENTIALS_FILE = ".credentials" CREDENTIALS_FILE = ".credentials"
CONFIG_FILE = ".config" CONFIG_FILE = ".config"
DESCRIPTION_DATASET_PATH_PREFIX=".dataset-description"
DESCRIPTION_DATASET_PENDING_PATH_PREFIX=".dataset-description-pending"
RUN_DONE = "done" RUN_DONE = "done"
RUN_ERROR = "error" RUN_ERROR = "error"
...@@ -88,6 +90,47 @@ class DatasetUtils ...@@ -88,6 +90,47 @@ class DatasetUtils
}.flatten.select{ |file| File.file?(file) } }.flatten.select{ |file| File.file?(file) }
end end
def deleteDatasetDescriptionPendingFile(data_set, tool_dir)
description_pending_path = appendSlashTo(tool_dir) + DESCRIPTION_DATASET_PENDING_PATH_PREFIX + "-" + data_set
File.delete(description_pending_path) if File.exist?(description_pending_path)
end
def datasetDescriptionPendingFileExist(data_set, tool_dir)
description_pending_path = appendSlashTo(tool_dir) + DESCRIPTION_DATASET_PENDING_PATH_PREFIX + "-" + data_set
return File.exist?(description_pending_path)
end
def getDatasetDescription(data_set, tool_dir)
description_path = appendSlashTo(tool_dir) + DESCRIPTION_DATASET_PATH_PREFIX + "-" + data_set
description_text = ""
File.readlines(description_path).each do |line|
if line != "" && ! line.start_with?("#")
description_text += line
end
end
return description_text
end
def setDatasetDescription(data_set, tool_dir, client)
begin
dataset_description_text = getDatasetDescription(data_set, tool_dir)
if dataset_description_text != ""
client.setDatasetDescription(data_set, dataset_description_text)
deleteDatasetDescriptionPendingFile(data_set, tool_dir)
@logger.info("Continue with ingestion? (y/n)", print=TRUE)
option = gets
option = option.chomp
if option == "n"
@logger.info("Ingestion cancelled by user.", print=TRUE)
@logger.abortExecution()
end
end
rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'setDatasetDescription':" + e.to_s)
@logger.error(e.backtrace)
end
end
def getCredentials(tool_dir) def getCredentials(tool_dir)
credential_path = appendSlashTo(tool_dir) + CREDENTIALS_FILE credential_path = appendSlashTo(tool_dir) + CREDENTIALS_FILE
if File.exist?(credential_path) if File.exist?(credential_path)
......
...@@ -107,10 +107,11 @@ module Embulk ...@@ -107,10 +107,11 @@ module Embulk
@logger.abortExecution() @logger.abortExecution()
end end
end end
end else
if not @status
user, password = @dataset_utils.getCredentials(tool_dir) user, password = @dataset_utils.getCredentials(tool_dir)
@wendelin = WendelinClient.new(config.param('erp5_url', :string), user, password) @wendelin = WendelinClient.new(config.param('erp5_url', :string), user, password)
@new_dataset_description = @dataset_utils.datasetDescriptionPendingFileExist(@data_set, tool_dir)
@dataset_utils.setDatasetDescription(@data_set, tool_dir, @wendelin) if @new_dataset_description
end end
@logger.info("Checking local files...", print=TRUE) @logger.info("Checking local files...", print=TRUE)
task['paths'] = @dataset_utils.getLocalPaths(paths) task['paths'] = @dataset_utils.getLocalPaths(paths)
...@@ -144,7 +145,7 @@ module Embulk ...@@ -144,7 +145,7 @@ module Embulk
@logger.info("Supplier: #{task['supplier']}") @logger.info("Supplier: #{task['supplier']}")
@logger.info("Dataset name: #{task['data_set']}") @logger.info("Dataset name: #{task['data_set']}")
@logger.info("Chunk size set in #{task['chunk_size']/DatasetUtils::MEGA}MB") @logger.info("Chunk size set in #{task['chunk_size']/DatasetUtils::MEGA}MB")
if task['paths'].empty? and not @dataset_utils.reportFileExist() if task['paths'].empty? and not @dataset_utils.reportFileExist() and not @new_dataset_description
@logger.error("The dataset directory '#{@data_set_directory}' is empty.", print=TRUE) @logger.error("The dataset directory '#{@data_set_directory}' is empty.", print=TRUE)
@logger.error("Could not find any valid file.", print=TRUE) @logger.error("Could not find any valid file.", print=TRUE)
@logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE) @logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE)
......
...@@ -31,14 +31,26 @@ module Embulk ...@@ -31,14 +31,26 @@ module Embulk
end end
end end
def self.askUserForAction(task, action) def self.askUserForAction(task, action, show_message)
option = @dataset_utils.getConfiguration(action, task['tool_dir']) option = @dataset_utils.getConfiguration(action, task['tool_dir'])
valid_option = option != DatasetUtils::OPTION_ABORT ? TRUE : FALSE valid_option = option != DatasetUtils::OPTION_ABORT ? TRUE : FALSE
if action == DatasetUtils::OPTION_RESUME if action == DatasetUtils::OPTION_RESUME
action_message = "#{DatasetUtils::OPTION_RESUME}: Resume. Continues download from last file." action_message = "#{DatasetUtils::OPTION_RESUME}: Resume. Continues download from last file."
if show_message
puts
@logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE)
@logger.info("What do you want to do?", print=TRUE) if option == DatasetUtils::OPTION_ABORT
puts
end
else else
action = DatasetUtils::OPTION_UPDATE action = DatasetUtils::OPTION_UPDATE
action_message = "#{DatasetUtils::OPTION_UPDATE}: Update. Checks for changes in dataset." action_message = "#{DatasetUtils::OPTION_UPDATE}: Update. Checks for changes in dataset."
if show_message
puts
@logger.info("This dataset was already downloaded.", print=TRUE)
@logger.info("What do you want to do?", print=TRUE) if option == DatasetUtils::OPTION_ABORT
puts
end
end end
while not valid_option while not valid_option
@logger.info("Please select an option [#{action}, #{DatasetUtils::OPTION_DOWNLOAD}, #{DatasetUtils::OPTION_ABORT}]", print=TRUE) @logger.info("Please select an option [#{action}, #{DatasetUtils::OPTION_DOWNLOAD}, #{DatasetUtils::OPTION_ABORT}]", print=TRUE)
...@@ -147,17 +159,10 @@ module Embulk ...@@ -147,17 +159,10 @@ module Embulk
task['data_streams'] = @dataset_utils.getRemoteFileListForDiscardLocalChanges(task['data_streams'], @data_set, task['data_streams'] = @dataset_utils.getRemoteFileListForDiscardLocalChanges(task['data_streams'], @data_set,
check_changes=FALSE, changes=local_changes) check_changes=FALSE, changes=local_changes)
else else
puts self.askUserForAction(task, action=DatasetUtils::OPTION_UPDATE, show_message=TRUE)
@logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE)
puts
self.askUserForAction(task, action=DatasetUtils::OPTION_UPDATE)
end end
elsif not @dataset_utils.partialIngestionFileExist() elsif not @dataset_utils.partialIngestionFileExist()
puts self.askUserForAction(task, action=DatasetUtils::OPTION_RESUME, show_message=TRUE)
@logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE)
@logger.info("What do you want to do?", print=TRUE)
puts
self.askUserForAction(task, action=DatasetUtils::OPTION_RESUME)
else else
if @dataset_utils.discardChangesFileExist() if @dataset_utils.discardChangesFileExist()
puts puts
...@@ -165,7 +170,7 @@ module Embulk ...@@ -165,7 +170,7 @@ module Embulk
@logger.info("Continuing with dataset download.", print=TRUE) @logger.info("Continuing with dataset download.", print=TRUE)
end end
puts puts
self.askUserForAction(task, action=DatasetUtils::OPTION_UPDATE) self.askUserForAction(task, action=DatasetUtils::OPTION_UPDATE, show_message=FALSE)
end end
else else
if @dataset_utils.discardChangesFileExist() if @dataset_utils.discardChangesFileExist()
......
...@@ -18,11 +18,16 @@ module Embulk ...@@ -18,11 +18,16 @@ module Embulk
"tool_dir" => config.param('tool_dir', :string) "tool_dir" => config.param('tool_dir', :string)
} }
storage_ingestion = ! task["type_input"] storage_ingestion = ! task["type_input"]
@logger = LogManager.instance()
@dataset_utils = DatasetUtils.new(Dir.pwd) @dataset_utils = DatasetUtils.new(Dir.pwd)
task["user"], task["password"] = @dataset_utils.getCredentials(task["tool_dir"]) task["user"], task["password"] = @dataset_utils.getCredentials(task["tool_dir"])
if storage_ingestion
@wendelin = WendelinClient.new(task["erp5_base_url"], task["user"], task["password"])
@new_dataset_description = @dataset_utils.datasetDescriptionPendingFileExist(task["data_set"], task["tool_dir"])
@dataset_utils.setDatasetDescription(task["data_set"], task["tool_dir"], @wendelin) if @new_dataset_description
end
task_reports = yield(task) task_reports = yield(task)
next_config_diff = {} next_config_diff = {}
@logger = LogManager.instance()
if task_reports.length > 0 if task_reports.length > 0
@logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE) @logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
if storage_ingestion if storage_ingestion
......
require 'base64'
require 'net/http' require 'net/http'
require 'openssl' require 'openssl'
require 'yaml' require 'yaml'
...@@ -86,6 +87,25 @@ class WendelinClient ...@@ -86,6 +87,25 @@ class WendelinClient
end end
end end
def setDatasetDescription(data_set, description_text)
puts
@logger.info("Setting remote dataset description...", print=TRUE)
begin
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_setDatasetDescription?dataset=#{data_set}"))
encoded_description = Base64.encode64(description_text)
response = handleRequest(uri, reference=nil, data_chunk=encoded_description)
raise response["message"] if not response["success"]
dict = JSON.parse(response["message"])
raise dict["message"] if dict["status_code"] != 0
@logger.info(dict["message"], print=TRUE)
rescue Exception => e
puts
@logger.error("An error occurred while setting dataset description: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
end
puts
end
def ingest(reference, data_chunk, split) def ingest(reference, data_chunk, split)
@logger.info("Ingestion reference: #{reference}", print=TRUE) @logger.info("Ingestion reference: #{reference}", print=TRUE)
if split and Time.new - @last_ingestion < 3 if split and Time.new - @last_ingestion < 3
...@@ -180,7 +200,7 @@ class WendelinClient ...@@ -180,7 +200,7 @@ class WendelinClient
req.basic_auth @user, @password req.basic_auth @user, @password
if data_chunk != nil if data_chunk != nil
@logger.info("Setting request form data...", print=TRUE) @logger.info("Setting request form data...", print=TRUE) if reference != nil
begin begin
req.set_form_data('data_chunk' => data_chunk) req.set_form_data('data_chunk' => data_chunk)
rescue java.lang.OutOfMemoryError rescue java.lang.OutOfMemoryError
...@@ -191,7 +211,7 @@ class WendelinClient ...@@ -191,7 +211,7 @@ class WendelinClient
@logger.error(e.backtrace) @logger.error(e.backtrace)
return {"success"=>FALSE, "message"=>HTTP_MESSAGE_EXCEPTION} return {"success"=>FALSE, "message"=>HTTP_MESSAGE_EXCEPTION}
end end
@logger.info("Sending record:'#{reference}'...", print=TRUE) @logger.info("Sending record:'#{reference}'...", print=TRUE) if reference != nil
end end
begin begin
res = Net::HTTP.start(uri.hostname, uri.port, res = Net::HTTP.start(uri.hostname, uri.port,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment