Giter VIP home page Giter VIP logo

fluvio-client-python's People

Contributors

atesio-goetz avatar bencleary avatar dariogoetz avatar dependabot[bot] avatar digikata avatar morenol avatar nacardin avatar ozgrakkurt avatar shylock-hg avatar simlay avatar taziksh avatar urbit-pilled avatar viniarck avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fluvio-client-python's Issues

Python Iterator Cleanup

Right now, to use a PartitionConsumerStream as a python iterator, we do a work around of creating a python class PartitionConsumerStreamIterator:

class PartitionConsumerStreamIterator:
    def __init__(self, stream):
        self.stream = stream

    def __iter__(self):
        return self

    def __next__(self):
        return self.stream.next()

where stream is the PartitionConsumerStream. It'd be more ideal if PartitionConsumerStream had the __iter__ and __next__ methods.

Rebuild fluvio-client-python on PyO3

The 'rust-cpython crate that the fluvio python client is currently built upon is no longer maintained and is EOL for Python 3.11.

Rebuild the fluvio python client on top of the PyO3 rust to python binding framework. For the initial rebuild, a simple wrapping of the fluvio rust crate in order to connect, consume and produce (preferably async) would be a good start.

Add documentation

Documentation should be hosted on gh-pages or read-the-docs or something.

Expose `RecordMetadata` after `send`

I have a use-case in which a fluvio producer needs to keep track of the offsets (and partitions) that it sent its messages to. To that end, I would like to expose the RecordMetadata struct to the python client.

If I understand correctly, the TopicProducer.send method (in rust) generates a ProduceOutput struct, which provides a wait method in the end yielding the desired RecordMetadata.

I see two general ways forward to expose such functionality in the python client:

  1. Introduce both a ProduceOutput and a RecordMetadata to the python world, effectively mirroring the rust variants.
    This has a weird caveat, though: The ProduceOutput.wait method consumes self, inhibiting any further use of the ProduceOutput object. If a ProduceOutput would also live in the python world, such a "consumption" is not possible to have (if I understand PyO3 correctly).
    This could be worked around by wrapping the "inner" rust-version of the ProduceOutput in a Cell
#[pyclass]
pub struct ProduceOutput {
    inner: Cell<NativeProduceOutput>,
}

and then using take in the corresponding wait method:

#[pymethods]
impl ProduceOutput {
    fn wait(&self) -> Result<RecordMetadata, FluvioError> {
        let inner = self.inner.take();
        Ok(run_block_on(inner.wait()).map_err(error_to_py_err)?)
    }
*
  1. Another option would be to not expose ProduceOutput and directly provide some method send_and_wait_for_metadata (a proper name would need to be found) that calls wait on the result itself and then returns a RecordMetadata object.

Do you have a good idea, how to address this issue?

I would be willing to add a PR for either of the approaches (or another one, if you have an idea).

Python client async and admin upgrade

Async

Add python async capable versions of the following fluvio python client capabilities.

  • PartitionConsumer

    • stream
    • stream_with_config
  • MultiplePartitionConsumer

    • stream
    • stream_with_config
  • TopicProducer

    • send
    • flush
    • send_all

The producer and consumer capabilities should also be able to be instantiated with config options in order to run with smartmodules. See Fluvio rust examples where config structs can specify smartmodules and their parameters to execute.

The async additions should not interfere the simpler sync versions . If it makes the implementation easier, a set of async classes could be created e.g. AsyncPartitionConsumer. Though if possible a single class is preferred with methods to create sync and async produce and consume objs.

Admin

Add the ability to run the Fluvio admin commands.

Map rust crate admin methods https://docs.rs/fluvio/latest/fluvio/struct.FluvioAdmin.html

  • connect
  • connect_with_config

Since the create, delete, list methods are generic over various object types, they should work for combinations available to the fluvio cli:

  • topics (TopicSpec)

    • create
    • delete
    • list
    • list_with_params
    • watch
  • smartmodules (SmartModuleSpec)

    • create
    • delete
    • list
    • watch
  • partitions

    • list

Reference links

Performance degradation on python producer.send with large number of records

slack thread: https://infinyon.slack.com/archives/CQ8UX6SBH/p1684349700779719

simple producer in python:

#!/usr/bin/env python3
from datetime import datetime
from fluvio import Fluvio
import time 

PARTITION = 0

if __name__ == "__main__":
   # Connect to cluster
   fluvio = Fluvio.connect()

   producer = fluvio.topic_producer("simple")

   start_time = time.time()

   for x in range(10000):
       producer.send_string("{}: timestamp: {}".format(x, datetime.now()))

   # Flush the last entry
   producer.flush()
   print("completed in  {} seconds", (time.time()-start_time))

10K records pushed in 1.2153611183166504 seconds, when if you increase the range to 100000
python3 producer100K.py
The same code takes 64.67606687545776 to complete.
Rust producer doesn't have such behaviour.

Support fluvio-cloud login with oauth flow

$ fluvio cloud login --use-oauth2
A web browser has been opened at https://infinyon-cloud.us.auth0.com/activate?user_code=LBHV-NWHQ.
Please proceed with authentication.

Lets have something similar for the python client.

Depends on #199.

Add docs to release artifacts

We want to build an archive of reference docs from older versions on the fluvio-website. The docs are already generated and hosted. Only need to keep track of each version. This will be easier to accomplish if we don't have to scrape with git.

Related infinyon/fluvio-website#460

Creating a ConsumerConfig raises a BusError

Description
When creating a ConsumerConfig it raises a BusError and stops the rest of the application from executing.

Example Code

from fluvio import Fluvio, Offset, ConsumerConfig, SmartModuleKind

class FluvioMessage(TypedDict):
    """Fluvio message payload."""
    mqtt_topic: str
    payload: dict


fluvio = Fluvio.connect()
consumer = fluvio.partition_consumer("mqtt", 0)
c = ConsumerConfig() # <-- raises BusError
c.smartmodule(
        path="./filter.wasm",
        kind=SmartModuleKind.Filter,
        params={"id": str("some-id")},
)
for msg in consumer.stream_with_config(Offset.end(), config):
      message: FluvioMessage = json.loads(bytearray(msg.value()).decode())

Expected Behaviour
I can consume with a smart module, and associated configuration.

Current Behavious
Raises BusError.

I am happy to help, just point me to where ever you think i should start looking ๐Ÿ‘

add optional CI build for Python 3.13

Python 3.13 is targeted for release 2024-10, add a CI checking workflow to get advance knowledge if any maintenance will be needed. A failed build of this prerelease should not stop the other builds

Fix PyPi release

Right now you gotta do from fluvio import fluvio_rust and then inside fluvio_rust exist the primitives.

Update to fluvio 0.9.5

The error text seems to have been updated for fluvio 0.9.5. This causes macOS CI to fail, so the error text needs to be updated.

Add Support for Fluvio Admin API

Add the ability to run the Fluvio admin commands.

Map rust crate admin methods https://docs.rs/fluvio/latest/fluvio/struct.FluvioAdmin.html

  • connect
  • connect_with_config

Since the create, delete, list methods are generic over various object types, they should work for combinations available to the fluvio cli:

  • topics (TopicSpec)

    • create
    • delete
    • list
    • list_with_params
    • watch
  • smartmodules (SmartModuleSpec)

    • create
    • delete
    • list
    • watch
  • partitions

    • list

TopicProduce Send should accept multiple types

send should additionally accept as input types

  • Dict
  • List
  • String
    Unless otherwise specified, the dict, list and strings are required to be json serializable.

send_all should additionally accept as input types:

  • dict
  • list

Unless otherwise specified, the elements of dict and list are required to be json serializable.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.