matrus2 / dynamodb-stream-elasticsearch Goto Github PK
View Code? Open in Web Editor NEWMissing blueprint for AWS Lambda. Reads stream from AWS DynamoDB and writes it to ElasticSearch.
License: MIT License
Missing blueprint for AWS Lambda. Reads stream from AWS DynamoDB and writes it to ElasticSearch.
License: MIT License
For performance reasons, and in order to reduce execution time, it would be good if the records could be aggregated per operation and then use the Bulk API to launch all the operations per request instead of one by one.
Is it possible to pass a function that returns a promise as "transform" function?
i.e. my use case would be that the stream is called when table1 get a change. I would have need also some data from table2 to augment the results in ES.
Table2 fetching requires an async operation. For example this one would be an async function as transformer.
const transform = async (body:) => {
constotherId = body.otherId;
const otherObject = await PersistedObject.findWithId(otherId);
return { ...body, extraProperty: otherObject.name };
};
Hello,
Thanks for this useful package, I am having a problem and I am not sure if it is down to me or the plugin.
Before inserting an item in my lambda I check if the index exists and if does not exists I create it with a mapping configuration.
After the items get inserted from dynamodb though the mapping that I set an index creation time is completely overridden. In particular, I am trying to use an English analyzer for some text fields but when I call the get mapping API the mapping for the index does not contain any analyzer mapping.
Am I doing something wrong?
When trying to run the lambda connected to a Dnyamo Stream.
It correctly fetches the events from Dynamo. But it not able to to write into ES with the following error:
The ES is public accesible and allows * on the JSON for PoC purposes.
Any idea?
Thxs!
2020-05-28T04:12:28.790Z 583cc742-a8c5-49ac-9933-7f3549820978 ERROR Invoke Error
{
"errorType": "Error",
"errorMessage": "Error Error: ResponseError: security_exception",
"stack": [
"Error: Error Error: ResponseError: security_exception",
" at _homogeneousError (/var/runtime/CallbackContext.js:13:12)",
" at postError (/var/runtime/CallbackContext.js:30:54)",
" at callback (/var/runtime/CallbackContext.js:42:7)",
" at /var/runtime/CallbackContext.js:105:16",
" at /var/task/streams.js:12:7",
" at processTicksAndRejections (internal/process/task_queues.js:97:5)"
]
}
After removing field from item in dynamo, it does not get removed in elasticsearch when using useBulk:true
.
With useBulk:false
the attribute gets removed properly from the document.
Hi there,
I've noticed only now that types are still present in the codebase, at first I thought it would be fine, as they are only deprecated in ES 7.10 but I am actually hitting some issues with some geo_point mappings on my ES cluster, because of the 'now wrong' way the documents are saved the engine can't recognise my mapping and throws errors.
Give the possibility to setup refresh parameter to the user.
Currently it is set to true by default.
Hello 👋
I’d like to look into refactoring or providing some definitions to this library. Okey, so I’ve been using this for a good while now, and I’m super happy with it! One benefit is that it just works, and I just drop the thing into a Lamda function and of we go.
Recently, the industry has praised (IMHO) TypeScript and we’ve seen many projects having it. I don’t think that alone makes a reason to refactor this project, but the fact that we are now seeing Pulumi. It utilizes TypeScript to define infrastructure, and this project can be an essential tools to make that experience as best as it can be.
I would be happy to rough out a starting point for the refactor (keeping the current API, or course), but before I do, I would like to check with any developers that find this issue (maybe searching for types to the library) and the maintainers. Should we do any of this? What other gains are there? What about pitfalls?
After an update, the type is always "_doc"
Your documentation mention that if nothing is defined, the type & the index are equal to the table name.
It was true with 2.3.0 but not anymore.
so, this generate this error during execution :
Rejecting mapping update to [user] as the final mapping would have more than 1 type: [_doc, user]
here the first item one my actual ES
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "user",
"_type": "user",
"_id": "5c61386b-d7bb-4349-9655-1c81e222c060",
Could you add the opportunity to set config options not only endpoint
, but and credentials
like this https://stackoverflow.com/a/53708779/6011421 (from Lambda)?
Temporarily, I ported a part of the code from node_modules to lambda to be able to configure the ElasticSearch parameters. Hope to improve your code.
(+) 1 vulnerabilities found
│ Path │ [email protected] > [email protected] > [email protected]
│ More Info │ https://nodesecurity.io/advisories/577
Reference:
elastic/elasticsearch-js#630
The Elasticsearch Long field supports 64-byte signed integers.
https://www.elastic.co/guide/en/elasticsearch/reference/7.16/number.html
For example, I stored the following large integer in DynamoDB from a Java application, and after DynamoDB Streams was started, when Lambda using this library processed the record, some digits were missing as shown below. The document is saved in Elasticsearch.
This happens because the DynamoDB record is converted to the JavaScript Number type when it is unmarshalled.
The integer part of type Number only supports integers up to 53 bits (9007199254740991 to be exact), so if this value is exceeded, it will be rounded.
When using useBulk: true
together with refresh: false
still sends ?refresh=true
in the bulk request.
This was observed in version 2.10.0 and looks like the error still exist on:
, setting the wrong keytoUpsert
instead of refresh
.
I tried to start a PR but I get version conflicts when attempting to install the node modules, which is to be honest too much context for me to delve into right now so I'm hoping just opening an issue is okay.
at 12:45:30 ❯ npm install
npm ERR! code ERESOLVE
npm ERR! ERESOLVE unable to resolve dependency tree
npm ERR!
npm ERR! While resolving: [email protected]
npm ERR! Found: @elastic/[email protected]
npm ERR! node_modules/@elastic/elasticsearch
npm ERR! @elastic/elasticsearch@"7.10.0" from the root project
npm ERR!
npm ERR! Could not resolve dependency:
npm ERR! peer @elastic/elasticsearch@">=5.6.16 <=7.9.1" from @acuris/[email protected]
npm ERR! node_modules/@acuris/aws-es-connection
npm ERR! @acuris/aws-es-connection@"2.1.0" from the root project
npm ERR!
npm ERR! Fix the upstream dependency conflict, or retry
npm ERR! this command with --force, or --legacy-peer-deps
npm ERR! to accept an incorrect (and potentially broken) dependency resolution.
We have a cluster which is configured to use an user and password for authentication. In the previous versions of your package we were able to use it by using the testMode
constant. eg:
pushStream( { event, endpoint: "https://" + ES_DOMAIN, testMode:true, index: "dynamodb-sources", type: "dynamo", transformFunction: modifyRecord, useBulk:true,
elasticSearchOptions: {auth: {
username: ES_USER,
password: ES_PASS
} }
} )
In this commit this flag is removed: c99e008#diff-ac1fa144d8a26929871e9bd0a2dc01736ee07d205abedd47bf638c69907fdfd7R8 .
It would be nice to have still have some flag that will allow to skip the default AWSConnection
and connect using the user and password authentication.
Getting this error, looks like it has somthing to do with the async
Function Logs
Syntax error in module 'Products/DynamoDBTrigger/index': SyntaxError
exports.pushStream = async (
SyntaxError: Unexpected token (
at createScript (vm.js:56:10)
at Object.runInThisContext (vm.js:97:10)
at Module._compile (module.js:542:28)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
at Function.Module._load (module.js:438:3)
at Module.require (module.js:497:17)
at require (internal/module.js:20:19)
at Object. (/var/task/Products/DynamoDBTrigger/index.js:1:86)
Hello,
First of all thanks for the library, it is a month that I am using it and it is really great.
Now I have a problem that is more an issue with ES rather than a problem with the library (once again thanks so much).
The problem is that sending the info to ES in a basic way ES brake the fields in many parts and makes the ordering almost useless. I have a bit of problem also with the search because I would love to perform partial searches with the wildcard * so If I have a user John Done I can search Joh and I will find, along with others, John Doe.
I was reading this link
https://stackoverflow.com/questions/34493947/elasticsearch-is-not-sorting-the-results
And it seems a nice solution, I know that this library supports a transform function so I imagine I could do this operation there?
Adding fields to the object in order to make the ordering easier?
Any suggestion on how to enable the wildcard too?
Thanks again for the help and the fantastic library
When the same event id INSERT and REMOVE in the events list, pushStream just inserts the event, and does not remove it.
{
"Records": [
{
"eventID": "586a2bf8b63d6c530ee0f4ebba4007c1",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-2",
"dynamodb": {
"ApproximateCreationDateTime": 1662442966,
"Keys": {
"_key": {
"S": "7360f3c9a7_project"
}
},
"NewImage": {},
"SequenceNumber": "2308098300000000006482232216",
"SizeBytes": 2316,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-"
},
{
"eventID": "4861b620ed2fbc718695b46ecd2e35aa",
"eventName": "REMOVE",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-2",
"dynamodb": {
"ApproximateCreationDateTime": 1662442966,
"Keys": {
"_key": {
"S": "7360f3c9a7_project"
}
},
"SequenceNumber": "2308098400000000006482232285",
"SizeBytes": 22,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-"
}
]
}
This event list just adding the event, does not removing it
Hi,
I just set up the Lambda according to the example in the readme.
Every time the Lambda tries to push an event to ES I get below error:
Invoke Error { "errorType": "Error", "errorMessage": "Error Error: ResponseError: Response Error", "stack": [ "Error: Error Error: ResponseError: Response Error", " at _homogeneousError (/var/runtime/CallbackContext.js:13:12)", " at postError (/var/runtime/CallbackContext.js:30:54)", " at callback (/var/runtime/CallbackContext.js:42:7)", " at /var/runtime/CallbackContext.js:105:16", " at /var/task/dynamodbtoelasticsearch.js:13:9", " at processTicksAndRejections (internal/process/task_queues.js:97:5)" ] }
This is the event that comes in from DynamoDb stream:
{
"Records": [
{
"eventID": "85768e0266331b54312689af3a20ee54",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "eu-west-1",
"dynamodb": {
"ApproximateCreationDateTime": 1591888165,
"Keys": {
"id": {
"S": "a2d60de1-dac9-4ace-b291-34e7650a25a3"
},
"account": {
"S": "pj5bdbYHjSUoKfucCtHcRZ"
}
},
"NewImage": {
"comments": {
"L": [
{
"M": {
"text": {
"S": "Tijdens haar stage op de afdeling Moordzaken van het plaatselijk Openbaar Ministerie, kwam Allison DuBois erachter dat ze een speciale gave had. Als ze de foto's en ander materiaal van de plaats delict klaarlegde voor de rechtszaak, zag ze de misdaad door de ogen van de dader. In plaats van haar gave te onderdrukken, besloot ze haar mogelijkheden ten volle te benutten. Nu helpt ze justitie bij het vinden van lichamen, het ontdekken van bewijs en het oplossen van zaken."
}
}
}
]
},
"seriesindex": {
"N": "0"
},
"languages": {
"L": [
{
"M": {
"code": {
"S": "nld"
}
}
}
]
},
"datas": {
"L": [
{
"M": {
"name": {
"S": "a2d60de1-dac9-4ace-b291-34e7650a25a3"
},
"format": {
"S": "epub"
}
}
}
]
},
"title": {
"S": "Kus ze niet vaarwel"
},
"type": {
"S": "book"
},
"tags": {
"L": [
{
"M": {
"name": {
"S": "Esoterie"
}
}
}
]
},
"cover": {
"M": {
"name": {
"S": "a2d60de1-dac9-4ace-b291-34e7650a25a3"
},
"format": {
"S": "jpg"
}
}
},
"lastmodified": {
"S": "2011-09-04T11:56:38.237+02:00"
},
"ratings": {
"L": [
{
"M": {
"value": {
"N": "10"
}
}
}
]
},
"publishers": {
"L": [
{
"M": {
"name": {
"S": "A.W. Bruna LeV."
}
}
}
]
},
"id": {
"S": "a2d60de1-dac9-4ace-b291-34e7650a25a3"
},
"publishingdate": {
"S": "2010-03-07T00:00:00+01:00"
},
"account": {
"S": "pj5bdbYHjSUoKfucCtHcRZ"
},
"authors": {
"L": [
{
"M": {
"name": {
"S": "Allison Dubois"
}
}
}
]
}
},
"SequenceNumber": "100000000058339634495",
"SizeBytes": 1014,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:eu-west-1:xxxxxxxxxxxx:table/xxxxxxxxx/stream/2020-06-11T14:11:26.034"
}
]
}
The error is coming from the await es.index({ index, type, id, body, refresh })
Tried many times with different settings, changed in config, etc. but I keep on getting the same error. I also enabled the error logging for ES but that did not provide any insights.
Love using your libary. but i hit a problem that i am not able to solve.
im trying to push a simple dymanodb stream to elasticsearch.
running elasticsearch 7.1
but im getting a error, how can i solve this problem?
error:
ResponseError: mapper_parsing_exception at IncomingMessage.<anonymous> (/var/task/node_modules/@elastic/elasticsearch/lib/Transport.js:232:25) at IncomingMessage.emit (events.js:327:22) at IncomingMessage.EventEmitter.emit (domain.js:482:12) at endReadableNT (_stream_readable.js:1221:12) at processTicksAndRejections (internal/process/task_queues.js:84:21) { meta: { body: { error: [Object], status: 400 }, statusCode: 400, headers: { date: 'Thu, 27 Aug 2020 17:01:06 GMT', 'content-type': 'application/json; charset=UTF-8', 'content-length': '457', connection: 'keep-alive', 'access-control-allow-origin': '*' }, warnings: null, meta: { context: null, request: [Object], name: 'elasticsearch-js', connection: [Object], attempts: 0, aborted: false } } } { "Records": [ { "eventID": "f528a40a996f0952a9a99902b902d98c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "eu-west-1", "dynamodb": { "ApproximateCreationDateTime": 1598547664, "Keys": { "id": { "S": "e5271750-5933-42ad-8d0c-d6ba75676067" } }, "NewImage": { "changeDate": { "S": "2020-08-27T17:01:04.240Z" }, "description": { "S": "wow23" }, "id": { "S": "e5271750-5933-42ad-8d0c-d6ba75676067" }, "title": { "S": "me3" }, "type": { "S": "request" }, "createDate": { "S": "2020-08-27T17:01:04.240Z" }, "status": { "S": "active" }, "tags": { "L": [ { "S": "#test" } ] } }, "SequenceNumber": "15241300000000009393215864", "SizeBytes": 204, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:eu-west-1:990669775596:table/Request-gitwanted-api-dev/stream/2020-08-24T10:45:06.825" } ] }
I had been using dynamo2es-lambda and saw this module is more lightweight and relies on less dependencies than the other module. However this module routes all records to a fixed index and type in elasticsearch.
The other module dynamod2es-lambda has this concept of indexField and typeField in configuration that allows each incoming record to be routed to different indexes or types depending on the record's contents. I am wondering if this feature will be difficult to add.
Example:
DynamoDB source table: "fruit_samples"
Configuration:
indexField: ["genus", "species"]
typeField: "genus"
seperator: "-"
Elasticsearch destinations:
index: "citrus-citrus_limon", type: "citrus"
index: "citrus-citrus_sinensis", type: "citrus"
index: "prunus-prunus_domestica", type: "prunus"
Hello,
I am trying your library but every time I perform an operation on the DB I am getting the following error:
"errorType": "Error",
"errorMessage": "Error Error: MapperParsingException[object mapping for [lesson] tried to parse field [description] as object, but got EOF, has a concrete value been provided to it?] :: {\"path\":\"/language/lesson/156f7a4b-91b8-4bbb-bdb8-7085a344894e\",\"query\":{\"refresh\":\"true\",\"timeout\":\"5m\"},\"body\":\"{\\\"description\\\":\\\"Micia miao del gatto\\\",\\\"lessonId\\\":\\\"156f7a4b-91b8-4bbb-bdb8-7085a344894e\\\",\\\"language\\\":\\\"en-GB\\\",\\\"video\\\":\\\"http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4\\\",\\\"title\\\":\\\"Micia theory\\\",\\\"type\\\":\\\"lessons\\\",\\\"tags\\\":[\\\"it\\\",\\\"games\\\",\\\"videogames\\\",\\\"software\\\",\\\"friendship\\\"]}\",\"statusCode\":400,\"response\":\"{\\\"error\\\":\\\"MapperParsingException[object mapping for [lesson] tried to parse field [description] as object, but got EOF, has a concrete value been provided to it?]\\\",\\\"status\\\":400}\"}",
"stack": [
"Error: Error Error: MapperParsingException[object mapping for [lesson] tried to parse field [description] as object, but got EOF, has a concrete value been provided to it?] :: {\"path\":\"/language/lesson/156f7a4b-91b8-4bbb-bdb8-7085a344894e\",\"query\":{\"refresh\":\"true\",\"timeout\":\"5m\"},\"body\":\"{\\\"description\\\":\\\"Micia miao del gatto\\\",\\\"lessonId\\\":\\\"156f7a4b-91b8-4bbb-bdb8-7085a344894e\\\",\\\"language\\\":\\\"en-GB\\\",\\\"video\\\":\\\"http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4\\\",\\\"title\\\":\\\"Micia theory\\\",\\\"type\\\":\\\"lessons\\\",\\\"tags\\\":[\\\"it\\\",\\\"games\\\",\\\"videogames\\\",\\\"software\\\",\\\"friendship\\\"]}\",\"statusCode\":400,\"response\":\"{\\\"error\\\":\\\"MapperParsingException[object mapping for [lesson] tried to parse field [description] as object, but got EOF, has a concrete value been provided to it?]\\\",\\\"status\\\":400}\"}",
" at _homogeneousError (/var/runtime/CallbackContext.js:12:12)",
" at postError (/var/runtime/CallbackContext.js:29:51)",
" at callback (/var/runtime/CallbackContext.js:40:7)",
" at /var/runtime/CallbackContext.js:103:16",
" at i.then.catch.e (/var/task/src/elastic_search/handler.js:9:261565)",
" at process._tickCallback (internal/process/next_tick.js:68:7)"
]
}
Thanks in advance for the help and the library :)
While useBulk option is present in js code, it's missing in the typyings.
So I was thinking, let's say that index & type is not defined maybe we could just set it as the dynamodb table name both of them?
And if only an index is defined it will duplicate to type?
Hi there,
as I wrote the bulk logic last year I've realized that there's an issue: the bulk operation on ES does not throw an error like the other ones, but rather 'collects' the errors in an object.
We should throw instead.
I have a PR ready 😄
I'm no Lambda expert, but I believe I have configured everything correctly according to the instructions provided.
My environment variables:
ES_ENDPOINT | https://my-endpoint-url.us-west-2.es.amazonaws.com |
---|---|
INDEX | highlights-index |
TYPE | Highlights |
And the logs:
2020-10-14T12:11:23.449-07:00 | 2020-10-14T19:11:23.449Z c4a28f98-ebb0-45ec-83bc-6fa20da6035b INFO Received event: 10 Records
-- | --
| 2020-10-14T12:11:53.472-07:00 | END RequestId: c4a28f98-ebb0-45ec-83bc-6fa20da6035b
| 2020-10-14T12:11:53.472-07:00 | REPORT RequestId: c4a28f98-ebb0-45ec-83bc-6fa20da6035b Duration: 30029.74 ms Billed Duration: 30000 ms Memory Size: 1088 MB Max Memory Used: 38 MB
| 2020-10-14T12:11:53.472-07:00 | 2020-10-14T19:11:53.471Z c4a28f98-ebb0-45ec-83bc-6fa20da6035b Task timed out after 30.03 seconds
Hello, seems like there is a minor mistake in Typescript definition - pushStream function actually returns a Promise, not void. It results in error when trying to then or catch it.
This package is based on a soon deprecated version of elasticsearch official client. Do you have any plan for migration? Do you accept PRs?
Hello,
Thanks for this package it works great.
I am having some issues with the Serverless Framework when building the Lambda function with Webpack it creates a 20MB artefact. AWS SDK is included in Lambda and it is not something that we need to directly have as a dependency. It is still needed for testing but it should be possible that packages exclude it from the artefact when deploying with Webpack.
There is a related issue here serverless-heaven/serverless-webpack#292
Their solution is to use peerDependencies
The most correct way is, that modules that require the aws-sdk define them as peerDependencies. So the le-* dependencies should have aws-sdk as peerDependency, which in turn requires the using module (your service) to have it as either "devDependecy" or "dependency" to fulfill the requirement.
Then the forceExclude will work as expected.
This is the same as with serverless-webpack and the webpack dependency.
As long as the modules have the sdk in their production dependencies it could additionally come to version conflicts and even duplications of the aws-sdk.
How would you feel about this change?
We are running into an issue where items are not being removed from ES Index properly. So hence we are getting a bunch of "ghost" documents that has no matching item in DynamoDB.
Just wondering if this is an issue that anyone else has seen? It's an intermittent issue, so the only thing I can think of is it may be a race condition issue. When creating and deleting happens, although the streams should be sending the data linearly AFAIK
Hello,
At first, Thanks for your good job and I'd like to use this module to load our DDB data to Amazon ES service.
But I am getting this error log in CloudWatch:
{
"errorMessage": "Authorization Exception :: {\"path\":\"/fliphire/application/060aeb59-d120-4160-a365-ef1aaeb34f503bb9bfb0-8ff7-40a5-9c52-35210683a18d\",\"query\":{\"refresh\":true},\"statusCode\":403,\"response\":\"\"}",
"errorType": "Error",
"stackTrace": [
"exports.pushStream (/var/task/node_modules/dynamodb-stream-elasticsearch/src/index.js:41:17)",
"<anonymous>",
"process._tickDomainCallback (internal/process/next_tick.js:228:7)"
]
}
Here is my code snippet how I am using your library:
'use strict';
/**
* Lambda trigger for DDB table stream to load data from DDB to ES
*/
const { pushStream } = require('dynamodb-stream-elasticsearch');
const { ES_ENDPOINT, INDEX } = process.env;
module.exports.handler = async event => {
console.log(JSON.stringify(event));
try {
await pushStream({ event, endpoint: ES_ENDPOINT, index: INDEX, type: 'application' });
console.log(`Successfully processed ${event.Records.length} records.`);
} catch (error) {
console.error(`${error}`);
throw error;
}
};
Can you help me why I am getting this?
Thanks
Hello @matrus2 . Thank you for your code. Could you add the opportunity to choose which fields to send to ElasticSearch (from Lambda)? That is, ability write to ElasticSearch some fields from the object of Image Stream.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.