Giter VIP home page Giter VIP logo

sns-sqs-big-payload's Introduction

Build PRs Welcome TypeScript NPM version

sns-sqs-big-payload

SQS/SNS producer/consumer library. Provides an ability to pass payloads though s3.

Motivation

Aspecto helps modern development teams solve production issues before they evolve. We collect real production data and perform deep API analysis over it to autogenerate tests and monitor services stability. As a result, we often need to handle large payloads which can't be used with SQS & SNS due to the hard size limit. This library was developed to overcome this challenge - it enables you to manage Amazon SNS & SQS message payloads with Amazon S3 when dealing with payloads larger than 256KB. Key functionality includes:

  • Controlling whether message payloads are always stored in Amazon S3 or only when a message's size exceeds 256KB.
  • Send a message that references a single message object stored in an Amazon S3 bucket.
  • Get the corresponding message object from an Amazon S3 bucket.
  • Handle the interface for large messages between SNS to SQS via S3 bucket in the middle

Installation

npm install sns-sqs-big-payload

Important:

Make sure you also have aws-sdk installed, because it's listed as a peer dependency, so won't be installed automatically.

Usage

The library exports 3 clients:

  • SNS producer
  • SQS producer
  • SQS consumer

All 3 clients are under the same repository since they share a similar contract when sending payloads via S3.

SNS Producer

import { SnsProducer } from 'sns-sqs-big-payload';

const snsProducer = SnsProducer.create({
    topicArn: '<topic-arn>',
    region: 'us-east-1',
    // to enable sending large payloads (>256KiB) though S3
    largePayloadThoughS3: true,
    // Opt-in to enable compatibility with
    // Amazon SQS Extended Client Java Library (and other compatible libraries).
    // see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
    extendedLibraryCompatibility: boolean,
    s3EndpointUrl: '...',
});

await snsProducer.sendJSON({
    // ...
});

SQS Producer

import { SqsProducer } from 'sns-sqs-big-payload';

const sqsProducer = SqsProducer.create({
    queueUrl: '...',
    region: 'us-east-1',
    // to enable sending large payloads (>256KiB) though S3
    largePayloadThoughS3: true,
    // Opt-in to enable compatibility with
    // Amazon SQS Extended Client Java Library (and other compatible libraries).
    // see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
    extendedLibraryCompatibility: boolean,
    s3Bucket: '...',
});

await sqsProducer.sendJSON({
    // ...
});

SQS Consumer

import { SqsConsumer, SqsConsumerEvents } from 'sns-sqs-big-payload';

const sqsConsumer = SqsConsumer.create({
    queueUrl: '...',
    region: 'us-east-1',
    // to enable loading payloads from S3 automatically
    getPayloadFromS3: true,
    s3Bucket: '...',
    // if the queue is subscribed to SNS
    // the message will arrive wrapped in sns envelope
    // so we need to unwrap it first
    transformMessageBody: (body) => {
        const snsMessage = JSON.parse(body);
        return snsMessage.Message;
    },
    // if you expect json payload - use `parsePayload` hook to parse it
    parsePayload: (raw) => JSON.parse(raw),
    // message handler, payload already parsed at this point
    handleMessage: async ({ payload }) => {
        // ...
    },
    // Opt-in to enable compatibility with
    // Amazon SQS Extended Client Java Library (and other compatible libraries).
    // see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
    extendedLibraryCompatibility: boolean,
});

// to subscribe for events
sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
    // ...
});

sqsConsumer.start();

// to stop processing
sqsConsumer.stop();
  • The queue is polled continuously for messages using long polling.
  • Messages are deleted from the queue once the handler function has completed successfully.
  • Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An SQS redrive policy can be used to move messages that cannot be processed to a dead letter queue.
  • By default messages are processed by 10 at a time – a new batch won't be received until the previous one is processed. To adjust number of messages that is being processed in parallel, use the batchSize option detailed below.

Usage in lambda

If you have a lambda function subscribed to sqs queue, you can use SqsConsumer in this case too. This is a short guide.

Compatibility with libraries in other languages

If you turn on extendedLibraryCompatibility, then the library will be compatible with:

Please be careful: This mode is not compatible with the standard mode due to differences in s3 payload.

Credentials

By default the consumer will look for AWS credentials in the places specified by the AWS SDK. The simplest option is to export your credentials as environment variables:

export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...

If you need to specify your credentials manually, you can use a pre-configured instance of the AWS SQS client:

import { SqsConsumer } from 'sns-sqs-big-payload';
import * as aws from 'aws-sdk';

aws.config.update({
    region: 'us-east-1',
    accessKeyId: '...',
    secretAccessKey: '...',
});

const consumer = SqsConsumer.create({
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/account-id/queue-name',
    handleMessage: async (message) => {
        // ...
    },
    sqs: new aws.SQS(),
});

consumer.start();

Events and logging

SqsConsumer has an internal EventEmitter, you can subscribe for events like this:

sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
    // ...
});

It sends the following events:

Event Params Description
started None Fires when the polling is started
message-received message Fires when a message is received (one per each message, not per batch)
message-processed message Fires after the message is successfully processed and removed from the queue
batch-processed None Fires after the current batch of messages is processed.
poll-ended None Fires after the polling cycle is ended. Useful for graceful shutdown.
stopped None Fires when the polling stops
error {err, message} Fires in case of processing error
s3-payload-error {err, message} Fires when an error occurs during downloading payload from s3
s3-extended-payload-error {err, message} Fires when a payload from s3 using extended compatibility is not in expected format
processing-error {err, message} Fires when an error occurs during processing (only inside handleMessage function)
connection-error err Fires when a connection error occurs during polling (retriable)
payload-parse-error err Fires when a connection error occurs during parsing

You can also use this enum if you're using TypeScript

enum SqsConsumerEvents {
    started = 'started',
    messageReceived = 'message-received',
    messageProcessed = 'message-processed',
    batchProcessed = 'batch-processed',
    pollEnded = 'poll-ended',
    stopped = 'stopped',
    error = 'error',
    s3PayloadError = 's3-payload-error',
    s3ExtendedPayloadError = 's3-extended-payload-error',
    processingError = 'processing-error',
    connectionError = 'connection-error',
    payloadParseError = 'payload-parse-error',
}

You may subscribe to those events to add logging for example.

Testing

Since this library heavily relies on AWS APIs, it is less relevant to run an isolated test using mocks. As a result, we recommend testing it using a localstack or by using real SQS queues and SNS topics.

To run localstack on mac:

TMPDIR=/private$TMPDIR docker-compose up

To run unit tests:

npm test

sns-sqs-big-payload's People

Contributors

aspecto-system avatar dependabot[bot] avatar frederikp avatar mottibec avatar mzahor avatar onderceylan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

sns-sqs-big-payload's Issues

Nothing returned for large data

Hi,

I noticed everything works fine accept when I send any data that uses S3 then I will get nothing page.

On handle message, I did:

handleMessage: async ({ payload }) => { console.log({ payload }) },

Payload would return undefined. This only happens when I send any data over 256KB and requires S3. When I check SQS messages and poll, I can see something like:

{"S3Payload":{"Id":"3eaff789-c556-4cbd-8381-acde38e2508a","Bucket":"sqs-event-payload","Key":"3eaff789-c556-4cbd-8381-acde38e2508a.json","Location":"https://event-payload.s3.amazonaws.com/3eaff789-c556-4cbd-8381-acde38e2508a.json"}}

I have getPayloadFromS3 set to true but it doesn't seem like the message is downloaded. Using transformMessageBody returns the raw message as saved on SQS. That is an S3Payload object containing Id, Bucket, Key and Location.

But on calling handleMessage, I get nothing. Any idea what I am doing wrong?

Some of the event handlers cannot be unit tested

Hi again!

I'd like to cover the functionality of the error handlers of my implementation with unit tests, but some of the error handlers are not reachable. AFAIK it's not possible to test them with unit tests currently, as we can't reproduce error scenarios with the public interface of SqsConsumer.

Please see the code below for examples of unreachable code blocks;

sqsConsumer.on(SqsConsumerEvents.error, err => {
  errorHandler('error', queueType, err, logger);
});

sqsConsumer.on(SqsConsumerEvents.connectionError, err => {
  errorHandler('connection_error', queueType, err, logger);
});

sqsConsumer.on(SqsConsumerEvents.s3PayloadError, err => {
  errorHandler('s3_payload_error', queueType, err, logger);
});

Proposition for the solution

To make libraries which depend on event handlers more test friendly, the interfaces that implement EventEmitter might be either updated to publicly expose the EventEmitter instance, or make the class inherit the EventEmitter interface.

Once such a change is in place, it would be possible to trigger the events hence test the event handlers like below example;

sqsConsumer.emit(SqsConsumerEvents.error, new Error('test'));

Please let me know what you think, and if you'd be interested in a PR.
Cheers!

Lambda usage is broken

I've created this issue as a general one for lambda issues.
Since we don't use lambda mode in our organization, it was never properly tested.
I think, the main 2 issues are lowercased keys and error handling.
Lowercased keys can be fixed by an additional callback. Error handling is a matter of documentation.
We may not have spare time to fix this in the nearest future, but we'll gladly accept a PR.

#15 #17 #16

Prefix the JSON file name

Hi,
I want to prefix the JSON file with custom value.

` async sendJSON(message: unknown, options: SqsMessageOptions = {}, jsonprefix): Promise {
const messageBody = JSON.stringify(message);
const msgSize = Buffer.byteLength(messageBody, 'utf-8');

    if ((msgSize > this.messageSizeThreshold && this.largePayloadThoughS3) || this.allPayloadThoughS3) {
        const payloadId = uuid();
        const payloadKey = `${jsonprefix}${payloadId}.json`;`

AWS SDK V3

Since AWS are ending V2 support*. I was wondering if there was any plan to migrate to the latest V3 of AWS SDK for Node.

Otherwise, we could potentially work on that migration should that be something you would be open to?

btw: Thankyou for this project, it's been really helpful to us 🙏

*
NOTE: We are formalizing our plans to enter AWS SDK for JavaScript (v2) into maintenance mode in 2023.
Please migrate your code to use AWS SDK for JavaScript (v3).
For more information, check the migration guide at https://a.co/7PzMCcy

Coud you add s3key prefix in SqsProducer and SnsProducer class

To whom it may concern,
You code is helpful.
It may give the user more flexibility if user can define which directory in s3 bucket she/he can upload the files to.
May I suggest to add
private s3BucketPrefix: string;
this.s3BucketPrefix = s3BucketPrefix;
to SqsProducer and SnsProducer class

in PublishJson or SendJson function, modify payloadKey
const payloadKey = ${this.s3BucketPrefix}/${payloadId}.json;

Regards,
Arthur

BatchRequest functionality in SQS Producer

I already posted this as an idea within the discussions category but posting it here to make it more visible to the other folks.

I was very much impressed with this library and almost decided to use it for our use case which is similar to yours at Aspecto but we currently use SQS.sendMessageBatch function to send a batch of messages. I was wondering it would be nice to implement the same functionality within SQS Producer of this library so we can send messages to SQS in batches helping us with less no. of network calls/connections.

amazon-sqs-java-extended-client alredy supports this functionality - https://github.com/awslabs/amazon-sqs-java-extended-client-lib/blob/master/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java#L759

Let me know your thoughts about this idea.

`extendedLibraryCompatibility` option is not compatible with AWS extended client library for Java

The same problem was reported earlier on issue #19, but the fix does not make sns-sqs-big-payload compatible with the JSON schema used by https://github.com/awslabs/amazon-sqs-java-extended-client-lib.

What's the issue?

Amazon S3 reference in the message body of sns-sqs-big-payload does not match with the JSON schema of the Java library.

The JSON structure that Java client produces is in the following format;

[
  "software.amazon.payloadoffloading.PayloadS3Pointer",
  {
    "s3BucketName": "extended-client-bucket",
    "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx"
  }
]

Whereas the format that sns-sqs-big-payload expects when extendedLibraryCompatibility is enabled is as follows;

{
  "s3BucketName": "extended-client-bucket",
  "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx.json"
}

There are two notable differences;

  1. Java library produces an array, but sns-sqs-big-payload expects an object.
  2. sns-sqs-big-payload uses S3 key directly to fetch the object, and as the Java library's message doesn't have .json extension in the value of s3Key, it cannot resolve the object from S3.

How to reproduce?

Assuming there is an object in S3 with the name 7ac71e8a-35bd-420f-b644-97abf2317e1d.json, running the following script will help reproducing the errors in the library.

import {SQS} from 'aws-sdk';
const sqs = new SQS({region: sqsConfig.region});
await sqs
  .sendMessage({
    QueueUrl: sqsConfig.queueUrl,
    MessageBody:
      '["software.amazon.payloadoffloading.PayloadS3Pointer",{"s3BucketName":"extended-client-bucket","s3Key":"7ac71e8a-35bd-420f-b644-97abf2317e1d"}]',
    MessageAttributes: {
      SQSLargePayloadSize: {
        StringValue: '5198',
        StringListValues: [],
        BinaryListValues: [],
        DataType: 'Number',
      },
    },
  })
 .promise();

The first error you'd hit would be There were 2 validation errors:\n* MissingRequiredParameter: Missing required key 'Bucket' in params\n* MissingRequiredParameter: Missing required key 'Key' in params", because lib cannot map the Key and Bucket fields in the payload due to JSON schema mismatch.

And if you get over it with manual payload transformation, due to not having the .json extension in S3 key name, the lib will hit "NoSuchKey: The specified key does not exist. error.

What's the fix?

When the opt-in extendedLibraryCompatibility option is used, the library should respect the JSON schema which the AWS client library produces. And, it should add an additional postfix while fetching the S3 object.

License questions/problems

Hi there,
as you can see I started using your library and extending your library #26

I looked into the license, because I thought some of the consumer interface/options/code is kind of similar to the bbc consumer: https://github.com/bbc/sqs-consumer

If you took some of the code to start with this project maybe you should consider stating that in the README or maybe it would even be needed to actually include their license, or at least their lines in your license file.

Copyright (c) 2017-present British Broadcasting Corporation

All rights reserved

(http://www.bbc.co.uk) and sqs-consumer Contributors

But maybe you just randomly have a very similar naming of options and stuff because it's obviously derived from the same aws docs.

Nonetheless, looking at your license file I'm seeing:

Copyright [yyyy] [name of copyright owner]
  1. You only need to include the part of the license starting with that line
  2. You should actually add year and owner information

I wasn't sure if I should even open up this issue, because I don't care too much personally. But I thought if more and more people start using it, you might want to know about this, so you can clean it up.

Cheers and thanks for the great library.

Use same Json schema as Java library

Is it possible to use same schema as
https://docs.aws.amazon.com/sns/latest/dg/large-message-payloads.html
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
java libraries for SNS and SQS to be able to produce/consume using nodeJs and consume/produce using Java.

they use next Json
[ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ]

or at least maybe make it configurable

Thanks

How to use inside of a Lambda?

I'd like to use this inside of a Lambda that's triggered from an SQS queue. How would I do that? I don't think polling is an appropriate solution here as AWS has already provided me the SQS message.

SqsConsumer with Lambda didn't work

Hello, I tried use package with my lambda functions, for SqsProducer works fine, it send a payload json to S3 Bucket, but my problem is when I need to consume a message that was sent to S3. I tried like this example: https://github.com/aspecto-io/sns-sqs-big-payload/blob/HEAD/docs/usage-in-lambda.md

But never runs the handleMessage function and always returns undefined response. The transformMessageBody runs but with body param undefined too.

My function getMessage should return a message payload.

  async getMessage (messageProps) {
    const sqsConsumer = SqsConsumer.create({
      region: 'us-east-1',
      getPayloadFromS3: true,
      s3Bucket: 'MY-S3-BUCKET',
      transformMessageBody: (body) => {
        console.log('transformMessageBody', body)
        const snsMessage = JSON.parse(body);
        return snsMessage.Message;
      },
      parsePayload: (raw) => JSON.parse(raw),
      handleMessage: async ({ payload }) => {
        console.log('handleMessage', payload)
      },
    });

    const result = await sqsConsumer.processMessage(messageProps);
    return result;
  }

This is my entire message payload

{
    "Records": [
        {
            "messageId": "238bba99-76e3-4fe2-b777-****",
            "receiptHandle": "BLABLABAL",
            "body": "{\"S3Payload\":{\"Id\":\"a51bed3c-e0d2-45f1-a37a-***\",\"Bucket\":\MY-S3-BUCKET\",\"Key\":\"a51bed3c-e0d2-45f1-a37a-*****.json\",\"Location\":\"https://MY-S3-BUCKET.s3.amazonaws.com/a51bed3c-e0d2-45f1-a37a-****.json\"}}",
            "attributes": {
                "ApproximateReceiveCount": "8",
                "SentTimestamp": "1604921763468",
                "SenderId": "AROA4VG6N6C5XCF3YA7S3:BABALBALBAL",
                "ApproximateFirstReceiveTimestamp": "1604921763468"
            },
            "messageAttributes": {},
            "md5OfBody": "4a125225a9e7016440****",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-1:*****:queueName",
            "awsRegion": "us-east-1"
        }
    ]
}

I send the first object of array Records, like this.

eventBody = await this.queueService.getMessage(event.Records[0]);

I really believe that I missed something but I dont know where. Could you help me?

Thanks.

SNSProducer does not support MessageAttributes when publishing SNS messages

Thanks for making this library available, it is really useful. We are looking to implement this library in our project and came across a small issue. We currently have an events driven architecture, where we use a lot of SNS + SQS to route and fan out messages to different consumers. As multiple SQS queues might be subscribed to an SNS topic, we use MessageAttributes to filter out only the events the consumer needs to process and reduce the amount of lambda invocations.

We currently publish this way:

sns.publish({
  TopicArn: routerTopicArn,
  Message: JSON.stringify(event),
  MessageAttributes: {
    eventName: {
      DataType: 'String',
      StringValue: event.eventName || '',
    },
  },
});

And our SQS subscription filter policy to SNS looks something like this:

{
  "eventName": [
    "validation.create",
    "activity.process"
  ]
}

When looking closer at your SNSProducer we noticed you expose a sendJSON method that creates the JSON string and publishes the message, but does not allow us to pass any other inputs to the core SNS publish method, in our case MessageAttributes.

Would it be possible to add this support? I am happy to provide a patch for this is you accept contributions.

Typo in README file

sns publish method used in readme file is wrong: await snsProducer.sendJSON({

Fix: await snsProducer.publishJSON({

Compress the JSON before sending it through SQS

From a quick skim of the source code it doesn't look like the JSON is compressed before sending it.

This seems like a simple new feature which would allow more messages to go via SQS without needing to be uploaded to S3

Lambda makes the event/records lowercase keys therefore a JSON.parse() error is thrown

Hi, AWS Lambda makes all the keys in the object lowercase, and your Consumer class for example is trying to do something like message.Body however this is undefined, because the key Body is actually body, therefore an error Invalid JSON at position 0 character 'u' is thrown.

I think you cannot change it to 'body' directly since the implementation works otherwise in environment outside Lambda, what I would suggest is to pass a key like integration: 'lambda' when an instance of the class is created and then you know if your class is being used in the context of lambda i.e lowercase record (event) keys.

SQS Producer doesn't support sending MessageAttributes & MessageSystemAttributes when sending a message to sqs.

Thanks to all of the maintainers to create & maintain such a helpful library, there is a similar open issue for SNS & we need similar functionality for SQS as well, as we are using AWS SQS as a trigger to lambda & filtering out messages based on sent MessageAttributes.

The sendJSON method in the SQS produces doesn't support the MessageAtrributes & MessageSystemAttributes sent by the caller as of now.

Can we add this support as well? I would be more than happy to contribute & raise an MR for the fix.

Thanks!

SqsProducer.create timeout

I’m trying to use SqsProducer to send sqs message inside my lambda function. But my function is timed out when I’m using very simple code from readme.

    const sqsProducer = SqsProducer.create({
        queueUrl: "https://sqs.us-east-1.amazonaws.com/1234567890/my-sqs",
        region: process.env.AWS_REGION,        
        largePayloadThoughS3: true,        
        extendedLibraryCompatibility: true,
        s3Bucket: "my-s3",
    });

Any idea why it may happen?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.