Commit 29b893c7 authored by Klaus Wölfel's avatar Klaus Wölfel

Upgrade to fluentd 1.0 plugin API

parent 8702c866
...@@ -19,14 +19,14 @@ ...@@ -19,14 +19,14 @@
# This plugin hooks into Fluentd in a way similiar to out_forward and # This plugin hooks into Fluentd in a way similiar to out_forward and
# out_secure_forward and outputs event stream to a Wendelin HTTP endpoint. # out_secure_forward and outputs event stream to a Wendelin HTTP endpoint.
require 'fluent/output' require 'fluent/plugin/output'
require_relative 'wendelin_client' require_relative 'wendelin_client'
module Fluent module Fluent::Plugin
class WendelinOutput < ObjectBufferedOutput # XXX verify base class class WendelinOutput < Output
Plugin.register_output('wendelin', self) Fluent::Plugin.register_output('wendelin', self)
# where Wendelin's Input Stream Tool is located, # where Wendelin's Input Stream Tool is located,
# ex http://example.com/erp5/portal_ingestion_policies/example_ingestion # ex http://example.com/erp5/portal_ingestion_policies/example_ingestion
...@@ -44,9 +44,17 @@ module Fluent ...@@ -44,9 +44,17 @@ module Fluent
config_param :open_timeout, :integer, :default => 60 config_param :open_timeout, :integer, :default => 60
config_param :read_timeout, :integer, :default => 60 config_param :read_timeout, :integer, :default => 60
config_param :keep_alive_timeout, :integer, :default => 300 config_param :keep_alive_timeout, :integer, :default => 300
config_section :buffer do
config_set_default :chunk_keys, ["tag"]
end
def configure(conf) def configure(conf)
super super
unless @chunk_key_tag
raise Fluent::ConfigError, "buffer chunk key must include 'tag' for wendelin output"
end
credentials = {} credentials = {}
if @user if @user
...@@ -68,12 +76,11 @@ module Fluent ...@@ -68,12 +76,11 @@ module Fluent
# TODO # TODO
end end
# Use normal "Synchronous Buffer" - write out records from a buffer chunk for a tag.
# hooked to ObjectBufferedOutput - write out records from a buffer chunk for a tag.
# #
# NOTE this is called from a separate thread (see OutputThread and BufferedOutput) def write(chunk)
def write_objects(tag, chunk) return if chunk.empty?
# NOTE if this fail and raises -> it will unroll to BufferedOutput#try_flush # NOTE if this fail and raises -> it will unroll to Output#try_flush
# which detects errors and retries outputting logs up to retry maxcount # which detects errors and retries outputting logs up to retry maxcount
# times and aborts outputting current logs if all try fail. # times and aborts outputting current logs if all try fail.
# #
...@@ -86,7 +93,7 @@ module Fluent ...@@ -86,7 +93,7 @@ module Fluent
# for input_stream_ref use tag as-is - it will be processed/translated # for input_stream_ref use tag as-is - it will be processed/translated
# further on server by Wendelin # further on server by Wendelin
reference = tag reference = chunk.metadata.tag
if @use_keep_alive if @use_keep_alive
@wendelin.ingest_with_keep_alive(reference, data_chunk) @wendelin.ingest_with_keep_alive(reference, data_chunk)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment