Commit df5b2c4a authored by Roque's avatar Roque

ebulk uses new wendelin scripts to get remote data streams count and file list in batches

parent ac0ff279
......@@ -125,7 +125,12 @@ module Embulk
@logger.abortExecution(error=FALSE)
end
@logger.info("Checking remote dataset...", print=TRUE)
data_stream_dict = @wendelin.getDataStreams(task['data_set'])
data_stream_count = @wendelin.getDataStreamCount(@data_set)
if data_stream_count["status_code"] != 0
@logger.error(data_stream_count["error_message"], print=TRUE)
@logger.abortExecution()
end
data_stream_dict = @wendelin.getDataStreamList(@data_set, data_stream_count["result"], 1000)
if data_stream_dict["status_code"] != 0
@logger.error(data_stream_dict["error_message"], print=TRUE)
@logger.abortExecution()
......
......@@ -101,6 +101,7 @@ module Embulk
@logger.setFilename(@tool_dir, "download")
@erp5_url = config.param('erp5_url', :string)
@data_set = config.param('data_set', :string)
@batch_size= config.param('batch_size', :integer)
@logger.info("Dataset name: #{@data_set}")
@output_path = config.param("output_path", :string, :default => nil)
if not File.directory?(@output_path)
......@@ -137,12 +138,18 @@ module Embulk
task['user'], task['password'] = @dataset_utils.getCredentials(@tool_dir)
@wendelin = WendelinClient.new(@erp5_url, task['user'], task['password'])
@logger.info("Getting remote file list from dataset '#{@data_set}'...", print=TRUE)
data_stream_list = @wendelin.getDataStreams(@data_set)
if data_stream_list["status_code"] == 0
if data_stream_list["result"].empty?
data_stream_count = @wendelin.getDataStreamCount(@data_set)
if data_stream_count["status_code"] == 0
if data_stream_count["result"] == 0
@logger.error("No valid data found for data set " + @data_set, print=TRUE)
@logger.abortExecution(error=FALSE)
end
else
@logger.error(data_stream_count["error_message"], print=TRUE)
@logger.abortExecution()
end
data_stream_list = @wendelin.getDataStreamList(@data_set, data_stream_count["result"], @batch_size)
if data_stream_list["status_code"] == 0
task['data_streams'] = data_stream_list["result"]
else
@logger.error(data_stream_list["error_message"], print=TRUE)
......
......@@ -184,6 +184,42 @@ class WendelinClient
end
end
def getDataStreamList(data_set_reference, data_stream_count, batch_size)
data_stream_list = []
total_batches = data_stream_count/batch_size.ceil()
if total_batches > 4
puts
@logger.warn("Due to its size, the remote file list will be fetch in batches. This could take some minutes.", print=TRUE)
end
nbatch = 0
while nbatch <= total_batches
batch_dict = getDataStreamBatch(data_set_reference, batch_size*nbatch, batch_size)
if batch_dict["status_code"] != 0
return batch_dict
end
data_stream_list += batch_dict["result"]
nbatch += 1
end
return_dict = Hash.new
return_dict["status_code"] = 0
return_dict["result"] = data_stream_list
return return_dict
end
def getDataStreamBatch(data_set_reference, offset, batch_size)
uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamList?data_set_reference=#{data_set_reference}&offset=#{offset}&batch_size=#{batch_size}"))
response = handleRequest(uri)
if response["success"] == FALSE
@logger.abortExecution()
end
str = response["message"]
if not str.nil?
str.gsub!(/(\,)(\S)/, "\\1 \\2")
return YAML::load(str)
end
return {'status_code': 0, 'result': []}
end
def getDataStreams(data_set_reference)
uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamList?data_set_reference=#{data_set_reference}"))
response = handleRequest(uri)
......@@ -198,6 +234,21 @@ class WendelinClient
return {'status_code': 0, 'result': []}
end
def getDataStreamCount(data_set_reference)
uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamCount?data_set_reference=#{data_set_reference}"))
response = handleRequest(uri)
if response["success"] == FALSE
@logger.abortExecution()
end
str = response["message"]
if not str.nil?
str.gsub!(/(\,)(\S)/, "\\1 \\2")
return YAML::load(str)
end
return {'status_code': 0, 'result': 0}
end
private
def handleRequest(uri, reference=nil, data_chunk=nil)
req = Net::HTTP::Post.new(uri)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment