Commit 6a4b0ff6 authored by Klaus Wölfel's avatar Klaus Wölfel

Initial commit

parents
source 'https://rubygems.org'
# gem's dependencies are in .gemspec
gemspec
Copyright (C) 2015 Nexedi SA and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
fluent-plugin-system
====================
[Fluentd][] input plugin for binary files based on in_tail.
[Fluentd]: http://fluentd.org/
Gem::Specification.new do |gem|
gem.name = "fluent-plugin-bin"
gem.version = "0.1"
gem.authors = ["Klaus Wölfel"]
gem.email = ["klaus@nexedi.com"]
gem.summary = %q{Fluentd input plugin for binary files}
gem.description = %q{Fluentd input plugin to read binary files based on in_tail}
gem.homepage = "https://lab.nexedi.com/klaus/fluent-plugin-bin"
gem.license = "APLv2"
gem.files = `git ls-files -z`.split("\x0")
gem.require_paths = ["lib"]
gem.add_runtime_dependency "fluentd", "~> 0.12"
end
# Fluentd input plugin for general hardware monitoring
# Copyright (C) 2016 Nexedi SA and Contributors.
# Klaus Wölfel <klaus@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'fluent/input'
module Fluent
class BinInput < NewTailInput
Plugin.register_input('bin', self)
def convert_line_to_event(line, es, tail_watcher)
begin
es.add(nil, line)
rescue => e
log.warn line.dump, error: e.to_s
log.debug_backtrace(e.backtrace)
end
end
def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines))
tw.attach(@loop)
tw
end
class TailWatcher < NewTailInput::TailWatcher
def on_rotate(io)
if @io_handler == nil
if io
# first time
stat = io.stat
fsize = stat.size
inode = stat.ino
last_inode = @pe.read_inode
if inode == last_inode
# rotated file has the same inode number with the last file.
# assuming following situation:
# a) file was once renamed and backed, or
# b) symlink or hardlink to the same file is recreated
# in either case, seek to the saved position
pos = @pe.read_pos
elsif last_inode != 0
# this is FilePositionEntry and fluentd once started.
# read data from the head of the rotated file.
# logs never duplicate because this file is a rotated new file.
pos = 0
@pe.update(inode, pos)
else
# this is MemoryPositionEntry or this is the first time fluentd started.
# seek to the end of the any files.
# logs may duplicate without this seek because it's not sure the file is
# existent file or rotated new file.
pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
end
io.seek(pos)
@io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
else
@io_handler = NullIOHandler.new
end
else
log_msg = "detected rotation of #{@path}"
log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist
@log.info log_msg
if io
stat = io.stat
inode = stat.ino
if inode == @pe.read_inode # truncated
@pe.update_pos(stat.size)
io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
@io_handler.close
@io_handler = io_handler
elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher
@pe.update(inode, io.pos)
io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
@io_handler = io_handler
else # file is rotated and new file found
@update_watcher.call(@path, swap_state(@pe))
end
else # file is rotated and new file not found
# Clear RotateHandler to avoid duplicated file watch in same path.
@rotate_handler = nil
@update_watcher.call(@path, swap_state(@pe))
end
end
def swap_state(pe)
# Use MemoryPositionEntry for rotated file temporary
mpe = MemoryPositionEntry.new
mpe.update(pe.read_inode, pe.read_pos)
@pe = mpe
@io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end
end
class IOHandler < NewTailInput::TailWatcher::IOHandler
def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
@log = log
@log.info "following #{io.path}" if first
@io = io
@pe = pe
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@lines = []
@io.binmode
end
def on_notify
begin
read_more = false
if @lines.empty?
begin
while true
@lines << @io.readpartial(2048)
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
end
end
rescue EOFError
end
end
unless @lines.empty?
if @receive_lines.call(@lines)
@pe.update_pos(@io.pos)
@lines.clear
else
read_more = false
end
end
end while read_more
rescue
@log.error $!.to_s
@log.error_backtrace
close
end
end
end
end
end
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment