Commit e320f937 authored by Klaus Wölfel's avatar Klaus Wölfel

Adapt to fluentd 0.14

parent f69d1063
...@@ -11,5 +11,5 @@ Gem::Specification.new do |gem| ...@@ -11,5 +11,5 @@ Gem::Specification.new do |gem|
gem.files = `git ls-files -z`.split("\x0") gem.files = `git ls-files -z`.split("\x0")
gem.require_paths = ["lib"] gem.require_paths = ["lib"]
gem.add_runtime_dependency "fluentd", "~> 0.12" gem.add_runtime_dependency "fluentd", "~> 0.14"
end end
...@@ -15,12 +15,12 @@ ...@@ -15,12 +15,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
require 'fluent/input' require 'fluent/plugin/in_tail'
module Fluent module Fluent::Plugin
class BinInput < NewTailInput class BinInput < TailInput
Plugin.register_input('bin', self) Fluent::Plugin.register_input('bin', self)
def convert_line_to_event(line, es, tail_watcher) def convert_line_to_event(line, es, tail_watcher)
begin begin
...@@ -34,11 +34,14 @@ module Fluent ...@@ -34,11 +34,14 @@ module Fluent
def setup_watcher(path, pe) def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines)) tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines))
tw.attach(@loop) tw.attach do |watcher|
timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
event_loop_attach(watcher.stat_trigger)
end
tw tw
end end
class TailWatcher < NewTailInput::TailWatcher class TailWatcher < TailInput::TailWatcher
def on_rotate(io) def on_rotate(io)
if @io_handler == nil if @io_handler == nil
if io if io
...@@ -104,7 +107,7 @@ module Fluent ...@@ -104,7 +107,7 @@ module Fluent
def swap_state(pe) def swap_state(pe)
# Use MemoryPositionEntry for rotated file temporary # Use MemoryPositionEntry for rotated file temporary
mpe = MemoryPositionEntry.new mpe = TailInput::MemoryPositionEntry.new
mpe.update(pe.read_inode, pe.read_pos) mpe.update(pe.read_inode, pe.read_pos)
@pe = mpe @pe = mpe
@io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.
...@@ -113,7 +116,7 @@ module Fluent ...@@ -113,7 +116,7 @@ module Fluent
end end
end end
class IOHandler < NewTailInput::TailWatcher::IOHandler class IOHandler < TailInput::TailWatcher::IOHandler
def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines) def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
@log = log @log = log
@log.info "following #{io.path}" if first @log.info "following #{io.path}" if first
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment