logstash-plugins / logstash-codec-fluent Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
I am sending binary data (Happens to be Protobuf messages) to Logstash, and the current implementation of the codec fails to generate messages with the appropriate payload.
I'm using a filter downstream to decode the Protobuf, so all we need is a sane binary representation of the binary data.
Given a config of
input {
tcp {
codec => fluent
port => 24224
}
}
output {
stdout {
codec => "rubydebug"
}
}
and sending a binary message, we get the error:
[2019-02-15T10:33:39,004][ERROR][logstash.codecs.fluent ] Fluent parse error, original data now in message field {:error=>#<NoMethodError: undefined method `merge' for #<String:0x4698b154>>, :data=>["N321GG_e2p", [[1550160094, "\n\x00\"\x9B\x01\n\a\n\x05%_v\xB4SZ\x8F\x01\n\x8C\x01\b\xFFD\x10\x01\x1A\t\tE#\x01\"\x14\x00\x00\x00\"\x03uid*\x05\r\x01\x00\x00\x7F2\x04iptv:\x05CNN_1R\x05-\x00\x00\x00\x00Z\x05-\x00\x00\x00\x00b\x02\b\x00j\a\n\x05%_v\xB4Sr\x05-\x00\x00\x00\x00z\x02ua\x82\x01\x04hash\x8A\x01$142f168d-374b-4f7e-8097-beeff02d4571\x92\x01\x06seatId\x9A\x01\x03ped"]]]}
Presumably you can replicate this by sending non UTF-8 data over Fluentd.
It appears the error occurs from an incomplete handling of the array type in the codec. Ruby looks for a string (the payload) to act as a map, and when that duck meows, the codec fails.
I am able to hack things into working with the following change to fluent.rb
, lines 98-102:
93 when Array
94 # Forward
95 entries.each do |entry|
96 epochtime = entry[0]
97 map = entry[1]
98 if !map.respond_to? :merge
99 map = {
100 "message" => entry[1]
101 }
102 end
103 event = LogStash::Event.new(map.merge(
104 LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
105 "tags" => [ tag ]
106 ))
107 yield event
108 end
I'm not much of a Ruby dev, so I'm looking for the best way to implement binary ingest, hopefully in a way that gets merged upstream.
Running the following pipeline will cause the codec to crash:
input { stdin {}}
filter { ruby { code => "event.set('[logstash_timestamp]', Time.now());" } }
output { stdout { codec => fluent }}
with the following error:
[2023-01-26T16:34:50,400][ERROR][logstash.javapipeline ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(NoMethodError) undefined method `to_msgpack' for 2023-01-26T21:34:50.384Z:LogStash::Timestamp", :exception=>Java::OrgJrubyExceptions::NoMethodError, :backtrace=>["org.msgpack.jruby.Packer.write(org/msgpack/jruby/Packer.java:128)"
Hi,
Using logstash with a tcp input and the fluent coded, with fluentd configured to send log messages to it, actually doesn't work. It looks like the fluent message format might have been changed at some point.
This is testing with fluentd 0.12
If I modify fluent.rb decode method to look something like
def decode(data)
@decoder.feed_each(data) do |obj|
tag = obj[0].to_s
puts(obj[1].class)
entries = obj[1]
if entries.class == String
puts('PackedForward')
entry_decoder = MessagePack::Unpacker.new
entry_decoder.feed_each(entries) do |entry|
puts(entry)
event = LogStash::Event.new(entry[1].merge(
"@timestamp" => Time.at(entry[0]),
"tags" => tag
))
yield event
end
elsif entries.class == Array
puts('Forward')
entries do |entry|
puts(entry)
event = LogStash::Event.new(entry[1].merge(
"@timestamp" => Time.at(entry[0]),
"tags" => tag
))
yield event
end
else
puts('Message')
end
end
end # def decode
Then logastash does work correctly again
Logstash version: docker image - logstash:8.6.2
input {
tcp {
codec => fluent {
nanosecond_precision => true
}
port => 4000
}
# http {
# codec => fluent
# port => 4000
# }
}
output {
stdout { codec => json }
stdout { codec => rubydebug }
}
When fluend message has timestamp with Nano second precision like "2023-05-09T09:39:45.724585Z", This plugin is preserving the nano seconds and printing the timestamp correctly with nano seconds.
When fluend message has timestamp with Milli second precision like "2023-05-09T09:39:45.724Z", This plugin is not preserving the Milli seconds and printing the timestamp as "2023-05-09T09:39:45.000Z".
Currently, Fluentd v1.0's Forward protocol EventTime extension cannot handle in logstash-codec-fluent.
see: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format
input {
tcp {
codec => fluent
port => 24224
}
}
output {
stdout { codec => json }
$ echo '{"current_version":"v0.14", "versions":{"unstable":0.14, "stable":0.12}}' | fluent-cat my.logs
Output json log with subsecond included timestamp like as:
{"@metdata":{"ip_address":"127.0.0.1"},"host":"localhost","versions":{"stable":0.12,"unstable":0.14},"tags":["my.logs"],"current_version":"v0.14","@version":"1","@timestamp":"2018-01-15T06:38:03.432Z","port":53918}
Nothing output when enabling EventTime msgpack extension. (This is default behavior for Fluentd v1.0)
#18.
moved from elastic/logstash#7102
created by @gasparch
When trying to send logs from Docker to Logstash using fluent I've discovered that logstash is not able to parse logs
11:14:11.739 [Ruby-0-Thread-193: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:124] ERROR logstash.inputs.tcp - An error occurred. Closing connection {:client=>"xxxx.3:36058", :exception=>#<TypeError: no implicit conversion from nil to integer>, :backtrace=>["org/jruby/RubyTime.java:1073:in `at'", "org/logstash/ext/JrubyTimestampExtLibrary.java:216:in `at'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-3.0.2-java/lib/logstash/codecs/fluent.rb:46:in `decode'", "org/msgpack/jruby/MessagePackLibrary.java:195:in `each'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-3.0.2-java/lib/logstash/codecs/fluent.rb:42:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:182:in `handle_socket'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:153:in `server_connection_thread'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-4.1.0/lib/logstash/inputs/tcp.rb:151:in `server_connection_thread'"]}
Same Docker can send logs to Fluentd and they are processed normally. After some debug here is a minimal case that triggers the error.
Only changed file from default installation is /etc/logstash/conf.d/config.conf
input {
tcp {
codec => fluent
port => 4000
}
}
output {
stdout {
codec => rubydebug
}
}
docker run --rm --name 'containName' --log-driver fluentd --log-opt fluentd-address=xxxxx.4:4000 --log-opt tag="test" busybox /bin/sh -c 'while true; do echo "test"; sleep 1; break; done'
I managed to write minimal code that produces the error:
gem install fluent-logger
irb
and issue commandsrequire 'fluent-logger'
logger = Fluent::Logger::FluentLogger.new(nil, :host => 'xxxxx.4', :port => 4000)
Following command produces log in Logstash
logger.post("some_tag", {"container_id" => "1111111111111111111111111111111" })
Following command causes error in Logstash
logger.post("some_tag", {"container_id" => "1111111111111111111111111111111X" })
Adding just one symbol totally ruins MsgPack decoder in Logstash.
In case of Docker logging Logstash is able to wrongly decode first half of packet, incorrectly decodes container_id and then tries to decode remainder of message as MsgPack packet and fails there (because remaining junk has no tag/epochtime/etc fields).
I am sending fluend logs over udp, when logstash is restarted this error can occur:
{
:timestamp=>"2015-07-24T00:22:33.841000+0000",
:message=>"Exception in inputworker",
"exception"=>#<TypeError: no implicit conversion from nil to integer>,
"backtrace"=> [
"org/jruby/RubyTime.java:1036:in `at'", "/opt/logstash/lib/logstash/codecs/fluent.rb:40:in `decode'",
"org/msgpack/jruby/MessagePackLibrary.java:196:in `each'",
"/opt/logstash/lib/logstash/codecs/fluent.rb:39:in `decode'",
"/opt/logstash/lib/logstash/inputs/udp.rb:100:in `inputworker'",
"/opt/logstash/lib/logstash/inputs/udp.rb:75:in `udp_listener'"
],
:level=>:error
}
I think a rescue and log exception is required rather exit?
Something like https://github.com/logstash-plugins/logstash-codec-json/blob/master/lib/logstash/codecs/json.rb#L49
Currently, Fluentd Forward protocol cannot handle Array or Other classes' value in tag.
see: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1
input {
generator {
lines => [
"line 1",
"line 2",
"line 3"
]
count => 3
tags => ['test', 'logstash']
}
}
output {
tcp {
codec => fluent
host => localhost
port => 24224
}
}
fluent-logstash-in.conf:
<source>
@type forward
</source>
<match log>
@type stdout
</match>
$ td-agent -c fluent-logstash-in.conf
JSON style output will be appeared in terminal which is running logstash:
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 3","@timestamp":"2018-01-24T03:21:53.008Z","host":"<hostname>.local","@version":"1","sequence":2}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 2","@timestamp":"2018-01-24T03:21:53.006Z","host":"<hostname>.local","@version":"1","sequence":0}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 1","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":1}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 3","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":1}
2018-01-24 12:21:52.000000000 +0900 test.logstash: {"message":"line 1","@timestamp":"2018-01-24T03:21:52.980Z","host":"<hostname>.local","@version":"1","sequence":0}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 2","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":1}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 2","@timestamp":"2018-01-24T03:21:53.008Z","host":"<hostname>.local","@version":"1","sequence":2}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 3","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":0}
2018-01-24 12:21:53.000000000 +0900 test.logstash: {"message":"line 1","@timestamp":"2018-01-24T03:21:53.007Z","host":"<hostname>.local","@version":"1","sequence":2}
Fluentd complains cannot handle user-defined tag in logstash configuration:
2018-01-24 11:39:50 +0900 [warn]: #0 emit transaction failed: error_class=TypeError error="no implicit conversion of Array into String" location="/Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'" tag=["test", "logstash"]
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:67:in `match?'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:244:in `block in find'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:243:in `each'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:243:in `each_with_index'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:243:in `find'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:111:in `block in match'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:129:in `get'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:110:in `match'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:96:in `emit_stream'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/event_router.rb:87:in `emit'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:337:in `on_message'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:211:in `block in handle_connection'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:248:in `block (3 levels) in read_messages'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:247:in `feed_each'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:247:in `block (2 levels) in read_messages'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin/in_forward.rb:256:in `block in read_messages'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/server.rb:591:in `on_read_without_connection'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/io.rb:123:in `on_readable'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/io.rb:186:in `on_readable'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/loop.rb:88:in `run_once'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.5.0/gems/cool.io-1.5.3/lib/cool.io/loop.rb:88:in `run'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/event_loop.rb:84:in `block in start'
2018-01-24 11:39:50 +0900 [warn]: #0 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2018-01-24 11:39:50 +0900 [error]: #0 unexpected error on reading data host="127.0.0.1" port=56161 error_class=TypeError error="no implicit conversion of Array into String"
2018-01-24 11:39:50 +0900 [error]: #0 suppressed same stacktrace
2018-01-24 11:39:50 +0900 [warn]: #0 emit transaction failed: error_class=TypeError error="no implicit conversion of Array into String" location="/Users/cosmo/GitHub/fluentd/lib/fluent/match.rb:128:in `match'" tag=["test", "logstash"]
#21.
From @edwardchuang elastic/logstash#3121
I am trying to export my logstash log to fluentd forwarder by
output {
tcp {
codec => "fluent"
host => "10.88.51.215"
port => 24224
}
}
But I got the error message below:
ArgumentError: Cannot pack type: org.jruby.RubyTime
pack at org/msgpack/jruby/MessagePackLibrary.java:63
encode at /home/logstash/lib/logstash/codecs/fluent.rb:52
receive at /home/logstash/lib/logstash/outputs/tcp.rb:143
handle at /home/logstash/lib/logstash/outputs/base.rb:86
initialize at (eval):61
call at org/jruby/RubyProc.java:271
output at /home/logstash/lib/logstash/pipeline.rb:266
outputworker at /home/logstash/lib/logstash/pipeline.rb:225
start_outputs at /home/logstash/lib/logstash/pipeline.rb:152
Any clue ?
The event structure of Fluentd consists of the following:
When fluentd event is received and decoded, the Time
(epoch time) is used as timestamp when creating the Logstash event which sets the time for the event. Since the Time
is an epoch time, milliseconds can't be set. Even if there is a field in the actual log content called @timestamp
, it gets overwritten.
In our case, we set the @timestamp
value in the record with a precision grater than epoch time and we do not wish this to be overwritten.
Here is the code setting @timestamp
in the plugin
Can we have an option to NOT use the Time
to set the Logstash event time and let logstash use the @timestamp
field instead if present in actual log?
This way we're not just limited to seconds and can have time in finer precision
Hi,
this plugin has no test at all, this is a situation that should be amended soon, so for example changes in both the plugin or the logstash core himself, are properly tested in the CI environment.
If you are giving this plugin some test, feel free to update elastic/logstash#3740 so we keep track of it.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.