Commit 67e761e4 authored by Eteri's avatar Eteri Committed by Klaus Wölfel

fluent-plugin-wendelin: add keep_alive as an option

parent 3bbc0908
...@@ -20,6 +20,7 @@ A sample configuration to setup ingestion of data for a tag to Wendelin: ...@@ -20,6 +20,7 @@ A sample configuration to setup ingestion of data for a tag to Wendelin:
streamtool_uri <Wendelin_URL>/erp5/portal_ingestion_policies/<YOUR_INGESTION_POLICY_ID> streamtool_uri <Wendelin_URL>/erp5/portal_ingestion_policies/<YOUR_INGESTION_POLICY_ID>
user <your_wendelin_user> user <your_wendelin_user>
password <your_wendelin_password> password <your_wendelin_password>
use_keep_alive <true/false>
# all parameters of BufferedOutput & Output classes are supported too, e.g. # all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see # `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
user <your_wendelin_user> user <your_wendelin_user>
password <your_wendelin_password> password <your_wendelin_password>
use_keep_alive true
# all parameters of BufferedOutput & Output classes are supported too, e.g. # all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see # `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
# their code near: # their code near:
......
...@@ -38,6 +38,9 @@ module Fluent ...@@ -38,6 +38,9 @@ module Fluent
config_param :user, :string, :default => nil config_param :user, :string, :default => nil
config_param :password, :string, :default => nil config_param :password, :string, :default => nil
config_param :use_keep_alive, :bool, :default => true
def configure(conf) def configure(conf)
super super
...@@ -80,8 +83,12 @@ module Fluent ...@@ -80,8 +83,12 @@ module Fluent
# further on server by Wendelin # further on server by Wendelin
reference = tag reference = tag
if @use_keep_alive
@wendelin.ingest_with_keep_alive(reference, data_chunk)
else
@wendelin.ingest(reference, data_chunk) @wendelin.ingest(reference, data_chunk)
end end
end
end end
......
...@@ -20,6 +20,7 @@ require 'openssl' ...@@ -20,6 +20,7 @@ require 'openssl'
# class representing a Wendelin client # class representing a Wendelin client
class WendelinClient class WendelinClient
# `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint" # `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
# `credentials` # {'user' => _, 'password' => _} TODO change to certificate # `credentials` # {'user' => _, 'password' => _} TODO change to certificate
# `log` - logger to use # `log` - logger to use
...@@ -27,36 +28,41 @@ class WendelinClient ...@@ -27,36 +28,41 @@ class WendelinClient
@streamtool_uri = streamtool_uri @streamtool_uri = streamtool_uri
@credentials = credentials @credentials = credentials
@log = log @log = log
# @stop_timer = 0
end end
# start request in an independent function to keep the connection open # start request in an independent function to keep the connection open
def start_request(uri) def start_request(uri)
puts ' START NEW REQUEST'
puts " START NEW REQUEST"
@http = Net::HTTP.start(uri.hostname, uri.port, @http = Net::HTTP.start(uri.hostname, uri.port,
use_ssl: (uri.scheme == 'https'), :use_ssl => (uri.scheme == 'https'),
verify_mode: OpenSSL::SSL::VERIFY_NONE, :verify_mode => OpenSSL::SSL::VERIFY_NONE,
# Net::HTTP default open timeout is infinity, which results # Net::HTTP default open timeout is infinity, which results
# in thread hang forever if other side does not fully # in thread hang forever if other side does not fully
# establish connection. Default read_timeout is 60 seconds. # establish connection. Default read_timeout is 60 seconds.
# We go safe way and make sure all timeouts are defined. # We go safe way and make sure all timeouts are defined.
ssl_timeout: 60, :ssl_timeout => 60,
open_timeout: 60, :open_timeout => 60,
read_timeout: 60, :read_timeout => 60,
keep_alive_timeout: 60) :keep_alive_timeout => 60,)
end end
# ingest `data_chunk` to a stream referenced as `reference` # ingest `data_chunk` to a stream referenced as `reference`
def ingest(reference, data_chunk) def ingest_with_keep_alive(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
puts 'uri = ' puts "uri = "
puts uri puts uri
# call start_request if request is undefined # call start_request if request is undefined
@request ||= start_request(uri) @request ||= start_request(uri)
# connect again if the connection is not started # connect again if the connection is not started
start_request(uri) unless @http.started? if ! @http.started?()
start_request(uri)
end
@request = Net::HTTP::Post.new(uri) @request = Net::HTTP::Post.new(uri)
...@@ -66,7 +72,7 @@ class WendelinClient ...@@ -66,7 +72,7 @@ class WendelinClient
@request.body = data_chunk @request.body = data_chunk
@request.content_type = 'application/octet-stream' @request.content_type = 'application/octet-stream'
if @credentials.key?('user') if @credentials.has_key?('user')
@request.basic_auth @credentials['user'], @credentials['password'] @request.basic_auth @credentials['user'], @credentials['password']
end end
...@@ -77,32 +83,102 @@ class WendelinClient ...@@ -77,32 +83,102 @@ class WendelinClient
@log.trace "uri\t=> #{@request.uri}" @log.trace "uri\t=> #{@request.uri}"
@log.trace "body\t=> #{@request.body}" @log.trace "body\t=> #{@request.body}"
@log.trace "body_stream\t=> #{@request.body_stream}" @log.trace "body_stream\t=> #{@request.body_stream}"
@request.each { |h| @log.trace "#{h}:\t#{@request[h]}" } @request.each {|h| @log.trace "#{h}:\t#{@request[h]}"}
@log.trace @log.trace
end end
begin begin
res = @http.request(@request) # Net::HTTPResponse object res = @http.request(@request) # Net::HTTPResponse object
end end
rescue StandardError
rescue
# some http/ssl/other connection error # some http/ssl/other connection error
@log.warn 'HTTP ERROR:' @log.warn "HTTP ERROR:"
raise raise
else else
@log.on_trace do @log.on_trace do
@log.trace '>>> RESPONSE' @log.trace '>>> RESPONSE'
res.each { |h| @log.trace "#{h}:\t#{res[h]}" } res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
@log.trace "code\t=> #{res.code}" @log.trace "code\t=> #{res.code}"
@log.trace "msg\t=> #{res.message}" @log.trace "msg\t=> #{res.message}"
@log.trace "class\t=> #{res.class}" @log.trace "class\t=> #{res.class}"
@log.trace 'body:', res.body @log.trace "body:", res.body
end end
if res.is_a?(Net::HTTPSuccess) # res.code is 2XX if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
# @log.info "ingested ok" #@log.info "ingested ok"
else else
@log.warn 'FAIL:' @log.warn "FAIL:"
res.value res.value
end end
end end
def ingest(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
req = Net::HTTP::Post.new(uri)
if @credentials.has_key?('user')
req.basic_auth @credentials['user'], @credentials['password']
end
# When using 'application/x-www-form-urlencoded', Ruby encodes with regex
# and it is far too slow. Such POST is legit:
# https://stackoverflow.com/a/14710450
req.body = data_chunk
req.content_type = 'application/octet-stream'
@log.on_trace do
@log.trace '>>> REQUEST'
@log.trace "method\t=> #{req.method}"
@log.trace "path\t=> #{req.path}"
@log.trace "uri\t=> #{req.uri}"
@log.trace "body\t=> #{req.body}"
@log.trace "body_stream\t=> #{req.body_stream}"
req.each {|h| @log.trace "#{h}:\t#{req[h]}"}
@log.trace
end
begin
# TODO keep connection open (so that every new ingest does not do
# full connect again)
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
# NOTE = "do not check server cert"
# TODO move this out to conf parameters
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
# Net::HTTP default open timeout is infinity, which results
# in thread hang forever if other side does not fully
# establish connection. Default read_timeout is 60 seconds.
# We go safe way and make sure all timeouts are defined.
:ssl_timeout => 60,
:open_timeout => 60,
:read_timeout => 60,
) do |http|
http.request(req)
end
rescue
# some http/ssl/other connection error
@log.warn "HTTP ERROR:"
raise
else
@log.on_trace do
@log.trace '>>> RESPONSE'
res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
@log.trace "code\t=> #{res.code}"
@log.trace "msg\t=> #{res.message}"
@log.trace "class\t=> #{res.class}"
@log.trace "body:", res.body
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#@log.info "ingested ok"
else
@log.warn "FAIL:"
res.value
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment