Comments (7)
Hey,
You're on the right path, but a couple things are off in your implementation:
- You're instantiating the
avroEncoder
but not using it anywhere. - The data you are writing is just the concatenation of the encoded records' bytes, whereas Avro tools expect a valid object container file (unless you use the
fragtojson
command).
We can fix both issues by substituting the avroType.toBuffer
calls with write
s to the avroEncoder
:
// ...
var avroEncoder = new avsc.streams.BlockEncoder(avroType);
// We pipe the encoder to a local file here (but any writable stream would work
// as well, for example an HTTP request).
avroEncoder.pipe(fs.createWriteStream('foo.avro'));
// We write the records to the block encoder, which will take care of serializing them
// into an object container file.
sourceData.forEach(function (data) {
if (avroType.isValid(data)) {
avroEncoder.write(data);
}
});
avroEncoder.end(); // To signal that we are done writing and flush all data.
foo.avro
should now contain a valid file, readable using Avro tools (for example via the tojson
command).
Finally, in case you are the author of this StackOverflow question, I'll mention that you can use BlockEncoder
's omitHeader
option to append data to an existing Avro container file (and still have it stay valid).
A quick example, building on the previous one:
// We need to know the file's synchronization marker. Here we do it by
// reading the header, but in your use-case you might want to cache it
// somewhere else.
var header = avsc.extractFileHeader(path);
var encoder = new avsc.streams.BlockEncoder(type, {
syncMarker: header.sync,
omitHeader: true
});
// Since we're writing locally, we pass the `'a'` flag when opening the file.
// Depending on how concatenation works in your use-case, you might
// not need to do anything different here.
encoder.pipe(fs.createWriteStream(path, {
defaultEncoding: 'binary',
flags: 'a'
}));
// Write and flush as above...
Let me know if anything is still unclear.
from avsc.
Nice, yeah, making those changes worked. The problem is, I don't really need to save it to the file system per se. I just need a buffer or stream that I can send off to S3. Using fs.createWriteStream
actually creates a file on the file system. Is there a way to get a buffer of the Avro contents instead? The AWS documentation says that it accepts a string, buffer, or "streamObject" as an input... so maybe I could use a stream of sorts.
As for the StackOverflow question -- indeed I am the author, but writing Avro to a Kinesis stream won't really work. If the schema ever changes, I can't just keep appending, and I don't have a way to trigger Kinesis to flush reliability. I ended up just writing raw JSON to Kinesis and then creating a small AWS Lambda service to do the conversion (which is where this code/problem is from) that gets triggered on flush event.
Thanks again for your help!
from avsc.
A BlockEncoder
is also a readable stream, so you should be able to just pass that as input.
From taking a quick look around it seems you'll likely want to use multipart-uploading if your files are large (for example via this library, which will let you pipe
the encoder directly to it).
Also, I'm not familiar with Kinesis but it does sound like arbitrary concatenation will cause issues with Avro's container file format. You could still pipe Avro in though (for example using a RawEncoder
), which would significantly reduce the size of your requests. You could encode a schema version inside the record if you wanted to simplify evolution in the future.
from avsc.
Actually, I think I can use s3's upload
method (instead of putObject
) which should handle this just fine. I didn't realize the BlockEncoder
was also a readable stream.
Cool. Closing this issue. Thanks so much for the help @mtth!!! You rock.
from avsc.
Hi guys,
I'm trying to achieve almost the same with no luck for now. This is part of my code:
const avro = require('avsc');
const EventAvroType = avro.parse({
"name": "event",
"type": "record",
"fields": [
{"name": "id", "type": "string"},
{"name": "app", "type": "string"},
{"name": "event", "type": "string"},
{"name": "ip", "type": "string"},
{"name": "dt", "type": "string"},
{"name": "ts", "type": "long"},
{"name": "user_id", "type": "string"}
]
});
var event = {
id: 'bad482a7dd1471149879d840c10f50f6a43583dc',
app: 'my_app',
event: 'e2',
ip: '192.168.0.1',
dt: '2016-01-01T00:00:22-03:00',
user_id: 'b0d21768-5d6c-11e6-8b77-86f30ca893d3',
ts: 1451617222000
};
var avroEncoder = new avro.streams.BlockEncoder(EventAvroType);
const buf = EventAvroType.toBuffer(event);
avroEncoder.write(buf);
I get the following error:
Error: invalid "string": undefined
at throwInvalidError (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/types.js:2687:9)
at StringType._write (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/types.js:742:5)
at RecordType.writeevent [as _write] (eval at <anonymous> (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/types.js:2004:10), <anonymous>:4:6)
at BlockEncoder._writeValue (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/containers.js:384:18)
at BlockEncoder._writeChunk (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/containers.js:445:8)
at BlockEncoder._write (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/containers.js:437:8)
at doWrite (_stream_writable.js:307:12)
at writeOrBuffer (_stream_writable.js:293:5)
at BlockEncoder.Writable.write (_stream_writable.js:220:11)
at Wrapped.exports.handler (/var/www/html/dwh_lambdas/dwh_backup_raw_events/index.js:52:17)
at Wrapped.runHandler (/usr/lib/node_modules/lambda-wrapper/index.js:46:18)
at Wrapped.run (/usr/lib/node_modules/lambda-wrapper/index.js:75:16)
at Object.<anonymous> (/var/www/html/dwh_lambdas/local-invoke.js:7:8)
at Module._compile (module.js:556:32)
at Object.Module._extensions..js (module.js:565:10)
at Module.load (module.js:473:32)
I just want a Buffer instance which i will pass to http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html#putRecord-property
I will appreciate any help!
Thanks
from avsc.
Hey @plamen-paskov. I'm not familiar with AWS Firehose but if you just need a Buffer
, you can just return the output of EventAvroType.toBuffer(event)
. It'll contain the encoded bytes of event
.
The avroEncoder
is useful if you want to create an Avro container file, typically with multiple records. It will also expect a decoded value rather than a buffer, so you would use it as follows:
var avroEncoder = new avro.streams.BlockEncoder(EventAvroType);
avroEncoder.write(event); // We're writing the event directly instead of a buffer.
// ...
avroEncoder.end();
from avsc.
// Since we're writing locally, we pass the
'a'
flag when opening the file.
// Depending on how concatenation works in your use-case, you might
// not need to do anything different here.
encoder.pipe(fs.createWriteStream(path, {
defaultEncoding: 'binary',
flags: 'a'
}));
Will this method of appending new records work if there is a compression algorithm like snappy or deflate applied?
from avsc.
Related Issues (20)
- How to add custom attributes to serialised schema type eg. sqlType HOT 4
- finish event fires too early HOT 3
- Array with null items supported? HOT 1
- Avro union - remove type information in resulting json HOT 1
- Bun support HOT 1
- Update `snappy` examples in wiki for `snappy` 7.x.x (async) HOT 1
- How to convert decoded avro data into JSON? HOT 1
- IDL not exporting types for array of union HOT 3
- Support ?-syntax for optional fields in avdl HOT 2
- can schema support dynamic keys? HOT 1
- Extending a schema causes a "truncated buffer" error when using fromBuffer HOT 5
- Invalid Avro header does not raise error event HOT 1
- "new SlowBuffer" is deprecated since Node v.6 --> cannot use it with VITE5 and VUE3 HOT 2
- Doesn't handle trailing 0s from buffer. HOT 1
- Unwrapping unions when deserialising HOT 4
- Unable to consume messages produced by Java application with AVRO schema HOT 9
- Using type.isValid() with a union of records HOT 3
- long encoding/decoding is not reversible for some large but safe js ints HOT 1
- Convert String Representations into Logical Type HOT 1
- Which version of the Avro specification does the latest version of avsc (5.7.7) implement? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from avsc.