Commit ae5730ef authored by Eteri's avatar Eteri

add mongodb input storage

parent d4df66b8
#! /usr/bin/env bash #! /usr/bin/env bash
DATA_LAKE_URL='https://wendelin.io/'
DATA_LAKE_URL='https://softinst127133.host.vifib.net/erp5/'
DOWN_URL="$DATA_LAKE_URL/" DOWN_URL="$DATA_LAKE_URL/"
ING_URL="$DATA_LAKE_URL/portal_ingestion_policies/wendelin_embulk" ING_URL="$DATA_LAKE_URL/portal_ingestion_policies/wendelin_embulk"
...@@ -31,6 +32,8 @@ HTTP_ING_FILE="$EBULK_DATA_PATH/ingestion-http-config.yml" ...@@ -31,6 +32,8 @@ HTTP_ING_FILE="$EBULK_DATA_PATH/ingestion-http-config.yml"
HTTP_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-http-config_template.yml" HTTP_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-http-config_template.yml"
FTP_ING_FILE="$EBULK_DATA_PATH/ingestion-ftp-config.yml" FTP_ING_FILE="$EBULK_DATA_PATH/ingestion-ftp-config.yml"
FTP_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-ftp-config_template.yml" FTP_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-ftp-config_template.yml"
MONGODB_ING_FILE="$EBULK_DATA_PATH/ingestion-mongodb-config.yml"
MONGODB_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-mongodb-config_template.yml"
GREEN='\033[0;32m' GREEN='\033[0;32m'
ORANGE='\033[0;33m' ORANGE='\033[0;33m'
NC='\033[0m' NC='\033[0m'
...@@ -278,6 +281,9 @@ function updateConfigFile { ...@@ -278,6 +281,9 @@ function updateConfigFile {
FTP_HOST=\"$FTP_HOST\" FTP_HOST=\"$FTP_HOST\"
FTP_USER=\"$FTP_USER\" FTP_USER=\"$FTP_USER\"
FTP_PASSWORD=\"$FTP_PASSWORD\" FTP_PASSWORD=\"$FTP_PASSWORD\"
MONGODB_URI=\"$MONGODB_URI\"
MONGODB_COLLECTION=\"$MONGODB_COLLECTION\"
template="$(cat ${TEMPLATE_FILE})" template="$(cat ${TEMPLATE_FILE})"
rm -f ${FILE} rm -f ${FILE}
...@@ -511,6 +517,25 @@ function askFTPparameters { ...@@ -511,6 +517,25 @@ function askFTPparameters {
fi fi
} }
function askMONGODBparameters {
echo "MONGODB Host:"
echo "* change this(e.g. ftp.aaa.bbb.gov) *"
read -e MONGODB_URI
if [ "$MONGODB_URI" = "" ] ; then
echo -e "${ORANGE}[ERROR] Empty host.${NC}"
exit
fi
MONGODB_URI="${MONGODB_URI/}"
#if [[ $MONGODB_URI == *"/"* ]]; then
# echo -e "${ORANGE}[ERROR] CHANGE THIS Please, enter only the ftp host, without '/' or path. Path will be requested after.${NC}"
# exit
# fi
echo "MONGODB Collection:"
echo "* you can leave this input empty and anonymous authentication will be used *"
read -e MONGODB_COLLECTION
}
function askS3parameters { function askS3parameters {
S3_AUTH_METHOD="basic" S3_AUTH_METHOD="basic"
echo "Bucket name:" echo "Bucket name:"
...@@ -770,9 +795,14 @@ case $OPERATION in ...@@ -770,9 +795,14 @@ case $OPERATION in
PARAMETER_FUNCTION=askFTPparameters PARAMETER_FUNCTION=askFTPparameters
STORAGE_GEM=embulk-input-ftp STORAGE_GEM=embulk-input-ftp
;; ;;
mongodb ) FILE=$MONGODB_ING_FILE
TEMPLATE_FILE=$MONGODB_ING_TEMPLATE_FILE
PARAMETER_FUNCTION=askMONGODBparameters
STORAGE_GEM=embulk-input-mongodb
;;
*) echo -e "${ORANGE}[ERROR] '$STORAGE' storage is not available in ebulk tool yet or it is not a valid storage.${NC}" *) echo -e "${ORANGE}[ERROR] '$STORAGE' storage is not available in ebulk tool yet or it is not a valid storage.${NC}"
echo "[INFO] If you want to configure yourself this storage, you can run the tool with parameter --custom-storage" echo "[INFO] If you want to configure yourself this storage, you can run the tool with parameter --custom-storage"
echo "[INFO] Current Ebulk version has the following storages available: ftp, http, s3." echo "[INFO] Current Ebulk version has the following storages available: ftp, http, s3, mongodb."
echo echo
exit exit
esac esac
......
# FTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR FTP STORAGE
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: mongodb
uri: mongodb://localhost:27017/eteri_db
collection: "myCollection"
#ssl_verify: false
#port: 21
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
out:
type: wendelin
tag: "my_tag"
extension: "json"
erp5_url: $ING_URL
tool_dir: $TOOL_DIR
data_set: $DATA_SET
erp5_base_url: $DOWN_URL
exec:
max_threads: 1
min_output_tasks: 1
...@@ -11,6 +11,8 @@ module Embulk ...@@ -11,6 +11,8 @@ module Embulk
def self.transaction(config, schema, count, &control) def self.transaction(config, schema, count, &control)
task = { task = {
"erp5_url" => config.param("erp5_url", :string), "erp5_url" => config.param("erp5_url", :string),
"tag" => config.param("tag", :string, :default => nil),
"extension" => config.param("extension", :string, :default => nil),
"path_prefix" => config.param("path_prefix", :string, :default => nil), "path_prefix" => config.param("path_prefix", :string, :default => nil),
"type_input" => config.param("type_input", :string, :default => nil), "type_input" => config.param("type_input", :string, :default => nil),
"data_set" => config.param("data_set", :string, default: nil), "data_set" => config.param("data_set", :string, default: nil),
...@@ -22,7 +24,7 @@ module Embulk ...@@ -22,7 +24,7 @@ module Embulk
@dataset_utils = DatasetUtils.new(Dir.pwd) @dataset_utils = DatasetUtils.new(Dir.pwd)
task["user"], task["password"] = @dataset_utils.getCredentials(task["tool_dir"]) task["user"], task["password"] = @dataset_utils.getCredentials(task["tool_dir"])
if storage_ingestion if storage_ingestion
@wendelin = WendelinClient.new(task["erp5_base_url"], task["user"], task["password"]) @wendelin = WendelinClient.new(task["erp5_base_url"], task["user"], task["password"], task["tag"], task["extension"])
@new_dataset_description = @dataset_utils.datasetDescriptionPendingFileExist(task["data_set"], task["tool_dir"]) @new_dataset_description = @dataset_utils.datasetDescriptionPendingFileExist(task["data_set"], task["tool_dir"])
@dataset_utils.setDatasetDescription(task["data_set"], task["tool_dir"], @wendelin) if @new_dataset_description @dataset_utils.setDatasetDescription(task["data_set"], task["tool_dir"], @wendelin) if @new_dataset_description
end end
...@@ -36,7 +38,7 @@ module Embulk ...@@ -36,7 +38,7 @@ module Embulk
@logger.info("#{done_tasks.length} new file(s) ingested.", print=TRUE) @logger.info("#{done_tasks.length} new file(s) ingested.", print=TRUE)
if(done_tasks.length == count) if(done_tasks.length == count)
@logger.info("Dataset successfully ingested.", print=TRUE) @logger.info("Dataset successfully ingested.", print=TRUE)
@wendelin = WendelinClient.new(task["erp5_base_url"], task["user"], task["password"]) @wendelin = WendelinClient.new(task["erp5_base_url"], task["user"], task["password"], task["tag"], task["extension"])
@wendelin.increaseDatasetVersion(task["data_set"]) @wendelin.increaseDatasetVersion(task["data_set"])
else else
failed_tasks = task_reports.map{|hash| hash[DatasetUtils::RUN_ERROR] || hash[DatasetUtils::RUN_ABORTED] }.flatten.compact failed_tasks = task_reports.map{|hash| hash[DatasetUtils::RUN_ERROR] || hash[DatasetUtils::RUN_ABORTED] }.flatten.compact
...@@ -55,21 +57,28 @@ module Embulk ...@@ -55,21 +57,28 @@ module Embulk
def init def init
credentials = {} credentials = {}
@erp5_url = task["erp5_url"] @erp5_url = task["erp5_url"]
@tag = task["tag"]
@extension = task["extension"]
@logger = LogManager.instance() @logger = LogManager.instance()
@wendelin = WendelinClient.new(@erp5_url, task["user"], task["password"])
@wendelin = WendelinClient.new(@erp5_url, task["user"], task["password"], task["tag"],task["extension"])
end end
def add(page) def add(page)
page.each do |record| page.each do |record|
@return_value = DatasetUtils::RUN_ERROR @return_value = DatasetUtils::RUN_ERROR
supplier = (record[0].nil? || record[0].empty?) ? "default" : record[0] supplier = (record[0].nil? || record[0].empty?) ? "default" : record[0]
dataset = (record[1].nil? || record[1].empty?) ? "default" : record[1] dataset = (record[1].nil? || record[1].empty?) ? "default" : record[1]
filename = record[2] filename = record[2]
extension = record[3] extension = record[3]
eof = record[5] eof = record[5]
data_chunk = record[4] data_chunk = record[4]
size = record[6] size = record[6]
hash = record[7] hash = record[7]
begin begin
if eof == DatasetUtils::DELETE if eof == DatasetUtils::DELETE
@reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR) @reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
...@@ -77,7 +86,25 @@ module Embulk ...@@ -77,7 +86,25 @@ module Embulk
elsif eof == DatasetUtils::RENAME elsif eof == DatasetUtils::RENAME
@reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR) @reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.rename(@reference, record[4].to_s) @wendelin.rename(@reference, record[4].to_s)
else
elsif @tag != nil
dataset = @tag
supplier = "default"
filename = record[0]["_id"]
extension = @extension
data_chunk = record[0].to_s
size = record[0].to_s.bytesize
hash = Digest::SHA256.hexdigest record[0].to_s
@reference = [supplier, dataset, filename, extension, eof, size, hash].join(DatasetUtils::REFERENCE_SEPARATOR)
split = ""
ingestion_response = @wendelin.ingest(@reference, data_chunk, split)
if not ingestion_response["success"]
raise ingestion_response["message"]
end
else
@reference = [supplier, dataset, filename, extension, eof, size, hash].join(DatasetUtils::REFERENCE_SEPARATOR) @reference = [supplier, dataset, filename, extension, eof, size, hash].join(DatasetUtils::REFERENCE_SEPARATOR)
split = eof != "" split = eof != ""
ingestion_response = @wendelin.ingest(@reference, data_chunk, split) ingestion_response = @wendelin.ingest(@reference, data_chunk, split)
......
...@@ -17,10 +17,12 @@ class WendelinClient ...@@ -17,10 +17,12 @@ class WendelinClient
HTTP_MEMORY_ERROR = "MEMORY-ERROR" HTTP_MEMORY_ERROR = "MEMORY-ERROR"
HTTP_REFERENCE_EXIST = "REFERENCE-EXIST" HTTP_REFERENCE_EXIST = "REFERENCE-EXIST"
def initialize(erp5_url, user, password) def initialize(erp5_url, user, password, tag, extension)
@erp5_url = erp5_url @erp5_url = erp5_url
@user = user @user = user
@password = password @password = password
@tag = tag
@extension = extension
@banned_references_list = [] @banned_references_list = []
@logger = LogManager.instance() @logger = LogManager.instance()
@last_ingestion = Time.new - 2 @last_ingestion = Time.new - 2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment