Giter VIP home page Giter VIP logo

fluency's Introduction

Fluency

Maven Central Coverage Status

High throughput data ingestion logger to Fluentd and Fluent Bit (and AWS S3 and Treasure Data.)

This document is for version 2. If you're looking for a document for version 1, see this.

Ingestion to Fluentd

Features

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-fluentd:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-fluentd</artifactId>
    <version>${fluency.version}</version>
</dependency>

Usage

Create Fluency instance

For single Fluentd
// Single Fluentd(localhost:24224 by default)
//   - TCP heartbeat (by default)
//   - Asynchronous flush (by default)
//   - Without ack response (by default)
//   - Flush attempt interval is 600ms (by default)
//   - Initial chunk buffer size is 1MB (by default)
//   - Threshold chunk buffer size to flush is 4MB (by default)
//   - Threshold chunk buffer retention time to flush is 1000 ms (by default)
//   - Max total buffer size is 512MB (by default)
//   - Use off heap memory for buffer pool (by default)
//   - Max retries of sending events is 8 (by default)
//   - Max wait until all buffers are flushed is 10 seconds (by default)
//   - Max wait until the flusher is terminated is 10 seconds (by default)
//   - Socket connection timeout is 5000 ms (by default)
//   - Socket read timeout is 5000 ms (by default)
Fluency fluency = new FluencyBuilderForFluentd().build();
For multiple Fluentd with failover
// Multiple Fluentd(localhost:24224, localhost:24225)
Fluency fluency = new FluencyBuilderForFluentd().build(
        Arrays.asList(
                new InetSocketAddress(24224),
                new InetSocketAddress(24225)));
Enable ACK response mode
// Single Fluentd(localhost:24224)
//   - With ack response
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setAckResponseMode(true);
Fluency fluency = builder.build();
Enable file backup mode

In this mode, Fluency takes backup of unsent memory buffers as files when closing and then resends them when restarting

// Single Fluentd(localhost:24224)
//   - Backup directory is the temporary directory
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFileBackupDir(System.getProperty("java.io.tmpdir"));
Fluency fluency = builder.build();
Buffer configuration

Fluency has some parameters to configure a flush timing of buffer. This diagram may help to understand it. buffer-flush

For high throughput data ingestion with high latency
// Single Fluentd(xxx.xxx.xxx.xxx:24224)
//   - Initial chunk buffer size is 16MB
//   - Threshold chunk buffer size to flush is 64MB
//     Keep this value (BufferRetentionSize) between `Initial chunk buffer size` and `Max total buffer size`
//   - Max total buffer size = 1024MB
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setBufferChunkInitialSize(16 * 1024 * 1024);
builder.setBufferChunkRetentionSize(64 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
Fluency fluency = builder.build("xxx.xxx.xxx.xxx", 24224);
Socket configuration
// Single Fluentd(localhost:24224)
//   - Socket connection timeout is 15000 ms
//   - Socket read timeout is 10000 ms
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setConnectionTimeoutMilli(15000);
builder.setReadTimeoutMilli(10000);
Fluency fluency = builder.build();
Waits on close sequence
// Single Fluentd(localhost:24224)
//   - Max wait until all buffers are flushed is 30 seconds
//   - Max wait until the flusher is terminated is 40 seconds
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setWaitUntilBufferFlushed(30);
builder.setWaitUntilFlusherTerminated(40);
Fluency fluency = builder.build();
Register Jackson modules
// Single Fluentd(localhost:24224)
//   - SimpleModule that has FooSerializer is enabled
SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(Foo.class, new FooSerializer());
FluentdRecordFormatter.Config recordFormatterConfig =
	new FluentdRecordFormatter.Config();
recordFormatterConfig.setJacksonModules(
	Collections.singletonList(simpleModule));
FluencyBuilderForFluentd builder = new FluencyBuilder();
builder.setRecordFormatter(new FluentdRecordFormatter(recordFormatterConfig));

Fluency fluency = builder.build();
Set a custom error handler
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setErrorHandler(ex -> {
  // Send a notification
});
Fluency fluency = builder.build();

    :

// If flushing events to Fluentd fails and retried out, the error handler is called back.
fluency.emit("foo.bar", event);
Send requests over SSL/TLS
// Single Fluentd(localhost:24224)
//   - Enable SSL/TLS
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setSslEnabled(true);

// Or, provide your own SSLSocketFactory at runtime (replace with your own)
builder.setSslSocketFactory(SSLSocketFactory.getDefault())

Fluency fluency = builder.build();

If you want to use a custom truststore, specify the JKS file path using -Djavax.net.ssl.trustStore (and -Djavax.net.ssl.trustStorePassword if needed). You can create a custom truststore like this:

$ keytool -import -file server.crt -alias mytruststore -keystore truststore.jks

For server side configuration, see https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls/ssl-encryption .

Mutual TLS

See this project.

Other configurations
// Multiple Fluentd(localhost:24224, localhost:24225)
//   - Flush attempt interval is 200ms
//   - Max retry of sending events is 12
//   - Use JVM heap memory for buffer pool
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFlushAttemptIntervalMillis(200);
builder.setSenderMaxRetryCount(12);
builder.setJvmHeapBufferMode(true);
Fluency fluency = builder.build(
        Arrays.asList(
                new InetSocketAddress(24224),
                new InetSocketAddress(24225));

Emit event

String tag = "foo_db.bar_tbl";
Map<String, Object> event = new HashMap<String, Object>();
event.put("name", "komamitsu");
event.put("age", 42);
event.put("rate", 3.14);
fluency.emit(tag, event);

If you want to use EventTime as a timestamp, call Fluency#emit with an EventTime object in the following way

int epochSeconds;
int nanoseconds;
    :
EventTime eventTime = EventTime.fromEpoch(epochSeconds, nanoseconds);

// You can also create an EventTime object like this
// EventTime eventTime = EventTime.fromEpochMilli(System.currentTimeMillis());

fluency.emit(tag, eventTime, event);

Error handling

Fluency#emit keeps buffered data in memory even if a retriable exception happens. But in case of buffer full, the method throws org.komamitsu.fluency.BufferFullException. There are 2 options to handle the exception.

a) Ignore the exception so that main application isn't blocked
try {
    fluency.emit(tag, event);
}
catch (BufferFullException e) {
    // Just log the error and move forward
    logger.warn("Fluency's buffer is full", e);
}
b) Retry until the data is successfully buffered
// Considering maximum retry count would be also good
while (true) {
	try {
	    fluency.emit(tag, event);
	    break;
	}
	catch (BufferFullException e) {
	    // Log the error, sleep and retry
	    logger.warn("Fluency's buffer is full. Retrying", e);
	    TimeUnit.SECONDS.sleep(5);
	}
}

Which to choose depends on how important the data is and how long the application can be blocked.

Wait until buffered data is flushed and release resource

fluency.close();

Know how much Fluency is allocating memory

LOG.debug("Memory size allocated by Fluency is {}", fluency.getAllocatedBufferSize());

Know how much Fluency is buffering unsent data in memory

LOG.debug("Unsent data size buffered by Fluency in memory is {}", fluency.getBufferedDataSize());

Ingestion to Fluentd to use extra features with Java 16 or later

Features

  • UNIX domain socket support

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-fluentd:${fluency.version}"
    compile "org.komamitsu:fluency-fluentd-ext:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-fluentd</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-fluentd-ext</artifactId>
    <version>${fluency.version}</version>
</dependency>

Usage

Create Fluency instance to communicate with Fluentd via UNIX domain socket

// Single Fluentd(UNIX socket path: /tmp/fluentd/ingest.socket)
//   - UNIX domain socket heartbeat (by default)
//   - Asynchronous flush (by default)
//   - Without ack response (by default)
//   - Flush attempt interval is 600ms (by default)
//   - Initial chunk buffer size is 1MB (by default)
//   - Threshold chunk buffer size to flush is 4MB (by default)
//   - Threshold chunk buffer retention time to flush is 1000 ms (by default)
//   - Max total buffer size is 512MB (by default)
//   - Use off heap memory for buffer pool (by default)
//   - Max retries of sending events is 8 (by default)
//   - Max wait until all buffers are flushed is 10 seconds (by default)
//   - Max wait until the flusher is terminated is 10 seconds (by default)
Fluency fluency = new FluencyExtBuilderForFluentd().build(Paths.get("/tmp/fluentd/ingest.socket");

Ingestion to Treasure Data

Features

  • Asynchronous flush
  • Backup of buffered data on local disk
  • Automatic database/table creation

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-treasuredata:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-treasuredata</artifactId>
    <version>${fluency.version}</version>
</dependency>

Create Fluency instance

Default configuration
// Asynchronous flush (by default)
// Flush attempt interval is 600ms (by default)
// Initial chunk buffer size is 4MB (by default)
// Threshold chunk buffer size to flush is 64MB (by default)
// Threshold chunk buffer retention time to flush is 30000 ms (by default)
// Max total buffer size is 512MB (by default)
// Use off heap memory for buffer pool (by default)
// Max retries of sending events is 10 (by default)
// Max wait until all buffers are flushed is 10 seconds (by default)
// Max wait until the flusher is terminated is 10 seconds (by default)
Fluency fluency = new FluencyBuilderForTreasureData().build(yourApiKey);
Buffer configuration for high throughput data ingestion with high latency
// Initial chunk buffer size is 32MB
// Threshold chunk buffer size to flush is 256MB
// Threshold chunk buffer retention time to flush is 120 seconds
// Max total buffer size is 1024MB
// Sender's working buffer size 32KB
FluencyBuilderForTreasureData builder = new FluencyBuilderForTreasureData();
builder.setBufferChunkInitialSize(32 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
builder.setBufferChunkRetentionSize(256 * 1024 * 1024);
builder.setBufferChunkRetentionTimeMillis(120 * 1000);
builder.setSenderWorkBufSize(32 * 1024);
Fluency fluency = builder.build(yourApiKey);
Customize Treasure Data endpoint
Fluency fluency = new FluencyBuilderForTreasureData()
						.build(yourApiKey, tdEndpoint);
Other configurations

Some of other usages are same as ingestion to Fluentd. See Ingestion to Fluentd > Usage above.

Ingestion to AWS S3

Features

  • Asynchronous flush
  • Backup of buffered data on local disk
  • Several format supports
    • CSV
    • JSONL
    • MessagePack
  • GZIP compression
  • Customizable S3 bucket/key decision rule

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-aws-s3:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-aws-s3</artifactId>
    <version>${fluency.version}</version>
</dependency>

Create Fluency instance

Default configuration for JSONL format
// Asynchronous flush (by default)
// Flush attempt interval is 600ms (by default)
// Initial chunk buffer size is 4MB (by default)
// Threshold chunk buffer size to flush is 64MB (by default)
// Threshold chunk buffer retention time to flush is 30000 ms (by default)
// Max total buffer size is 512MB (by default)
// Use off heap memory for buffer pool (by default)
// Sender's working buffer size 8KB (by default)
// Max retries of sending events is 10 (by default)
// Initial retry interval of sending events is 1000 ms (by default)
// Retry backoff factor of sending events is 2.0 (by default)
// Max retry interval of sending events is 30000 ms (by default)
// Max wait until all buffers are flushed is 10 seconds (by default)
// Max wait until the flusher is terminated is 10 seconds (by default)
// Destination S3 bucket is specified by Fluency#emit()'s "tag" parameter (by default)
// Destination S3 key format is "yyyy/MM/dd/HH/mm-ss-SSSSSS" (by default)
// Destination S3 key is decided as UTC (by default)
// GZIP compression is enabled (by default)
// File format is JSONL
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
Fluency fluency = builder.build();
Default configuration for MessagePack format
// File format is MessagePack
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.MESSAGE_PACK);
Fluency fluency = builder.build();
Default configuration for CSV format
// File format is CSV
// Expected columns are "time", "age", "name", "comment"
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.CSV);
builder.setFormatCsvColumnNames(Arrays.asList("time", "age", "name", "comment"));
Fluency fluency = builder.build();
AWS S3 configuration

fluency-aws-s3 follows default credential provider chain. If you want to explicitly specify credentials, use the following APIs.

// AWS S3 region is "us-east-1"
// AWS S3 endpoint is "https://another.s3.endpoi.nt"
// AWS access key id is "ABCDEFGHIJKLMNOPQRST"
// AWS secret access key is "ZaQ1XsW2CdE3VfR4BgT5NhY6"
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setAwsRegion("us-east-1");
builder.setAwsEndpoint("https://another.s3.endpoi.nt");
builder.setAwsAccessKeyId("ABCDEFGHIJKLMNOPQRST");
builder.setAwsSecretAccessKey("ZaQ1XsW2CdE3VfR4BgT5NhY6");
Fluency fluency = builder.build();
Disable compression
// GZIP compression is disabled
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setCompressionEnabled(false);
Fluency fluency = builder.build();
Change timezone used in S3 key decision rule
// Destination S3 key is decided as JST
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setS3KeyTimeZoneId(ZoneId.of("JST", SHORT_IDS));
Fluency fluency = builder.build();
Customize S3 destination decision rule
// Destination S3 bucket is "fixed-bucket-name"
// Destination S3 key format is UNIX epoch seconds rounded to 1 hour range
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setCustomS3DestinationDecider((tag, time) ->
    new S3DestinationDecider.S3Destination(
        "fixed-bucket-name",
        String.format("%s-%d", tag, time.getEpochSecond() / 3600)
));
Fluency fluency = builder.build();
Buffer configuration for high throughput data ingestion with high latency
// Initial chunk buffer size is 32MB
// Threshold chunk buffer size to flush is 256MB
// Threshold chunk buffer retention time to flush is 120 seconds
// Max total buffer size is 1024MB
// Sender's working buffer size 32KB
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setBufferChunkInitialSize(32 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
builder.setBufferChunkRetentionSize(256 * 1024 * 1024);
builder.setBufferChunkRetentionTimeMillis(120 * 1000);
builder.setSenderWorkBufSize(32 * 1024);
Fluency fluency = builder.build();
Retry configuration
// Max retries of sending events is 16
// Initial retry interval of sending events is 500 ms
// Retry backoff factor of sending events is 1.5
// Max retry interval of sending events is 20000 ms
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setSenderRetryMax(16);
builder.setSenderRetryIntervalMillis(500);
builder.setSenderRetryFactor(1.5f);
builder.setSenderMaxRetryIntervalMillis(20000);
Fluency fluency = builder.build();
Other configurations

Some of other usages are same as ingestion to Fluentd. See Ingestion to Fluentd > Usage above.

fluency's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

fluency's Issues

What is the minimum Wait values to flush

I have implemented code and it works OK.
I need to reduce execution time on my app and when added Fluency to my code it takes more time to complete.

I changed wait values as follows:

)
//   - Max wait until all buffers are flushed is 10 seconds
//   - Max wait until the flusher is terminated is 15 seconds
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setWaitUntilBufferFlushed(10);
builder.setWaitUntilFlusherTerminated(15);

Using this configuration Fluency doesn't log anything, I suppose this is because it needs more time, but how do I know what is the mรญnimum value that I may place? or arguments are in milliseconds?

Regards
Diego Barriguete

Use custom ObjectMapper

For using fluency in Scala, I need to have additional modules in ObjectMapper. For example,
https://github.com/FasterXML/jackson-module-scala
This module is useful for generating json (msgpack) log from Scala case classes.

In the current code, ObjectMapper is explicitly instantiated here:

return new ObjectMapper(new MessagePackFactory());

So we have no way to configure ObjectMapper modules.

@komamitsu
Do you have any idea on making it extensible? I think some Provider of ObjectMapper is necessary (as in Guice: https://github.com/google/guice/wiki/ProviderBindings)

And also using thread local storage as an ObjectMapper holder is not always a good idea, especially if we have hundreds of threads. Using some session-scoped singleton would be better.

Errors when one of multiple senders fails

We have configuration with multiple senders, but when one sender fails it switches to another one. Works perfectly, but it generates log entry with level=ERROR

As the message says 'Trying to use next sender...' it is not fatal, IMHO it should be level=WARN, only when all senders fail it should be level=error

Whole exception below:

2019-11-14 03:59:37.660 [pool-15-thread-1] ERROR org.komamitsu.fluency.sender.MultiSender - Failed to send: sender=TCPSender{config=Config{baseConfig=Config{baseConfig=Config{senderErrorHandler=null}, host='fluentd2', port=5500, connectionTimeoutMilli=5000, readTimeoutMilli=5000, heartbeaterConfig=Config{baseConfig=Config{host='fluentd2', port=5500, intervalMillis=1000}}, failureDetectorConfig=Config{failureIntervalMillis=3000}, failureDetectorStrategyConfig=org.komamitsu.fluency.sender.failuredetect.PhiAccrualFailureDetectStrategy$Config@53a86df0, waitBeforeCloseMilli=1000}}} NetworkSender{config=Config{baseConfig=Config{senderErrorHandler=null}, host='fluentd2', port=5500, connectionTimeoutMilli=5000, readTimeoutMilli=5000, heartbeaterConfig=Config{baseConfig=Config{host='fluentd2', port=5500, intervalMillis=1000}}, failureDetectorConfig=Config{failureIntervalMillis=3000}, failureDetectorStrategyConfig=org.komamitsu.fluency.sender.failuredetect.PhiAccrualFailureDetectStrategy$Config@53a86df0, waitBeforeCloseMilli=1000}, failureDetector=FailureDetector{failureDetectStrategy=PhiAccrualFailureDetectStrategy{failureDetector=org.komamitsu.failuredetector.PhiAccuralFailureDetector@438662bb} org.komamitsu.fluency.sender.failuredetect.PhiAccrualFailureDetectStrategy@3305a04f, heartbeater=TCPHeartbeater{config=Config{baseConfig=Config{host='fluentd2', port=5500, intervalMillis=1000}}} Heartbeater{config=Config{host='fluentd2', port=5500, intervalMillis=1000}}, lastFailureTimestampMillis=1573703977660, config=Config{failureIntervalMillis=3000}}} org.komamitsu.fluency.sender.TCPSender@47b1e00a. Trying to use next sender...
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
	at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
	at sun.nio.ch.IOUtil.write(IOUtil.java:148)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
	at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
	at org.komamitsu.fluency.sender.TCPSender.sendBuffers(TCPSender.java:63)
	at org.komamitsu.fluency.sender.TCPSender.sendBuffers(TCPSender.java:32)
	at org.komamitsu.fluency.sender.NetworkSender.sendInternal(NetworkSender.java:109)
	at org.komamitsu.fluency.sender.Sender.sendInternalWithRestoreBufferPositions(Sender.java:68)
	at org.komamitsu.fluency.sender.Sender.send(Sender.java:50)
	at org.komamitsu.fluency.sender.MultiSender.sendInternal(MultiSender.java:58)
	at org.komamitsu.fluency.sender.Sender.sendInternalWithRestoreBufferPositions(Sender.java:68)
	at org.komamitsu.fluency.sender.Sender.send(Sender.java:50)
	at org.komamitsu.fluency.sender.RetryableSender.sendInternal(RetryableSender.java:82)
	at org.komamitsu.fluency.sender.Sender.sendInternalWithRestoreBufferPositions(Sender.java:68)
	at org.komamitsu.fluency.sender.Sender.send(Sender.java:50)
	at org.komamitsu.fluency.buffer.PackedForwardBuffer.flushInternal(PackedForwardBuffer.java:320)
	at org.komamitsu.fluency.buffer.Buffer.flush(Buffer.java:113)
	at org.komamitsu.fluency.flusher.AsyncFlusher$1.run(AsyncFlusher.java:48)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
	at java.lang.Thread.run(Thread.java:748)

Issue when used with spring-cloud-sleuth.

Hello
When using fluency on a spring boot project in conjunction with spring-cloud-starter & a non available host the application startup hangs for a while without an apparent reason.
My guess is that during spring startup when a buffer flush is being triggered something happens in between and blocks the execution of the rest of the startup.

Any help will be appreciated.

EventTime: any reason seconds isn't an int?

Thanks for a great project.

What is the reason for seconds in EventTime to be a 64-bit-long-that-is-only-valid-if-in-fact-it-is-a-32-bit-int rather than just an int? I assume it's performance related?

setSslEnabled(false) is not effective

new FluencyBuilderForFluentd().setSslEnabled(false)

Even if we add this config, Fluency (2.0.0 or 2.1.0) still tries to use SSL and produces unredable data for fluentd:

2019-03-10 11:08:40.804-0700  warn [RetryableSender] Sender failed to send data. sender=RetryableSender{baseSender=SSLSender{config=Config{host='127.0.0.1', port=63224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}} NetworkSender{config=Config{host='127.0.0.1', port=63224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}, failureDetector=null} org.komamitsu.fluency.fluentd.ingester.sender.SSLSender@8b1cbce, retryStrategy=ExponentialBackOffRetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}} RetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}}, isClosed=false} org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender@553de17d, retry=4
java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at org.komamitsu.fluency.fluentd.ingester.sender.SSLSocketBuilder.build(SSLSocketBuilder.java:53)
	at org.komamitsu.fluency.fluentd.ingester.sender.SSLSender.getOrCreateSocketInternal(SSLSender.java:68)
	at org.komamitsu.fluency.fluentd.ingester.sender.SSLSender.getOrCreateSocketInternal(SSLSender.java:30)
	at org.komamitsu.fluency.fluentd.ingester.sender.NetworkSender.getOrCreateSocket(NetworkSender.java:75)
	at org.komamitsu.fluency.fluentd.ingester.sender.NetworkSender.sendInternal(NetworkSender.java:100)
	at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)
	at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendWithAck(FluentdSender.java:62)
	at org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender.sendInternal(RetryableSender.java:80)
	at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)
	at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendWithAck(FluentdSender.java:62)
	at org.komamitsu.fluency.fluentd.ingester.FluentdIngester.ingest(FluentdIngester.java:79)
	at org.komamitsu.fluency.buffer.Buffer.flushInternal(Buffer.java:358)
	at org.komamitsu.fluency.buffer.Buffer.flush(Buffer.java:112)
	at org.komamitsu.fluency.flusher.AsyncFlusher.lambda$new$0(AsyncFlusher.java:45)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)```

MultiSender only removes senders, never adds them back when available

We run fluent-bit as a daemon set in kubernetes with a service in front of them. To make use of the multiple instances we use MultiSender in fluency. Sometimes when we do rolling upgrades or there is an issue further down in the log chain we will get "Heartbeater - ping(): failed". It seems like once this happens the sender is marked as unavailable and never considered again. Once this happens for all senders we get "AllNodesUnavailableException: All nodes are unavailable". Fluency never recovers from this state even though all fluent-bit instances have recovered.

Use of EventTime result in errors

Hi,

I'm currently trying to figure out how to get log event with millisecond precision with fluentd.
I see that you offer this feature with the use of EventTime object as argument of emit() method.

I tried it but i get errors from fluent forward agent (from docker image https://hub.docker.com/r/fluent/fluentd/ latest version):

2017-11-30 20:45:11 +0000 [warn]: emit transaction failed: error_class=MessagePack::UnknownExtTypeError error="unexpected extension type" location="/usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.41/lib/fluent/event.rb:149:in `feed_each'" tag="debug"
  2017-11-30 20:45:11 +0000 [warn]: suppressed same stacktrace
2017-11-30 20:45:11 +0000 [error]: forward error error=#<MessagePack::UnknownExtTypeError: unexpected extension type> error_class=MessagePack::UnknownExtTypeError
  2017-11-30 20:45:11 +0000 [error]: suppressed same stacktrace
2017-11-30 20:45:11 +0000 fluent.warn: {"error_class":"MessagePack::UnknownExtTypeError","error":"unexpected extension type","location":"/usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.41/lib/fluent/event.rb:149:in `feed_each'","tag":"debug","message":"emit transaction failed: error_class=MessagePack::UnknownExtTypeError error=\"unexpected extension type\" location=\"/usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.41/lib/fluent/event.rb:149:in `feed_each'\" tag=\"debug\""}
2017-11-30 20:45:11 +0000 fluent.error: {"error":"#<MessagePack::UnknownExtTypeError: unexpected extension type>","error_class":"MessagePack::UnknownExtTypeError","message":"forward error error=#<MessagePack::UnknownExtTypeError: unexpected extension type> error_class=MessagePack::UnknownExtTypeError"}
2017-11-30 20:45:11 +0000 [warn]: no patterns matched tag="fluent.error"

I'm using the version 1.6.0 and calling emit like this:

...
EventTime eventTime = EventTime.fromEpochMilli(System.currentTimeMillis());
fluency.emit(tag == null ? "" : tag, eventTime, data);
...

If i don't use EventTime, everything seem alright.

Too many open file error (Tomcat), if fluentd server is down

Config

	FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
	builder.setBufferChunkInitialSize(16 * 1024 * 1024);
	builder.setBufferChunkRetentionSize(64 * 1024 * 1024);
	builder.setMaxBufferSize(1024 * 1024 * 1024L);
	builder.setFlushIntervalMillis(10000);
	fluency = builder.build("xxxxxxxxx", 24224);

Trace:

2019-05-09 10:40:16,588 WARN [pool-2-thread-1] o.k.f.f.i.s.RetryableSender [RetryableSender.java:86] Sender failed to send data. sender=RetryableSender{baseSender=TCPSender{config=Config{host='xxxxxxxxxxxx', port=24224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}} NetworkSender{config=Config{host='xxxxxxxxxxxx', port=24224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}, failureDetector=null} org.komamitsu.fluency.fluentd.ingester.sender.TCPSender@2e130737, retryStrategy=ExponentialBackOffRetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}} RetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}}, isClosed=false} org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender@25eac167, retry=3
java.net.SocketException: Too many open files
at sun.nio.ch.Net.socket0(Native Method)
at sun.nio.ch.Net.socket(Net.java:411)
at sun.nio.ch.Net.socket(Net.java:404)

sync delivery of log messages

Motivation
Log events in regulated environments contain not only details about an application's behaviour, but also auditing relevant information. An audit log event must not be lost and needs therefore be written synchronously.

Writing synchronously to a local file is not an option, because the applications are executed on a container platform. That means if the container is destroyed for whatever reason, the logfile would be lost.

Fluentd already offers an at-least-once semantic if ack responses are enabled. Therefore I want to re-use that mechanism for moving audit logs outside of the container.

Current behaviour
Fluency's current implementation supports Fluentds ack feature, but only asynchronous buffering of log events. The Fluency's Flusher writes buffered log events periodically to Fluentd. Though, log events can be lost between buffering and flushing.

Expected behaviour
Fluency can be optionally configured not to buffer (audit) log events, but write them directly to Fluentd.

