infinyon / fluvio-client-python Goto Github PK
View Code? Open in Web Editor NEWThe Fluvio Python Client!
Home Page: https://infinyon.github.io/fluvio-client-python/fluvio.html
License: Apache License 2.0
The Fluvio Python Client!
Home Page: https://infinyon.github.io/fluvio-client-python/fluvio.html
License: Apache License 2.0
Right now, to use a PartitionConsumerStream
as a python iterator, we do a work around of creating a python class PartitionConsumerStreamIterator
:
class PartitionConsumerStreamIterator:
def __init__(self, stream):
self.stream = stream
def __iter__(self):
return self
def __next__(self):
return self.stream.next()
where stream is the PartitionConsumerStream
. It'd be more ideal if PartitionConsumerStream
had the __iter__
and __next__
methods.
This should enable Jupyter Notebooks to work on the m1 macs as on an m1, it's an arm64 build.
The 'rust-cpython crate that the fluvio python client is currently built upon is no longer maintained and is EOL for Python 3.11.
Rebuild the fluvio python client on top of the PyO3 rust to python binding framework. For the initial rebuild, a simple wrapping of the fluvio rust crate in order to connect, consume and produce (preferably async) would be a good start.
Right now the python client doesn't support creating or deleting topics. Let's fix that!
Depends on #195.
Let's make Fluvio.connect
return a Result
. This will make fail on connect
throw an exception that we can catch rather than just a panic.
When pypa/cibuildwheel#934 is resolved, we should re-add it to publish CI.
Right now we only support connecting with the default profile location. Let's support connect_with_config
.
Expose rust connect_with_config to the python client api
As per the release schedule security patches won't be done on python 3.7 after June 27th 2023.
Python 3.12 was released on 2023-10-02, add it to CI build and publish targets
3.12 is currently incompatible w/ the fluvio-client-dependencies
A shift in smartmodule accumulate output is causing a CI test to fail w/ 0.10.14 update PR #286. Diagnose root cause and propose a fix.
https://github.com/infinyon/fluvio-client-python/actions/runs/5752971026/job/15595152136
Documentation should be hosted on gh-pages or read-the-docs or something.
Similar to #206, this would enable going from a panda's data frame to a fluvio stream without blocking.
I have a use-case in which a fluvio producer needs to keep track of the offsets (and partitions) that it sent its messages to. To that end, I would like to expose the RecordMetadata
struct to the python client.
If I understand correctly, the TopicProducer.send
method (in rust) generates a ProduceOutput
struct, which provides a wait
method in the end yielding the desired RecordMetadata
.
I see two general ways forward to expose such functionality in the python client:
ProduceOutput
and a RecordMetadata
to the python world, effectively mirroring the rust variants.ProduceOutput.wait
method consumes self
, inhibiting any further use of the ProduceOutput
object. If a ProduceOutput
would also live in the python world, such a "consumption" is not possible to have (if I understand PyO3 correctly).ProduceOutput
in a Cell
#[pyclass]
pub struct ProduceOutput {
inner: Cell<NativeProduceOutput>,
}
and then using take
in the corresponding wait
method:
#[pymethods]
impl ProduceOutput {
fn wait(&self) -> Result<RecordMetadata, FluvioError> {
let inner = self.inner.take();
Ok(run_block_on(inner.wait()).map_err(error_to_py_err)?)
}
*
ProduceOutput
and directly provide some method send_and_wait_for_metadata
(a proper name would need to be found) that calls wait
on the result itself and then returns a RecordMetadata
object.Do you have a good idea, how to address this issue?
I would be willing to add a PR for either of the approaches (or another one, if you have an idea).
Based on #159 (comment). We should pass in a ConsumerConfig
to the stream_with_config
api.
This should also use smartmodule names rather than just paths to the wasm blob.
Current consumer only supports stream. Let's add fetch
.
This should be pretty easy.
Similar to how connectors support applying smart modules to records before they're sent over the wire, we should support this in the python client.
subprocess.CalledProcessError: Command 'fluvio topic create 90c7826f-b63a-4e72-b382-a5e2a581bb4d' returned non-zero exit status 1.
Add python async capable versions of the following fluvio python client capabilities.
PartitionConsumer
MultiplePartitionConsumer
TopicProducer
The producer and consumer capabilities should also be able to be instantiated with config options in order to run with smartmodules. See Fluvio rust examples where config structs can specify smartmodules and their parameters to execute.
fn fluvio_producer_with_config(...)
PartitionConsumer::stream_with_config
MultiplePartitionConsumer::stream_with_config
The async additions should not interfere the simpler sync versions . If it makes the implementation easier, a set of async classes could be created e.g. AsyncPartitionConsumer. Though if possible a single class is preferred with methods to create sync and async produce and consume objs.
Add the ability to run the Fluvio admin commands.
Map rust crate admin methods https://docs.rs/fluvio/latest/fluvio/struct.FluvioAdmin.html
Since the create, delete, list methods are generic over various object types, they should work for combinations available to the fluvio cli:
topics (TopicSpec)
smartmodules (SmartModuleSpec)
partitions
slack thread: https://infinyon.slack.com/archives/CQ8UX6SBH/p1684349700779719
simple producer in python:
#!/usr/bin/env python3
from datetime import datetime
from fluvio import Fluvio
import time
PARTITION = 0
if __name__ == "__main__":
# Connect to cluster
fluvio = Fluvio.connect()
producer = fluvio.topic_producer("simple")
start_time = time.time()
for x in range(10000):
producer.send_string("{}: timestamp: {}".format(x, datetime.now()))
# Flush the last entry
producer.flush()
print("completed in {} seconds", (time.time()-start_time))
10K records pushed in 1.2153611183166504 seconds, when if you increase the range to 100000
python3 producer100K.py
The same code takes 64.67606687545776 to complete.
Rust producer doesn't have such behaviour.
See #388 for reference.
$ fluvio cloud login --use-oauth2
A web browser has been opened at https://infinyon-cloud.us.auth0.com/activate?user_code=LBHV-NWHQ.
Please proceed with authentication.
Lets have something similar for the python client.
Depends on #199.
We want to build an archive of reference docs from older versions on the fluvio-website. The docs are already generated and hosted. Only need to keep track of each version. This will be easier to accomplish if we don't have to scrape with git.
Related infinyon/fluvio-website#460
this test test the ability to access partition information, but a topic is needed to check
In ci, one is implicitly provided, but the test should set one up as a needed prereq.
Description
When creating a ConsumerConfig it raises a BusError and stops the rest of the application from executing.
Example Code
from fluvio import Fluvio, Offset, ConsumerConfig, SmartModuleKind
class FluvioMessage(TypedDict):
"""Fluvio message payload."""
mqtt_topic: str
payload: dict
fluvio = Fluvio.connect()
consumer = fluvio.partition_consumer("mqtt", 0)
c = ConsumerConfig() # <-- raises BusError
c.smartmodule(
path="./filter.wasm",
kind=SmartModuleKind.Filter,
params={"id": str("some-id")},
)
for msg in consumer.stream_with_config(Offset.end(), config):
message: FluvioMessage = json.loads(bytearray(msg.value()).decode())
Expected Behaviour
I can consume with a smart module, and associated configuration.
Current Behavious
Raises BusError.
I am happy to help, just point me to where ever you think i should start looking ๐
Based on https://www.python.org/dev/peps/pep-0494/#id34, we should drop support for python 3.6. Once we do that we can more reasonably do #5 as async was added to python in 3.7.
#209 is the PR
Python 3.13 is targeted for release 2024-10, add a CI checking workflow to get advance knowledge if any maintenance will be needed. A failed build of this prerelease should not stop the other builds
Fluvio connect docs incorrectly say this:
"If there is no current profile or the ~/.fluvio/config file does not exist, then this will create a new profile with default settings and set it as current, then try to connect to the cluster using those settings."
Relatively simple, it'll just be nice documentation.
Right now you gotta do from fluvio import fluvio_rust
and then inside fluvio_rust exist the primitives.
Requires adding a Record
type to the python side.
Related:
The error text seems to have been updated for fluvio 0.9.5. This causes macOS CI to fail, so the error text needs to be updated.
Right now, the python client is only compiled and distributed for x86 and i686 architectures. With infinyon/fluvio#464, we should be able to better compile to other architectures such as arm and such.
Looks like it shouldn't be too hard via non-native architectures using emulation.
Looks like we might be able to just add:
env:
CIBW_ARCHS_LINUX: auto aarch64 ppc64le s390x
with the correct architectures to the publish ci.
It seems that when python calls the rust, it ignores the SIGKILL signal when you do a ctrl-c. Might be an issue with flapigen.
Add the ability to run the Fluvio admin commands.
Map rust crate admin methods https://docs.rs/fluvio/latest/fluvio/struct.FluvioAdmin.html
Since the create, delete, list methods are generic over various object types, they should work for combinations available to the fluvio cli:
topics (TopicSpec)
smartmodules (SmartModuleSpec)
partitions
send
should additionally accept as input types
send_all
should additionally accept as input types:
Unless otherwise specified, the elements of dict and list are required to be json serializable.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.