pax_global_header 0000666 0000000 0000000 00000000064 12600762117 0014514 g ustar 00root root 0000000 0000000 52 comment=87bb68f1138301f18defad9775d539d731f03d30
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/ 0000775 0000000 0000000 00000000000 12600762117 0022764 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/.gitignore 0000664 0000000 0000000 00000000030 12600762117 0024745 0 ustar 00root root 0000000 0000000 /.bundle/
/Gemfile.lock
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/Gemfile 0000664 0000000 0000000 00000000114 12600762117 0024253 0 ustar 00root root 0000000 0000000 source 'https://rubygems.org'
# gem's dependencies are in .gemspec
gemspec
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/LICENSE.txt 0000664 0000000 0000000 00000001126 12600762117 0024607 0 ustar 00root root 0000000 0000000 Copyright (C) 2015 Nexedi SA and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/README.md 0000664 0000000 0000000 00000002316 12600762117 0024245 0 ustar 00root root 0000000 0000000 fluent-plugin-wendelin
======================
This is output plugin for [Fluentd][] to forward data to [Wendelin][] system.
[Fluentd]: http://fluentd.org/
[Wendelin]: http://wendelin.io/
Configuration
-------------
A sample configuration to setup ingestion of data for a tag to Wendelin:
```apache
@type wendelin
@id wendelin_out
streamtool_uri /erp5/portal_ingestion_policies/
user
password
# all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
# their code near:
#
# https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
#
# logging setup description:
#
# http://docs.fluentd.org/articles/logging
buffer_type memory
#buffer_path "#{ENV['HOME']/var}"
flush_interval 5s
```
See *example/to_wendelin.conf* for fully setup example.
TODO
----
- make content to be posted over HTTP as raw (instead of urlencode data).
- HTTPS certificates are currently not verified.
- X.509 certificates instead of user / password.
- cleanup
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/example/ 0000775 0000000 0000000 00000000000 12600762117 0024417 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/example/to_wendelin.conf 0000664 0000000 0000000 00000003463 12600762117 0027603 0 ustar 00root root 0000000 0000000 # Example configuration of Wendelin output for Fluentd
## built-in TCP input, so we can accept messages from other fluentd(s) and so
## that fluent-cat works:
## $ echo | fluent-cat
# HTTP input
# http://localhost:8888/?json=
## output tag=sensor.** to Wendelin
.**>
@type wendelin
@id wendelin_out
streamtool_uri /erp5/portal_ingestion_policies/
# TODO ^^^ do not check peer's certificate
user
password
# all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
# their code near:
#
# https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
#
# logging setup description:
#
# http://docs.fluentd.org/articles/logging
buffer_type memory
#buffer_path "#{ENV['HOME']/var}"
flush_interval 5s
# ---- monitoring & debugging ----
# Listen HTTP for monitoring
# http://localhost:24220/api/plugins
# http://localhost:24220/api/plugins?type=TYPE
# http://localhost:24220/api/plugins?tag=MYTAG
# Listen DRb for debug
## match tag=debug.** and dump to console
@type stdout
@id stdout_output
## match fluent's internal events
#
# @type null
#
## match not matched logs and write to file
#
# @type file
# path /var/log/fluent/else
# compress gz
#
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/fluent-plugin-wendelin.gemspec 0000664 0000000 0000000 00000001213 12600762117 0030722 0 ustar 00root root 0000000 0000000 Gem::Specification.new do |gem|
gem.name = "fluent-plugin-wendelin"
gem.version = "0.1.alpha1"
gem.authors = ["Kirill Smelkov"]
gem.email = ["kirr@nexedi.com"]
gem.summary = %q{Fluentd output to Wendelin}
gem.description = %q{Fluentd output plugin to forward data to Wendelin system}
gem.homepage = "https://lab.nexedi.com/nexedi/fluent-plugin-wendelin"
gem.license = "APLv2"
gem.files = `git ls-files -z`.split("\x0")
gem.require_paths = ["lib"]
gem.add_runtime_dependency "fluentd", "~> 0"
# ? net/http
# ? openssl
end
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/lib/ 0000775 0000000 0000000 00000000000 12600762117 0023532 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/lib/fluent/ 0000775 0000000 0000000 00000000000 12600762117 0025027 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/lib/fluent/plugin/ 0000775 0000000 0000000 00000000000 12600762117 0026325 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/lib/fluent/plugin/out_wendelin.rb 0000664 0000000 0000000 00000005540 12600762117 0031352 0 ustar 00root root 0000000 0000000 # Fluentd output plugin to ingest data to Wendelin system
# Copyright (C) 2015 Nexedi SA and Contributors.
# Kirill Smelkov
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This plugin hooks into Fluentd in a way similiar to out_forward and
# out_secure_forward and outputs event stream to a Wendelin HTTP endpoint.
require 'fluent/output'
require_relative 'wendelin_client'
module Fluent
class WendelinOutput < ObjectBufferedOutput # XXX verify base class
Plugin.register_output('wendelin', self)
# where Wendelin's Input Stream Tool is located,
# ex http://example.com/erp5/portal_ingestion_policies/example_ingestion
config_param :streamtool_uri, :string
# credentials to authenticate this fluentd to wendelin
# by default credentials are not used
# TODO user/password -> certificate
config_param :user, :string, :default => nil
config_param :password, :string, :default => nil
def configure(conf)
super
credentials = {}
if @user
credentials['user'] = @user
credentials['password'] = @password
end
@wendelin = WendelinClient.new(@streamtool_uri, credentials)
end
def start
super
# TODO
end
def shutdown
super
# TODO
end
# hooked to ObjectBufferedOutput - write out records from a buffer chunk for a tag.
#
# NOTE this is called from a separate thread (see OutputThread and BufferedOutput)
def write_objects(tag, chunk)
# NOTE if this fail and raises -> it will unroll to BufferedOutput#try_flush
# which detects errors and retries outputting logs up to retry maxcount
# times and aborts outputting current logs if all try fail.
#
# This way, we don't need to code rescue here.
# NOTE tag is 1, and chunk stores an event stream, usually [] of
# (timestamp, record) in msgpack, but it general it could be arbitrary
# data - we send it as-is.
data_chunk = chunk.read()
# for input_stream_ref use tag as-is - it will be processed/translated
# further on server by Wendelin
reference = tag
@wendelin.ingest(reference, data_chunk)
end
end
end
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/lib/fluent/plugin/wendelin_client.rb0000664 0000000 0000000 00000006305 12600762117 0032021 0 ustar 00root root 0000000 0000000 # module to ingest data to a Wendelin through HTTP endpoint
# Copyright (C) 2015 Nexedi SA and Contributors.
# Kirill Smelkov
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'net/http'
require 'openssl'
# show request/response traces or not
$trace = false
# class representing a Wendelin client
class WendelinClient
# `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
# `credentials` # {'user' => _, 'password' => _} TODO change to certificate
def initialize(streamtool_uri, credentials)
@streamtool_uri = streamtool_uri
@credentials = credentials
end
# ingest `data_chunk` to a stream referenced as `reference`
def ingest(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
req = Net::HTTP::Post.new(uri)
if @credentials.has_key?('user')
req.basic_auth @credentials['user'], @credentials['password']
end
# TODO ensure content-type is 'raw', e.g. this way
# (but then querystring ?reference=... is lost)
# req.body = data_chunk
# req.content_type = 'application/octet-stream'
req.set_form_data('data_chunk' => data_chunk)
if $trace
puts '>>> REQUEST'
puts "method\t=> #{req.method}"
puts "path\t=> #{req.path}"
puts "uri\t=> #{req.uri}"
puts "body\t=> #{req.body}"
puts "body_stream\t=> #{req.body_stream}"
req.each {|h| puts "#{h}:\t#{req[h]}"}
puts
end
begin
# TODO keep connection open (so that every new ingest does not do
# full connect again)
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
# NOTE = "do not check server cert"
# TODO move this out to conf parameters
:verify_mode => OpenSSL::SSL::VERIFY_NONE
) do |http|
http.request(req)
end
rescue
# some http/ssl/other connection error
puts "HTTP ERROR:"
raise
else
if $trace
puts '>>> RESPONSE'
res.each {|h| puts "#{h}:\t#{res[h]}"}
puts "code\t=> #{res.code}"
puts "msg\t=> #{res.message}"
puts "class\t=> #{res.class}"
puts "body:", res.body
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#puts "ingested ok"
else
puts "FAIL:"
res.value
end
end
end
end
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/test/ 0000775 0000000 0000000 00000000000 12600762117 0023743 5 ustar 00root root 0000000 0000000 fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/test/ingest-torture.rb 0000775 0000000 0000000 00000000725 12600762117 0027272 0 ustar 00root root 0000000 0000000 #!/usr/bin/env ruby
# ingest several data streams concurently forever
require_relative 'wendelin_test'
def runingest(stream_ref)
i = 0
while true
data_chunk = ", #{i} (#{stream_ref})"
#puts "ingest #{stream_ref}, #{data_chunk}"
ingest stream_ref, data_chunk
i = i + 1
sleep 0.1
end
end
t31 = Thread.new { runingest 'ABC-HELLO-01.cat' }
t32 = Thread.new { runingest 'ABC-HELLO-01.tac' }
t31.join # wait forever
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/test/ingest.rb 0000775 0000000 0000000 00000000707 12600762117 0025570 0 ustar 00root root 0000000 0000000 #!/usr/bin/env ruby
# manually test erp5_data_streams ingestion via Ruby HTTP lib
# XXX draft/stub
require_relative 'wendelin_test'
require 'msgpack'
require 'time'
# t -> 2015-08-11 14:51:50+02:00
def tfmt(t)
t.strftime('%Y-%m-%d %H:%M:%S%:z')
end
t = Time.now
s = ''
s += [1, {'date' => tfmt(t), 'hello' => 1, 'world' => 2}].to_msgpack
s += [2, {'date' => tfmt(t+1), 'pi' => 3.1415, 'e' => 2.71828}].to_msgpack
# puts s
ingest 'testds', s
fluent-plugin-wendelin-87bb68f1138301f18defad9775d539d731f03d30/test/wendelin_test.rb 0000664 0000000 0000000 00000001320 12600762117 0027130 0 ustar 00root root 0000000 0000000 # simple ingest() for wendelin tests
# (without need for providing streamtool uri, credentials, etc)
# load wendelin_clinet.rb from in-tree
here = File.dirname(__FILE__)
$LOAD_PATH << File.expand_path(File.join(here, '../lib'))
require 'fluent/plugin/wendelin_client'
# FIXME hardcoded
$user = 'user'
$password = 'password'
# FIXME hardcoded & shortcut for tests
$erp5 = "https://example.com/erp5" # erp5 root
$streamtool = "#{$erp5}/portal_input_data_streams" # where Input Stream Tool is mounted
$wendelin = WendelinClient.new($streamtool, {'user' => $user, 'password' => $password})
def ingest(input_stream_ref, data_chunk)
$wendelin.ingest(input_stream_ref, data_chunk)
end