Giter VIP home page Giter VIP logo

logstash-codec-fluent's People

Contributors

colinsurprenant avatar cosmo0920 avatar electrical avatar ich199 avatar jakelandis avatar jordansissel avatar jsvd avatar kares avatar mashhurs avatar merlindmc avatar original-brownbear avatar ph avatar robbavey avatar suyograo avatar yaauie avatar ycombinator avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

logstash-codec-fluent's Issues

Codec does not support binary input

I am sending binary data (Happens to be Protobuf messages) to Logstash, and the current implementation of the codec fails to generate messages with the appropriate payload.
I'm using a filter downstream to decode the Protobuf, so all we need is a sane binary representation of the binary data.

Given a config of

input {
  tcp {
    codec => fluent
    port => 24224
  }
}

output {
  stdout {
    codec => "rubydebug"
  }
}

and sending a binary message, we get the error:

[2019-02-15T10:33:39,004][ERROR][logstash.codecs.fluent   ] Fluent parse error, original data now in message field {:error=>#<NoMethodError: undefined method `merge' for #<String:0x4698b154>>, :data=>["N321GG_e2p", [[1550160094, "\n\x00\"\x9B\x01\n\a\n\x05%_v\xB4SZ\x8F\x01\n\x8C\x01\b\xFFD\x10\x01\x1A\t\tE#\x01\"\x14\x00\x00\x00\"\x03uid*\x05\r\x01\x00\x00\x7F2\x04iptv:\x05CNN_1R\x05-\x00\x00\x00\x00Z\x05-\x00\x00\x00\x00b\x02\b\x00j\a\n\x05%_v\xB4Sr\x05-\x00\x00\x00\x00z\x02ua\x82\x01\x04hash\x8A\x01$142f168d-374b-4f7e-8097-beeff02d4571\x92\x01\x06seatId\x9A\x01\x03ped"]]]}

Presumably you can replicate this by sending non UTF-8 data over Fluentd.

It appears the error occurs from an incomplete handling of the array type in the codec. Ruby looks for a string (the payload) to act as a map, and when that duck meows, the codec fails.

I am able to hack things into working with the following change to fluent.rb, lines 98-102:

 93     when Array
 94       # Forward
 95       entries.each do |entry|
 96         epochtime = entry[0]
 97         map = entry[1]
 98         if !map.respond_to? :merge 
 99           map = {
100             "message" => entry[1]
101           }
102         end
103         event = LogStash::Event.new(map.merge(
104                                       LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
105                                       "tags" => [ tag ]
106                                     ))
107         yield event
108       end

I'm not much of a Ruby dev, so I'm looking for the best way to implement binary ingest, hopefully in a way that gets merged upstream.

Timestamps in event other than `@timestamp` will cause the plugin to crash

Running the following pipeline will cause the codec to crash:

input { stdin {}} 
filter { ruby { code => "event.set('[logstash_timestamp]', Time.now());" } } 
output { stdout { codec => fluent  }}

with the following error:

[2023-01-26T16:34:50,400][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(NoMethodError) undefined method `to_msgpack' for 2023-01-26T21:34:50.384Z:LogStash::Timestamp", :exception=>Java::OrgJrubyExceptions::NoMethodError, :backtrace=>["org.msgpack.jruby.Packer.write(org/msgpack/jruby/Packer.java:128)"

fluent codec doesn't work with latest version of fluentd

Hi,

Using logstash with a tcp input and the fluent coded, with fluentd configured to send log messages to it, actually doesn't work. It looks like the fluent message format might have been changed at some point.

This is testing with fluentd 0.12

If I modify fluent.rb decode method to look something like

  def decode(data)
    @decoder.feed_each(data) do |obj|
      tag = obj[0].to_s
      puts(obj[1].class)
      entries = obj[1]
      if entries.class == String
        puts('PackedForward')
        entry_decoder = MessagePack::Unpacker.new
        entry_decoder.feed_each(entries) do |entry|
          puts(entry)
          event = LogStash::Event.new(entry[1].merge(
            "@timestamp" => Time.at(entry[0]),
            "tags" => tag
          ))
          yield event
        end
      elsif entries.class == Array
        puts('Forward')
        entries do |entry|
          puts(entry)
          event = LogStash::Event.new(entry[1].merge(
            "@timestamp" => Time.at(entry[0]),
            "tags" => tag
          ))
          yield event
        end
      else
        puts('Message')
      end
    end
  end # def decode

Then logastash does work correctly again

Milliseconds in timestamp is not preserved when setting nanosecond_precision as true

Logstash version: docker image - logstash:8.6.2

input {
    tcp {
        codec => fluent {
          nanosecond_precision => true
        }
        port => 4000
    }

  #  http {
  #      codec => fluent
  #      port => 4000
  #  }

}

output {
  stdout { codec => json }
  stdout { codec => rubydebug }
}

When fluend message has timestamp with Nano second precision like "2023-05-09T09:39:45.724585Z", This plugin is preserving the nano seconds and printing the timestamp correctly with nano seconds.

When fluend message has timestamp with Milli second precision like "2023-05-09T09:39:45.724Z", This plugin is not preserving the Milli seconds and printing the timestamp as "2023-05-09T09:39:45.000Z".

Handle EventTime Forward protocol v1 extension

Currently, Fluentd v1.0's Forward protocol EventTime extension cannot handle in logstash-codec-fluent.

see: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format

  • Version: 3.1.5 and 3.2.0
  • Operating System: macOS and Linux
  • Config File (if you have sensitive info, please remove it):
input {
  tcp {
    codec => fluent
    port => 24224
  }
}
output {
  stdout { codec => json }
  • Sample Data and Steps to Reproduce:
  1. Install td-agent3
  2. execute fluent-cat with EventTime extension:
$  echo '{"current_version":"v0.14", "versions":{"unstable":0.14, "stable":0.12}}' | fluent-cat my.logs
  • Expected Result

Output json log with subsecond included timestamp like as:

{"@metdata":{"ip_address":"127.0.0.1"},"host":"localhost","versions":{"stable":0.12,"unstable":0.14},"tags":["my.logs"],"current_version":"v0.14","@version":"1","@timestamp":"2018-01-15T06:38:03.432Z","port":53918}
  • Actual Result

Nothing output when enabling EventTime msgpack extension. (This is default behavior for Fluentd v1.0)

  • Fixing Candidates Patch

#18.

bug in input fluend codec(decode) / MsgPack decoder #7102

moved from elastic/logstash#7102
created by @gasparch


When trying to send logs from Docker to Logstash using fluent I've discovered that logstash is not able to parse logs

11:14:11.739 [Ruby-0-Thread-193: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:124] ERROR logstash.inputs.tcp - An error occurred. Closing connection {:client=>"xxxx.3:36058", :exception=>#<TypeError: no implicit conversion from nil to integer>, :backtrace=>["org/jruby/RubyTime.java:1073:in `at'", "org/logstash/ext/JrubyTimestampExtLibrary.java:216:in `at'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-3.0.2-java/lib/logstash/codecs/fluent.rb:46:in `decode'", "org/msgpack/jruby/MessagePackLibrary.java:195:in `each'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-3.0.2-java/lib/logstash/codecs/fluent.rb:42:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:182:in `handle_socket'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:153:in `server_connection_thread'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:151:in `server_connection_thread'"]}

Same Docker can send logs to Fluentd and they are processed normally. After some debug here is a minimal case that triggers the error.

  • Version:
    Logstash 1:5.4.0-1
  • Operating System:
    CentOS 7, kernel 3.10.0-514.6.1.el7.x86_64
  • Config File (if you have sensitive info, please remove it):

Only changed file from default installation is /etc/logstash/conf.d/config.conf

input {
        tcp {
                codec => fluent
                port => 4000
        }
}

output {
        stdout { 
                codec => rubydebug 
        }
}
  • Sample Data:

docker run --rm --name 'containName' --log-driver fluentd --log-opt fluentd-address=xxxxx.4:4000 --log-opt tag="test" busybox /bin/sh -c 'while true; do echo "test"; sleep 1; break; done'

  • Steps to Reproduce:

I managed to write minimal code that produces the error:

  1. CentOS7, install ruby (ruby 2.0.0p648 (2015-12-16) [x86_64-linux]), and gem install fluent-logger
  2. run irb and issue commands
require 'fluent-logger'
logger = Fluent::Logger::FluentLogger.new(nil, :host => 'xxxxx.4', :port => 4000)

Following command produces log in Logstash

logger.post("some_tag", {"container_id" => "1111111111111111111111111111111" })

Following command causes error in Logstash

logger.post("some_tag", {"container_id" => "1111111111111111111111111111111X" })

Adding just one symbol totally ruins MsgPack decoder in Logstash.

In case of Docker logging Logstash is able to wrongly decode first half of packet, incorrectly decodes container_id and then tries to decode remainder of message as MsgPack packet and fails there (because remaining junk has no tag/epochtime/etc fields).

Codec kills listener on exception

I am sending fluend logs over udp, when logstash is restarted this error can occur:

{
  :timestamp=>"2015-07-24T00:22:33.841000+0000",
  :message=>"Exception in inputworker",
  "exception"=>#<TypeError: no implicit conversion from nil to integer>,
  "backtrace"=> [
    "org/jruby/RubyTime.java:1036:in `at'", "/opt/logstash/lib/logstash/codecs/fluent.rb:40:in `decode'",
    "org/msgpack/jruby/MessagePackLibrary.java:196:in `each'",
    "/opt/logstash/lib/logstash/codecs/fluent.rb:39:in `decode'",
    "/opt/logstash/lib/logstash/inputs/udp.rb:100:in `inputworker'",
    "/opt/logstash/lib/logstash/inputs/udp.rb:75:in `udp_listener'"
  ],
  :level=>:error
}

I think a rescue and log exception is required rather exit?

Something like https://github.com/logstash-plugins/logstash-codec-json/blob/master/lib/logstash/codecs/json.rb#L49

Handle user-defined tags in #encode

Currently, Fluentd Forward protocol cannot handle Array or Other classes' value in tag.

see: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1

  • Version: 3.1.5
  • Operating System: macOS and Linux
  • Config File (if you have sensitive info, please remove it):
input {
    generator {
    lines => [
      "line 1",
      "line 2",
      "line 3"
    ]
    count => 3 
    tags => ['test', 'logstash']
  }
}
output {
  tcp {
    codec => fluent
    host => localhost
    port => 24224
  }
}
  • Sample Data and Steps to Reproduce:
  1. Install td-agent3
  2. execute td-agent with the following configuration and command:

fluent-logstash-in.conf:

<source>
  @type forward
</source>

<match log>
  @type stdout
</match>
$ td-agent -c fluent-logstash-in.conf
  • Expected Result

JSON style output will be appeared in terminal which is running logstash:

2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 3","@timestamp":"2018-01-24T03:21:53.008Z","host":"<hostname>.local","@version":"1","sequence":2}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 2","@timestamp":"2018-01-24T03:21:53.006Z","host":"<hostname>.local","@version":"1","sequence":0}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 1","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":1}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 3","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":1}
2018-01-24 12:21:52.000000000 +0900 test.logstash: {"message":"line 1","@timestamp":"2018-01-24T03:21:52.980Z","host":"<hostname>.local","@version":"1","sequence":0}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 2","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":1}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 2","@timestamp":"2018-01-24T03:21:53.008Z","host":"<hostname>.local","@version":"1","sequence":2}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 3","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":0}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 1","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":2}
  • Actual Result

