Giter VIP home page Giter VIP logo

fastify-sse-v2's Introduction

Fastify SSE Plugin

CI check npm version

Fastify plugin for sending Server-sent events.

For [email protected] use [email protected]!

How to use?

yarn add fastify-sse-v2

Register fastify-sse-v2 plugin into your fastify instance:

import {FastifySSEPlugin} from "fastify-sse-v2";

const server = fastify();
server.register(FastifySSEPlugin);

Sending events from AsyncIterable source

import {FastifySSEPlugin} from "fastify-sse-v2";

const server = fastify();
server.register(FastifySSEPlugin);

server.get("/", function (req, res) {
    res.sse((async function * source () {
          for (let i = 0; i < 10; i++) {
            sleep(2000);
            yield {id: String(i), data: "Some message"};
          }
    })());
});

Sending individual events

import {FastifySSEPlugin} from "fastify-sse-v2";

const server = fastify();
server.register(FastifySSEPlugin);

server.get("/", async function (req, res) {
    for (let i = 0; i < 10; i++) {
      await sleep(2000);
      res.sse({id: String(i), data: "Some message"});
    }
});

fastify.get('/listenForChanges', {}, (request, reply) => {
    const listenStream = fastify.db.watch('doc-uuid')
        .on('data', (data)=>reply.sse({ data: JSON.stringify(data) }))
        .on('delete', () => reply.sse({ event: 'close' }))
    request.socket.on('close', ()=>listenStream.end())
})
Note
  • When sending individual events, the connection is kept open until you call reply.sseContext.source.end() to terminate the stream.
Sending events from EventEmmiters
import {FastifySSEPlugin} from "fastify-sse-v2";
import {on} from "events";

const server = fastify();
server.register(FastifySSEPlugin);

server.get("/", function (req, res) {
    res.sse(
  (async function* () {
    for await (const [event] of on(eventEmmitter, "update")) {
      yield {
        event: event.name,
        data: JSON.stringify(event),
      };
    }
  })()
);
});
Note
  • to remove event listeners (or some other cleanup) when client closes connection, you can listen on connection closing event: request.socket.on('close', () => abortController.abort());

fastify-sse-v2's People

Contributors

binamra7 avatar dependabot[bot] avatar edenhermelin avatar github-actions[bot] avatar likev avatar masarykadam avatar morrigan avatar mpetrunic avatar nahuel avatar nmlinaric avatar shuirong avatar simenb avatar sinedied avatar thomasdingemanse avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

fastify-sse-v2's Issues

FST_ERR_PROMISE_NOT_FULFILLED

I haven't tried any of the other examples besides the asyncGenerator, so this may only be specific to that example? I'm receiving the following error when I have a route defined as below.

Promise may not be fulfilled with 'undefined' when statusCode is not 204

app.get('/simulator/state', async (request, response) => {
    response.sse(
      (async function* source() {
        const state = simulator.state()
        yield { data: JSON.stringify(state) }
        await sleepFor(10000)
      })()
    )
})

session close after 60 seconds of "idle" time?

Hello,

I've begun using your fastify-sse-v2.

My curl client terminates after ~60 seconds of idle time:

retry: 3000

* transfer closed with outstanding read data remaining
* Closing connection 0
curl: (18) transfer closed with outstanding read data remaining

By "idle" I mean that the SSE server hasn't received any events in 60 seconds (quite likely in this app) and thus hasn't sent any on to the curl client.

  1. Does your plugin close the session after 60 seconds of no events?
  2. If so, is this value configurable?
  3. If not, do you know where this close is occurring?

I do realize that a real-world client would be reconnecting when this occurs.

Thanks for this plugin, and for your help.

Example for cleaning up anonymous function (redis pubsub)

Hey, I am using the implementation for redis, according to your docs:

import {FastifySSEPlugin} from "fastify-sse-v2";
import {on} from "events";
import { server } from '../index.js';

const server = fastify();
server.register(FastifySSEPlugin);

server.get("/", function (req, res) {
    const { redis } = server;
    res.sse(
  (async function* () {
    for await (const event of on(redis, "message")) {
      yield {
        type: event.name,
        data: JSON.stringify(event),
      };
    }
  })()
);
});

This implementation works fine, but after several requests, I am getting the error MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 finish listeners added to [ServerResponse]. Use emitter.setMaxListeners() to increase limit

As you have written in the docs, a cleanup should happen on connection close.
I am wondering how I should unset the listener on an anonymous function.

Is there some example?

Thanks in advance.

No way to pass in headers or CORs options

Thanks for the plugin! My team is attempting to implement this for our needs using the Nestjs framework. It works pretty well accept for the matter of CORs in the browser. It would be great if you could pass in an options object for customizing headers, etc. For our needs, I think we have a workaround but would be nice if we could modify this at a global level.

Thanks!

Possibly missing configs or instructions

I have a pretty vanilla Fastify install that works great ("fastify": "^3.9.2"), however when running the first example

res.sse((async function * source () {
      for (let i = 0; i < 10; i++) {
        sleep(2000);
        yield {id: String(i), data: "Some message"};
      }
})());

My server dumps all the output only once after 20 second loop completes. Basically it does not stream and the EventStream tab in chrome dev tools is absent, until the GET request completes with status 200 and returns 10 lines of messages at once with 'Content-Type': 'text/event-stream'.

I am curious if I missed some other fastify config that needs to be enabled for SSE to work as expected. Also, here is my definition of the sleep function:
const sleep = (waitTimeInMs) => new Promise(resolve => setTimeout(resolve, waitTimeInMs))
Any pointers at what is going on would be greatly appreciated.

[QUESTION] Client reconnection

What is the expected bahaviour when I have an active client connection, and the serve restarts? Should the client auto-reconnects?

TypeError: Cannot read properties of undefined (reading 'source')

TypeError: Cannot read properties of undefined (reading 'source')
    at _Reply.sse (/Users/kev/code_base/instinctmethod/server/node_modules/fastify-sse-v2/lib/plugin.js:41:33)
    at IncomingMessage.<anonymous> (file:///Users/kev/code_base/instinctmethod/server/src/route.ts:200:15)
    at IncomingMessage.emit (node:events:513:28)
    at IncomingMessage.emit (node:domain:489:12)
    at IncomingMessage.Readable.read (node:internal/streams/readable:539:10)
    at flow (node:internal/streams/readable:1023:34)
    at emitReadable_ (node:internal/streams/readable:604:3)
    at processTicksAndRejections (node:internal/process/task_queues:81:21)

Complete code:

  void server.post('/call/gpt_message/send_stream', {}, async (i: FastifyRequest, o: FastifyReply) => {
      const opt = i.input();
      const row = await gpt_message_ensure(opt, i);
      const stream = (await Gpt_message.send(row.id, true));
      stream.on('data', (chunk: any) => {
        const payloads = chunk.toString().split('\n\n');
        for (const payload of payloads) {
          if (payload.includes('[DONE]')) return;
          let content: string | undefined;
          if (payload.startsWith('data:')) {
            try {
              const json = JSON.parse(payload.replace('data: ', ''));
              content = json.choices[0].delta?.content;
            } catch (error) {
              console.log(chunk.toString());
              console.log(`Error with JSON.parse and ${payload}.\n${error}`);
            }
          }

          if (content) {
            o.sse({ id: '1', data: chunk } as any); // this line throws
          }
        }
      });

      stream.on('end', () => {
        setTimeout(() => {
          console.log('\nStream done');
          o.send({ message: 'Stream done' });
        }, 10);
      });

      stream.on('error', (err: Error) => {
        console.log(err);
        o.send(err);
      });

    },
  )

SSE connection never close when not using async iterable

When trying your examples with [email protected], only the async iterable one ever completes.

The exemples with individual events works for sending events, but the connection is kept open and never closes.
Is there something I missed? Should I close the connection manually from the server? If so, how to do it?

I put up an example repo to illustrate this: https://github.com/sinedied/fastify-sse-test/blob/main/src/routes/test/index.ts

After running the server, run these URLs in a browser:

Add support for Node events.on asyncIterable

I'm pretty new to fastify, SSEs, and node events so please bear with.

In the 'Sending events from EventEmmiters' example, it shows how to use event-iterator to convert an EventEmitter to an asyncIterable. Recently having delved into this, I found that node can already convert its EventEmitter into an asyncIterable through using events.on which "Returns: <AsyncIterator> that iterates eventName events emitted by the emitter". Here is the code for a mostly working example:

// inside a routes plugin

// contrived and simplified example
const ee = new EventEmitter();

fastify.get("/data", (_request, reply) => {
  reply.sse(on(ee, "update"));
});

fastify.post(
  "/api",
  async (request, reply) => {
    // do something...
    const update = doSomething(request.body)
    ee.emit("update", {
      type: "ping",
      data: JSON.stringify({ update }),
    });
    reply.status(200).send();
  }
);

Like I mentioned, this is mostly working. Where it breaks down is due to how EventEmitters collect emit args into an array so when transformAsyncIterable in sse.ts gets a message from the source, it is an array not just an object like serializeSSEEvent expects.

So I'm opening this issue to 1) see if there's interest in supporting node events.on for converting to an asyncIterable, and 2) what would be the best approach for handling a message that is an array of emit arguments.

For my needs, I'm able to assume that the first arg is all that matters:

async function* transformAsyncIterable(source) {
  for await (const message of source) {
+    if(Array.isArray(message)) {
+      yield serializeSSEEvent(message[0]);
+    }
    yield serializeSSEEvent(message);
  }

  yield serializeSSEEvent({
    event: "end",
    data: "Stream closed"
  });
}

but that is likely a pretty naive solution. I wonder if instead the whole array should be serialized (eg. if(Array.isArray(message)) { yield message.map(serializeSSEEvent) }) or if the payload should reduce them down to a single payload; I'm basing this idea off of what little I know about SSEs but the MDN docs say:

When the EventSource receives multiple consecutive lines that begin with data:, it concatenates them, inserting a newline character between each one.

So maybe it makes sense to accumulate them all and send them as data. I'm open to feedback and learning, thanks!

Sample code doesn't work

I'm referring to this code
and I'm getting this error:

UnhandledPromiseRejectionWarning: TypeError: source is not async iterable
    at transformAsyncIterable (node_modules\fastify-sse-v2\lib\sse.js:36:31)
    at transformAsyncIterable.next ...

to make it work, I had to do something like this

server.get("/", async function (req, res) {
    res.sse(await (async function * source () {
          for (let i = 0; i < 10; i++) {
            //sleep(2000);
            yield {id: String(i), data: "Some message"};
          }
    })())
});

not sure if I got the same intended functionality, but at least now it works
Node v12.16.3

Event emitter example does not work

I tried to run the code from the last example in the README:

import {FastifySSEPlugin} from "fastify-sse-v2";
import EventIterator from "event-iterator";

const server = fastify();
server.register(FastifySSEPlugin);

server.get("/", function (req, res) {
    const eventEmitter = new EventEmitter();
    res.sse(new EventIterator(
                (push) => {
                  eventEmitter.on("some_event", push)
                  return () => eventEmitter.removeEventListener("some_event", push)
                }
        )
    );
});

I changed "some event" to the events emitted by my event emitter ("scan"). But I still get the following error:

TypeError [ERR_INVALID_ARG_TYPE]: The "listener" argument must be of type function. Received an instance of Object
        at checkListener (events.js:111:11)
        at _addListener (events.js:348:3)
        at EventEmitter.addListener (events.js:406:10)
        at D:\Development\new-project\server\routes\index.js:25:21

Does this only work with certain versions of event-iterator? Or is there another issue I overlooked?

Handle error thrown when an async iterable is used

When I'm using reply.sse(myAsyncGenerator), if myAsyncGenerator function throws an exception I cannot catch it and gracefully return an error, even if the call is enclosed in a try / catch.

Example:

      try {
          const generator = await getMyAsyncGenerator();
          reply.sse(generator);
        } else {
          return await askApproach.run(question, overrides ?? {});
        }
      } catch (error) {
        // it's never going here
      }

Is there something I'm missing, or should I simply not use this form if I want to catch generator errors?
If I handle the async generator manually, I'm able to catch the error:

      try {
          const generator = await getMyAsyncGenerator();
          for await (const chunk of generator) {
            reply.sse(chunk);  
          }
          reply.sseContext.source.end();
        } else {
          return await askApproach.run(question, overrides ?? {});
        }
      } catch (error) {
        // here I can gracefully handle the error
      }

Thanks

headers/mime-type not sent in sample from Readme

Plugin headers like Cache-Control or mime-type text/event-stream aren't sent, check this sample taken from the Readme:

const {FastifySSEPlugin} = require("fastify-sse-v2");
const fastify = require("fastify")

const server = fastify();
server.register(FastifySSEPlugin);

server.get("/", function (req, res) {
    res.sse((async function * source () {
          for (let i = 0; i < 2; i++) {
            yield {id: String(i), data: "Some message"};
          }
    })());
});

server.listen(9000)

Trying it:

/tmp/test$ curl -v  http://localhost:9000/
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 9000 (#0)
> GET / HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Date: Wed, 22 Jul 2020 21:57:04 GMT
< Connection: keep-alive
< Transfer-Encoding: chunked
< 
retry: 3000

id: 0
data: Some message

id: 1
data: Some message

event: end
data: Stream closed

* Connection #0 to host localhost left intact
/tmp/test$ 

Versions used:

node: v14.6.0
fastify: 3.1.1
fastify-sse-v2: 2.01

[QUESTION] List connected clients

Is there a property on the plugin to get a list of connected clients? Or, should I wrap this package in a local plugin, and track the client connections myself?

How to know connection is closed on sse?

Hey quick question here,
I'm trying to dynamically create event emitters when sse starts, but have no idea how to listen close event (eg when user closed browser tab) to properly destroy event emitter; Is there something I'm missing for?

Export `EventMessage` interface

Could this be exported? I'm using this repo for a project and need to use EventMessage as a typing,
my usage example

    const eventIterator = new EventIterator<EventMessage>(({ push, stop }) => {....}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.