Giter VIP home page Giter VIP logo

py2k's Introduction

Welcome to Py2k

Tests codecov pypi downloads versions license

A high level Python to Kafka API with Schema Registry compatibility and automatic avro schema creation.

  • Free software: Apache2 license

Installation

Py2K is currently available on PIP:

pip install py2k

Documentation

You can view additional documentation on our website.

Contributing

Please see the Contribution Guide for more information.

Usage

Minimal Example

from py2k.record import PandasToRecordsTransformer
from py2k.writer import KafkaWriter

# assuming we have a pandas DataFrame, df
records = PandasToRecordsTransformer(df=df, record_name='test_model').from_pandas()

writer = KafkaWriter(
    topic="topic_name",
    schema_registry_config=schema_registry_config,
    producer_config=producer_config
)

writer.write(records)

For additional examples please see the examples folder

Features

  • Schema Registry Integration
  • Automatic Avro Serialization from pandas DataFrames
  • Automatic Avro Schema generation from pandas DataFrames and Pydantic objects

License

Copyright 2021 ABSA Group Limited

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

py2k's People

Contributors

danwertheimer avatar dependabot[bot] avatar vesely-david avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

py2k's Issues

KafkaModel and DynamicKafkaModel

The current usage example found in README is like below.

from py2k.models import DynamicKafkaModel
from py2k.writer import KafkaWriter

# assuming we have a pandas DataFrame, df
serialized_df = DynamicKafkaModel(df=df,model_name='test_model').from_pandas()

writer = KafkaWriter(
    topic="topic_name",
    schema_registry_config=schema_registry_config,
    producer_config=producer_config
)

writer.write(serialized_df)

The output from DynamicKafkaModel is called serialized_df but it is not actually serialized in the context of what the library does, as pointed out in #34 .

Also, DynamicKafkaModel is only converting Pandas dataframes, whatever their schemas, to key-value records ready to then be serialized as Avro and dispatched to Kafka.

From that, having Model as part of the name might be slightly misleading. Maybe using something like KafkaFormatter, PandasToKafkaTransformer, etc might be more informative to both, users and contributors.

Users should be able to define their own schema namespace

Currently, users have their namespace automatically created for them at python.kafka.modelname

py2k/py2k/record.py

Lines 66 to 80 in 14d82e5

@staticmethod
def schema_extra(schema: Dict[str, Any],
model: Type['KafkaRecord']) -> None:
schema['type'] = 'record'
schema['name'] = schema.pop('title')
schema['namespace'] = (f'python.kafka.'
f'{schema["name"].lower()}')
schema = process_properties(schema)
schema.pop('properties')
# Dynamically generated schemas might not have this field,
# which is removed anyway.
if 'required' in schema:
schema.pop('required')
update_optional_schema(schema=schema, model=model)

We should allow users to define their own namespace.

something like,

class MyRecord(KafkaRecord):
    name: str
    
    @property
    def namespace():
        return 'my.name.space'

or as an argument to PandasToRecordsTransformer

Add num of tries while pushing to kafka

As of now, we are pushing to Kafka via an infinite loop, which might cause problems in the future.
We should consider defining "number of tries" after which exception will be raised.

py2k/py2k/producer.py

Lines 24 to 39 in dfe8d5e

def produce(self, record):
while True:
try:
self._producer.produce(
topic=self._topic,
key=record.key_to_avro_dict(),
value=record.value_to_avro_dict(),
on_delivery=self._delivery_report
)
self._producer.poll(0)
break
except BufferError as e:
print(
f'Failed to send on attempt {record}. '
f'Error received {str(e)}')
self._producer.poll(1)

Users should be able to update the topic on the writer to a new topic

For example, if I want to write raw data and processed data onto Kafka, into two separate topics, but with the same producer config and schema registry config.

Suggested API

from py2k.writer import KafkaWriter

writer = KafkaWriter(topic='topic1',
                                   producer_config=some_producer_config,
                                   schema_registry_config=some_sr_config)
writer.write(some_records)

writer.topic = 'new_topic'
# OR
writer.update_topic('new_topic')

writer.write(other_records)

Adjust the concept of serialization

In the readme, the example currently available is like below.

from py2k.models import DynamicKafkaModel
from py2k.writer import KafkaWriter

# assuming we have a pandas DataFrame, df
serialized_df = DynamicKafkaModel(df=df,model_name='test_model').from_pandas()

writer = KafkaWriter(
    topic="topic_name",
    schema_registry_config=schema_registry_config,
    producer_config=producer_config
)

writer.write(serialized_df)

This library is about Avro, and in Avro parlance, serialization refers to formatting data as Avro.

In the example above, serialized_df is not in Avro format, so calling it serialized is misleading.

Also, it might be worth to change names across the whole library to refer to serialization only when it is about converting data to Avro, in order to avoid confusing users and contributors.

Work with iterables instead of lists

We've hit memory issues and the main problem seems to be creating a list of Pydantic models out of a large pandas dataframe.

The suggestion is to exchange lists for generators as we discussed earlier.

KafkaWriter fails to write data for boolean types

Checks

  • I added a descriptive title to this issue
  • I have searched (google, github) for similar issues and couldn't find anything

Bug

  • py2k version: 1.8.0
  • Python version: 3.9
  • Operating System: MacOS

Expected Result

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 10.09it/s]

Actual Result

Traceback (most recent call last):
  File "/examples/basic_write_from_pandas.py", line 25, in <module>
    writer.write(records)
  File "/folder/anaconda3/envs/py2k/lib/python3.9/site-packages/py2k-1.8.0-py3.9.egg/py2k/writer.py", line 57, in write
    self._create_producer(records)
  File "/folder/anaconda3/envs/py2k/lib/python3.9/site-packages/py2k-1.8.0-py3.9.egg/py2k/writer.py", line 49, in _create_producer
    self._producer = KafkaProducer(self._topic, producer_config)
  File "/folder/anaconda3/envs/py2k/lib/python3.9/site-packages/py2k-1.8.0-py3.9.egg/py2k/producer.py", line 22, in __init__
    self._producer = SerializingProducer(producer_config.dict)
  File "/folder/anaconda3/envs/py2k/lib/python3.9/site-packages/py2k-1.8.0-py3.9.egg/py2k/producer_config.py", line 35, in dict
    config_build['value.serializer'] = self._serializer.value_serializer()
  File "/folder/anaconda3/envs/py2k/lib/python3.9/site-packages/py2k-1.8.0-py3.9.egg/py2k/serializer.py", line 31, in value_serializer
    return AvroSerializer(
  File "/folder/anaconda3/envs/py2k/lib/python3.9/site-packages/confluent_kafka/schema_registry/avro.py", line 174, in __init__
    schema_dict = loads(schema.schema_str)
  File "/folder/anaconda3/envs/py2k/lib/python3.9/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/folder/anaconda3/envs/py2k/lib/python3.9/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/folder/anaconda3/envs/py2k/lib/python3.9/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 158 (char 157)

Code Snippet

from py2k.record import PandasToRecordsTransformer
from py2k.writer import KafkaWriter
import pandas as pd


df = pd.DataFrame({'name': ['Daniel', 'David', 'Felipe', 'Ruslan'],
                   'is_cool': [False, True, True, True],
                   'value': [27.1, 100.0, 0, 9000.0]})

record_transformer = PandasToRecordsTransformer(
    df, record_name='KafkaRecord')

records = record_transformer.from_pandas()


topic = 'py2k-test-topic'
producer_config = {'bootstrap.servers': '...'}
schema_registry_config = {
    'url': '...'}

writer = KafkaWriter(topic=topic,
                     schema_registry_config=schema_registry_config,
                     producer_config=producer_config)

writer.write(records)

Schema generation of defaults is incompatible with Avro schema

Checks

  • I added a descriptive title to this issue
  • I have searched (google, github) for similar issues and couldn't find anything

Bug

  • py2k version: 1.9.1
  • Python version: Any
  • Operating System: Any

Expected Result

When generating a Schema with a default, it should look like:

{"name": "xxx", "type": ["null", "boolean"], "default": null}

Actual Result

{"name": "xxx", "type": "boolean", "default": null}

Users cannot inspect py2k version using py2k.__version__

Checks

  • I added a descriptive title to this issue
  • I have searched (google, github) for similar issues and couldn't find anything

Bug

  • py2k version: 1.8.2
  • Python version: 3.x
  • Operating System: OSX

Expected Result

>>> import py2k

>>> print(py2k.__version__)
'1.8.2'

Actual Result

>>> import py2k
>>> print(py2k.__version__)
'AttributeError: py2k has no attribute __version__'

Implementation

in __init__.py

__version__ = '1.8.2'

Should find a way to include this in bump2version

Posibility to specify whether key will be included in value

The current implementation includes key data into value without the possibility to choose whether it is necessary.

We propose to add a boolean flag "include_key" to KafkaWriter to enable choosing whether the user wants to have key data included in the value.

The default behavior should be false

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.