Giter VIP home page Giter VIP logo

Comments (7)

mtth avatar mtth commented on May 24, 2024 5

Hey,

You're on the right path, but a couple things are off in your implementation:

  • You're instantiating the avroEncoder but not using it anywhere.
  • The data you are writing is just the concatenation of the encoded records' bytes, whereas Avro tools expect a valid object container file (unless you use the fragtojson command).

We can fix both issues by substituting the avroType.toBuffer calls with writes to the avroEncoder:

// ...
var avroEncoder = new avsc.streams.BlockEncoder(avroType);

// We pipe the encoder to a local file here (but any writable stream would work
// as well, for example an HTTP request).
avroEncoder.pipe(fs.createWriteStream('foo.avro'));

// We write the records to the block encoder, which will take care of serializing them
// into an object container file.
sourceData.forEach(function (data) {
  if (avroType.isValid(data)) {
    avroEncoder.write(data);
  }
});
avroEncoder.end(); // To signal that we are done writing and flush all data.

foo.avro should now contain a valid file, readable using Avro tools (for example via the tojson command).

Finally, in case you are the author of this StackOverflow question, I'll mention that you can use BlockEncoder's omitHeader option to append data to an existing Avro container file (and still have it stay valid).

A quick example, building on the previous one:

// We need to know the file's synchronization marker. Here we do it by
// reading the header, but in your use-case you might want to cache it
// somewhere else.
var header = avsc.extractFileHeader(path);

var encoder = new avsc.streams.BlockEncoder(type, {
  syncMarker: header.sync,
  omitHeader: true
});

// Since we're writing locally, we pass the `'a'` flag when opening the file.
// Depending on how concatenation works in your use-case, you might
// not need to do anything different here.
encoder.pipe(fs.createWriteStream(path, {
  defaultEncoding: 'binary',
  flags: 'a'
}));

// Write and flush as above...

Let me know if anything is still unclear.

from avsc.

ee99ee avatar ee99ee commented on May 24, 2024

Nice, yeah, making those changes worked. The problem is, I don't really need to save it to the file system per se. I just need a buffer or stream that I can send off to S3. Using fs.createWriteStream actually creates a file on the file system. Is there a way to get a buffer of the Avro contents instead? The AWS documentation says that it accepts a string, buffer, or "streamObject" as an input... so maybe I could use a stream of sorts.

As for the StackOverflow question -- indeed I am the author, but writing Avro to a Kinesis stream won't really work. If the schema ever changes, I can't just keep appending, and I don't have a way to trigger Kinesis to flush reliability. I ended up just writing raw JSON to Kinesis and then creating a small AWS Lambda service to do the conversion (which is where this code/problem is from) that gets triggered on flush event.

Thanks again for your help!

from avsc.

mtth avatar mtth commented on May 24, 2024

A BlockEncoder is also a readable stream, so you should be able to just pass that as input.

From taking a quick look around it seems you'll likely want to use multipart-uploading if your files are large (for example via this library, which will let you pipe the encoder directly to it).

Also, I'm not familiar with Kinesis but it does sound like arbitrary concatenation will cause issues with Avro's container file format. You could still pipe Avro in though (for example using a RawEncoder), which would significantly reduce the size of your requests. You could encode a schema version inside the record if you wanted to simplify evolution in the future.

from avsc.

ee99ee avatar ee99ee commented on May 24, 2024

Actually, I think I can use s3's upload method (instead of putObject) which should handle this just fine. I didn't realize the BlockEncoder was also a readable stream.

Cool. Closing this issue. Thanks so much for the help @mtth!!! You rock.

from avsc.

plamen-paskov avatar plamen-paskov commented on May 24, 2024

Hi guys,
I'm trying to achieve almost the same with no luck for now. This is part of my code:

const avro = require('avsc');
const EventAvroType = avro.parse({
    "name": "event",
    "type": "record",
    "fields": [
        {"name": "id", "type": "string"},
        {"name": "app", "type": "string"},
        {"name": "event", "type": "string"},
        {"name": "ip", "type": "string"},
        {"name": "dt", "type": "string"},
        {"name": "ts", "type": "long"},
        {"name": "user_id", "type": "string"}
    ]
});

var event = {
    id: 'bad482a7dd1471149879d840c10f50f6a43583dc',
    app: 'my_app',
    event: 'e2',
    ip: '192.168.0.1',
    dt: '2016-01-01T00:00:22-03:00',
    user_id: 'b0d21768-5d6c-11e6-8b77-86f30ca893d3',
    ts: 1451617222000 
};

var avroEncoder = new avro.streams.BlockEncoder(EventAvroType);
const buf = EventAvroType.toBuffer(event);
avroEncoder.write(buf);

I get the following error:

Error: invalid "string": undefined
    at throwInvalidError (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/types.js:2687:9)
    at StringType._write (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/types.js:742:5)
    at RecordType.writeevent [as _write] (eval at <anonymous> (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/types.js:2004:10), <anonymous>:4:6)
    at BlockEncoder._writeValue (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/containers.js:384:18)
    at BlockEncoder._writeChunk (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/containers.js:445:8)
    at BlockEncoder._write (/var/www/html/dwh_lambdas/dwh_backup_raw_events/node_modules/avsc/lib/containers.js:437:8)
    at doWrite (_stream_writable.js:307:12)
    at writeOrBuffer (_stream_writable.js:293:5)
    at BlockEncoder.Writable.write (_stream_writable.js:220:11)
    at Wrapped.exports.handler (/var/www/html/dwh_lambdas/dwh_backup_raw_events/index.js:52:17)
    at Wrapped.runHandler (/usr/lib/node_modules/lambda-wrapper/index.js:46:18)
    at Wrapped.run (/usr/lib/node_modules/lambda-wrapper/index.js:75:16)
    at Object.<anonymous> (/var/www/html/dwh_lambdas/local-invoke.js:7:8)
    at Module._compile (module.js:556:32)
    at Object.Module._extensions..js (module.js:565:10)
    at Module.load (module.js:473:32)

I just want a Buffer instance which i will pass to http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html#putRecord-property
I will appreciate any help!
Thanks

from avsc.

mtth avatar mtth commented on May 24, 2024

Hey @plamen-paskov. I'm not familiar with AWS Firehose but if you just need a Buffer, you can just return the output of EventAvroType.toBuffer(event). It'll contain the encoded bytes of event.

The avroEncoder is useful if you want to create an Avro container file, typically with multiple records. It will also expect a decoded value rather than a buffer, so you would use it as follows:

var avroEncoder = new avro.streams.BlockEncoder(EventAvroType);
avroEncoder.write(event); // We're writing the event directly instead of a buffer.
// ...
avroEncoder.end();

from avsc.

sbwrege2z avatar sbwrege2z commented on May 24, 2024

// Since we're writing locally, we pass the 'a' flag when opening the file.
// Depending on how concatenation works in your use-case, you might
// not need to do anything different here.
encoder.pipe(fs.createWriteStream(path, {
defaultEncoding: 'binary',
flags: 'a'
}));

Will this method of appending new records work if there is a compression algorithm like snappy or deflate applied?

from avsc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.