ebulk: resume command on storage ingestions

parent c298caf0
...@@ -98,7 +98,7 @@ function checkParameters { ...@@ -98,7 +98,7 @@ function checkParameters {
else else
DATA_SET=$DATASET_DIR DATA_SET=$DATASET_DIR
fi fi
re='^[A-Za-z][_A-Za-z.0-9-]*$' re='^[_A-Za-z.0-9-]*$'
if ! [[ $DATA_SET =~ $re ]] ; then if ! [[ $DATA_SET =~ $re ]] ; then
if [ "$DATA_SET" = "." ] && [[ -z "$STORAGE" ]] ; then if [ "$DATA_SET" = "." ] && [[ -z "$STORAGE" ]] ; then
echo echo
...@@ -107,7 +107,7 @@ function checkParameters { ...@@ -107,7 +107,7 @@ function checkParameters {
else else
echo echo
echo -e "${ORANGE}[ERROR] Error in argument: invalid dataset name ${GREEN}'$DATA_SET'${ORANGE}.${NC}" echo -e "${ORANGE}[ERROR] Error in argument: invalid dataset name ${GREEN}'$DATA_SET'${ORANGE}.${NC}"
echo -e "${ORANGE}[ERROR] Dataset name must start with a letter, and only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed.${NC}" echo -e "${ORANGE}[ERROR] Only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed.${NC}"
echo echo
fi fi
helpReadme >&2; return 1 helpReadme >&2; return 1
...@@ -164,7 +164,8 @@ function updateConfigFile { ...@@ -164,7 +164,8 @@ function updateConfigFile {
sleep 1 sleep 1
fi fi
fi fi
DIFF="-c $EBULK_DATA_PATH/$STORAGE-$USER-$DATA_SET-diff.yml" DIFF_FILE="$EBULK_DATA_PATH/$STORAGE-$USER-$DATA_SET-diff.yml"
DIFF_COMMAND="-c $DIFF_FILE"
else else
$PARAMETER_FUNCTION $PARAMETER_FUNCTION
fi fi
...@@ -230,8 +231,11 @@ function runProcess { ...@@ -230,8 +231,11 @@ function runProcess {
if [ ! -d $LOG_DIR ]; then if [ ! -d $LOG_DIR ]; then
mkdir $LOG_DIR 2>/dev/null mkdir $LOG_DIR 2>/dev/null
fi fi
$embulk run -L $TOOL_PATH/embulk-wendelin-dataset-tool $FILE $DIFF 2> "$LOG_DIR/error.log" || { if [ -z "$RESUME_STORAGE_INGESTION" ]; then
if [ -z "$STATUS" ]; then rm -f ${DIFF_FILE} 2>/dev/null
fi
$embulk run -I $TOOL_PATH/embulk-wendelin-dataset-tool/lib $FILE $DIFF_COMMAND 2> "$LOG_DIR/error.log" || {
if [ "$STATUS" == \"\" ] ; then
echo echo
echo -e "${ORANGE}[ERROR] Embulk tool stopped its execution.${NC}" echo -e "${ORANGE}[ERROR] Embulk tool stopped its execution.${NC}"
if [ "$STORAGE" != \"\" ] ; then if [ "$STORAGE" != \"\" ] ; then
...@@ -317,21 +321,27 @@ function checkSoftware { ...@@ -317,21 +321,27 @@ function checkSoftware {
function checkStoragePlugin { function checkStoragePlugin {
if [ "$STORAGE_GEM" != "" ] ; then if [ "$STORAGE_GEM" != "" ] ; then
echo -n "[INFO] Checking if '$STORAGE' plugin is installed... " echo -n "[INFO] Checking if '$STORAGE' plugin is installed... "
$embulk gem list 2>/dev/null | grep -q "$STORAGE_GEM" 2>/dev/null if [ "$CUSTOM" = false ] ; then
if [ $? == 0 ]; then sleep 1
echo -e "${GREEN}OK${NC}" echo -e "${GREEN}OK${NC}"
sleep 1 sleep 1
else else
echo -e "${ORANGE}NO${NC}"
echo "[INFO] Installing '$STORAGE' plugin..."
$embulk gem install $STORAGE_GEM 2>/dev/null | grep -q "ERROR" 2>/dev/null
$embulk gem list 2>/dev/null | grep -q "$STORAGE_GEM" 2>/dev/null $embulk gem list 2>/dev/null | grep -q "$STORAGE_GEM" 2>/dev/null
if [ $? == 0 ]; then if [ $? == 0 ]; then
echo "[INFO] '$STORAGE' plugin successfully installed." echo -e "${GREEN}OK${NC}"
sleep 1
else else
echo -e "${ORANGE}[ERROR] Could not find a valid Embulk plugin (gem) '$STORAGE_GEM'.${NC}" echo -e "${ORANGE}NO${NC}"
exit echo "[INFO] Installing '$STORAGE' plugin..."
fi $embulk gem install $STORAGE_GEM 2>/dev/null | grep -q "ERROR" 2>/dev/null
$embulk gem list 2>/dev/null | grep -q "$STORAGE_GEM" 2>/dev/null
if [ $? == 0 ]; then
echo "[INFO] '$STORAGE' plugin successfully installed."
else
echo -e "${ORANGE}[ERROR] Could not find a valid Embulk plugin (gem) '$STORAGE_GEM'.${NC}"
exit
fi
fi
fi fi
fi fi
} }
...@@ -483,6 +493,8 @@ while [ "$1" != "" ]; do ...@@ -483,6 +493,8 @@ while [ "$1" != "" ]; do
;; ;;
-a | --advanced ) ADVANCED=true -a | --advanced ) ADVANCED=true
;; ;;
-rs | --resume ) RESUME_STORAGE_INGESTION=true
;;
-dc | --discard-changes ) DISCARD_CHANGES=true -dc | --discard-changes ) DISCARD_CHANGES=true
;; ;;
-c | --chunk ) shift -c | --chunk ) shift
......
...@@ -18,3 +18,6 @@ out: ...@@ -18,3 +18,6 @@ out:
erp5_url: $ING_URL erp5_url: $ING_URL
user: $USER user: $USER
password: $pwd password: $pwd
type_input: "filesystem"
data_set: $DATA_SET
erp5_base_url: $DOWN_URL
...@@ -30,6 +30,8 @@ out: ...@@ -30,6 +30,8 @@ out:
erp5_url: $ING_URL erp5_url: $ING_URL
user: $USER user: $USER
password: $pwd password: $pwd
data_set: $DATA_SET
erp5_base_url: $DOWN_URL
exec: exec:
max_threads: 1 max_threads: 1
......
...@@ -25,6 +25,8 @@ out: ...@@ -25,6 +25,8 @@ out:
erp5_url: $ING_URL erp5_url: $ING_URL
user: $USER user: $USER
password: $pwd password: $pwd
data_set: $DATA_SET
erp5_base_url: $DOWN_URL
exec: exec:
max_threads: 1 max_threads: 1
......
...@@ -27,6 +27,8 @@ out: ...@@ -27,6 +27,8 @@ out:
erp5_url: $ING_URL erp5_url: $ING_URL
user: $USER user: $USER
password: $pwd password: $pwd
data_set: $DATA_SET
erp5_base_url: $DOWN_URL
exec: exec:
max_threads: 1 max_threads: 1
......
...@@ -31,6 +31,8 @@ out: ...@@ -31,6 +31,8 @@ out:
erp5_url: $ING_URL erp5_url: $ING_URL
user: $USER user: $USER
password: $pwd password: $pwd
data_set: $DATA_SET
erp5_base_url: $DOWN_URL
exec: exec:
max_threads: 1 max_threads: 1
......
...@@ -14,12 +14,20 @@ module Embulk ...@@ -14,12 +14,20 @@ module Embulk
"user" => config.param("user", :string, defualt: nil), "user" => config.param("user", :string, defualt: nil),
"password" => config.param("password", :string, default: nil), "password" => config.param("password", :string, default: nil),
"path_prefix" => config.param("path_prefix", :string, :default => nil), "path_prefix" => config.param("path_prefix", :string, :default => nil),
"type_input" => config.param("type_input", :string, :default => nil),
"data_set" => config.param("data_set", :string, default: nil),
"erp5_base_url" => config.param("erp5_base_url", :string, default: nil)
} }
task_reports = yield(task) task_reports = yield(task)
next_config_diff = {} next_config_diff = {}
@logger = LogManager.instance() @logger = LogManager.instance()
if task_reports.length > 0 if task_reports.length > 0
@logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE) @logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
# if ingestion was done from a storage different than filesystem, increase dataset version
if not task["type_input"] and task["data_set"] and task["erp5_base_url"]
@wendelin = WendelinClient.new(task["erp5_base_url"], task["user"], task["password"])
@wendelin.increaseDatasetVersion(task["data_set"])
end
else else
@logger.info("No new files where processed for ingestion.", print=TRUE) @logger.info("No new files where processed for ingestion.", print=TRUE)
end end
......
...@@ -19,7 +19,7 @@ commands: ...@@ -19,7 +19,7 @@ commands:
argument: argument:
dataset argument Unique reference for the target dataset dataset argument Unique reference for the target dataset
If empty, current directory will be used as dataset directory and reference If empty, current directory will be used as dataset directory and reference
It must start with a letter, and only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed It can only contain alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - )
* For download, the reference must be one of the available datasets on the site * For download, the reference must be one of the available datasets on the site
* For ingestion, an existing reference will append the files to the corresponding dataset * For ingestion, an existing reference will append the files to the corresponding dataset
* A new reference will create a new dataset on the site * A new reference will create a new dataset on the site
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment