Commit 78a758bb authored by Eteri's avatar Eteri Committed by Klaus Wölfel

fluent-plugin-wendelin: fix configuration parameters, check for closed connections

parent 72f1a151
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
require 'fluent/output' require 'fluent/output'
require_relative 'wendelin_client' require_relative 'wendelin_client'
module Fluent module Fluent
class WendelinOutput < ObjectBufferedOutput # XXX verify base class class WendelinOutput < ObjectBufferedOutput # XXX verify base class
...@@ -37,8 +38,6 @@ module Fluent ...@@ -37,8 +38,6 @@ module Fluent
config_param :user, :string, :default => nil config_param :user, :string, :default => nil
config_param :password, :string, :default => nil config_param :password, :string, :default => nil
config_param :reference, :string
def configure(conf) def configure(conf)
super super
...@@ -48,7 +47,7 @@ module Fluent ...@@ -48,7 +47,7 @@ module Fluent
credentials['user'] = @user credentials['user'] = @user
credentials['password'] = @password credentials['password'] = @password
end end
@wendelin = WendelinClient.new(@streamtool_uri, credentials, @log, @reference) @wendelin = WendelinClient.new(@streamtool_uri, credentials, @log)
end end
def start def start
...@@ -80,8 +79,8 @@ module Fluent ...@@ -80,8 +79,8 @@ module Fluent
# for input_stream_ref use tag as-is - it will be processed/translated # for input_stream_ref use tag as-is - it will be processed/translated
# further on server by Wendelin # further on server by Wendelin
reference = tag reference = tag
@wendelin.ingest(data_chunk)
@wendelin.ingest(reference, data_chunk)
end end
end end
......
...@@ -19,84 +19,97 @@ ...@@ -19,84 +19,97 @@
require 'net/http' require 'net/http'
require 'openssl' require 'openssl'
# class representing a Wendelin client # class representing a Wendelin client
class WendelinClient class WendelinClient
# `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint" # `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
# `credentials` # {'user' => _, 'password' => _} TODO change to certificate # `credentials` # {'user' => _, 'password' => _} TODO change to certificate
# `log` - logger to use # `log` - logger to use
def initialize(streamtool_uri, credentials, log, reference) def initialize(streamtool_uri, credentials, log)
@streamtool_uri = streamtool_uri @streamtool_uri = streamtool_uri
@credentials = credentials @credentials = credentials
@log = log @log = log
@reference = reference end
end
# start request in an independent function to keep the connection open # start request in an independent function to keep the connection open
def start_request def start_request(uri)
uri = URI("#{@streamtool_uri}/ingest?reference=#{@reference}") @http = Net::HTTP.start(uri.hostname, uri.port,
@http = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => (uri.scheme == 'https'),
:use_ssl => (uri.scheme == 'https'), :verify_mode => OpenSSL::SSL::VERIFY_NONE,
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
:ssl_timeout => 60, # Net::HTTP default open timeout is infinity, which results
:open_timeout => 60, # in thread hang forever if other side does not fully
:read_timeout => 60, # establish connection. Default read_timeout is 60 seconds.
:keep_alive_timeout => 60,) # We go safe way and make sure all timeouts are defined.
@request = Net::HTTP::Post.new(uri) :ssl_timeout => 60,
:open_timeout => 60,
end :read_timeout => 60,
:keep_alive_timeout => 60,)
# ingest `data_chunk` to a stream referenced as `reference` @request = Net::HTTP::Post.new(uri)
def ingest(data_chunk)
end
# When using 'application/x-www-form-urlencoded', Ruby encodes with regex
# and it is far too slow. Such POST is legit: # ingest `data_chunk` to a stream referenced as `reference`
# https://stackoverflow.com/a/14710450 def ingest(reference, data_chunk)
@request ||= start_request # call start_request if request is undefined uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
@request.body = data_chunk
@request.content_type = 'application/octet-stream' # call start_request if request is undefined
if @credentials.has_key?('user') @request ||= start_request(uri)
@request.basic_auth @credentials['user'], @credentials['password']
end # connect again if the connection is not started
if ! @http.started?()
@log.on_trace do start_request(uri)
@log.trace '>>> REQUEST' end
@log.trace "method\t=> #{@request.method}"
@log.trace "path\t=> #{@request.path}" # When using 'application/x-www-form-urlencoded', Ruby encodes with regex
@log.trace "uri\t=> #{@request.uri}" # and it is far too slow. Such POST is legit:
@log.trace "body\t=> #{@request.body}" # https://stackoverflow.com/a/14710450
@log.trace "body_stream\t=> #{@request.body_stream}" @request.body = data_chunk
@request.each {|h| @log.trace "#{h}:\t#{@request[h]}"} @request.content_type = 'application/octet-stream'
@log.trace
end if @credentials.has_key?('user')
@request.basic_auth @credentials['user'], @credentials['password']
begin end
res = @http.request(@request) # Net::HTTPResponse object
end @log.on_trace do
@log.trace '>>> REQUEST'
rescue @log.trace "method\t=> #{@request.method}"
# some http/ssl/other connection error @log.trace "path\t=> #{@request.path}"
@log.warn "HTTP ERROR:" @log.trace "uri\t=> #{@request.uri}"
raise @log.trace "body\t=> #{@request.body}"
@log.trace "body_stream\t=> #{@request.body_stream}"
else @request.each {|h| @log.trace "#{h}:\t#{@request[h]}"}
@log.on_trace do @log.trace
@log.trace '>>> RESPONSE' end
res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
@log.trace "code\t=> #{res.code}" begin
@log.trace "msg\t=> #{res.message}" res = @http.request(@request) # Net::HTTPResponse object
@log.trace "class\t=> #{res.class}" end
@log.trace "body:", res.body
end rescue
# some http/ssl/other connection error
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX @log.warn "HTTP ERROR:"
#@log.info "ingested ok" raise
else else
@log.warn "FAIL:" @log.on_trace do
res.value @log.trace '>>> RESPONSE'
end res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
end @log.trace "code\t=> #{res.code}"
@log.trace "msg\t=> #{res.message}"
@log.trace "class\t=> #{res.class}"
@log.trace "body:", res.body
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#@log.info "ingested ok"
else
@log.warn "FAIL:"
res.value
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment