Giter VIP home page Giter VIP logo

streamz's Introduction

Streams

Do not use. This is an exploratory project to learn more about what it takes to build typical real time systems.

Other projects you should probably consider instead:

  1. RxPY
  2. Flink
  3. Beam

Basic Explanation

This implements a trivial subset of an API commonly found in streaming data systems. You can have a Stream and can create various other streams from that stream using common modifiers like map, filter, scan, etc.. Eventually you create a sink that consumes results.

from streams import Stream

source = Stream()

L = []
stream = source.map(inc).scan(add)
stream.sink(L.append)
stream.sink(print)

Now as you feed data into the source all of the operations trigger as necessary

>>> for i in range(3):
...     source.emit(i)
1
3
7

>>> L
[1, 3, 7]

You can use the typical map/filter/scan syntax. Everything can have multiple subscribers at any point in the stream.

Backpressure

Additionally everything responds to backpressure, so if the sink blocks the source will block (although you can add in buffers if desired). Additionally everything supports asynchronous workloads with Tornado coroutines, so you can do async/await stuff if you prefer (or gen.coroutine/yield in Python 2).

async def f(result):
    ... do non-blocking stuff with result ...

stream.sink(f)  # f might impose waits like while a database ingests results

for i in range(10):
    await source.emit(i)  # waiting at the sink is felt here at the source

This means that if the sinks can't keep up then the sources will stop pushing data into the system. This is useful to control buildup.

Recursion and Feedback

By connecting sources to sinks you can create feedback loops. Here is a tiny web crawler:

source = Stream()
pages = source.unique()
content = (pages.map(requests.get)
                .map(lambda x: x.content))
links = (content.map(get_list_of_links)
                .concat())
links.sink(source.emit)

pages.sink(print)

>>> source.emit('http://github.com')
http://github.com
http://github.com/features
http://github.com/business
http://github.com/explore
http://github.com/pricing
...

This was not an intentional feature of the system. It just fell out from the design.

Dask

Everything above runs with normal Python in the main thread or optionally in a Tornado event loop. Alternatively this library plays well with Dask. You can scatter data to the cluster, map and scan things up there, gather back, etc..

source.to_dask()
      .scatter()
      .map(func)   # Runs on a cluster
      .scan(func)  # Runs on a cluster
      .gather()
      .sink(...)

Less Trivial Example

source = Source()
output = open('out')

s = source.map(json.loads)        # Parse lines of JSON data
          .timed_window(0.050)    # Collect data into into 50ms batches
          .filter(len)            # Remove any batches that didn't have data
          .to_dask().scatter()    # Send to cluster
          .map(pd.DataFrame)      # Convert to pandas dataframes on the cluster
          .map(pd.DataFrame.sum)  # Sum rows of each batch on the custer
          .scan(add)              # Maintain running sum of all data on the cluster
          .gather()               # Collect results back to local machine
          .map(str)               # Convert to string
          .sink(output.write)     # Write to file

from some_kafka_library import KafkaReader

topic = KafkaReader().subscribe('data')

while True:
    for line in topic:
        source.emit(line)

What's missing?

This is still a toy library. It has never been used for anything. So presumably many things are wrong. I've tried to build a simple system that can grow if use cases arrive. Here are some obvious things that are missing:

  1. A lot of API. I recommend looking at the Rx or Flink APIs to get a sense of what people often need.
  2. Integration to collections like lists, numpy arrays, or pandas dataframes. For example we should be able to think about streams of lists of things. In this case seq_stream.map(func) would apply the function across every element in the constituent lists.
  3. Thinking about time. It would be nice to be able to annotate elements with things like event and processing time, and have this information pass through operations like map
  4. Multi-stream operations like zip and joins
  5. More APIs for common endpoints like Kafka

Some things I like

  1. It's small
  2. It scales down and is very lightweight. Common operations can be used without concurrency, without event loops, without Dask. Nothing but pure Python.
  3. I think that it can probably combine well with Dask to do very large things

streamz's People

Contributors

cj-wright avatar danielballan avatar mrocklin avatar

Watchers

 avatar  avatar  avatar

streamz's Issues

Decorating

It might be nice to decorate some of the stream functions emits (eg for saving things into the databroker as they pass by).
What would be the best way to do this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.