Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
E
ebulk
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Roque
ebulk
Commits
469eb486
Commit
469eb486
authored
Jul 09, 2019
by
Roque
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[WIP] http requests are done using curl command
parent
41070fa8
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
94 additions
and
77 deletions
+94
-77
ebulk-data/embulk-wendelin-dataset-tool/lib/embulk/wendelin_client.rb
...mbulk-wendelin-dataset-tool/lib/embulk/wendelin_client.rb
+94
-77
No files found.
ebulk-data/embulk-wendelin-dataset-tool/lib/embulk/wendelin_client.rb
View file @
469eb486
...
@@ -4,6 +4,7 @@ require 'openssl'
...
@@ -4,6 +4,7 @@ require 'openssl'
require
'yaml'
require
'yaml'
require
'open-uri'
require
'open-uri'
require_relative
'filelogger'
require_relative
'filelogger'
require
'open3'
# class representing a Wendelin client
# class representing a Wendelin client
class
WendelinClient
class
WendelinClient
...
@@ -24,6 +25,37 @@ class WendelinClient
...
@@ -24,6 +25,37 @@ class WendelinClient
@banned_references_list
=
[]
@banned_references_list
=
[]
@logger
=
LogManager
.
instance
()
@logger
=
LogManager
.
instance
()
@last_ingestion
=
Time
.
new
-
2
@last_ingestion
=
Time
.
new
-
2
begin
# check credentials
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
"
))
uri
=
URI
(
URI
.
escape
(
"
#{
uri
.
scheme
}
://
#{
uri
.
host
}
/erp5/getDataStreamList?data_set_reference="
))
scheme
=
uri
.
scheme
==
'https'
?
"https://"
:
"http://"
check_credential_url
=
uri
.
to_s
.
sub!
scheme
,
scheme
+
@user
+
":"
+
@password
+
"@"
command
=
"curl --request GET '"
+
check_credential_url
+
"'"
stdout
,
stderr
,
status
=
Open3
.
capture3
(
command
)
@logger
.
info
(
"Checking credentials..."
)
print
"[INFO] Checking credentials..."
if
status
.
success?
if
stdout
.
include?
"'"
stdout
=
stdout
.
gsub!
"'"
,
'"'
end
dict
=
JSON
.
parse
(
stdout
)
if
dict
[
"status_code"
]
==
1
@logger
.
error
(
dict
[
"error_message"
],
print
=
TRUE
)
@logger
.
abortExecution
()
end
@logger
.
info
(
"OK"
)
puts
"OK"
else
@logger
.
error
(
HTTP_MESSAGE_400
,
print
=
TRUE
)
@logger
.
error
(
stderr
)
@logger
.
abortExecution
()
end
rescue
Exception
=>
e
@logger
.
error
(
"HTTP ERROR: "
+
e
.
to_s
,
print
=
TRUE
)
@logger
.
error
(
e
.
backtrace
)
@logger
.
abortExecution
()
end
end
end
def
checkReferenceChars
(
reference
)
def
checkReferenceChars
(
reference
)
...
@@ -36,7 +68,9 @@ class WendelinClient
...
@@ -36,7 +68,9 @@ class WendelinClient
checkReferenceChars
(
reference
)
checkReferenceChars
(
reference
)
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ingestionReferenceExists?reference=
#{
reference
}
"
))
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ingestionReferenceExists?reference=
#{
reference
}
"
))
begin
begin
res
=
open
(
uri
,
http_basic_authentication:
[
@user
,
@password
]).
read
response
=
handleRequest
(
uri
,
nil
,
nil
,
non_json_content
=
true
)
res
=
response
[
"message"
]
if
response
[
"success"
]
==
TRUE
raise
response
[
"message"
]
if
response
[
"success"
]
==
FALSE
rescue
Exception
=>
e
rescue
Exception
=>
e
@logger
.
error
(
"An error occurred while checking if reference exists: "
+
e
.
to_s
)
@logger
.
error
(
"An error occurred while checking if reference exists: "
+
e
.
to_s
)
@logger
.
error
(
e
.
backtrace
)
@logger
.
error
(
e
.
backtrace
)
...
@@ -53,7 +87,7 @@ class WendelinClient
...
@@ -53,7 +87,7 @@ class WendelinClient
@logger
.
info
(
"Deletion requested for reference
#{
reference
}
"
,
print
=
TRUE
)
@logger
.
info
(
"Deletion requested for reference
#{
reference
}
"
,
print
=
TRUE
)
checkReferenceChars
(
reference
)
checkReferenceChars
(
reference
)
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ERP5Site_invalidateIngestionObjects?reference=
#{
reference
}
"
))
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ERP5Site_invalidateIngestionObjects?reference=
#{
reference
}
"
))
res
=
handleRequest
(
uri
)
res
=
handleRequest
(
uri
,
nil
,
nil
,
non_json_content
=
true
)
if
res
[
"success"
]
==
FALSE
if
res
[
"success"
]
==
FALSE
@logger
.
abortExecution
()
@logger
.
abortExecution
()
end
end
...
@@ -65,7 +99,7 @@ class WendelinClient
...
@@ -65,7 +99,7 @@ class WendelinClient
checkReferenceChars
(
reference
)
checkReferenceChars
(
reference
)
checkReferenceChars
(
new_reference
)
checkReferenceChars
(
new_reference
)
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ERP5Site_renameIngestion?reference=
#{
reference
}
&new_reference=
#{
new_reference
}
"
))
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ERP5Site_renameIngestion?reference=
#{
reference
}
&new_reference=
#{
new_reference
}
"
))
res
=
handleRequest
(
uri
)
res
=
handleRequest
(
uri
,
nil
,
nil
,
non_json_content
=
true
)
if
res
[
"success"
]
==
FALSE
if
res
[
"success"
]
==
FALSE
@logger
.
abortExecution
()
@logger
.
abortExecution
()
end
end
...
@@ -78,8 +112,9 @@ class WendelinClient
...
@@ -78,8 +112,9 @@ class WendelinClient
else
else
@logger
.
info
(
"Increasing dataset version"
)
@logger
.
info
(
"Increasing dataset version"
)
begin
begin
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ERP5Site_increaseDatasetVersion?reference=
#{
reference
}
"
))
uri
=
URI
(
URI
.
escape
(
"
#{
@erp5_url
}
/ERP5Site_increaseDatasetVersion?reference=
#{
reference
}
"
))
res
=
open
(
uri
,
http_basic_authentication:
[
@user
,
@password
]).
read
response
=
handleRequest
(
uri
,
nil
,
nil
,
non_json_content
=
true
)
raise
response
[
"message"
]
if
response
[
"success"
]
==
FALSE
rescue
Exception
=>
e
rescue
Exception
=>
e
@logger
.
error
(
"An error occurred while increasing dataset version: "
+
e
.
to_s
)
@logger
.
error
(
"An error occurred while increasing dataset version: "
+
e
.
to_s
)
@logger
.
error
(
e
.
backtrace
)
@logger
.
error
(
e
.
backtrace
)
...
@@ -150,31 +185,27 @@ class WendelinClient
...
@@ -150,31 +185,27 @@ class WendelinClient
success
=
FALSE
success
=
FALSE
n_retry
=
0
n_retry
=
0
while
!
success
&&
n_retry
<
10
while
!
success
&&
n_retry
<
10
begin
response
=
handleRequest
(
uri
,
nil
,
nil
,
non_json_content
=
true
)
res
=
open
(
uri
,
http_basic_authentication:
[
@user
,
@password
])
{
if
response
[
"success"
]
==
TRUE
|
content
|
chunk
=
response
[
"message"
]
chunk
=
content
.
read
()
if
chunk
.
nil?
||
chunk
.
empty?
if
chunk
.
nil?
||
chunk
.
empty?
if
first
if
first
yield
chunk
yield
chunk
end
end
@logger
.
info
(
"Done"
,
print
=
TRUE
)
@logger
.
info
(
"Done"
,
print
=
TRUE
)
done
=
TRUE
done
=
TRUE
else
else
first
=
FALSE
first
=
FALSE
n_part
+=
1
n_part
+=
1
yield
chunk
yield
chunk
end
end
success
=
TRUE
}
else
success
=
TRUE
@logger
.
error
(
"Error downloading data: "
+
response
[
"message"
],
print
=
TRUE
)
rescue
Exception
=>
e
n_retry
+=
1
exception
=
e
@logger
.
info
(
"Retrying
#{
n_retry
}
/10..."
,
print
=
TRUE
)
@logger
.
error
(
"Error downloading data: "
+
e
.
to_s
,
print
=
TRUE
)
sleep
30
@logger
.
error
(
e
.
backtrace
)
end
n_retry
+=
1
@logger
.
info
(
"Retrying
#{
n_retry
}
/10..."
,
print
=
TRUE
)
sleep
30
end
end
end
raise
exception
if
not
success
raise
exception
if
not
success
end
end
...
@@ -195,55 +226,41 @@ class WendelinClient
...
@@ -195,55 +226,41 @@ class WendelinClient
end
end
private
private
def
handleRequest
(
uri
,
reference
=
nil
,
data_chunk
=
nil
)
def
handleRequest
(
uri
,
reference
=
nil
,
data_chunk
=
nil
,
non_json_content
=
false
)
req
=
Net
::
HTTP
::
Post
.
new
(
uri
)
scheme
=
uri
.
scheme
==
'https'
?
"https://"
:
"http://"
req
.
basic_auth
@user
,
@password
url
=
uri
.
to_s
.
sub!
scheme
,
scheme
+
@user
+
":"
+
@password
+
"@"
begin
command
=
"curl --request POST '"
+
url
+
"'"
if
data_chunk
!=
nil
if
data_chunk
!=
nil
@logger
.
info
(
"Setting request form data..."
,
print
=
TRUE
)
if
reference
!=
nil
command
+=
" --data 'data_chunk="
+
data_chunk
+
"'"
begin
req
.
set_form_data
(
'data_chunk'
=>
data_chunk
)
rescue
java
.
lang
.
OutOfMemoryError
@logger
.
logOutOfMemoryError
(
reference
)
return
{
"success"
=>
FALSE
,
"message"
=>
HTTP_MEMORY_ERROR
}
rescue
Exception
=>
e
@logger
.
error
(
"Error setting form data: "
+
e
.
to_s
,
print
=
TRUE
)
@logger
.
error
(
e
.
backtrace
)
return
{
"success"
=>
FALSE
,
"message"
=>
HTTP_MESSAGE_EXCEPTION
}
end
@logger
.
info
(
"Sending record:'
#{
reference
}
'..."
,
print
=
TRUE
)
if
reference
!=
nil
end
end
begin
stdout
,
stderr
,
status
=
Open3
.
capture3
(
command
)
#) + " >&2")
res
=
Net
::
HTTP
.
start
(
uri
.
hostname
,
uri
.
port
,
if
status
.
success?
:use_ssl
=>
(
uri
.
scheme
==
'https'
),
# check response (if ok, it should be empty or json)
:verify_mode
=>
OpenSSL
::
SSL
::
VERIFY_NONE
,
if
not
stdout
.
nil?
and
not
stdout
.
empty?
:ssl_timeout
=>
300
,
:open_timeout
=>
300
,
:read_timeout
=>
300
,
begin
)
do
|
http
|
if
not
non_json_content
http
.
request
(
req
)
if
stdout
.
include?
"'"
end
stdout
=
stdout
.
gsub!
"'"
,
'"'
rescue
Exception
=>
e
end
@logger
.
error
(
"HTTP ERROR: "
+
e
.
to_s
,
print
=
TRUE
)
dict
=
JSON
.
parse
(
stdout
)
@logger
.
error
(
e
.
backtrace
)
end
return
{
"success"
=>
FALSE
,
"message"
=>
HTTP_MESSAGE_EXCEPTION
}
rescue
Exception
=>
e
@logger
.
error
(
"EXCEPTION: "
+
e
.
to_s
,
print
=
TRUE
)
@logger
.
error
(
HTTP_MESSAGE_400
,
print
=
TRUE
)
@logger
.
error
(
stdout
)
end
end
@logger
.
info
(
"Done"
)
return
{
"success"
=>
TRUE
,
"message"
=>
stdout
}
else
else
if
res
.
kind_of?
(
Net
::
HTTPSuccess
)
# res.code is 2XX
@logger
.
error
(
HTTP_MESSAGE_OTHER
,
print
=
TRUE
)
@logger
.
info
(
"Done"
)
@logger
.
error
(
stderr
)
return
{
"success"
=>
TRUE
,
"message"
=>
res
.
body
}
end
else
rescue
Exception
=>
e
@logger
.
error
(
"HTTP FAIL - code:
#{
res
.
code
}
"
,
print
=
TRUE
)
@logger
.
error
(
"HTTP ERROR: "
+
e
.
to_s
,
print
=
TRUE
)
if
res
.
code
==
'500'
or
res
.
code
==
'502'
or
res
.
code
==
'503'
@logger
.
error
(
e
.
backtrace
)
@logger
.
error
(
HTTP_MESSAGE_5XX
,
print
=
TRUE
)
return
{
"success"
=>
FALSE
,
"message"
=>
e
.
to_s
}
elsif
res
.
code
==
'401'
end
@logger
.
error
(
HTTP_MESSAGE_401
,
print
=
TRUE
)
@logger
.
abortExecution
()
elsif
res
.
code
==
'400'
@logger
.
error
(
HTTP_MESSAGE_400
,
print
=
TRUE
)
@logger
.
abortExecution
()
else
@logger
.
error
(
HTTP_MESSAGE_OTHER
,
print
=
TRUE
)
end
return
{
"success"
=>
FALSE
,
"message"
=>
HTTP_MESSAGE_NOT_2XX
}
end
end
end
end
end
end
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment