pax_global_header 0000666 0000000 0000000 00000000064 12725262515 0014521 g ustar 00root root 0000000 0000000 52 comment=bef2350e351616b3d52498fc8f52570e1600d13b
fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/ 0000775 0000000 0000000 00000000000 12725262515 0021630 5 ustar 00root root 0000000 0000000 fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/Gemfile 0000664 0000000 0000000 00000000114 12725262515 0023117 0 ustar 00root root 0000000 0000000 source 'https://rubygems.org'
# gem's dependencies are in .gemspec
gemspec
fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/LICENSE.txt 0000664 0000000 0000000 00000001126 12725262515 0023453 0 ustar 00root root 0000000 0000000 Copyright (C) 2015 Nexedi SA and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/README.md 0000664 0000000 0000000 00000003123 12725262515 0023106 0 ustar 00root root 0000000 0000000 fluent-plugin-system
====================
[Fluentd][] input plugin for binary files based on in_tail.
Usage Example
=============
This exmaple shows how to install and configure fluent-plugin-bin together
with fluent-plugin wendelin output plugin to ingest binary files into
[Wendelin][].
sudo apt-get install supervisor
sudo service supervisor restart
sudo apt-get install python-software-properties
sudo apt-add-repository ppa:brightbox/ruby-ng
sudo apt-get update
sudo apt-get install ruby2.0 ruby2.0-dev make
sudo gem install fluentd --no-ri --no-rdoc
sudo mkdir -p /etc/fluent/plugin
cd /etc/fluent/plugin
sudo wget https://lab.nexedi.com/nexedi/fluent-plugin-bin/raw/master/lib/fluent/plugin/in_bin.rb
sudo wget https://lab.nexedi.cn/nexedi/fluent-plugin-wendelin/raw/master/lib/fluent/plugin/out_wendelin.rb
sudo wget https://lab.nexedi.cn/nexedi/fluent-plugin-wendelin/raw/master/lib/fluent/plugin/wendelin_client.rb
Create /etc/fluent/fluentd.conf:
@type wendelin
@id wendelin_out
streamtool_uri https://wendelin_url/portal_ingestion_policies/my_policy
user my_user_name
password my_password
buffer_type memory
flush_interval 1s
disable_retry_limit true
[Fluentd]: http://fluentd.org
[Wendelin: http://www.wendelin.io fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/fluent-plugin-bin.gemspec 0000664 0000000 0000000 00000001150 12725262515 0026531 0 ustar 00root root 0000000 0000000 Gem::Specification.new do |gem|
gem.name = "fluent-plugin-bin"
gem.version = "0.1"
gem.authors = ["Klaus Wölfel"]
gem.email = ["klaus@nexedi.com"]
gem.summary = %q{Fluentd input plugin for binary files}
gem.description = %q{Fluentd input plugin to read binary files based on in_tail}
gem.homepage = "https://lab.nexedi.com/klaus/fluent-plugin-bin"
gem.license = "APLv2"
gem.files = `git ls-files -z`.split("\x0")
gem.require_paths = ["lib"]
gem.add_runtime_dependency "fluentd", "~> 0.12"
end
fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/lib/ 0000775 0000000 0000000 00000000000 12725262515 0022376 5 ustar 00root root 0000000 0000000 fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/lib/fluent/ 0000775 0000000 0000000 00000000000 12725262515 0023673 5 ustar 00root root 0000000 0000000 fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/lib/fluent/plugin/ 0000775 0000000 0000000 00000000000 12725262515 0025171 5 ustar 00root root 0000000 0000000 fluent-plugin-bin-bef2350e351616b3d52498fc8f52570e1600d13b/lib/fluent/plugin/in_bin.rb 0000664 0000000 0000000 00000013640 12725262515 0026760 0 ustar 00root root 0000000 0000000 # Fluentd input plugin for general hardware monitoring
# Copyright (C) 2016 Nexedi SA and Contributors.
# Klaus Wölfel
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'fluent/input'
module Fluent
class BinInput < NewTailInput
Plugin.register_input('bin', self)
def convert_line_to_event(line, es, tail_watcher)
begin
es.add(nil, line)
rescue => e
log.warn line.dump, error: e.to_s
log.debug_backtrace(e.backtrace)
end
end
def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines))
tw.attach(@loop)
tw
end
class TailWatcher < NewTailInput::TailWatcher
def on_rotate(io)
if @io_handler == nil
if io
# first time
stat = io.stat
fsize = stat.size
inode = stat.ino
last_inode = @pe.read_inode
if inode == last_inode
# rotated file has the same inode number with the last file.
# assuming following situation:
# a) file was once renamed and backed, or
# b) symlink or hardlink to the same file is recreated
# in either case, seek to the saved position
pos = @pe.read_pos
elsif last_inode != 0
# this is FilePositionEntry and fluentd once started.
# read data from the head of the rotated file.
# logs never duplicate because this file is a rotated new file.
pos = 0
@pe.update(inode, pos)
else
# this is MemoryPositionEntry or this is the first time fluentd started.
# seek to the end of the any files.
# logs may duplicate without this seek because it's not sure the file is
# existent file or rotated new file.
pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
end
io.seek(pos)
@io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
else
@io_handler = NullIOHandler.new
end
else
log_msg = "detected rotation of #{@path}"
log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist
@log.info log_msg
if io
stat = io.stat
inode = stat.ino
if inode == @pe.read_inode # truncated
@pe.update_pos(stat.size)
io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
@io_handler.close
@io_handler = io_handler
elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher
@pe.update(inode, io.pos)
io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
@io_handler = io_handler
else # file is rotated and new file found
@update_watcher.call(@path, swap_state(@pe))
end
else # file is rotated and new file not found
# Clear RotateHandler to avoid duplicated file watch in same path.
@rotate_handler = nil
@update_watcher.call(@path, swap_state(@pe))
end
end
def swap_state(pe)
# Use MemoryPositionEntry for rotated file temporary
mpe = MemoryPositionEntry.new
mpe.update(pe.read_inode, pe.read_pos)
@pe = mpe
@io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end
end
class IOHandler < NewTailInput::TailWatcher::IOHandler
def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
@log = log
@log.info "following #{io.path}" if first
@io = io
@pe = pe
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@lines = []
@io.binmode
end
def on_notify
begin
read_more = false
if @lines.empty?
begin
while true
@lines << @io.readpartial(2048)
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
end
end
rescue EOFError
end
end
unless @lines.empty?
if @receive_lines.call(@lines)
@pe.update_pos(@io.pos)
@lines.clear
else
read_more = false
end
end
end while read_more
rescue
@log.error $!.to_s
@log.error_backtrace
close
end
end
end
end
end