Commit b9d9f30d authored by Kirill Smelkov's avatar Kirill Smelkov

First draft version of code

This was pre-tested and know to ingest data ok, modulo some known TODO.
parent 3768f079
/.bundle/
/Gemfile.lock
source 'https://rubygems.org'
# gem's dependencies are in .gemspec
gemspec
Copyright (C) 2015 Nexedi SA and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
fluent-plugin-wendelin
======================
This is output plugin for [Fluentd][] to forward data to [Wendelin][] system.
[Fluentd]: http://fluentd.org/
[Wendelin]: http://wendelin.io/
Configuration
-------------
A sample configuration to setup ingestion of data for a tag to Wendelin:
```apache
<match your.sensor.tag>
@type wendelin
@id wendelin_out
streamtool_uri <Wendelin_URL>/erp5/portal_ingestion_policies/<YOUR_INGESTION_POLICY_ID>
user <your_wendelin_user>
password <your_wendelin_password>
# all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
# their code near:
#
# https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
#
# logging setup description:
#
# http://docs.fluentd.org/articles/logging
buffer_type memory
#buffer_path "#{ENV['HOME']/var}"
flush_interval 5s
</match>
```
See *example/to_wendelin.conf* for fully setup example.
TODO
----
- make content to be posted over HTTP as raw (instead of urlencode data).
- HTTPS certificates are currently not verified.
- X.509 certificates instead of user / password.
- cleanup
# Example configuration of Wendelin output for Fluentd
## built-in TCP input, so we can accept messages from other fluentd(s) and so
## that fluent-cat works:
## $ echo <json|msgpack> | fluent-cat <tag>
<source>
@type forward
@id forward_input
</source>
# HTTP input
# http://localhost:8888/<tag>?json=<json>
<source>
@type http
@id http_input
port 8888
</source>
## output tag=sensor.** to Wendelin
<match <YOUR_SENSOR_TAG>.**>
@type wendelin
@id wendelin_out
streamtool_uri <Wendelin_URL>/erp5/portal_ingestion_policies/<YOUR_INGESTION_POLICY_ID>
# TODO ^^^ do not check peer's certificate
user <your_wendelin_user>
password <your_wendelin_password>
# all parameters of BufferedOutput & Output classes are supported too, e.g.
# `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
# their code near:
#
# https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
#
# logging setup description:
#
# http://docs.fluentd.org/articles/logging
buffer_type memory
#buffer_path "#{ENV['HOME']/var}"
flush_interval 5s
</match>
# ---- monitoring & debugging ----
# Listen HTTP for monitoring
# http://localhost:24220/api/plugins
# http://localhost:24220/api/plugins?type=TYPE
# http://localhost:24220/api/plugins?tag=MYTAG
<source>
@type monitor_agent
@id monitor_agent_input
port 24220
</source>
# Listen DRb for debug
<source>
@type debug_agent
@id debug_agent_input
bind 127.0.0.1
port 24230
</source>
## match tag=debug.** and dump to console
<match debug.**>
@type stdout
@id stdout_output
</match>
## match fluent's internal events
#<match fluent.**>
# @type null
#</match>
## match not matched logs and write to file
#<match **>
# @type file
# path /var/log/fluent/else
# compress gz
#</match>
Gem::Specification.new do |gem|
gem.name = "fluent-plugin-wendelin"
gem.version = "0.1.alpha1"
gem.authors = ["Kirill Smelkov"]
gem.email = ["kirr@nexedi.com"]
gem.summary = %q{Fluentd output to Wendelin}
gem.description = %q{Fluentd output plugin to forward data to Wendelin system}
gem.homepage = "https://lab.nexedi.cn/nexedi/fluent-plugin-wendelin"
gem.license = "APLv2"
gem.files = `git ls-files -z`.split("\x0")
gem.require_paths = ["lib"]
gem.add_runtime_dependency "fluentd", "~> 0"
# ? net/http
# ? openssl
end
# Fluentd output plugin to ingest data to Wendelin system
# Copyright (C) 2015 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This plugin hooks into Fluentd in a way similiar to out_forward and
# out_secure_forward and outputs event stream to a Wendelin HTTP endpoint.
require 'fluent/output'
require_relative 'wendelin_client'
module Fluent
class WendelinOutput < ObjectBufferedOutput # XXX verify base class
Plugin.register_output('wendelin', self)
# where Wendelin's Input Stream Tool is located,
# ex http://example.com/erp5/portal_ingestion_policies/example_ingestion
config_param :streamtool_uri, :string
# credentials to authenticate this fluentd to wendelin
# by default credentials are not used
# TODO user/password -> certificate
config_param :user, :string, :default => nil
config_param :password, :string, :default => nil
def configure(conf)
super
credentials = {}
if @user
credentials['user'] = @user
credentials['password'] = @password
end
@wendelin = WendelinClient.new(@streamtool_uri, credentials)
end
def start
super
# TODO
end
def shutdown
super
# TODO
end
# hooked to ObjectBufferedOutput - write out records from a buffer chunk for a tag.
#
# NOTE this is called from a separate thread (see OutputThread and BufferedOutput)
def write_objects(tag, chunk)
# NOTE if this fail and raises -> it will unroll to BufferedOutput#try_flush
# which detects errors and retries outputting logs up to retry maxcount
# times and aborts outputting current logs if all try fail.
#
# This way, we don't need to code rescue here.
# NOTE tag is 1, and chunk stores an event stream, usually [] of
# (timestamp, record) in msgpack, but it general it could be arbitrary
# data - we send it as-is.
data_chunk = chunk.read()
# for input_stream_ref use tag as-is - it will be processed/translated
# further on server by Wendelin
reference = tag
@wendelin.ingest(reference, data_chunk)
end
end
end
# module to ingest data to a Wendelin through HTTP endpoint
# Copyright (C) 2015 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'net/http'
require 'openssl'
# show request/response traces or not
$trace = false
# class representing a Wendelin client
class WendelinClient
# `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
# `credentials` # {'user' => _, 'password' => _} TODO change to certificate
def initialize(streamtool_uri, credentials)
@streamtool_uri = streamtool_uri
@credentials = credentials
end
# ingest `data_chunk` to a stream referenced as `reference`
def ingest(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
req = Net::HTTP::Post.new(uri)
if @credentials.has_key?('user')
req.basic_auth @credentials['user'], @credentials['password']
end
# TODO ensure content-type is 'raw', e.g. this way
# (but then querystring ?reference=... is lost)
# req.body = data_chunk
# req.content_type = 'application/octet-stream'
req.set_form_data('data_chunk' => data_chunk)
if $trace
puts '>>> REQUEST'
puts "method\t=> #{req.method}"
puts "path\t=> #{req.path}"
puts "uri\t=> #{req.uri}"
puts "body\t=> #{req.body}"
puts "body_stream\t=> #{req.body_stream}"
req.each {|h| puts "#{h}:\t#{req[h]}"}
puts
end
begin
# TODO keep connection open (so that every new ingest does not do
# full connect again)
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
# NOTE = "do not check server cert"
# TODO move this out to conf parameters
:verify_mode => OpenSSL::SSL::VERIFY_NONE
) do |http|
http.request(req)
end
rescue
# some http/ssl/other connection error
puts "HTTP ERROR:"
raise
else
if $trace
puts '>>> RESPONSE'
res.each {|h| puts "#{h}:\t#{res[h]}"}
puts "code\t=> #{res.code}"
puts "msg\t=> #{res.message}"
puts "class\t=> #{res.class}"
puts "body:", res.body
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#puts "ingested ok"
else
puts "FAIL:"
res.value
end
end
end
end
#!/usr/bin/env ruby
# ingest several data streams concurently forever
require_relative 'wendelin_test'
def runingest(stream_ref)
i = 0
while true
data_chunk = ", #{i} (#{stream_ref})"
#puts "ingest #{stream_ref}, #{data_chunk}"
ingest stream_ref, data_chunk
i = i + 1
sleep 0.1
end
end
t31 = Thread.new { runingest 'ABC-HELLO-01.cat' }
t32 = Thread.new { runingest 'ABC-HELLO-01.tac' }
t31.join # wait forever
#!/usr/bin/env ruby
# manually test erp5_data_streams ingestion via Ruby HTTP lib
# XXX draft/stub
require_relative 'wendelin_test'
require 'msgpack'
#puts [1,2,'hello'].to_msgpack
ingest 'ABC-HELLO-01.cat', [1,2,'hello'].to_msgpack
#ingest 'ABC-HELLO-01.cat', ', hello'
#ingest 'ABC-HELLO-01.tac', ', magic'
# simple ingest() for wendelin tests
# (without need for providing streamtool uri, credentials, etc)
# load wendelin_clinet.rb from in-tree
here = File.dirname(__FILE__)
$LOAD_PATH << File.expand_path(File.join(here, '../lib'))
require 'fluent/plugin/wendelin_client'
# FIXME hardcoded
$user = 'user'
$password = 'password'
# FIXME hardcoded & shortcut for tests
$erp5 = "https://example.com/erp5" # erp5 root
$streamtool = "#{$erp5}/portal_input_data_streams" # where Input Stream Tool is mounted
$wendelin = WendelinClient.new($streamtool, {'user' => $user, 'password' => $password})
def ingest(input_stream_ref, data_chunk)
$wendelin.ingestDataChunk(input_stream_ref, data_chunk)
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment