Giter VIP home page Giter VIP logo

fastavro's Introduction

fastavro

Build Status Documentation Status codecov

Because the Apache Python avro package is written in pure Python, it is relatively slow. In one test case, it takes about 14 seconds to iterate through a file of 10,000 records. By comparison, the JAVA avro SDK reads the same file in 1.9 seconds.

The fastavro library was written to offer performance comparable to the Java library. With regular CPython, fastavro uses C extensions which allow it to iterate the same 10,000 record file in 1.7 seconds. With PyPy, this drops to 1.5 seconds (to be fair, the JAVA benchmark is doing some extra JSON encoding/decoding).

fastavro supports the following Python versions:

  • Python 3.8
  • Python 3.9
  • Python 3.10
  • Python 3.11
  • Python 3.12
  • PyPy3

Supported Features

  • File Writer
  • File Reader (iterating via records or blocks)
  • Schemaless Writer
  • Schemaless Reader
  • JSON Writer
  • JSON Reader
  • Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
  • Schema resolution
  • Aliases
  • Logical Types
  • Parsing schemas into the canonical form
  • Schema fingerprinting

Missing Features

  • Anything involving Avro's RPC features

Documentation

Documentation is available at http://fastavro.readthedocs.io/en/latest/

Installing

fastavro is available both on PyPI

pip install fastavro

and on conda-forge conda channel.

conda install -c conda-forge fastavro

Contributing

  • Bugs and new feature requests typically start as GitHub issues where they can be discussed. I try to resolve these as time affords, but PRs are welcome from all.
  • Get approval from discussing on the GitHub issue before opening the pull request
  • Tests must be passing for pull request to be considered

Developer requirements can be installed with pip install -r developer_requirements.txt. If those are installed, you can run the tests with ./run-tests.sh. If you have trouble installing those dependencies, you can run docker build . to run the tests inside a Docker container. This won't test on all versions of Python or on PyPy, so it's possible to still get CI failures after making a pull request, but we can work through those errors if/when they happen. .run-tests.sh only covers the Cython tests. In order to test the pure Python implementation, comment out python setup.py build_ext --inplace and re-run.

NOTE: Some tests might fail when running the tests locally. An example of this is this codec tests. If the supporting codec library is not available, the test will fail. These failures can be ignored since the tests will on pull requests and will be run in the correct environments with the correct dependencies set up.

Releasing

We release both to PyPI and to conda-forge.

We assume you have twine installed and that you've created your own fork of fastavro-feedstock.

  • Make sure the tests pass
  • Run make tag
  • Wait for all artifacts to be built and published the the Github release
  • Run make publish
  • The conda-forge PR should get created and merged automatically

Changes

See the ChangeLog

Contact

Project Home

fastavro's People

Contributors

arop avatar barrywhart avatar dacjames avatar dependabot[bot] avatar fthyssen avatar gudjonragnar avatar hroncok avatar jquast avatar kkirsanov avatar ksunden avatar kurtostfeld avatar marcosschroh avatar matpuk avatar mcguipat avatar mtth avatar natb1 avatar nicusor97 avatar nobo728x avatar pkoch avatar pllim avatar regisb avatar rhaarm avatar rodcarroll avatar rouge8 avatar ryan-williams avatar scottbelden avatar serikdm avatar spenczar avatar tebeka avatar theianrobertson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fastavro's Issues

License

I do not see any license included, what license is the project under?

Performance Optimizations: Function calls are taxing

Python incurs significant overhead when functions are called.

Looking at just the writer functionality...

One clear optimization here is not to create functions (e.g. Null/None) when they do nothing. Another option is to possibly replace functions that are just wrappers (.e.g utf8, long and bytes). Probably the most performant optimization is to simply use an if block rather than a map with a function call. You might modify the mapping to identify the struct pack character rather than calling a function.

Based on my quick look at a cython -a <filename>, and then opening up the html file, I can see that all of the functions are still using their python form rather than a pure C form. So one of the worst performance problems of python (function calls) is not being replaced by Cython. That tells me there's a lot of room for improvement.

The Cython solution is to create a hybrid cython file: pyx. And to really focus on creating types and memory allocation. With enough massaging, it won't look much like python at all (nor will it look like C) but it will be fast. It also ends up ruining DRY and you'll have to maintain two files.

However, if you have to maintain two files anyway, the better approach is probably to move to C or possibly Python's C-API for the slow pieces.

How to append records one by one?

Hello,
As example you have something like

records = [
{u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
{u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
{u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
{u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
]

with open('weather.avro', 'wb') as out:
writer(out, schema, records)

First of all, it's impossible to call last line more one one time - I get

Exception in thread "main" org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync!
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:210)
at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:77)
at org.apache.avro.tool.Main.run(Main.java:84)
at org.apache.avro.tool.Main.main(Main.java:73)
Caused by: java.io.IOException: Invalid sync!
at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:198)
... 3 more

in avro tools. And next -I can't see any way to write records one by one (like writer(out, schema, record) or writer.write(out, schema, record)).

Vadym

Rename fastavro.writer

We currently have fastavro.writer module but also a writer object in __init__.py. This makes it hard while debugging to import the writer module.

Support for pypi

Just wondering if are there any plans to make fastavro pip installable.

Support for snappy codec

Stack trace:

Traceback (most recent call last):
  File "get_sig_set.py", line 34, in <module>
    records = get_records_fastavro("avro-files/" + fname)
  File "get_sig_set.py", line 11, in get_records_fastavro
    records = [record for record in reader]
  File "fastavro/_reader.py", line 432, in _iter_avro (fastavro/_reader.c:9237)
ValueError: Unrecognized codec: u'snappy'

Version: 0.12.2
Python: 2.7.13

Data validation not reliable on Python 3.4.0

Fastavro does not raise if it receives an int value for a string field.
This seems to stem from a Python bug, as it works on 3.4.3.
Not sure if this demands a fix in fastavro, but for the record and other users:
The last test test_int_in_string_raises fails.

from io import BytesIO
import fastavro 

schema = {
  "fields": [
    {
      "name": "str_null",
      "type": ["null", "string"]
    },
    {
      "name": "str",
      "type": "string"
    },
    {
      "name": "integ_null",
      "type": ["null", "int"]
    },
    {
      "name": "integ",
      "type": "int"
    }
  ],
  "namespace": "namespace",
  "name": "missingerror",
  "type": "record"
}

def serialize(schema, *records):
    buffer = BytesIO()
    fastavro.writer(buffer, schema, records)
    serialized = buffer.getvalue()
    return serialized

def test_types_match():
    records = [{'str_null': 'str', 'str': 'str', 'integ_null': 21, 'integ': 21}]
    serialize(schema, *records)


def test_string_in_int_raises():
    records = [{'str_null': 'str', 'str': 'str', 'integ_null': 'str', 'integ': 21}]
    try:
        serialize(schema, *records)
    except:
        pass
    else:
        assert False, "Did not raise"

def test_string_in_int_null_raises():
    records = [{'str_null': 'str', 'str': 'str', 'integ_null': 11, 'integ': 'str'}]
    try:
        serialize(schema, *records)
    except Exception as e:
        pass
    else:
        assert False, "Did not raise"

def test_int_in_string_null_raises():
    records = [{'str_null': 11, 'str': 'str', 'integ_null': 21, 'integ': 21}]
    try:
        serialize(schema, *records)
    except:
        pass
    else:
        assert False, "Did not raise"

def test_int_in_string_raises():
    records = [{'str_null': 'str', 'str': 11, 'integ_null': 21, 'integ': 21}]

    try:
        serialize(schema, *records)
    except:
        pass
    else:
        assert False, "Did not raise raise"

fastavro gives a default value for unions with nulls that don't have a default value

This is somewhat related to #48. Given this example schema:

protocol test {
  record a {
    int x;
    union {null, int} y;
  }
}

the following should fail:

>>> data = StringIO.StringIO()
>>> fastavro.writer(data, schema, [{'val': {'x': 1}}])

However, it does not, giving a default value of null to a field ('y') without a default value.

This stems from the use of .get(), which returns None if the key is not found.
The field does not have a default value, so datum.get('default') returns None, which is then used as the default value.

This is just mixing up the lack of a value, with a value of null. Instead, the code should check for the field's existence in the datum, and return false if it doesn't exist.

This is related to #37, in that having write_record call validate() on itself would catch this. Although I imagine the overhead might raise some ire, so there are some options.

Some pull requests will be made in a bit.

Validation of Unions

Hey,
I'm currently running into the following problem:
With a schema like this:

protocol test {
  record a {
    int x;
    int y;
  }

  record b {
    int x;
    int y;
    union {null, int} z;
  }

  record test {
    union {a, b} val;
  } 
}

I can create a file with two objects:

>>> data = StringIO.StringIO()
>>> fastavro.writer(data, schema, [{'val': {'x': 1, 'y': 2}},{'val': {'x': 3, 'y':4, 'z':5}}])

However, because of the way unions are validated, both 'val's are serialized as type 'a', and the 'z' field of the second object is silently dropped.

This can be confirmed by reading in the objects:

>>> data.seek(0)
>>> reader = fastavro.reader(data)
>>> next(reader)
{u'val': {u'y': 2, u'x': 1}}
>>> next(reader)
{u'val': {u'y': 4, u'x': 3}}

Do you have any suggestions for a way of dealing with this problem?

Thanks,
Nick

Booleans should be encoded as 0x01 and 0x00 (vs 0x31 and 0x30)

#17 reported a problem with boolean decoding, and it was addressed by #18 and #19, although I believe the correct solution is to encode the boolean as a byte (1 or 0) rather than a string character ('1' or '0') as documented by the spec: "a boolean is written as a single byte whose value is either 0 (false) or 1 (true)"

Faster Union Write

Hi there,
I'm running into a problem where it takes a long time to output my data.
I've done some digging, and a large portion of the time is a result of my schema having a large union of records in it (24 different records, each of which are fairly complex).
This causes validation to be run on each potential candidate for the union until one succeeds.

Is there something that could be done to improve this performance without having to change the schema?
My code knows ahead of time which candidate needs to be used from the union, so if that information could be passed into write_union somehow that would improve the performance significantly.

For example, if instead of simply providing a dictionary for the record, I provided a tuple of the name and the dictionary, then write_union could simply find the desired record based on the name instead of having to run verification.

What are your thoughts on this?
Thanks,
Nick

logicalType timestamp-millis gotcha

I find it hard to understand why the output of deserializing logicalType timestamp-millis field to datetime object, the microsecond property would contain the millisecond value, thus requiring further division by 1000 to turn it back to correct value -- as the unit test shown:

def test_logical_types():
    data1 = {
        'date': datetime.date.today(),
        'timestamp-millis': datetime.datetime.now(),
        'timestamp-micros': datetime.datetime.now(),

    }
    binary = serialize(schema, data1)
    data2 = deserialize(schema, binary)
    assert (data1['date'] == data2['date'])
    assert (data1['timestamp-micros'] == data2['timestamp-micros'])
    assert (int(data1['timestamp-millis'].microsecond / 1000) ==
            int(data2['timestamp-millis'].microsecond))

What was the original thought behind this?

Allow writer to support lists or tuples (or named tuples) in addition to dict as record type

The writer currently requires a dictionary to process each record entry, but the documentation indicates that it's okay to use an iterable, with no requirement imposed on data structure.

In [1]: import fastavro

In [2]: schema = {"type": "record", "fields": [{"name": "name", "type": "string"}, {"name": "type", "type": "string"}]}

In [3]: records = [
   ...:     ("apple", "fruit"),
   ...:     ("banana", "fruit"),
   ...: ]

In [4]: path = "tmp.avro"

In [5]: with open(path, 'wb') as stream:
   ...:     fastavro.writer(stream, schema, records)
   ...:
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-5-3c54c57bc598> in <module>()
      1 with open(path, 'wb') as stream:
----> 2     fastavro.writer(stream, schema, records)
      3

<redacted>/site-packages/fastavro/_writer.so in fastavro._writer.writer (fastavro/_writer.c:12273)()

<redacted>/site-packages/fastavro/_writer.so in fastavro._writer.writer (fastavro/_writer.c:12052)()

<redacted>/site-packages/fastavro/_writer.so in fastavro._writer.Writer.write (fastavro/_writer.c:11527)()

<redacted>/site-packages/fastavro/_writer.so in fastavro._writer.write_data (fastavro/_writer.c:8450)()

<redacted>/site-packages/fastavro/_writer.so in fastavro._writer.write_record (fastavro/_writer.c:8310)()

<redacted>/site-packages/fastavro/_writer.so in fastavro._writer.write_record (fastavro/_writer.c:8117)()

ValueError: no value and no default for name

In [6]: records_as_dict = [
   ...:     {'name': 'apple', 'type': 'fruit'},
   ...:     {'name': 'banana', 'type': 'fruit'},
   ...: ]

In [7]: with open(path, 'wb') as stream:
   ...:     fastavro.writer(stream, schema, records_as_dict)
   ...:

It would make sense that a tuple, list or namedtuple would be an easy (and possibly preferred) data structure to be used in addition to a dictionary. This is especially true if the data is already in a column-based format (e.g. string.split('\t'), csv read, db read, etc.).

Does fastavro support encoding/decoding without schema?

Is it possible to decode/encode messages without having schema in the message itself? My use case is a lot of small messages, and the schema itself is almost always bigger than the data itself.
I wanted something like apache-avro where I don't have to encode schema in the messae itself and hopefully use something like schema-registry in long term.

Zero Bytes when serializing several large strings

When serializing a large list of urls we saw they got serialized with a lot of \x00 in the serialized byte string and we were not able to deserialize it again.
We could reproduce this problem using the following python3 code:

import random
import string
from io import BytesIO
import fastavro

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

testdata = []
num = 100

for i in range(50):
    testdata.append({"first":  id_generator(num), "second":  id_generator(num), "third":  id_generator(num), "fourth":  id_generator(num)})

schema = {
  "fields": [
    {
      "name": "first",
      "type": "string"
    },
    {
      "name": "second",
      "type": "string"
    },
    {
      "name": "third",
      "type": "string"
    },
    {
      "name": "fourth",
      "type": "string"
    }
  ],
  "namespace": "namespace",
  "name": "zerobyte",
  "type": "record"
}

def serialize(schema, *records):
    buffer = BytesIO()
    fastavro.writer(buffer, schema, records)
    serialized = buffer.getvalue()
    return serialized

print(serialize(schema, *testdata))

Using the reference python avro implementation, this works fine.
If you serialize the lines separately, it also works fine with fastavro.

Drop support for python 2.6?

2.6 has been end of life for over 4 years now. I noticed there's a few places where special handling is done to support 2.6 that could be simplified by removing that support.

Thoughts @tebeka?

Add support for aliases

Currently, fastavro does not support Avro aliases: http://avro.apache.org/docs/current/spec.html#Aliases

Aliases are required for renaming fields, which is a frequent type of schema evolution. At read time, when a field is absent from the writer schema, the field name is looked up among the field aliases.

If you agree this is a feature worth having, I can start working on a PR.

EDIT: simplified the description of this issue to reduce the scope of aliases.

Support BinaryEncoder

It would be nice (at least for me) to support the equivalent of BinaryEncoder() in Apache's Avro. This work is basically done as far as I can tell (in fact my tests involved commenting code out with no additions). But ultimately it's just a method to print the records without any header or length information. I was able to achieve this by simply commenting out any of the writes unrelated to the record itself.

I commented out the following lines in writer.py:
280, 327, 331, 335

Obviously though, editing the library is not the best option, so a way to do this natively would be nice.

Thanks for your work!

Docstrings

Currently the functions within fastavro are not easy to inspect for interactive users.

In [1]: import fastavro

In [2]: fastavro.writer?
Type:        cython_function_or_method
String form: <cyfunction writer at 0x7ff96e693df0>
Docstring:   <no docstring>

I find myself referring to the source code in order to determine suggested inputs. I suspect that a small amount of effort here would significantly increase impact.

Support for time logicalTypes

As per the Avro 1.8.2 spec (https://avro.apache.org/docs/1.8.2/spec.html#Date), there are a number of date/time related logicalTypes, which are serialized as int or long.

Currently fastavro supports conversions between:

  • datetime.date and logicalType date
  • datetime.datetime and logicalTypes timestamp-millis, timestamp-micros

I'd like support added for conversions between:

  • datetime.time and logicalTypes time-millis, time-micros

This seems pretty straightforward:

>>> t = datetime.time(13, 25, 9, 723743)
>>> int(t.hour * 3600000 + t.minute*60000 + t.second*1000 + t.microsecond/1000)
48309723
>>> t.hour * 3600000000 + t.minute*60000000 + t.second*1000000 + t.microsecond
48309723743

Add support for fixed type for logicalType decimals

I.e.:

  "type": 
    {
        "name": "MyFixedDecimal",
        "type": "fixed",
        "size": 8,
        "logicalType": "decimal",
        "precision": 18,
        "scale": 6
    },

I think what has to be done:

Add "fixed-decimal" to LOGICAL_READERS and LOGICAL_WRITERS

"bytes-decimal" is in both of these, but not "fixed-decimal"

Add functions to the above

For the reader, function read_bytes_decimal should be able to handle the fixed-decimals, as it does size = len(data).

For the writer, will need to make a new function. Can take from apache/avro#82, like the current code.

Add "fixed-decimal" to validate

Add:

if record_type == 'fixed':
        return (
            (isinstance(datum, bytes) and len(datum) == schema['size'])
            or (isinstance(datum, decimal.Decimal))
        )

Union calls validate, so you'll need this if use union with "fixed-decimal".

You could monkey patch the reader with:

fastavro._reader.LOGICAL_READERS['fixed-decimal']=fastavro._reader.LOGICAL_READERS['bytes-decimal']

I could attempt a PR.

Add support for writing records asyncronously

Currently the fastavro.writer method expects a list of records and writes them at once. I'd like to be able to write the file header, preserve some context, and then write each record as my application receives data, then make a final call to flush the output when finished.

Composable schemas

The Java Avro Maven plugin lets you compose schemas that may be defined across multiple files using an <import> feature. I was able to do something similar using fastavro with a little utility code:

def load_schema_recursively(schema_file, schema=None):                                 
    if schema is None:
        with open('schema_files/src/main/avro/{}.avsc'.format(schema_file)) as fd:           
            schema = json.load(fd)
    try:  
        fastavro.acquaint_schema(schema)
    except UnknownType as e:
        child_name = e.name.split('.')[1]
        load_schema_recursively(child_name)
        load_schema_recursively(schema_file, schema)
    return schema

Would this kind of functionality be useful in fastavro or is this outside of the scope of what this lib does?

Does fast avro do caching?

When sending/receiving many messages with the same schema, does fastavro do any caching to avoid parsing the json definition multiple times?

Exporting fastavro._writer.Writer

Great library, thanks for your work.

I've got a use case where I do not want to load all of the records into memory before calling fastavro.writer() to write them to a container. Instead, I am creating an instance of fastavro._writer.Writer so I can have one writer object and invoke fastavro._writer.Writer.write and fastavro._writer.Writer.flush myself.

While this works, it feels dirty accessing your private by convention modules.

Are you open to a PR that would add the Writer class to the top-level exports in fastavro/__init__.py?

Write support

Just to know if it's in planning to add write support?

KeyError when trying to read a named type

Hi,

I am trying to use fastavro to read data created from MySQL binlogs. In the data, I have two similar keys before and after. Since their schema is the same, after type is a name (namespace + name, see documentation about names) from the before key.

The error I have is like this:

fastavro/reader.py", line 479, in read_data
    data = READERS[record_type](fo, writer_schema, reader_schema)
KeyError u'my_namespace.my_name'

Am I missing something or isn't this supported?

Thanks in advance for your help.

Parallel Processing

Ken said:

Hey, I'm using your FastAvro module, and it's working great. I'm processing network security data with it - about 500 million records a day right now, and that might grow to 5-20+ billion a day in a couple of years.

At that volume I need to squeeze a bit more performance out of my avro processing. Do you have any thoughts you can share about the feasibility of using threads, multiprocessing or futures to support read parallelism at the block level?

Invalid Avro file causes very slow processing

I have a program that receives binary files that may or may not be Avro encoded.
My idea was to just try to decode the file using fastavro, and if that failed, treat it like a non-Avro file.

try:
    with open("maybe_an_avro_file", 'rb') as fin:
        reader = fastavro.reader(fin)
except Exception:
    #This file cannot be parsed as Avro, handle differently 
    pass

However, there were a few files that caused fastavro to take a very long time trying to read the block count out of the (non-existent) schema header.

For example, a 2M file consumed a CPU for nearly 1 hour before eventually causing a Python OverflowError.

This is not a huge file, but there seems to be something that is not O(n), as it reads the first few bytes rapidly, then the performance rapidly drops.

  File <REDACTED>
    reader = fastavro.reader(input)
  File "<REDACTED>/fastavro/fastavro/reader.py", line 478, in __init__
    self._header = read_data(fo, HEADER_SCHEMA)
  File "<REDACTED>/fastavro/fastavro/reader.py", line 391, in read_data
    return READERS[record_type](fo, writer_schema, reader_schema)
  File "<REDACTED>/fastavro/fastavro/reader.py", line 334, in read_record
    record[field['name']] = read_data(fo, field['type'])
  File "<REDACTED>/fastavro/fastavro/reader.py", line 391, in read_data
    return READERS[record_type](fo, writer_schema, reader_schema)
  File "<REDACTED>/fastavro/fastavro/reader.py", line 273, in read_map
    block_count = read_long(fo)
  File "<REDACTED>/fastavro/fastavro/reader.py", line 162, in read_long
    n |= (b & 0x7F) << shift

This results in a few questions:

  1. Is there a better way to know whether a file is an Avro formatted file, besides attempting to parse it?
  2. Is the non-linearity concerning?

StopIteration raised on truncated data

The following will raise "StopIteration" instead of a "more" valid Exception...

import fastavro
from StringIO import StringIO
fastavro.load(StringIO(), {'fields':[{'type':'boolean', 'name':'a'}], 'type':'record', 'name': 'test'})

This can cause data loss when using fastavro in a for loop. For example when coupled with python's kafka client which process messages in batches, a single bad message in the batch will cause all following messages to be discarded with no exceptions or warnings thrown. The message need not be empty, truncated messages can also trigger the error if the truncation occurs at the boundary of two values.

logicalType date conversion is incorrect

The Avro 1.8.2 spec (https://avro.apache.org/docs/1.8.2/spec.html#Date) says that the date logicalType should be int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar).

The current fastavro code uses return data.toordinal() which is the number of days since 0 January 0001, i.e.

>>> datetime.date(1,1,1).toordinal()
1
>>> datetime.date(1970,1,1).toordinal()
719163
>>> datetime.date(2017,9,8).toordinal()
736580

A more appropriate conversion would be return (data - datetime.date(1970,1,1)).days:

>>> (datetime.date(1,1,1) - datetime.date(1970,1,1)).days
-719162
>>> (datetime.date(1970,1,1) - datetime.date(1970,1,1)).days
0
>>> (datetime.date(2017,9,8) - datetime.date(1970,1,1)).days
17417

Alternatively, return data.toordinal() - 719163 would probably be the fastest.

Random Access

Maris said:

i need to make my files seekable e.g. my record has an ID and i want to jump exactly to that record (instead of iterating across the whole file). It works fine i only need the byte offset of the written file after each record, which is then appended to a dict. In default avro a function aclled sync() does that (in Datafile.py class DataFileWriter(). It essentially forces to write the data and then returns tell() of the file. Is there any chance that something like this will be build in?

Bug: validation fails when writing nullable datetime.date

Small bug. I'm happy to submit a PR, here's the issue:

When writing a datetime.date record to a nullable date field, this will error out:

import fastavro
import datetime

schema = {
    'doc': 'test', 'name': 'test', 'namespace': 'test', 'type': 'record',
    'fields': [{'name': 'date', 'type': ['null', {'type': 'int', 'logicalType': 'date'}]}]}

records = [{'date': datetime.date(2017, 1, 1)}]

with open('test.avro', 'wb') as out:
    fastavro.writer(out, schema, records)

output:

ValueError: datetime.date(2017, 1, 1) (type <class 'datetime.date'>) do not match ['null', {'type': 'int', 'logicalType': 'date'}]

If you write the same data to a non-nullable date field, or else use datetime.datetime, it's fine:

schema = {
    'doc': 'test', 'name': 'test', 'namespace': 'test', 'type': 'record',
    'fields': [{'name': 'date', 'type': {'type': 'int', 'logicalType': 'date'}}]}

records = [{'date': datetime.date(2017, 1, 1)}]

with open('test.avro', 'wb') as out:
    fastavro.writer(out, schema, records)

or

schema = {
    'doc': 'test', 'name': 'test', 'namespace': 'test', 'type': 'record',
    'fields': [{'name': 'date', 'type': ['null', {'type': 'int', 'logicalType': 'date'}]}]}

records = [
    {'date': datetime.datetime(2017, 1, 1)},
    {'date': None}, #Works with Nones too
    ]

with open('test.avro', 'wb') as out:
    fastavro.writer(out, schema, records)

Issue is in fastavro.writer.validate, will put together a PR (should be a small fix I think)

Writing and reading sets boolean to False

I use following schema:

{'fields': [{'name': 'flag', 'type': 'boolean'}],
 'name': 'test',
 'type': 'record'}

to write this datum:

{"flag":True}

Serializing results in this byte string:

b'Obj\x01\x04\x14avro.codec\x08null\x16avro.schema\xa6\x01{"name": "test", "type": "record", "fields": [{"type": "boolean", "name": "flag"}]}\x00:\xea\xf7\x8cV\xa0:4\\\xd0A\x85\xa4\x05\xcf\xe9\x02\x021:\xea\xf7\x8cV\xa0:4\\\xd0A\x85\xa4\x05\xcf\xe9'

and deserializing it gives me this datum:

[{'flag': False}]

i.e. the True has been turned to False. The deserialized schema is intact.

Additionally, writing a datum like {} works just fine (although it should throw an Exception) and when reading back, I again get:

[{'flag': False}]

I'm using python3, but have reproduced the problem on python2 as well. I also tried the pure python version without the compiled cython extensions and have the same problem there. I use a BytesIO as the fo, if this matters.

Avro 1.8 Support

Perhaps this is in the works but I haven't seen any reference to 1.8 yet.

The major addition has been logical types: https://avro.apache.org/docs/1.8.0/spec.html#Logical+Types

In the serialization and deserialization of the avro data in the presence of a logical type of say timestamp-millis it would be nice if you passed a datetime object inside a dictionary that it would be serialized to a long unix timestamp. Similarly you'd get a datetime object back when deserializing with the correct precision set inside.

Naturally logical types should default to types inside the Python standard lib. But just because we use arrow objects, it would also be nice to have the ability to add to the serializers to accommodate different types and override the default deserializer for a given logical type.

Doc incorrectly reports that reader schema validation is unsupported

I just spent a couple hours looking for a good Python package for Avro data (de)serialization. According to the documentation, fastavro was missing reader schema validation (or so I believed) which is an important feature for me. However, after digging inside the reader source code, I discovered that this feature was indeed implemented \o/ (this made me very happy)

I suggest to modify the README and the documentation to reflect this feature. If you would like, I can open a PR.

EDIT: PR was opened

Support for reading from streaming IO

At the moment fastavro.reader calls 'seek' in skip_sync to seek back 16 bytes if it doesn't find an expected sync marker. This stops it working with streaming IO (e.g. from an HTTP connection), which is a shame as avro is really designed to be streamable.

Is this necessary?

From the spec it looks like the sync marker is just there as a data integrity check, you're already told upfront how many records and how many bytes to expect in the current block. So IIUC you should always find the sync marker at the end of the block -- or at least if you don't then it's exception-worthy, not situation where you should rewind and pretend nothing happened.

Currently you're calling skip_sync at the start of every block; you'll always fail to find a sync marker before the first block, really you should skip it after each block. And if you fail to find it, complain.

Does that sound right?

[EDIT: turns out the purpose of the sync marker is to more to allow hadoop to find the nearest block start in a split chunk of an avro file: https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-Whatisthepurposeofthesyncmarkerintheobjectfileformat -- not something you need to worry about in a straightforward streaming setup starting from the beginning of the file ]

Definition differences between python and C

I have a strange error using C compiled fastavro, calling schemaless_reader()

Traceback (most recent call last):
  File "/home/fpietka/.pyenv/versions/fpietka/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 173, in _get_decoder_func
    schemaless_reader(payload, schema_dict)
  File "/home/fpietka/.pyenv/versions/fpietka/lib/python3.6/site-packages/fastavro/reader.py", line 606, in schemaless_reader
    acquaint_schema(schema, READERS)
TypeError: _acquaint_schema() takes 1 positional argument but 2 were given

And when I output the definition I have this:

def _acquaint_schema(schema):
    """Add a new schema to the schema repo.

    Parameters
    ----------
    schema: dict
        Schema to add to repo
    """
    _reader.acquaint_schema(schema)
    _writer.acquaint_schema(schema)

Seems the difference is here:
https://github.com/tebeka/fastavro/blob/e752a9559368ea2d12d17f33263a2c80e8c9409f/fastavro/__init__.py#L60-L69

Following up on #101 what do you suggest me to do?

Thanks

High CPU Utilisation

I am using fastavro for processing live log records. The frequency at which I am ingesting log records is around 1000 records/second. The problem I am facing is high CPU utilization. CPU utilisation is always ranging around 100%. Any suggestion is most welcome

does fastavro support bytes Primitive Type?

I'm using another language avro library writing some avro files with bytes field, I believe the avro files were written correctly because bigquery can load it successfully (https://cloud.google.com/bigquery/loading-data#loading_avro_files)

      {
       "type": "bytes", 
       "name": "SenderIP"
      }, 

http://avro.apache.org/docs/1.8.1/spec.html

however, when I want to use the standard python avro or this fastavro as command line tool to view it, it results a crash like this in utf-8:

{
    "senderInfo": {
        "SenderIP": Traceback (most recent call last):
  File "./.local/bin/fastavro", line 11, in <module>
    sys.exit(main())
  File "/home/.../.local/lib/python2.7/site-packages/fastavro/__main__.py", line 55, in main
    json_dump(record, indent)
  File "/home/.../.local/lib/python2.7/site-packages/fastavro/six.py", line 45, in py2_json_dump
    json.dump(obj, stdout, indent=indent, encoding=_outenc)
  File "/usr/lib/python2.7/json/__init__.py", line 189, in dump
    for chunk in iterable:
  File "/usr/lib/python2.7/json/encoder.py", line 434, in _iterencode
    for chunk in _iterencode_dict(o, _current_indent_level):
  File "/usr/lib/python2.7/json/encoder.py", line 408, in _iterencode_dict
    for chunk in chunks:
  File "/usr/lib/python2.7/json/encoder.py", line 390, in _iterencode_dict
    yield _encoder(value)
  File "/usr/lib/python2.7/json/encoder.py", line 233, in _encoder
    o = o.decode(_encoding)
  File "/usr/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x87 in position 2: invalid start byte

or this if I unset LANG to utf-8, it defaults to ascii

{
    "senderInfo": {
        "SenderIP": Traceback (most recent call last):
  File "./.local/bin/fastavro", line 11, in <module>
    sys.exit(main())
  File "/home/.../.local/lib/python2.7/site-packages/fastavro/__main__.py", line 55, in main
    json_dump(record, indent)
  File "/home/.../.local/lib/python2.7/site-packages/fastavro/six.py", line 45, in py2_json_dump
    json.dump(obj, stdout, indent=indent, encoding=_outenc)
  File "/usr/lib/python2.7/json/__init__.py", line 189, in dump
    for chunk in iterable:
  File "/usr/lib/python2.7/json/encoder.py", line 434, in _iterencode
    for chunk in _iterencode_dict(o, _current_indent_level):
  File "/usr/lib/python2.7/json/encoder.py", line 408, in _iterencode_dict
    for chunk in chunks:
  File "/usr/lib/python2.7/json/encoder.py", line 390, in _iterencode_dict
    yield _encoder(value)
  File "/usr/lib/python2.7/json/encoder.py", line 233, in _encoder
    o = o.decode(_encoding)
UnicodeDecodeError: 'ascii' codec can't decode byte 0x87 in position 2: ordinal not in range(128)

I can confirm when my older code generated the AVRO files without the SenderIP bytes field, the avro / fastavro was able to decode; so the bytes field could be the one that fastavro doesn't understand?

timestamp-millis bug introduced in 0.14. Works in 0.13

The following simple program worked perfectly in fastavro 0.13 and now fails on fastavro 0.14. I presume this is a bug?

from io import BytesIO

import datetime
import fastavro

avro_schema = {
  "type": "record",
  "name": "simple_record",
  "namespace": "org.test",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "timestamp",
    "type": {
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  }]
}

epoch = datetime.datetime.utcfromtimestamp(0)


def datetime_to_epoch_millis(dt):
    return long((dt - epoch).total_seconds() * 1.0e3)


def generate_avro():
    avro_dict = {
        "name": "abc",
        "timestamp": datetime_to_epoch_millis(datetime.datetime(2015, 6, 7, 15, 30, 0))
    }

    with BytesIO() as bytes_io:
        fastavro.schemaless_writer(bytes_io, avro_schema, avro_dict)
        return bytes_io.getvalue()


if __name__ == "__main__":
    print("generate_avro test...")
    avro_binary = generate_avro()
    print("done :)")
    print("length={}".format(len(avro_binary)))

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.