Discussion: Solution
In a very first Proof-of-Concept implementation I've refactored a Buffer interface and implemented a SyncBuffer, which writes unbuffered to the Ingester. That approach basically solves my requirement.

However, this implementation needs some more love. Before I invest the time I want to know if there is any chance to get such a feature with a PR into Fluency (again #64)?

Why I just get three loggers, but I can get more by using consoleAppender

when using FluentLogbackAppender, I get just three loggers:
2018-02-06 04:30:24.000000000 -0500 fmshiot.bootstrap: {"msg":"[192.168.39.20] [port_IS_UNDEFINED] [INFO] [bootstrap] [] [] [] [] [8740] [main] [o.s.c.a.AnnotationConfigApplicationContext] [Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@55b0dcab: startup date [Tue Feb 06 17:30:24 CST 2018]; root of context hierarchy] []"}
2018-02-06 04:30:25.000000000 -0500 fmshiot.bootstrap: {"msg":"[192.168.39.20] [port_IS_UNDEFINED] [INFO] [bootstrap] [] [] [] [] [8740] [main] [o.s.b.f.a.AutowiredAnnotationBeanPostProcessor] [JSR-330 'javax.inject.Inject' annotation found and supported for autowiring] []"}
2018-02-06 04:30:25.000000000 -0500 fmshiot.bootstrap: {"msg":"[192.168.39.20] [port_IS_UNDEFINED] [INFO] [bootstrap] [] [] [] [] [8740] [main] [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] [Bean 'configurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$798dd570] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)] []"}

but I can get more when using ConsoleAppender, as follows:
2018-02-06 17:30:24.789 [main] INFO o.s.c.a.AnnotationConfigApplicationContext - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@55b0dcab: startup date [Tue Feb 06 17:30:24 CST 2018]; root of context hierarchy
2018-02-06 17:30:25.127 [main] INFO o.s.b.f.a.AutowiredAnnotationBeanPostProcessor - JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
2018-02-06 17:30:25.188 [main] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'configurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$798dd570] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)

. ____ _ __ _ _
/\ / ' __ _ () __ __ _ \ \ \
( ( )_
_ | '_ | '| | ' / ` | \ \ \
\/ )| |)| | | | | || (| | ) ) ) )
' |
| .__|| ||| |_, | / / / /
=========|
|==============|/=////
:: Spring Boot :: (v1.5.7.RELEASE)

2018-02-06 17:30:25.696 [main] INFO c.f.f.registry.RegistryApplication - No active profile set, falling back to default profiles: default
2018-02-06 17:30:25.713 [main] INFO o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext - Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@58c34bb3: startup date [Tue Feb 06 17:30:25 CST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@55b0dcab
2018-02-06 17:30:26.793 [main] INFO o.s.cloud.context.scope.GenericScope - BeanFactory id=737bdab9-1582-3bc0-9e4b-54df761c6ee9
2018-02-06 17:30:26.816 [main] INFO o.s.b.f.a.AutowiredAnnotationBeanPostProcessor - JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
2018-02-06 17:30:26.877 [main] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.cloud.netflix.metrics.MetricsInterceptorConfiguration$MetricsRestTemplateConfiguration' of type [org.springframework.cloud.netflix.metrics.MetricsInterceptorConfiguration$MetricsRestTemplateConfiguration$$EnhancerBySpringCGLIB$$8fa078b4] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-02-06 17:30:26.888 [main] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$798dd570] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-02-06 17:30:27.224 [main] INFO o.s.b.c.e.t.TomcatEmbeddedServletContainer - Tomcat initialized with port(s): 9080 (http)
2018-02-06 17:30:27.237 [main] INFO o.a.catalina.core.StandardService - Starting service [Tomcat]
2018-02-06 17:30:27.238 [main] INFO o.a.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.20
2018-02-06 17:30:27.384 [localhost-startStop-1] INFO o.a.c.c.C.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
2018-02-06 17:30:27.384 [localhost-startStop-1] INFO o.s.web.context.ContextLoader - Root WebApplicationContext: initialization completed in 1671 ms
2018-02-06 17:30:28.291 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'metricsFilter' to: [/]
2018-02-06 17:30:28.292 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'characterEncodingFilter' to: [/
]
2018-02-06 17:30:28.292 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'hiddenHttpMethodFilter' to: [/]
2018-02-06 17:30:28.292 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'httpPutFormContentFilter' to: [/
]
2018-02-06 17:30:28.292 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'requestContextFilter' to: [/]
2018-02-06 17:30:28.292 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'webRequestTraceFilter' to: [/
]
2018-02-06 17:30:28.292 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'servletContainer' to urls: [/eureka/]
2018-02-06 17:30:28.293 [localhost-startStop-1] INFO o.s.b.w.s.FilterRegistrationBean - Mapping filter: 'applicationContextIdFilter' to: [/
]
2018-02-06 17:30:28.293 [localhost-startStop-1] INFO o.s.b.w.s.ServletRegistrationBean - Mapping servlet: 'dispatcherServlet' to [/]
2018-02-06 17:30:28.381 [localhost-startStop-1] INFO c.s.j.s.i.a.WebApplicationImpl - Initiating Jersey application, version 'Jersey: 1.19.1 03/11/2016 02:08 PM'
2018-02-06 17:30:28.451 [localhost-startStop-1] INFO c.n.d.p.DiscoveryJerseyProvider - Using JSON encoding codec LegacyJacksonJson
2018-02-06 17:30:28.453 [localhost-startStop-1] INFO c.n.d.p.DiscoveryJerseyProvider - Using JSON decoding codec LegacyJacksonJson
2018-02-06 17:30:28.662 [localhost-startStop-1] INFO c.n.d.p.DiscoveryJerseyProvider - Using XML encoding codec XStreamXml
..........

Copyright and license in the class files headers

Hi Mitsunori Komatsu,

We would very much like to use your project as 3rd-party library, but we can't for the law reasons - a missing copyright and license header in the class files.
Could you please add them to a new minor release?

Heartbeater is disabled by default although doc says the opposite

I was a little bit confused by thinking that the Heartbeat functionality is enabled by default (as the Readme.md) says, but when I did some investigation i found out that the construction is disabled by default:

private Heartbeater.Instantiator heartbeaterConfig; // Disabled by default
--> org.komamitsu.fluency.sender.TCPSender.Config (line 225)

Is there a technical background for that?

Support sync flush in async flusher

The idea would be a max buffer size (or time delay) before the async flusher switches to a sync flush upon an emit() and blocks return until the pending buffer size decreases below a defined threshold.

The scenario I ran into was processing a large file (several million lines, each converted into a single emit). I was processing data faster then the flusher was flushing it in the background and hence quickly ran into OOM issues (this is a small footprint app).

While (I think) the SyncFlusher has this type of check built in the SyncFlusher also has the issue that widely spaced emit calls could lead to a delay in a message send (ie push a bunch of messages in quickly yet still under the flush interval and buffer threshold and then stop calling emit()). Those messages will sit there forever until another emit() occurs.

Pass log payload into custom error handler

The data that could not be logged is available in the same context where the CustomErrorHandler is called.
It would be great to pass the payload into to the error handler.

My use case is to try to emit via Fluency, if that fails, then log the same data to system journald.
So when Fluency reports an error, I need to also grab the payload so I can try again with Journald.

Thanks.

Fluency supports an option to save remaining memory buffer to file when stopping

Even with ACK response mode enabled, Fluency can lost buffered data when Fluency#close method is called, but Fluentd is down and Fluency retries over.

In this case, the following option sounds helpful to avoid data lost:

  1. Fluency saves buffered memory data to a specified file path if the option is enabled when it gives up flushing the data after Fluency#close method is called
  2. Fluency loads the file to its memory buffer when launching if the option is enabled

How to synchronous flush to Fluentd ?

Hi, I was looking for the way to activate synchronous flush to Fluentd and it did not find how in the documentation nor by looking at the Config class.

Could you please tell me what configuration controls that ?

Thank you.
Guillaume

Architectural Doubts

Hi!

I don't know if this is the best place to ask such questions, please let me know if there's a better place.
I've considered Fluentd as a key component of an audit architecture design. I have some questions regarding fluency/fluent-logger-java, but let me first explain the context.

I need to collect logs in many services, so to easy the developers lives I created a library to abstract most work related to format output logs and validate data, besides connection to Fluentd server.

arquitetura-auditoria-Page-6

The service can programmatically call audit in the lib or rely on automatic capture of events, and in the library both strategies send the final formatted data to fluentd client (in this diagram fluent-logger-java, as it was my first option at the time). The client sends data to fluentd server, that sends logs to S3 and so on.

In a deployment point of view, I would like to have this:

arquitetura-auditoria-Page-7

Services running on EC2 would call a Fluentd cluster, whereas microservices would rely on a side-car instance of fluentd, so I could distribute load.

Questions

Given the above context, I have some questions.

  1. Initially I've tested with TPC input on port 24224, but when deploying Fluentd in Kubernetes we had to use in_http Input Plugin, and I noticed that fluent-logger-java didn't support such communication, so I'd like to know: is there an option for such input option with fluency?

  2. Considering the implementation of my library still relies on fluent-logger-java (I'm considering to change to Fluency), is it ok to create a FluentLogger for each log operation, closing it at the end? Notice that I even create a new thread to do the call, so I don't lock the current thread if calling Fluentd has a high timeout. Is this the best way to do this with Fluency?

public class FluentdAuditLogWriter implements AuditLogWriter {

    private final AuditConfiguration auditConfiguration;

    @Override
    public boolean audit(AuditRecordMap auditRecordMap) {

        if(!hasValidaData(auditRecordMap) || !hasRequiredParameters())
            return false;

        FluentdLogWorker worker = new FluentdLogWorker(
                this.auditConfiguration.getApplicationName(),
                this.auditConfiguration.getFluentdServerHost(),
                this.auditConfiguration.getFluentdServerPort(),
                this.auditConfiguration.getFluentdTagPrefix(),
                this.auditConfiguration.getFluentdTag(),
                auditRecordMap);

        new Thread(worker).start();

        return true;
    }

    private boolean hasValidaData(AuditRecordMap auditRecordMap){
        ...
    }

    private boolean hasRequiredParameters(){
        ...
    }

    class FluentdLogWorker implements Runnable {

        private final String tagPrefix;
        private final String serverHost;
        private final String tag;
        private final int serverPort;
        private String applicationName;
        private final AuditRecordMap auditRecordMap;

        FluentdLogWorker(String applicationName, String serverHost, int serverPort, String tagPrefix, String tag,
                         AuditRecordMap auditRecordMap){
            this.applicationName = applicationName;
            this.serverHost = serverHost;
            this.serverPort = serverPort;
            this.tagPrefix = tagPrefix;
            this.tag = tag;
            this.auditRecordMap = auditRecordMap;
        }

        public void run() {
            FluentLogger logger = null;
            try{
                logger = FluentLogger.getLogger(this.tagPrefix,
                        this.serverHost, this.serverPort);
                logger.log(this.tag, auditRecordMap.getAuditRecordMap());
            } catch(Throwable t){
                log.error("[AUDIT] Error logging audit data (FluentdLogWorker)! Details: {}", t.getMessage());
            } finally {
                if(logger != null)
                    logger.close();
            }
        }
    }
}
  1. What is the best way to handle connection erros with Fluency?

  2. What's the recommended way to backup data in local disk when communication erros occur and the way to send it later?

  3. My intention is to have some instances of Fluentd behind a load balancer and make Fluency communicate with LB. Is that possible, or just with TCP and failover configuration?

What happens when Fluentd Server is down.?

We are planning to integrate Fluentd with our services. So Logging across the services is always been the challenge.

As a first step, Now we are planning to integrate Fluentd with all our micro-services. While reading about the limitation of fluent-logger we have decided to go with Fluency Thanks a lot for an awesome project.

Since we are interested in integrate Fluency with our services, We would like to know more about Fluency. Like the main point is,

-What happens if the Fluentd Server is down.?
-Does it has a buffer.? if yes what is the default size.? and how do we specify it.?
-How does the communication between fluency and fluentd-server communication been managed.?
-Would it be a bottleneck for the application performance if we have not taken care of any particular configuration?

Currently we are using logback.xmls across all over the services. We are looking an approach where we could just add a fluentd appender in it and Fluency dependencies in the pom.xml.

Could you please help by providing necessary details.?

Thanks

TLS issues, possible Bug

Hi,

SSLSocketBuilder.java

SSLContext sslContext = SSLContext.getInstance(SSL_PROTOCOL); sslContext.init(null, null, new SecureRandom());

Why are you creating a new context instead of using the default one ? The above sets the keystore and truststore to null so we are unable to pass them in from the command line.

Fluency : Sender failed to send data

I am getting the following error from Fluency.

thread-1,logger:o.k.fluency.sender.RetryableSender,message:Sender failed to send data. sender=RetryableSender{baseSender=TCPSender{config=Config{baseConfig=Config{baseConfig=Config{senderErrorHandler=null}, host=td-agent.ap-northeast-1.store-inventory, port=24224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, heartbeaterConfig=null, failureDetectorConfig=Config{failureIntervalMillis=3000}, failureDetectorStrategyConfig=org.komamitsu.fluency.sender.failuredetect.PhiAccrualFailureDetectStrategyConfig@226c5101, waitBeforeCloseMilli=1000}}}

Not sure what is the exact reason for this.
Here do we have any chance of log missing and also is there a way to turn off this warning log?

After a long period of unavailability of fluentd, when fluentd becomes available, fluency stops sending logs without any exceptions

Hello! I have some troubles with using fluency with logback-more-appenders (https://github.com/sndyuk/logback-more-appenders/blob/master/src/test/resources/logback-appenders-fluentd.xml) with same logback configuration.
I run my spring boot application in docker container and when fluend (td-agent) is available it seems no problem, but when I decided to try to test how this solution will behave if fluentd is unavailable for a long time,I saw that when enabling fluentd, the buffer from fluency sended to fluentd, but after that no new log was sent from the application without any errors on the part of fluency. Why can this happen? Thanks!

What does setSenderMaxRetryCount ?

Hi there, my config is:
fluency = Fluency.defaultFluency(MyConstants.FLUENTD_IP, MyConstants.FLUENTD_PORT,
new Fluency.Config()
.setSenderMaxRetryCount(0)
.setSenderErrorHandler(new SenderErrorHandler() {
@OverRide
public void handle(Throwable e) {
System.out.println("error");
fluentdActive = false;
}
})
);

when i set setSenderMaxRetryCount =1, it keeps printing out "error" so many times, but when i set setSenderMaxRetryCount , it only printout just 1 times. I test with 1 array contains only 1 element.

Fluency fails with SSLException: Broken pipe

Hi,

I am running into issues sending logs using TCP forward to fluentd to a fluentd cluster setup in AWS:

AWS Network load balancer (NLB) with TLS protocol with a self-signed certificate for *.us-east-1.lb.cvent.com
The fluentd cluster itself sits behind the NLB and runs in a Docker container in ECS. The NLB terminates SSL and is supposed to forward requests to the fluentd cluster.

The exception I am getting is below:

Here's how I have built the fluency client:

        FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
        builder.setBufferChunkInitialSize((int) (FileUtils.ONE_MB / 2));
        builder.setBufferChunkRetentionSize((int) (1 * FileUtils.ONE_MB));
        builder.setMaxBufferSize(2 * FileUtils.ONE_MB);
        builder.setBufferChunkRetentionTimeMillis((int) TimeUnit.SECONDS.toMillis(1));
        builder.setJvmHeapBufferMode(true);
        builder.setSenderMaxRetryCount(1);
        builder.setSslEnabled(true);
//        builder.setAckResponseMode(true);
        builder.setConnectionTimeoutMilli((int) TimeUnit.MINUTES.toMillis(2));
        builder.setReadTimeoutMilli((int) TimeUnit.MINUTES.toMillis(2));

        return builder.build("fluentd-sandbox.us-east-1.lb.cvent.com", 24224);
    }
{
	"thread": "pool-4-thread-1",
	"message": "Failed to send 1071 bytes data",
	"level": "ERROR",
	"timestamp": "2020-02-24T16:48:00.654Z",
	"logger": "org.komamitsu.fluency.fluentd.ingester.sender.NetworkSender"
} {
	"exception": "java.net.SocketException: Broken pipe (Write failed)\n\tat java.base/java.net.SocketOutputStream.socketWrite0(Native Method)\n\tat java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)\n\tat java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)\n\tat java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:320)\n\tat java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:983)\n\t... 16 common frames omitted\nWrapped by: javax.net.ssl.SSLProtocolException: Broken pipe (Write failed)\n\tat java.base/sun.security.ssl.Alert.createSSLException(Alert.java:126)\n\tat java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321)\n\tat java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)\n\tat java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:259)\n\tat java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:988)\n\tat java.base/java.io.OutputStream.write(OutputStream.java:122)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.SSLSender.sendBuffers(SSLSender.java:87)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.SSLSender.sendBuffers(SSLSender.java:31)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.NetworkSender.sendInternal(NetworkSender.java:102)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.send(FluentdSender.java:56)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender.sendInternal(RetryableSender.java:77)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)\n\tat org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.send(FluentdSender.java:56)\n\tat org.komamitsu.fluency.fluentd.ingester.FluentdIngester.ingest(FluentdIngester.java:87)\n\tat org.komamitsu.fluency.buffer.Buffer.flushInternal(Buffer.java:357)\n\tat org.komamitsu.fluency.buffer.Buffer.flush(Buffer.java:112)\n\tat org.komamitsu.fluency.flusher.Flusher.runLoop(Flusher.java:66)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n",
	"thread": "pool-4-thread-1",
	"message": "Sender failed to send data. sender=RetryableSender{baseSender=SSLSender{config=Config{host='fluentd-sandbox.us-east-1.lb.cvent.com', port=24224, connectionTimeoutMilli=120000, readTimeoutMilli=120000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}} NetworkSender{config=Config{host='fluentd-sandbox.us-east-1.lb.cvent.com', port=24224, connectionTimeoutMilli=120000, readTimeoutMilli=120000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}, failureDetector=null} org.komamitsu.fluency.fluentd.ingester.sender.SSLSender@695a4ee9, retryStrategy=ExponentialBackOffRetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=1}} RetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=1}}, isClosed=false} org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender@52e856da, retry=0",
	"level": "WARN",
	"timestamp": "2020-02-24T16:48:00.658Z",
	"logger": "org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender"
}

I am happy to provide more info as necessary. I am not sure what I am doing wrong with the setup.

incorrect maxBufferSize comparison in BufferPool

The comparison against maxBufferSize in BufferPool is reporting the buffer as full, though there is one more increase possible.

A good example for this can be seen in testBufferFullException.

initialSize: 64
maxBufferSize: 256

The loop is adding 7 elements, 17 bytes each.

This results in: PackedForwardBuffer writing: 17 - size: 128 - remaining: 9

The test then adds one more, which triggers a BufferFullException, which is incorrect at that point. The current buffer is 128 bytes and could be doubled once more to the configured 256 bytes.

It seems, that the increase to 128 pushes allocatedSize to 192 at this point: BufferPool.java#L62

From what i've understood is, that each increase will allocate and return a new and empty buffer, which is then filled with the data from the previous buffer. However i do not understand, why allocatedSize is increased there to the combined sizes of the previous buffers. Basically this means, that you can never use more than 50% of maxBufferSize.

Support to HTTP Input

I've searched a bit and couldn't find an answer to this.
Does fluency (or even fluent-logger-java) support sending logs to http endpoint, when fluentd server has in_http Input Plugin enabled?

Logback.xml Sample

Hi,
I am currently using sl4j in my spring boot micro services. I would like to integrate fluentd with my services.Currently I am using fluent-logger in my logback xml as an appender and propagate the logs to Fluentd. Since it does not support SSL, I would like to explore Fluency. Can Fluency be defined as an appender in my logback.xml with ssl enabled.?

Possible Memory Leak in AsyncFlusher

We notice a massive amount on recorded objects of the AsynchFlusher while using it as a LoggingAdapter.

We use the 0.0.8 maven artifact for an application that has not more than 10 messages per minute output.

Fluency is constructed using org.komamitsu.fluency.Fluency.defaultFluency(String host, int port) throws IOException

We counted the instances using YourKit:

bildschirmfoto_2016-03-21_17-04-39

Many ESTABLISHED sockets on server sidew hile using fluency

Created this issue for public knowledge, after having mail contact with komamitsu:

The problem we currently notice is that the server has a rising number of ESTABLISHED sockets to the hosts with the JVM, that log with fluency. All sockets are connected to the in_forward.
On the server side we use the td-agent 2.3.1 on a Ubuntu 14.04 LTS.

The clients still have only one ESTABLISHED. The cluent fluency version is the 0.0.9 artifact from the mvn repo.

The solution is to restart the td-agent every 2 days as it will crash because of reaching the ulimit.

We are instantiating the fluency connection like this:

Fluency logger = Fluency.defaultFluency(config.getServerIp(), config.getServerPort());

This logger instance is accessible via a static Holder instance. So it's just one instance per JVM.

The only methods that are invoked on this object are (This is called via a Runtime ShutdownHook):

logger().close();
                for (int i = 0; i < 5; i++)
                {
                    if (logger().isTerminated())
                    {
                        break;
                    }
                    TimeUnit.SECONDS.sleep(5);
                }

The other call is:

Map<String, Object> log = new HashMap<String, Object>();
log.put(PROCESS_PROP, processIdentifier);
log.put(HOST_PROP, hostAlias);
log.put(ENVIRONMENT_PROP, environment);
log.put(MESSAGE_PROP, String.format(format, objects));
logger().emit(getName(), log);
//whereas getName() returns a string

The server-config of the td-agent is:

 <source>
    @type forward
    port 24224
  </source>
  <match **>
    type elasticsearch
    host localhost
    port 9200
    include_tag_key true
    logstash_format true
    flush_interval 3s
  </match>

Logger prevents application from terminating

The following code successfully sends a message to Fluentd and prints "OK." but prevents my application from exiting after this:

package org.komamitsu.fluency;

import java.io.IOException;
import java.util.HashMap;

public class FluentdTest {
    public static void main(String[] args) {
        Fluency fluency = null;
        try {
            fluency = Fluency.defaultFluency("localhost", 24224,
                    new Fluency.Config()
                            .setBufferChunkInitialSize(4 * 1024 * 1024)
                            .setBufferChunkRetentionSize(16 * 1024 * 1024)
                            .setMaxBufferSize(256 * 1024 * 1024L));
            String tag = "log";
            HashMap<String, Object> event = new HashMap<String, Object>();
            event.put("name", "komamitsu");
            event.put("age", 42);
            event.put("rate", 3.14);
            fluency.emit(tag, event);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("OK.");
    }
}

Am I doing something wrong?

Writing logs to a normal file when the Fluentd is down.

Hi,
I have been using SL4j with log appenders so far to write my logs. When I integrated fluentd I have commended the appenders which was being writing logs to the files. I was wondering whether I could write the logs as it used to be when fluentd is down. so that I will have a backup and no need to worry about losing the data..
Please let me know your thoughts..

Add an emit method to output MessagePack encoded data directly

The current emit method requires using Map<String, Object>, and then msgpack-jackson encodes this Map value into a msgpack byte array:
https://github.com/komamitsu/fluency/blob/master/src/main/java/org/komamitsu/fluency/buffer/PackedForwardBuffer.java#L143

If the application can generate msgpack map<string, msgpack value> data in advance, this step can be optimized. For example, we can write a converter of zipkin or opentracing data into msgpack, then use Fluency for sending these application tracing data somewhere through fluentd.

@komamitsu
How about adding such low-level advanced methods like this?
emit(String tag, byte[] mapValue)
emit(String tag, byte[] mapValue, int offset, int len)
emit(String tag, ByteBuffer mapValue)

Fluent-Bit Buffer Warning

I'm running into a constant exception as if fluency isn't respecting setBufferChunkRetentionSize. It starts out working fine but as i up the request volume it ramps up the exceptions.

Exception triggered in fluency when writing to fluent bit.

java.io.IOException: Connection reset by peer

in fluent bit

[2018/04/18 23:13:41] [ warn] [in_fw] fd=18 incoming data exceed limit (32768 bytes)

fluency settings

    .setFlushIntervalMillis(100)
    .setBufferChunkRetentionSize(1*1024 * 1024)
    .setBufferChunkInitialSize(2*1024 * 1024)
    .setMaxBufferSize(32*1024*1024)

ClassNotFoundException: org.komamitsu.failuredetector.PhiAccuralFailureDetector$Builder

Hi I tried to create a jar for this project. By cloning this and importing it through eclipse as a gradle project. Then exporting it as a jar file so that I could use it for my other projects.

I was going to test on how this was used but when I tried to instantiate Fluency using Fluency.defaultFluency method. It caused a ClassNotFoundException exception. Where exactly is the org.komamitsu.failuredetector located? Pardon if I might just have done something wrong. If I might have done something wrong in the creation of the jar, would it be alright if you could assist me in making a jar for your project?

Return Error instead of Discarding Data

Scenario:

  1. Fluentd daemon is not running, so logger cannot connect.
  2. No file backup is configured, so filebackup == null
  3. Retries to connect to daemon have been exceeded so failure strategy is to save buffer to filebackup
  4. Here the logger silently discards the data. It should instead return IOException, so that application can handle the situation.

Similarly, here it simply logs an error with AsyncFlusher instead or actually returning the error

Sending to Treasure Data by passing MessagePack value fails

[error] java.lang.UnsupportedOperationException
[error]         at java.util.AbstractMap.put(AbstractMap.java:209)
[error]         at org.komamitsu.fluency.treasuredata.recordformat.TreasureDataRecordFormatter.addTimeColumnToMsgpackRecord(TreasureDataRecordFormatter.java:93)
[error]         at org.komamitsu.fluency.treasuredata.recordformat.TreasureDataRecordFormatter.formatFromMessagePack(TreasureDataRecordFormatter.java:108)
[error]         at org.komamitsu.fluency.buffer.Buffer.appendMessagePackMapValueInternal(Buffer.java:249)
[error]         at org.komamitsu.fluency.buffer.Buffer.appendMessagePackMapValue(Buffer.java:274)
[error]         at org.komamitsu.fluency.Fluency.lambda$emit$2(Fluency.java:89)
[error]         at org.komamitsu.fluency.Fluency$Emitter.emit(Fluency.java:57)
[error]         at org.komamitsu.fluency.Fluency.emit(Fluency.java:89)
[error]         at org.komamitsu.fluency.Fluency.emit(Fluency.java:95)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.