vmlaker / mpipe Goto Github PK
View Code? Open in Web Editor NEWPython API for writing multiprocessing pipelines
Home Page: http://vmlaker.github.io/mpipe
License: MIT License
Python API for writing multiprocessing pipelines
Home Page: http://vmlaker.github.io/mpipe
License: MIT License
我想问下。我该怎么修改,才能完成实时的获取到最新的数据,如果我下一个进程处理不过来的话,可以将旧的数据清除掉,保障每次获取的都是最新的数据,而不是一直将数据塞入缓冲区,最后导致内存撑爆。 @vmlaker
I'm trying this for the first time, and the simple example on the main page does not work: http://vmlaker.github.io/mpipe/
Tried on python 3.4.5 and 3.6.1
Windows 7
Anaconda 4.4.0 64-bit
Has this ever been tested on windows?
It errors out with:
...
File "C:\Anaconda3\lib\site-packages\mpipe\Pipeline.py", line 9, in init
self._input_stage.build()
File "C:\Anaconda3\lib\site-packages\mpipe\Stage.py", line 99, in build
self._do_stop_task,
File "C:\Anaconda3\lib\site-packages\mpipe\OrderedWorker.py", line 93, in assemble
worker.start()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in init
reduction.dump(process_obj, to_child)
File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'OrderedStage.init..wclass'
Add Frens Jan Rumph's max_backlog option to documentation. Add related entries to:
Add to documentation an example of how to distribute an embarrasingly parallel problem to mulitple cores. Consider a cascade of concentric for-loops, marshalling the outer for-loop to workers.
Fan-in linkage should not be allowed. For example:
stage1.link(stage2.link(stage3))
stage1.link(stage3) # This is bad.
In the second line above, link()
method should detect that stage3 has already been linked as an output stage, and an exception should be thrown.
Add to documentation instructions on how to build/deploy/test MPipe in a Virtualenv environment.
Upgrade the version of software to 1.0.8 and release (upload to PyPI).
I'm using mpipe
for computer vision processing from camera frames, it's exactly what I need, but the put()
get()
api is nonideal, and I can't figure out a way to have infinite tasks.
I read the documentation but I keep finding that you need to feed the tasks before running, I need to feed tasks while running
stage1.link(stage2)
...
pipe = Pipeline(...
for number in range(10):
pipe.put(number)
I tried running a thread in the background to feed a multiprocessing.Queue
but it just stops when the queue gets full, I concluded that you can't run put()
while you're running get()
def fill_queue():
while True:
if not q.full():
pipe.put(0)
else:
time.sleep(0.01) # Rest for 10ms, we have a full queue
thread = Thread(target=fill_queue, args=())
thread.daemon = True
while True:
if q.qsize():
results... = q.get()
either way, something is very wrong
Add instructions on how to build the Sphinx docs to the documentation.
Pipeline.get return only one result from its output_stages . I can not catch the design, or a bug ?
def get(self, timeout=None):
"""Return result from the pipeline."""
result = None
for stage in self._output_stages:
result = stage.get(timeout)
return result
On OS without fork
support, wclass
in OrderedStage.__init__
is required to be serializable, that's a problem.
Otherwise an exception will be raised:
AttributeError: Can't pickle local object 'OrderedStage.init..wclass'
Add the tox virtualenv management into the flow to be able to test for different Python versions.
I've tried a pipeline like this:
stage1 --> stage2 --> stage3
stage1 --> stage3
That is, stage3 receives tasks from both stage1 and stage2. I've get some wired results. does mpipe support such a pipeline?
I'm seeing hangs using OrderedWorker
if the payload used in put
is longer than a certain size. Here is an example that will randomly hang on my machine if a single character is added to the payload:
from __future__ import print_function
import random
import mpipe
class OWorker(mpipe.OrderedWorker):
def doTask(self, value):
return value
if __name__ == '__main__':
stage_get = mpipe.Stage(OWorker, 2)
p = mpipe.Pipeline(stage_get)
j=95
if random.randint(0,1):
print("HANGS")
j += 1
else:
print("WORKS")
for i in list(range(10)):
d = {'payload': 'a'*j}
p.put(d)
p.put(None)
for r in p.results():
print(r)
Tested hanging behavior on OS X using Python 2.7.11 and 3.5.1. Latest version of mpipe
installed via pip install mpipe
.
My understanding is that whenever pipe.put(something)
is executed it is put into a queue or similar.
If we only want to process the somethings by workers, without caring at all about any return value, we can use disable_result
but it seems there is no limit to how much data can get put into the pipeline. If a large number of large data is put into the pipeline, will this cause problems? Is it possible the have only a certain maximum number of items waiting for processing before put(something)
blocks?
Add a test exercising Frens Jan Rumph's Backlog Limit feature to existing set of tests.
The current modules don't support handling for Windows OS due to the use of multiprocessing.Lock
. Implementation can be done on Windows OS with the use of another logic similar to multiprocessing.Event
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.