pax_global_header 0000666 0000000 0000000 00000000064 13317414632 0014516 g ustar 00root root 0000000 0000000 52 comment=5abf85e3b6f7a57e23891feb8e238edda46afc50
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/ 0000775 0000000 0000000 00000000000 13317414632 0023432 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/.gitignore 0000664 0000000 0000000 00000000030 13317414632 0025413 0 ustar 00root root 0000000 0000000 /.bundle/
/Gemfile.lock
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/Gemfile 0000664 0000000 0000000 00000000114 13317414632 0024721 0 ustar 00root root 0000000 0000000 source 'https://rubygems.org'
# gem's dependencies are in .gemspec
gemspec
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/LICENSE.txt 0000664 0000000 0000000 00000001126 13317414632 0025255 0 ustar 00root root 0000000 0000000 Copyright (C) 2015 Nexedi SA and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/README.md 0000664 0000000 0000000 00000002240 13317414632 0024707 0 ustar 00root root 0000000 0000000 fluent-plugin-wendelin
======================
This is output plugin for [Fluentd][] to forward data to [Wendelin][] system.
[Fluentd]: http://fluentd.org/
[Wendelin]: http://wendelin.io/
Configuration
-------------
A sample configuration to setup ingestion of data for a tag to Wendelin:
```
@type wendelin
@id wendelin_out
streamtool_uri /erp5/portal_ingestion_policies/
user
password
use_keep_alive
# all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
# their code near:
#
# https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
#
# logging setup description:
#
# http://docs.fluentd.org/articles/logging
buffer_type memory
#buffer_path "#{ENV['HOME']/var}"
flush_interval 5s
```
See *example/to_wendelin.conf* for fully setup example.
TODO
----
- HTTPS certificates are currently not verified.
- X.509 certificates instead of user / password.
- cleanup
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/example/ 0000775 0000000 0000000 00000000000 13317414632 0025065 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/example/to_wendelin.conf 0000664 0000000 0000000 00000003514 13317414632 0030246 0 ustar 00root root 0000000 0000000 # Example configuration of Wendelin output for Fluentd
## built-in TCP input, so we can accept messages from other fluentd(s) and so
## that fluent-cat works:
## $ echo | fluent-cat
# HTTP input
# http://localhost:8888/?json=
## output tag=sensor.** to Wendelin
.**>
@type wendelin
@id wendelin_out
streamtool_uri /erp5/portal_ingestion_policies/
# TODO ^^^ do not check peer's certificate
user
password
use_keep_alive true
# all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
# their code near:
#
# https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
#
# logging setup description:
#
# http://docs.fluentd.org/articles/logging
buffer_type memory
#buffer_path "#{ENV['HOME']/var}"
flush_interval 5s
# ---- monitoring & debugging ----
# Listen HTTP for monitoring
# http://localhost:24220/api/plugins
# http://localhost:24220/api/plugins?type=TYPE
# http://localhost:24220/api/plugins?tag=MYTAG
# Listen DRb for debug
## match tag=debug.** and dump to console
@type stdout
@id stdout_output
## match fluent's internal events
#
# @type null
#
## match not matched logs and write to file
#
# @type file
# path /var/log/fluent/else
# compress gz
#
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/fluent-plugin-wendelin.gemspec 0000664 0000000 0000000 00000001211 13317414632 0031366 0 ustar 00root root 0000000 0000000 Gem::Specification.new do |gem|
gem.name = "fluent-plugin-wendelin"
gem.version = "0.3"
gem.authors = ["Kirill Smelkov"]
gem.email = ["kirr@nexedi.com"]
gem.summary = %q{Fluentd output to Wendelin}
gem.description = %q{Fluentd output plugin to forward data to Wendelin system}
gem.homepage = "https://lab.nexedi.com/nexedi/fluent-plugin-wendelin"
gem.license = "Apache-2.0"
gem.files = `git ls-files -z`.split("\x0")
gem.require_paths = ["lib"]
gem.add_runtime_dependency "fluentd", "~> 0"
# ? net/http
# ? openssl
end
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/lib/ 0000775 0000000 0000000 00000000000 13317414632 0024200 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/lib/fluent/ 0000775 0000000 0000000 00000000000 13317414632 0025475 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/lib/fluent/plugin/ 0000775 0000000 0000000 00000000000 13317414632 0026773 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/lib/fluent/plugin/out_wendelin.rb 0000664 0000000 0000000 00000006037 13317414632 0032022 0 ustar 00root root 0000000 0000000 # Fluentd output plugin to ingest data to Wendelin system
# Copyright (C) 2015 Nexedi SA and Contributors.
# Kirill Smelkov
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This plugin hooks into Fluentd in a way similiar to out_forward and
# out_secure_forward and outputs event stream to a Wendelin HTTP endpoint.
require 'fluent/output'
require_relative 'wendelin_client'
module Fluent
class WendelinOutput < ObjectBufferedOutput # XXX verify base class
Plugin.register_output('wendelin', self)
# where Wendelin's Input Stream Tool is located,
# ex http://example.com/erp5/portal_ingestion_policies/example_ingestion
config_param :streamtool_uri, :string
# credentials to authenticate this fluentd to wendelin
# by default credentials are not used
# TODO user/password -> certificate
config_param :user, :string, :default => nil
config_param :password, :string, :default => nil
config_param :use_keep_alive, :bool, :default => true
def configure(conf)
super
credentials = {}
if @user
credentials['user'] = @user
credentials['password'] = @password
end
@wendelin = WendelinClient.new(@streamtool_uri, credentials, @log)
end
def start
super
# TODO
end
def shutdown
super
# TODO
end
# hooked to ObjectBufferedOutput - write out records from a buffer chunk for a tag.
#
# NOTE this is called from a separate thread (see OutputThread and BufferedOutput)
def write_objects(tag, chunk)
# NOTE if this fail and raises -> it will unroll to BufferedOutput#try_flush
# which detects errors and retries outputting logs up to retry maxcount
# times and aborts outputting current logs if all try fail.
#
# This way, we don't need to code rescue here.
# NOTE tag is 1, and chunk stores an event stream, usually [] of
# (timestamp, record) in msgpack, but it general it could be arbitrary
# data - we send it as-is.
data_chunk = chunk.read()
# for input_stream_ref use tag as-is - it will be processed/translated
# further on server by Wendelin
reference = tag
if @use_keep_alive
@wendelin.ingest_with_keep_alive(reference, data_chunk)
else
@wendelin.ingest(reference, data_chunk)
end
end
end
end
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/lib/fluent/plugin/wendelin_client.rb0000664 0000000 0000000 00000013177 13317414632 0032474 0 ustar 00root root 0000000 0000000 # module to ingest data to a Wendelin through HTTP endpoint
# Copyright (C) 2015 Nexedi SA and Contributors.
# Kirill Smelkov
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'net/http'
require 'openssl'
# class representing a Wendelin client
class WendelinClient
# `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
# `credentials` # {'user' => _, 'password' => _} TODO change to certificate
# `log` - logger to use
def initialize(streamtool_uri, credentials, log)
@streamtool_uri = streamtool_uri
@credentials = credentials
@log = log
# @stop_timer = 0
end
# start request in an independent function to keep the connection open
def start_request(uri)
puts " START NEW REQUEST"
@http = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
# Net::HTTP default open timeout is infinity, which results
# in thread hang forever if other side does not fully
# establish connection. Default read_timeout is 60 seconds.
# We go safe way and make sure all timeouts are defined.
:ssl_timeout => 60,
:open_timeout => 60,
:read_timeout => 60,
:keep_alive_timeout => 60,)
end
# ingest `data_chunk` to a stream referenced as `reference`
def ingest_with_keep_alive(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
puts "uri = "
puts uri
# call start_request if request is undefined
@request ||= start_request(uri)
# connect again if the connection is not started
if ! @http.started?()
start_request(uri)
end
@request = Net::HTTP::Post.new(uri)
# When using 'application/x-www-form-urlencoded', Ruby encodes with regex
# and it is far too slow. Such POST is legit:
# https://stackoverflow.com/a/14710450
@request.body = data_chunk
@request.content_type = 'application/octet-stream'
if @credentials.has_key?('user')
@request.basic_auth @credentials['user'], @credentials['password']
end
@log.on_trace do
@log.trace '>>> REQUEST'
@log.trace "method\t=> #{@request.method}"
@log.trace "path\t=> #{@request.path}"
@log.trace "uri\t=> #{@request.uri}"
@log.trace "body\t=> #{@request.body}"
@log.trace "body_stream\t=> #{@request.body_stream}"
@request.each {|h| @log.trace "#{h}:\t#{@request[h]}"}
@log.trace
end
begin
res = @http.request(@request) # Net::HTTPResponse object
end
rescue
# some http/ssl/other connection error
@log.warn "HTTP ERROR:"
raise
else
@log.on_trace do
@log.trace '>>> RESPONSE'
res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
@log.trace "code\t=> #{res.code}"
@log.trace "msg\t=> #{res.message}"
@log.trace "class\t=> #{res.class}"
@log.trace "body:", res.body
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#@log.info "ingested ok"
else
@log.warn "FAIL:"
res.value
end
end
def ingest(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
req = Net::HTTP::Post.new(uri)
if @credentials.has_key?('user')
req.basic_auth @credentials['user'], @credentials['password']
end
# When using 'application/x-www-form-urlencoded', Ruby encodes with regex
# and it is far too slow. Such POST is legit:
# https://stackoverflow.com/a/14710450
req.body = data_chunk
req.content_type = 'application/octet-stream'
@log.on_trace do
@log.trace '>>> REQUEST'
@log.trace "method\t=> #{req.method}"
@log.trace "path\t=> #{req.path}"
@log.trace "uri\t=> #{req.uri}"
@log.trace "body\t=> #{req.body}"
@log.trace "body_stream\t=> #{req.body_stream}"
req.each {|h| @log.trace "#{h}:\t#{req[h]}"}
@log.trace
end
begin
# TODO keep connection open (so that every new ingest does not do
# full connect again)
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
# NOTE = "do not check server cert"
# TODO move this out to conf parameters
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
# Net::HTTP default open timeout is infinity, which results
# in thread hang forever if other side does not fully
# establish connection. Default read_timeout is 60 seconds.
# We go safe way and make sure all timeouts are defined.
:ssl_timeout => 60,
:open_timeout => 60,
:read_timeout => 60,
) do |http|
http.request(req)
end
rescue
# some http/ssl/other connection error
@log.warn "HTTP ERROR:"
raise
else
@log.on_trace do
@log.trace '>>> RESPONSE'
res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
@log.trace "code\t=> #{res.code}"
@log.trace "msg\t=> #{res.message}"
@log.trace "class\t=> #{res.class}"
@log.trace "body:", res.body
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#@log.info "ingested ok"
else
@log.warn "FAIL:"
res.value
end
end
end
end
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/test/ 0000775 0000000 0000000 00000000000 13317414632 0024411 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/test/ingest-torture.rb 0000775 0000000 0000000 00000000725 13317414632 0027740 0 ustar 00root root 0000000 0000000 #!/usr/bin/env ruby
# ingest several data streams concurently forever
require_relative 'wendelin_test'
def runingest(stream_ref)
i = 0
while true
data_chunk = ", #{i} (#{stream_ref})"
#puts "ingest #{stream_ref}, #{data_chunk}"
ingest stream_ref, data_chunk
i = i + 1
sleep 0.1
end
end
t31 = Thread.new { runingest 'ABC-HELLO-01.cat' }
t32 = Thread.new { runingest 'ABC-HELLO-01.tac' }
t31.join # wait forever
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/test/ingest.rb 0000775 0000000 0000000 00000000707 13317414632 0026236 0 ustar 00root root 0000000 0000000 #!/usr/bin/env ruby
# manually test erp5_data_streams ingestion via Ruby HTTP lib
# XXX draft/stub
require_relative 'wendelin_test'
require 'msgpack'
require 'time'
# t -> 2015-08-11 14:51:50+02:00
def tfmt(t)
t.strftime('%Y-%m-%d %H:%M:%S%:z')
end
t = Time.now
s = ''
s += [1, {'date' => tfmt(t), 'hello' => 1, 'world' => 2}].to_msgpack
s += [2, {'date' => tfmt(t+1), 'pi' => 3.1415, 'e' => 2.71828}].to_msgpack
# puts s
ingest 'testds', s
fluent-plugin-wendelin-5abf85e3b6f7a57e23891feb8e238edda46afc50/test/wendelin_test.rb 0000664 0000000 0000000 00000001525 13317414632 0027605 0 ustar 00root root 0000000 0000000 # simple ingest() for wendelin tests
# (without need for providing streamtool uri, credentials, etc)
# load wendelin_clinet.rb from in-tree
here = File.dirname(__FILE__)
$LOAD_PATH << File.expand_path(File.join(here, '../lib'))
require 'fluent/plugin/wendelin_client'
require 'fluent/log'
require 'fluent/engine' # for log
log = Fluent::Log.new(out=STDERR, level=Fluent::Log::LEVEL_TRACE)
# FIXME hardcoded
$user = 'user'
$password = 'password'
# FIXME hardcoded & shortcut for tests
$erp5 = "https://example.com/erp5" # erp5 root
$streamtool = "#{$erp5}/portal_input_data_streams" # where Input Stream Tool is mounted
$wendelin = WendelinClient.new($streamtool, {'user' => $user, 'password' => $password}, log)
def ingest(input_stream_ref, data_chunk)
$wendelin.ingest(input_stream_ref, data_chunk)
end