Fluentd complains cannot handle user-defined tag in logstash configuration:

2018-01-24 11:39:50 +0900 [warn]: #0 emit transaction failed: error_class=TypeError error="no implicit conversion of Array into String" location="/Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'" tag=["test", "logstash"]
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:67:in `match?'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:244:in `block in find'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:243:in `each'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:243:in `each_with_index'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:243:in `find'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:111:in `block in match'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:129:in `get'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:110:in `match'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:96:in `emit_stream'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:87:in `emit'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:337:in `on_message'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:211:in `block in handle_connection'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:248:in `block (3 levels) in read_messages'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:247:in `feed_each'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:247:in `block (2 levels) in read_messages'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:256:in `block in read_messages'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/server.rb:591:in `on_read_without_connection'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/io.rb:123:in `on_readable'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/io.rb:186:in `on_readable'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/loop.rb:88:in `run_once'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/loop.rb:88:in `run'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/event_loop.rb:84:in `block in start'
  2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2018-01-24 11:39:50 +0900 [error]: #0 unexpected error on reading data host="127.0.0.1" port=56161 error_class=TypeError error="no implicit conversion of Array into String"
  2018-01-24 11:39:50 +0900 [error]: #0 suppressed same stacktrace
2018-01-24 11:39:50 +0900 [warn]: #0 emit transaction failed: error_class=TypeError error="no implicit conversion of Array into String" location="/Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'" tag=["test", "logstash"]
  • Fixing Candidates Patch

#21.

ArgumentError: Cannot pack type: org.jruby.RubyTime while trying use tcp output

From @edwardchuang elastic/logstash#3121

I am trying to export my logstash log to fluentd forwarder by

output {
 tcp {
    codec => "fluent"
    host => "10.88.51.215"
    port => 24224
  }
}

But I got the error message below:

ArgumentError: Cannot pack type: org.jruby.RubyTime
           pack at org/msgpack/jruby/MessagePackLibrary.java:63
         encode at /home/logstash/lib/logstash/codecs/fluent.rb:52
        receive at /home/logstash/lib/logstash/outputs/tcp.rb:143
         handle at /home/logstash/lib/logstash/outputs/base.rb:86
     initialize at (eval):61
           call at org/jruby/RubyProc.java:271
         output at /home/logstash/lib/logstash/pipeline.rb:266
   outputworker at /home/logstash/lib/logstash/pipeline.rb:225
  start_outputs at /home/logstash/lib/logstash/pipeline.rb:152

Any clue ?

@timestamp field ignored in actual message

The event structure of Fluentd consists of the following:

  • Tag
  • Time (Epoch time)
  • record (Actual log content - JSON format)

When fluentd event is received and decoded, the Time (epoch time) is used as timestamp when creating the Logstash event which sets the time for the event. Since the Time is an epoch time, milliseconds can't be set. Even if there is a field in the actual log content called @timestamp, it gets overwritten.

In our case, we set the @timestamp value in the record with a precision grater than epoch time and we do not wish this to be overwritten.

Here is the code setting @timestamp in the plugin

Can we have an option to NOT use the Time to set the Logstash event time and let logstash use the @timestamp field instead if present in actual log?

This way we're not just limited to seconds and can have time in finer precision

No tests / Tests missing

Hi,
this plugin has no test at all, this is a situation that should be amended soon, so for example changes in both the plugin or the logstash core himself, are properly tested in the CI environment.

If you are giving this plugin some test, feel free to update elastic/logstash#3740 so we keep track of it.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.