Giter VIP home page Giter VIP logo

fluent-logger-forward-node's Introduction

@fluent-org/logger

Build Status License Docs

Fluent Forward Protocol implementation for Node.js. Built upon fluent-logger-node.

NPM

Install

$ npm install @fluent-org/logger

Client

@fluent-org/logger provides a fully functional client that implements the Forward protocol. It supports reconnection, acknowledgements, timeouts, event retries, and more, and exposes its functionality via a simple typed Promise interface.

For a full list of the client options and methods, see the FluentClient docs

Prerequisites

The fluent daemon should be listening in forward mode.

A simple starting configuration for Fluentd is the following:

<source>
  @type forward
  port 24224
</source>

<match **.*>
  @type stdout
</match>

See the FluentD docs for more info.

A similar starting configuration for Fluent Bit is the following:

[INPUT]
    Name              forward
    Listen            0.0.0.0
    Port              24224
    Buffer_Chunk_Size 1M
    Buffer_Max_Size   6M

[OUTPUT]
    Name   stdout
    Match  *

See the Fluent Bit docs for more info.

Sending an event record to an upstream Fluent server

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
  }
});

The emit method has following signature

emit(data: Record<string, any>): Promise<void>;
emit(data: Record<string, any>, timestamp: number | Date | EventTime): Promise<void>;
emit(label: string, data: Record<string, any>): Promise<void>;
emit(label: string, data: Record<string, any>, timestamp: number | Date | EventTime): Promise<void>;

The returned Promise is resolved once the event is written to the socket, or rejected if an error occurs.

Acknowledgements

The Fluent forward protocol provides explicit support for acknowledgements, which allow the client to be sure that the event reached its destination.

Enabling acknowledgements means that the promise returned by emit will be resolved once the client receives an explicit acknowledgement from the server.

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  ack: {}
});

Event modes

The Fluent forward protocol provides multiple message modes, Message, Forward, PackedForward(default), CompressedPackedForward. The Fluent client supports all of them.

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  eventMode: "Message" | "Forward" | "PackedForward" | "CompressedPackedForward"
});

Disable automatic reconnect

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
    disableReconnect: true
  }
});
// If you disable reconnections, the socket has to be manually connected, 
// connect() returns a promise, which rejects on connection errors.
logger.connect();

Shared key authentication

Logger configuration:

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
  },
  security: {
    clientHostname: "client.localdomain",
    sharedKey: "secure_communication_is_awesome"
  }
});

Fluentd configuration:

<source>
  @type forward
  port 24224
  <security>
    self_hostname input.testing.local
    shared_key secure_communication_is_awesome
  </security>
</source>

<match dummy.*>
  @type stdout
</match>

See also the Fluentd examples.

TLS/SSL encryption

Logger configuration:

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
    tls: {
      ca: fs.readFileSync("/path/to/ca_cert.pem")
    },
  },
  security: {
    clientHostname: "client.localdomain",
    sharedKey: "secure_communication_is_awesome"
  },
});

Fluentd configuration:

<source>
  @type forward
  port 24224
  <transport tls>
    ca_cert_path /path/to/ca_cert.pem
    ca_private_key_path /path/to/ca_key.pem
    ca_private_key_passphrase very_secret_passphrase
  </transport>
  <security>
    self_hostname input.testing.local
    shared_key secure_communication_is_awesome
  </security>
</source>

<match dummy.*>
  @type stdout
</match>

FYI: You can generate certificates using the fluent-ca-generate command since Fluentd 1.1.0.

See also How to enable TLS/SSL encryption.

Mutual TLS Authentication

Logger configuration:

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
    tls: {
      ca: fs.readFileSync("/path/to/ca_cert.pem"),
      cert: fs.readFileSync("/path/to/client-cert.pem"),
      key: fs.readFileSync("/path/to/client-key.pem"),
      passphrase: "very-secret"
    },
  },
  security: {
    clientHostname: "client.localdomain",
    sharedKey: "secure_communication_is_awesome"
  }
});

Fluentd configuration:

<source>
  @type forward
  port 24224
  <transport tls>
    ca_path /path/to/ca-cert.pem
    cert_path /path/to/server-cert.pem
    private_key_path /path/to/server-key.pem
    private_key_passphrase very_secret_passphrase
    client_cert_auth true
  </transport>
  <security>
    self_hostname input.testing.local
    shared_key secure_communication_is_awesome
  </security>
</source>

<match dummy.*>
  @type stdout
</match>

EventTime support

We can also specify an EventTime as a timestamp. See the EventTime docs

const FluentClient = require("@fluent-org/logger").FluentClient;
const EventTime = require("@fluent-org/logger").EventTime;
const eventTime = new EventTime(1489547207, 745003500); // 2017-03-15 12:06:47 +0900
const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
  }
});
logger.emit("tag", { message: "This is a message" }, eventTime);

Handling errors

The Fluent client will manage errors internally, and reject promises on errors. If you"d like to access the non-user facing internal errors, you can do so by passing errorHandler

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  onSocketError: (err: Error) => {
    console.error("error!", err)
  }
});

Retrying events

Sometimes it makes sense to resubmit events if their initial submission failed. You can do this by specifying eventRetry.

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  eventRetry: {}
});

Server

@fluent-org/logger includes a fully functional forward server which can be used as a downstream Fluent sink.

const FluentServer = require("@fluent-org/logger").FluentServer;

const server = new FluentServer({ listenOptions: { port: 24224 }});

await server.listen();

Fluentd config:

<match pattern>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s

  <server>
    name fluent_node 
    host 127.0.0.1
    port 24224
    weight 60
  </server>

  <secondary>
    @type file
    path /var/log/fluent/forward-failed
  </secondary>
</match>

See the FluentD docs for more info.

Alternatively, see the Fluent Bit docs for info on setting up Fluent Bit.

For a full list of the server options and methods, see the FluentServer docs

License

Apache License, Version 2.0.

About NodeJS versions

This package is compatible with NodeJS versions >= 12.

fluent-logger-forward-node's People

Contributors

ffuentes avatar jamiees2 avatar raytung avatar rganatra-kasada avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

fluent-logger-forward-node's Issues

Node process doesn't exit if the destination is not available

I stopped the fluentd server and tried the below code with node

const FluentClient = require("@fluent-org/logger").FluentClient;

const flDlogger = new FluentClient("test", {
    eventMode: "Message",
    socket: {
        host: "localhost",
        port: 24224,
        timeout: 3000,
    }
});
flDlogger.emit({}).then(() => { console.log("Sent Message Successfully")}).catch((e) => { console.log(e);})

The node process did not timeout/quit until the server becomes available, is there any flag/way to exit the node process if the destination becomes unavailable, i want to try this in AWS lambda and if the destination is not available then the lambda may run always.

Support keep alive property

Hey ๐Ÿ‘‹

There was a change to Fluentd's out_forward awhile ago to support keep alive behaviour fluent/fluentd#2393. Is there any chance of implementing this in this client as well? The use case is identical to the original issue in fluent/fluentd#2188 where we'd like our clients to rebalance its connections over to newly spawned fluentd replicas.

MaxListenersExceededWarning: Possible EventEmitter memory leak detected

Hi folks,

First of all, I want to say thank you for the project and I want to ask about an advice.

Time to time I face the error 'MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added to [FluentSocket]. Use emitter.setMaxListeners() to increase limit' and my config is quite straightforward.

this.logger = new FluentClient('ld', { socket: { host: url.hostname, port: parseInt(url.port, 10), }, })

Could you advice what could be a problem?

Proposal: reject connect() on authentication error

Hi, I have a use case where authentication is required:

  const logger = new FluentClient("output", {
    socket: {
      host: "fluentd.local",
      port: 24224,
      timeout: 3000,
      disableReconnect: true,
      tls: {
        ca: require('fs').readFileSync('./fluentd.crt'),
      },
    },
    security: {
      clientHostname: "client.local",
      sharedKey: "incorrect",
      username: "ooo",
      password: "xxx"
    },
  });

  try {
    await logger.connect();
  } catch (err) {
    /* do something with err */
  }

When security is not correctly configured, connect() still resolves.

Though the error could be detected by logger.socketOn('error'), it's not synchronous with connect() and some mechanism to wait for the error (or to ensure no error thrown) is required.

If connect() can reject on error, it would be more easy to determine whether the client is authenticated successfully.

unhandledRejection in `flush` method

When testing in in our production environment I noticed unhandledRejection with follow stack trace logged.

It seems this is around here

I also noticed EPIPE errors but the stack trace is not very helpful to pinpoint where exactly is wrong.

Error: write EPIPE
    at afterWriteDispatched (internal/stream_base_commons.js:156:25)
    at writeGeneric (internal/stream_base_commons.js:147:3)
    at Socket._writeGeneric (net.js:787:11)
    at Socket._write (net.js:799:8)
    at doWrite (_stream_writable.js:403:12)
    at writeOrBuffer (_stream_writable.js:387:5)
    at Socket.Writable.write (_stream_writable.js:318:11)
    at /srv/test/node_modules/@fluent-org/logger/build/src/socket.js:590:45
    at new Promise (<anonymous>)
    at FluentSocket.innerWrite (/srv/test/node_modules/@fluent-org/logger/build/src/socket.js:586:16)
    at FluentSocket.write (/srv/test/node_modules/@fluent-org/logger/build/src/socket.js:630:21)
    at FluentClient.sendNext (/srv/test/node_modules/@fluent-org/logger/build/src/client.js:459:42)
    at FluentClient.syncFlush (/srv/test/node_modules/@fluent-org/logger/build/src/client.js:321:36)
    at /srv/test/node_modules/@fluent-org/logger/build/src/client.js:336:30
    at processTicksAndRejections (internal/process/task_queues.js:79:11)
    at runNextTicks (internal/process/task_queues.js:66:3)
    at processImmediate (internal/timers.js:434:9)
    at process.topLevelDomainCallback (domain.js:138:15)
    at process.callbackTrampoline (internal/async_hooks.js:124:14)

Other stack trace:

Error: write EPIPE
    at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:94:16)
    at WriteWrap.callbackTrampoline (internal/async_hooks.js:126:14)

Logging stopping to work after a while in Meteor projects

Hi!

I just want to ask about any ideas why the package could not work with Meteor(we still have to maintain some legacy) projects. It started logging but stop after a while (but there is still console.log logs in console). No errors, just silently stop to work and restart after redeploy.

Winston papertrail plugin worked fine (also based on tcp request as far as I know).
Do you have any ideas? :-)

Thanks in advice.

Andrei

Node v14+ support and fix for TypeScript and EventEmitter

Node v14 is the active LTS and it switches to "maintenance" tomorrow, making Node v16 the current LTS as of Tuesday. It would be great to be able to upgrade the project so it builds without errors. When I try, I get the following:

node_modules/@fluent-org/logger/build/src/server.d.ts:63:43 - error TS2507: Type 'typeof EventEmitter' is not a constructor function type.

63 export declare class FluentServer extends EventEmitter {
                                             ~~~~~~~~~~~~

node_modules/@fluent-org/logger/build/src/socket.d.ts:164:43 - error TS2507: Type 'typeof EventEmitter' is not a constructor function type.

164 export declare class FluentSocket extends EventEmitter {
                                              ~~~~~~~~~~~~


Found 2 errors.

From what I see at DefinitelyTyped, the upgrade at 13.4 or 13.5 caused issues. Geckos.io has a simple fix, as does cheap-watch. It basically means you have to change the import line.

// Old
import * as EventEmitter from "events";

// New
import { EventEmitter } from 'events';

Question: How do we handle deferred promise rejections due to dropped entries?

Thanks for the awesome library!

I am experimenting with the situation where Fluent becomes unreachable while events are being emitted. I believe sendQueueMaxLimit is the right way to prevent backpressure, but I noticed that dropped entries cause an unhandledPromiseRejection, and I don't see a way to catch the rejection.

Here is a simple reproduction:

const { FluentClient } = require('@fluent-org/logger')
const logger = new FluentClient('testing', {
  socket: {
    host: 'i-dont-exist.com'
  },
  sendQueueMaxLimit: {
    length: 2,
  },
  ack: {}
})

const run = async () => {
  await Promise.all(Array(3).fill().map(async (_, i) => {
    try {
      await logger.emit('label', {
        hello: i
      })
      console.log('written!', i)
    } catch (err) {
      console.error('uh oh!', err.message)
    }
  }))
}

run()

This results in unhandledPromiseRejection: DroppedError. It seems the dropEntry.deferred.reject here is not bubbled up to the caller.

The only resolution I've found is to add the following:

process.on('unhandledRejection', (reason, promise) => {
  if (reason instanceof FluentError.DroppedError) return
  throw promise
})

Is there a correct way to handle this that I am missing?

Proposal: Ability to configure msgpack encoding, e.g. to ignore undefined properties

Hi ๐Ÿ‘‹

Would it be helpful to anyone else if it were possible to configure the msgpack encoder.

For example msgpack has the option to ignoreUndefinedProperties, which is desirable in certain circumstances, eg. to reduce bandwidth or storage requirements.

Msgpack options could (for example) be provided with the client configuration. Something like:

const client = new FluentClient('tag', {
  msgpackOptions: {
    maxDepth: 42,
    ingnoreUndefined: true,
    // other msgpack options at https://github.com/msgpack/msgpack-javascript/blob/main/src/Encoder.ts#L15
  }
}

Would be happy to throw something together if it's interesting.

Thanks in advance

Configure Connection Error Propagation

Hi, I've just started migrating an app from fluent-logger to this tool, and I'm trying to understand how to configure propagation of connection errors to my app. For context, we have a fluent-bit server with a configurable port setting, so we need to make sure the connection is reliably established (to avoid configuration problems). I have something like this in the app:

class FluentLogger {
  logger: any;
  fluentHost: string;
  fluentPort: number;
  constructor() {
    const FluentClient = require("@fluent-org/logger").FluentClient;
    this.fluentHost = process.env.FLUENTD_ENDPOINT ?? "localhost";
    this.fluentPort = parseInt(process.env.FLUENTD_PORT) || 24224;
    this.logger = new FluentClient("teleop-frontend", {
      socket: {
        host: this.fluentHost,
        port: this.fluentPort,
      },
    });
  }

  emit(event: object, label?: string) {
    if (! this.logger.writable) {
      console.log(
      `Unable to connect to Fluent logger client at ${this.fluentHost}:${this.fluentPort}!`
      );
      return;
    }
    if (label) this.logger.emit(label, event);
    else this.logger.emit(event);
  }
}

This check of FluentClient.writable is the only thing I found in this package which provides some feedback to the status of the connection. I see that there is an internal FluentSocket.onError that serves to emit e.g. connection errors (via FluentSocket.openSocket()) but I can't find any configuration options to actually emit that event to anything. All the other functions I call return void Promises that don't raise on failure; is there some configuration option/step I'm missing?

EDIT: I actually am getting false in all cases when checking FluentClient.writable, even when the host and port are correct (i.e. removing that if (!this.logger.writable) block successfully publishes the event). So I'm not sure what the right way to do this is

I'm using @fluent-org/logger 1.0.10 in a NextJS app, for reference

Reconnection background task keeps running even after calling disconnect()

Hello!

Came across an issue where the background async reconnection task would keep running even after explicitly calling disconnect() in the particular case where the remote fluent server closes the connection first (i.e. before calling disconnect()). This causes the node process to never stop since there's a background async task that never fully resolves if the remote fluent server doesn't come back up. A side effect is also that, if the remote fluent server did come back up, the client will reconnect even though it should have remained disconnected.

Minimal reproduction:

import { FluentClient, FluentServer } from '@fluent-org/logger';
import { hostname } from 'os';

void (async () => {
  const fluentServerPort = 9999;

  const fluentServer = new FluentServer({
    keepalive: true,
    listenOptions: {
      port: fluentServerPort,
    },
  });

  await fluentServer.listen();

  const fluentClient = new FluentClient(undefined, {
    socket: {
      disableReconnect: false,
      host: hostname(),
      port: fluentServerPort,
    },
    eventRetry: {},
    disconnect: {
      waitForPending: false,
      socketDisconnectDelay: 0,
    },
    disableAutoconnect: true,
  });

  await fluentClient.connect();

  await fluentServer.close();

  await fluentClient.shutdown();
})();

This also means that any tests for this behaviour will either hang forever, or time out (if so configured).

TLS/SSL encryption does not work

I've been trying to make TLS/SSL encryption work locally by following the example in the doc, but I cannot. Here are the configs

Fluentd configuration:

<source>
  @type forward
  port 24224
  <transport tls>
    ca_cert_path /path/to/ca_cert.pem
    ca_private_key_path /path/to/ca_key.pem
    ca_private_key_passphrase very_secret_passphrase
  </transport>
</source>

<match **.*>
  @type stdout
</match>

Logger configuration:

const logger = new FluentClient("tag", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
  },
  tls: {
    ca: fs.readFileSync("/path/to/ca_cert.pem")
  }
});

I can successfully log a message with openssl (command is from the fluentd docs):

echo -e '\x93\xa9debug.tls\xceZr\xbc1\x81\xa3foo\xa3bar' | \
  openssl s_client -connect localhost:24224

When I try with the logger, the emit promise is resolved, but I get the following error in the Fluentd server:

#0 unexpected error before accepting TLS connection by OpenSSL addr="127.0.0.1" host="127.0.0.1" port=56088 error_class=OpenSSL::SSL::SSLError error="SSL_accept returned=1 errno=0 state=error: wrong version number"

I tried messing around with TLS versions, but my current guess is that the ca parameter gets completely ignored. I get the same error even if I excude the ca parameter.

In socket.ts, there is this line: tls.connect({...this.tlsOptions, ...this.socketParams});, but the tls.connect function doesn't actually seem to take in a parameter called ca (per the Node.js TLS API docs). However, tls.createSecureContext does seem to accept ca to override trusted certificates.

I'm not sure if I've missed something in setting this up or if the TLS doesn't work with this client.

Edit

After doing some more digging into the source code, I figured it out.

const logger = new FluentClient("tag", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds,
    tls: {
      ca: fs.readFileSync("/path/to/ca_cert.pem")
    }
  }, 
});

Please correct your documentation.

FluentServer not decoding protocol correctly?

Hi,

Trying to use this as a server/sink to have fluent forward logs into a node app, but nothing seems to get emitted when forwarded in by an actual fluentd..

Using this test code:

#!/usr/bin/node
'use strict';

const FluentServer = require("@fluent-org/logger").FluentServer;
const FluentClient = require("@fluent-org/logger").FluentClient;

const server = new FluentServer({ listenOptions: { port: 24420 }});

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24420,
    timeout: 3000, // 3 seconds
  }
});


let main = async ()=>{
        await server.listen();
        logger.emit('tag', { message: 'message' });
};

server.on('entry', (a,b,c) => console.log(a,b,c));

main()

I do immediately see the message forwarded into itself with this test app, however I see nothing emitted when my local fluentd connects and flushes it's buffer.

I've tried the config posted in the readme basically; I have my local syslog as a source tagged as system.

<match system.**>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s

  <server>
    name fluent_node 
    host 127.0.0.1
    port 24420
    weight 60
  </server>

  <secondary>
    @type file
    path /var/log/calyptia-fluentd/forward-failed
  </secondary>
</match>

I've also tried <match **> for all messages.. I do not ever see a forward-failed file. The fluentd log complains when my node app isn't listening and then shows that it flushed when it does start listening.

I've setup a copy output to copy to the forward for the node sink and to a local file and can indeed see the log entries in the local file.

2022-05-17 12:16:49 +0000 [warn]: #0 recovered forwarding server 'fluent_node' host="127.0.0.1" port=24420
2022-05-17 12:16:55 +0000 [warn]: #0 retry succeeded. chunk_id="5df3416a0841558c7b24a18a52894e0e"
2022-05-17 12:18:16 +0000 [warn]: #0 failed to flush the buffer. retry_times=0 next_retry_time=2022-05-17 12:18:18 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Errno::ECONNREFUSED error="Connection refused - connect(2) for \"127.0.0.1\" port 24420"
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/socket.rb:62:in `initialize'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/socket.rb:62:in `new'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/socket.rb:62:in `socket_create_tcp'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:410:in `create_transfer_socket'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward/connection_manager.rb:46:in `call'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward/connection_manager.rb:46:in `connect'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:807:in `connect'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:676:in `send_data'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:365:in `block in write'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward/load_balancer.rb:46:in `block in select_healthy_node'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward/load_balancer.rb:37:in `times'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward/load_balancer.rb:37:in `select_healthy_node'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/service_discovery/manager.rb:108:in `select_service'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/service_discovery.rb:82:in `service_discovery_select_service'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:365:in `write'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/output.rb:1179:in `try_flush'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:353:in `try_flush'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/output.rb:1500:in `flush_thread_run'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/output.rb:499:in `block (2 levels) in start'
  2022-05-17 12:18:16 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2022-05-17 12:18:17 +0000 [warn]: #0 failed to flush the buffer. retry_times=1 next_retry_time=2022-05-17 12:18:20 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Errno::ECONNREFUSED error="Connection refused - connect(2) for \"127.0.0.1\" port 24420"
  2022-05-17 12:18:17 +0000 [warn]: #0 suppressed same stacktrace
2022-05-17 12:18:19 +0000 [warn]: #0 failed to flush the buffer. retry_times=2 next_retry_time=2022-05-17 12:18:24 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Errno::ECONNREFUSED error="Connection refused - connect(2) for \"127.0.0.1\" port 24420"
  2022-05-17 12:18:19 +0000 [warn]: #0 suppressed same stacktrace
2022-05-17 12:18:23 +0000 [warn]: #0 failed to flush the buffer. retry_times=3 next_retry_time=2022-05-17 12:18:32 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Errno::ECONNREFUSED error="Connection refused - connect(2) for \"127.0.0.1\" port 24420"
  2022-05-17 12:18:23 +0000 [warn]: #0 suppressed same stacktrace
2022-05-17 12:18:32 +0000 [warn]: #0 failed to flush the buffer. retry_times=4 next_retry_time=2022-05-17 12:18:49 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Errno::ECONNREFUSED error="Connection refused - connect(2) for \"127.0.0.1\" port 24420"
  2022-05-17 12:18:32 +0000 [warn]: #0 suppressed same stacktrace
2022-05-17 12:18:47 +0000 [warn]: #0 detached forwarding server 'fluent_node' host="127.0.0.1" port=24420 phi=16.00192534264217 phi_threshold=16
2022-05-17 12:18:48 +0000 [warn]: #0 failed to flush the buffer. retry_times=5 next_retry_time=2022-05-17 12:19:21 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Fluent::Plugin::ForwardOutput::NoNodesAvailable error="no nodes are available"
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward/load_balancer.rb:55:in `select_healthy_node'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/service_discovery/manager.rb:108:in `select_service'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/service_discovery.rb:82:in `service_discovery_select_service'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:365:in `write'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/output.rb:1179:in `try_flush'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/out_forward.rb:353:in `try_flush'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/output.rb:1500:in `flush_thread_run'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin/output.rb:499:in `block (2 levels) in start'
  2022-05-17 12:18:48 +0000 [warn]: #0 /opt/calyptia-fluentd/lib/ruby/gems/3.0.0/gems/fluentd-1.14.6/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2022-05-17 12:19:21 +0000 [warn]: #0 failed to flush the buffer. retry_times=6 next_retry_time=2022-05-17 12:20:22 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Fluent::Plugin::ForwardOutput::NoNodesAvailable error="no nodes are available"
  2022-05-17 12:19:21 +0000 [warn]: #0 suppressed same stacktrace
2022-05-17 12:19:26 +0000 [warn]: #0 recovered forwarding server 'fluent_node' host="127.0.0.1" port=24420
2022-05-17 12:20:17 +0000 [warn]: #0 detached forwarding server 'fluent_node' host="127.0.0.1" port=24420 phi=16.068666338465153 phi_threshold=16
2022-05-17 12:20:22 +0000 [warn]: #0 failed to flush the buffer. retry_times=7 next_retry_time=2022-05-17 12:22:36 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Fluent::Plugin::ForwardOutput::NoNodesAvailable error="no nodes are available"
  2022-05-17 12:20:22 +0000 [warn]: #0 suppressed same stacktrace
2022-05-17 12:22:36 +0000 [warn]: #0 failed to flush the buffer. retry_times=8 next_retry_time=2022-05-17 12:26:35 +0000 chunk="5df341e996491df677197139862f6aa5" error_class=Fluent::Plugin::ForwardOutput::NoNodesAvailable error="no nodes are available"
  2022-05-17 12:22:36 +0000 [warn]: #0 suppressed same stacktrace
2022-05-17 12:24:47 +0000 [warn]: #0 recovered forwarding server 'fluent_node' host="127.0.0.1" port=24420
2022-05-17 12:26:35 +0000 [warn]: #0 retry succeeded. chunk_id="5df341e996491df677197139862f6aa5"

Perhaps I'm reading it wrong but retry succeeded with a chunk id sounds like it's definitely pushing a chunk but I'm not getting any event emitted while I do when I push it locally from node.

Any ideas what my issue might be? is there a possible version issue or something?

thanks

`TimeoutOverflowWarning` after many retries

The maxDelay parameter defaults to Infinity:

maxDelay: +Infinity,

In the case where the computed delay is larger than the maximum 32-bit signed integer (2 ** 31 - 1), a TimeoutOverflowWarning is emitted by setTimeout, the value 1 is assumed, and we end up waiting only 1 millisecond before retrying, which causes frequent retries.

I have worked around this issue by passing:

socket: {
  reconnect: {
    maxDelay: 2 ** 31 - 1
  }
}

NodeJS heap out of memory caused by multiple connects and disconnects

Hey ๐Ÿ‘‹

I have a minimal reproducible code that causes NodeJS to crash

index.ts

import * as fluent from '@fluent-org/logger';

const f = new fluent.FluentClient('abc');

f.connect()
  .then(() => f.emit({ 'hello': 'world' }))
  .then(() => f.disconnect())
  .then(() => f.connect())   // <---- Stuck here and then crash
  .catch(() => console.log('should not get here'));

And this is Fluentd's configuration

fluent.conf

<source>
    @type forward
</source>

<match **>
  @type null
</match>

Results

node ./dist/src/index.js
# ... wait for a while

<--- Last few GCs --->

[13592:0x148008000]    18981 ms: Mark-sweep 4042.5 (4128.4) -> 4036.5 (4138.9) MB, 2135.1 / 0.0 ms  (average mu = 0.453, current mu = 0.033) allocation failure scavenge might not succeed
[13592:0x148008000]    21901 ms: Mark-sweep 4052.2 (4138.9) -> 4043.4 (4145.4) MB, 2917.1 / 0.0 ms  (average mu = 0.245, current mu = 0.001) allocation failure scavenge might not succeed


<--- JS stacktrace --->

FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out of memory
# ... etc

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.