Giter VIP home page Giter VIP logo

nestjs-google-pubsub-microservice's Introduction

NestJS Google Cloud Pub/Sub Microservice Transport

Nest Logo

Google Cloud Pub/Sub

Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events.

You can use Pub/Sub as messaging-oriented middleware or event ingestion and delivery for streaming analytics pipelines.

Pub/Sub offers durable message storage and real-time message delivery with high availability and consistent performance at scale

Installation

To start building Pub/Sub-based microservices, first install the required packages:

$ npm i --save @google-cloud/pubsub nestjs-google-pubsub-microservice

Overview

To use the Pub/Sub transporter, pass the following options object to the createMicroservice() method:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  ApplicationModule,
  {
    strategy: new GCPubSubServer({
      topic: 'cats_topic',
      subscription: 'cats_subscription',
      client: {
        projectId: 'microservice',
      },
    }),
  },
);

Options

The options property is specific to the chosen transporter. The GCloud Pub/Sub transporter exposes the properties described below.

topic Topic name which your server subscription will belong to
subscription Subscription name which your server will listen to
replyTopic Topic name which your client subscription will belong to
replySubscription Subscription name which your client will listen to
noAck If false, manual acknowledgment mode enabled
init If false, topics and subscriptions will not be created, only validated
checkExistence If false, topics and subscriptions will not be checked, only used. This only applies when init is false
useAttributes Only applicable for client. If true, pattern and correlationId will be sent via message attributes. This is useful if message consumer is not NestJs microservice or you have message filtering on subscription
client Additional client options (read more here)
publisher Additional topic publisher options (read more here)
subscriber Additional subscriber options (read more here)
scopedEnvKey Scope topics and subscriptions to avoid losing messages when several people are working on the same code base. Will prefixes topics and subscriptions with this key (read more here)

Client

const client = new GCPubSubClient({
  client: {
    apiEndpoint: 'localhost:8681',
    projectId: 'microservice',
  },
});
client
  .send('pattern', 'Hello world!')
  .subscribe((response) => console.log(response));

Context

In more sophisticated scenarios, you may want to access more information about the incoming request. When using the Pub/Sub transporter, you can access the GCPubSubContext object.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
  console.log(`Pattern: ${context.getPattern()}`);
}

To access the original Pub/Sub message (with the attributes, data, ack and nack), use the getMessage() method of the GCPubSubContext object, as follows:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
  console.log(context.getMessage());
}

Message acknowledgement

To make sure a message is never lost, Pub/Sub supports message acknowledgements. An acknowledgement is sent back by the consumer to tell Pub/Sub that a particular message has been received, processed and that Pub/Sub is free to delete it. If a consumer dies (its subscription is closed, connection is closed, or TCP connection is lost) without sending an ack, Pub/Sub will understand that a message wasn't processed fully and will re-deliver it.

To enable manual acknowledgment mode, set the noAck property to false:

{
  replyTopic: 'cats_topic_reply',
  replySubscription: 'cats_subscription_reply',
  noAck: false,
  client: {
    projectId: 'microservice',
  },
},

When manual consumer acknowledgements are turned on, we must send a proper acknowledgement from the worker to signal that we are done with a task.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
  const originalMsg = context.getMessage();

  originalMsg.ack();
}

Shutdown

Pub/Sub requires a graceful shutdown properly configured in order to work correctly, otherwise some messages acknowledges can be lost. Therefore, don't forget to call client close:

export class GCPubSubController implements OnApplicationShutdown {
  client: ClientProxy;

  constructor() {
    this.client = new GCPubSubClient({});
  }

  onApplicationShutdown() {
    return this.client.close();
  }
}

nestjs-google-pubsub-microservice's People

Contributors

cristobalgvera avatar daviddlv avatar fpierre avatar gperdomor avatar mz-hyeok avatar p-fedyukovich avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

nestjs-google-pubsub-microservice's Issues

Only usable with Pub/Sub Admin permissions

This microservice only works with Pub/Sub Admin permission, but in many use cases, this does not follow the least permission policy.
For some reason, it tries to create a topic and subscription if they do not exist, would be great if this would be optional.

multiple subscribers + Listen to a topic - Difference between emit and send

Hi, Is it possible to have multiple subscriptions without having to setup multiple app.connectMicroservice().

Our goal is to set a topic per action, for example one topic for create-user another one for update-user....

Plus we need to listen to a specific pattern example @MessagePattern('create-user'). By doing this it means sending a message like this :

const client: ClientProxy = new GCPubSubClient({
      topic: 'create-user',
      client: {projectId: 'my-project'},
    });
    try {
      return client.emit('create-user', data);
    } finally {
      client.close();
    }

As you can see, i have to specify 'create-user' as a topic and as well has message pattern. So is it possible to listen to a topic and not a pattern ?

Also last question, what is the difference between client.emit() and client.send() . Not really clear from the docs.

Summary:

  • Best way to have multiple subscribers
  • How to listen to a topic not a pattern
  • what is the difference between client.emit() and client.send()

Thanks

Why is server.send called when only listening?

I've got a weird error, so far I only implemented this listener:

@MessagePattern({ cmd: 'test' })
listen(@Payload() payload: any, @Ctx() context: GCPubSubContext) {
  console.log('Payload:', payload);
  console.log('Context:', context);
}

And the server is configured with topic, subscription, and a client (keyFile, projectId).

Whenever a message arrives, I also get this error:

Error: A name must be specified for a topic.
    at PubSub.topic (/Users/me/myproject/node_modules/@google-cloud/pubsub/src/pubsub.ts:1435:13)
    at GCPubSubServer.sendMessage (/Users/me/myproject/node_modules/nestjs-google-pubsub-microservice/lib/gc-pubsub.server.ts:223:7)
    at publish (/Users/me/myproject/node_modules/nestjs-google-pubsub-microservice/lib/gc-pubsub.server.ts:215:22)
    at /Users/me/myproject/node_modules/@nestjs/microservices/server/server.js:49:31
    at processTicksAndRejections (node:internal/process/task_queues:77:11)

Which is weird as '/@nestjs/microservices/server/server.js:49' belongs to the 'send' function and I'm not trying to send anything. Unfortunately there is also no hint to any app code in the stack trace.

Any ideas?

Extend support for Cloud Events

Hi,
I'm very much liking the transporter and wanted it to support cloud events specification.

I have some pubsub subscriptions which deal with google generated cloud events and the data is base 64 encoded. Unfortunately i cannot specify any pattern here to filter out the messages of my interest.

Can you extend the support for this?
It will be based on an assumption that if the event payload follows the cloud event spec, then the data is base64 encoded. If this can be resolved by providing base64 decoder func in the event pattern decorator, that would be awesome.

Thanks for the effort.
Happy to contribute.

Bug: PubSub client is not connected: topic undefined does not exist

I'm experiencing an awkward bug.
I've two microservices with the same MessageService class

export class MessageService implements OnApplicationShutdown, MessageServiceInterface {
  private client: GCPubSubClient;
  private readonly projectId: string;
  private readonly apiEndpoint: string;
  constructor(private config: ConfigService) {
    this.projectId = this.config.get('PUBSUB_PROJECT_ID') ?? this.config.getOrThrow('GCP_PROJECT_ID');
    this.apiEndpoint = this.config.getOrThrow('PUBSUB_HOST');
  }

  createClient(topic: string, subscription?: string): GCPubSubClient {
    return (this.client = new GCPubSubClient({
      init: false,
      checkExistence: true,
      noAck: false,
      topic: topic,
      subscription: subscription,
      publisher: {
        batching: { maxMessages: 10 },
      },
      client: {
        projectId: this.projectId,
        apiEndpoint: this.apiEndpoint,
      },
    }));
  }

  public onApplicationShutdown(): Promise<void> {
    return this.client.close();
  }
}

And then I'm using it in one on my controllers in the next way

await firstValueFrom(
          this.messageService
            .createClient(this.paymentsCalculatorSuccLevTopic)
            .emit(this.paymentsCalculatorSuccLevPattern, data),
        )

In one of the services it's working perfectly but in the other I get all the time the next error:
PubSub client is not connected: topic undefined does not exist

I've confirmed that the topic is there, and that the connection is done and all the configuration is working.
Also, if I call the PubSub functions like the example above it works with no problem

await this.messageService
          .createClient()
          .topic(this.paymentsCalculatorSuccLevTopic)
          .publishMessage({
            json: {
              pattern: this.paymentsCalculatorSuccLevPattern,
              data,
            },
          });

Here is a log of the client

GCPubSubClient {
  routingMap: Map(0) {},
  options: {
    init: false,
    checkExistence: true,
    noAck: false,
    topic: 'payment_calculator_suc_lev_test',
    subscription: undefined,
    publisher: { batching: [Object] },
    client: { projectId: 'emulator', apiEndpoint: '127.0.0.1:8085' }
  },
  logger: Logger { context: 'GCPubSubClient', options: {} },
  client: null,
  replySubscription: null,
  topic: null,
  clientConfig: { projectId: 'emulator', apiEndpoint: '127.0.0.1:8085' },
  scopedEnvKey: '',
  topicName: 'payment_calculator_suc_lev_test',
  subscriberConfig: {
    topic: Topic {
      getSubscriptionsStream: [Function (anonymous)],
      name: 'projects/emulator/topics/payment_calculator_update_test',
      publisher: [Publisher],
      pubsub: [PubSub],
      parent: [PubSub],
      request: [Function: bound request],
      iam: [IAM],
      metadata: [Object]
    }
  },
  publisherConfig: { batching: { maxMessages: 10 } },
  replyTopicName: 'undefined',
  replySubscriptionName: 'undefined',
  noAck: false,
  init: false,
  useAttributes: false,
  checkExistence: true,
  serializer: IdentitySerializer {},
  deserializer: IncomingResponseDeserializer {}
}

The log from the other microservice looks the same but just with different topic and subscription.

Do you know what could be happening?

There is no matching event handler defined in the remote service. Event pattern: undefined error message

Hi I tried to use the library but I keep getting "There is no matching event handler defined in the remote service. Event pattern: undefined" error when a new message arrives.

my code is like this.

main.ts

async function bootstrap() {
    const port = process.env.PORT || 3000;

    const configApp = await NestFactory.create(AppModule);
    const configService = configApp.get(ConfigService);
    const loggerService = new LoggerService();

    const app = await NestFactory.create(AppModule, {
        logger: loggerService
    });

    const microservice2 = app.connectMicroservice({
        strategy: new GCPubSubServer({
            topic: 'test-latency',
            subscription: 'test-latency-pull',
            noAck: true,
            client: {
                projectId: 'cmor-baas-dev'
            }
        })
    });

    await app.startAllMicroservices();
    microservice2.useLogger(loggerService);

    await app.listen(port);
}
bootstrap();

Then in app.controller

    @MessagePattern('notifications')
    getNotifications(@Payload() data: any, @Ctx() context: GCPubSubContext) {
        console.log(`Pattern: ${context.getPattern()}`);
        console.log(context.getMessage());
        console.log(JSON.stringify(data));
    }

image

nestjs:8.2.5 (I tried with 8.0.0 as well)
@google-cloud/pubsub: 2.19.4
nestjs-google-pubsub-microservice: 2.2.0

Readme regarding client.close is not quite clear

This part of the readme is not really clear:

export class GCPubSubController implements OnApplicationShutdown {
  client: ClientProxy;

  constructor() {
    this.client = new GCPubSubClient({});
  }

  onApplicationShutdown() {
    return this.client.close();
  }
}

I assume that we might need to close the same instance of the pub/sub client created during the call of the GCPubSubServer constructor. However, it would be truly helpful and appreciated to have the author's insight on this matter.

Decouple subscription creation from topic creation during "init"?

Code:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  ApplicationModule,
  {
    strategy: new GCPubSubServer({
      topic: 'projects/ALPHA/topics/FOO',
      subscription: 'projects/BETA/subscriptions/FOO',
      init: true,
      checkExistence: true,
      client: {
        projectId: 'BETA',
      },
    }),
  },
);

Context:

There is a topic FOO in ALPHA project (outside of my team's ownership) that I want to subscribe to from BETA (project my team actually owns). I don't have permissions inside ALPHA except pubsub.topics.attachSubscription.
A combination of init=true and checkExistence=true fails with PERMISSION_DENIED (no pubsub.topics.list or pubsub.topics.get), if I flip init to false it also doesn't help since the subscription FOO doesn't exist yet ๐Ÿคท

Error

/app/node_modules/@grpc/grpc-js/src/call.ts:82
                     |   const error = new Error(message);
                     |                 ^
                     | Error: 7 PERMISSION_DENIED: User not authorized to perform this action.
                     |     at callErrorFromStatus (/app/node_modules/@grpc/grpc-js/src/call.ts:82:17)
                     |     at Object.onReceiveStatus (/app/node_modules/@grpc/grpc-js/src/client.ts:360:55)
                     |     at Object.onReceiveStatus (/app/node_modules/@grpc/grpc-js/src/client-interceptors.ts:458:34)
                     |     at Object.onReceiveStatus (/app/node_modules/@grpc/grpc-js/src/client-interceptors.ts:419:48)
                     |     at /app/node_modules/@grpc/grpc-js/src/resolving-call.ts:132:24
                     |     at processTicksAndRejections (node:internal/process/task_queues:77:11)
                     | for call at
                     |     at ServiceClientImpl.makeUnaryRequest (/app/node_modules/@grpc/grpc-js/src/client.ts:325:42)
                     |     at ServiceClientImpl.<anonymous> (/app/node_modules/@grpc/grpc-js/src/make-client.ts:189:15)
                     |     at /app/node_modules/@google-cloud/pubsub/src/v1/publisher_client.ts:307:25
                     |     at /app/node_modules/google-gax/build/src/normalCalls/timeout.js:44:16
                     |     at repeat (/app/node_modules/google-gax/build/src/normalCalls/retries.js:80:25)
                     |     at /app/node_modules/google-gax/build/src/normalCalls/retries.js:118:13
                     |     at OngoingCall.call (/app/node_modules/google-gax/build/src/call.js:67:27)
                     |     at NormalApiCaller.call (/app/node_modules/google-gax/build/src/normalCalls/normalApiCaller.js:34:19)
                     |     at /app/node_modules/google-gax/build/src/createApiCall.js:84:30
                     |     at processTicksAndRejections (node:internal/process/task_queues:95:5)

Expected behavior

init and checkExistence moved into topic and subscription so we could make something like this work:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  ApplicationModule,
  {
    strategy: new GCPubSubServer({
      topic: {
        name: 'projects/ALPHA/topics/FOO', # <path> might be a better "name" for this
	init: false,
      },
      subscription: {
        name: 'projects/BETA/subscriptions/FOO',
	init: true,
        checkExistence: true,
      },
    }),
  },
);

Alternatively topicInit and subscriptionInit would work as well.

Thanks for all the work you put in nestjs-google-pubsub-microservice ๐Ÿฅ‡ ๐Ÿš€

Multiple topic subscription and healthcheck

Hi there,

Interested in using the package, I was wondering a couple thing, maybe you can help me out:

  • How would I subscribe to multiple topics (auto-discovery based on my controllers)
  • How would I ensure the subscriptions are healthy (maybe using @nestjs/terminus)

Thanks!

Error: 4 DEADLINE_EXCEEDED: Deadline exceeded Pub Sub & NestJs

I am facing this issue in one of my projects. There are 800 errors from past 1 day. I tried researching on this but not sure what's the root cause.

Error: 4 DEADLINE_EXCEEDED: Deadline exceeded
    
at Object.callErrorFromStatus (/app/node_modules/@grpc/grpc-js/build/src/call.js:31:19)
    
at Object.onReceiveStatus (/app/node_modules/@grpc/grpc-js/build/src/client.js:422:49)
    
at Object.onReceiveStatus (/app/node_modules/@grpc/grpc-js/build/src/client-interceptors.js:328:181)
    
at /app/node_modules/@grpc/grpc-js/build/src/call-stream.js:188:78
    
at process.processTicksAndRejections (node:internal/process/task_queues:77:11)
for call at
    
at ServiceClientImpl.makeBidiStreamRequest (/app/node_modules/@grpc/grpc-js/build/src/client.js:406:34)
    
at ServiceClientImpl.<anonymous> (/app/node_modules/@grpc/grpc-js/build/src/make-client.js:105:19)
    
at /app/node_modules/dd-trace/packages/datadog-instrumentations/src/grpc/client.js:162:25
    
at AsyncResource.runInAsyncScope (node:async_hooks:204:9)
    
at callMethod (/app/node_modules/dd-trace/packages/datadog-instrumentations/src/grpc/client.js:151:26)
    
at ServiceClientImpl.wrapped [as streamingPull] (/app/node_modules/dd-trace/packages/datadog-instrumentations/src/grpc/client.js:92:12)
    
at MessageStream._fillStreamPool (/app/node_modules/@google-cloud/pubsub/build/src/message-stream.js:184:35)

Any help would be appreciated.
Thanks

Please provide a full example

Hi there

Would you please provide a full example in this repo or medium website. For example it's not clear where to put that const client = xx and ...

It will be very helpful

Thank you in advance

No DTO validation possible?

Hi, I've a RPC Controller, expecting for a specific request:

@Controller()
export class EnableSyncRpcController {
  @MessagePattern({ cmd: 'EnableSyncRpcController' })
  async handle(@Payload() request: EnableSyncRpcDto) {
    // ...
  }
}

and its DTO:

import { IsNotEmpty, IsString } from 'class-validator';

export class EnableSyncRpcDto {
  @IsNotEmpty()
  @IsString()
  id!: string;
}

But whatever the payload I give to the send method, there is no error thrown from the DTO.
Any insight on this?

request-repsonse pattern with multiple clients and servers

In an architecture where i have multiple replicas of an http-server and multiple replicas of a microservice.

Say that each http server gets a request via REST and send a message to a microservice to perform a task and waits for its response.
How would you go about it using this lib as a transport?

specifically - if there are multiple http server clients that send messages to the microservices- how do i know that the same http-server that sends a message also gets the response and not the other http-server replica?

do i need to have a unique replyTopice / replySubscription per http-service (aka the publishing client)?

Add support for ClientsModule.register

When talking about ClientsModule, I mean the one exported from @nestjs/microservices package.

Currently, ClientsModule.register call does not support this package. It would be good to support it tho because it would make migrating between different message brokers very easy.

If agreed, I could make a PR that would add the support.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.