Giter VIP home page Giter VIP logo

deco's Introduction

Decorated Concurrency

A simplified parallel computing model for Python. DECO automatically parallelizes Python programs, and requires minimal modifications to existing serial programs.

Install using pip:

pip install deco

Documentation

You can reference the Wiki on Github for slightly more in-depth documentation.

General Usage

Using DECO is as simple as finding, or creating, two functions in your Python program. The first function is the one we want to run in parallel, and is decorated with @concurrent. The second function is the function which calls the @concurrent function and is decorated with @synchronized. Decorating the second function is optional, but provides some very cool benefits. Let's take a look at an example.

@concurrent # We add this for the concurrent function
def process_lat_lon(lat, lon, data):
  #Does some work which takes a while
  return result

@synchronized # And we add this for the function which calls the concurrent function
def process_data_set(data):
  results = defaultdict(dict)
  for lat in range(...):
    for lon in range(...):
      results[lat][lon] = process_lat_lon(lat, lon, data)
  return results

That's it, two lines of changes is all we need in order to parallelize this program. Now this program will make use of all the cores on the machine it's running on, allowing it to run significantly faster.

What it does

  • The @concurrent decorator uses multiprocessing.pool to parallelize calls to the target function
  • Indexed based mutation of function arguments is handled automatically, which pool cannot do
  • The @synchronized decorator automatically inserts synchronization events
  • It also automatically refactors assignments of the results of @concurrent function calls to happen during synchronization events

Limitations

  • The @concurrent decorator will only speed up functions that take longer than ~1ms
    • If they take less time your code will run slower!
  • By default, @concurrent function arguments/return values must be pickleable for use with multiprocessing
  • The @synchronized decorator only works on 'simple' functions, make sure the function meets the following criteria
    • Only calls, or assigns the result of @concurrent functions to indexable objects such as:
      • concurrent(...)
      • result[key] = concurrent(...)
    • Never indirectly reads objects that get assigned to by calls of the @concurrent function

How it works

For an in depth discussion of the mechanisms at work, we wrote a paper for a class which can be found here.

As an overview, DECO is mainly just a smart wrapper for Python's multiprocessing.pool. When @concurrent is applied to a function it replaces it with calls to pool.apply_async. Additionally when arguments are passed to pool.apply_async, DECO replaces any index mutable objects with proxies, allowing it to detect and synchronize mutations of these objects. The results of these calls can then be obtained by calling wait() on the concurrent function, invoking a synchronization event. These events can be placed automatically in your code by using the @synchronized decorator on functions that call @concurrent functions. Additionally while using @synchronized, you can directly assign the result of concurrent function calls to index mutable objects. These assignments get refactored by DECO to automatically occur during the next synchronization event. All of this means that in many cases, parallel programming using DECO appears exactly the same as simpler serial programming.

deco's People

Contributors

alex-sherman avatar cgxeiji avatar garrettwilkin avatar joshuapostel avatar pddenhar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

deco's Issues

generator argument

I'm doing some heavy work on csv and would like to pass a generator into the @concurrent function, but script just stalls, so when I swap it out for a list, it actually runs.

AttributeError: 'Tuple' object has no attribute 'value'

I'm using Python 3.4 on Ubuntu.

I'm testing out deco in a quick program I'm writing located here.

However, I'm getting this cryptic error:

runfile('/home/alexpetralia/Desktop/boston_events/main.py', wdir='/home/alexpetralia/Desktop/boston_events')
Traceback (most recent call last):

  File "<ipython-input-4-b3482a262993>", line 1, in <module>
    runfile('/home/alexpetralia/Desktop/boston_events/main.py', wdir='/home/alexpetralia/Desktop/boston_events')

  File "/usr/lib/python3/dist-packages/spyderlib/widgets/externalshell/sitecustomize.py", line 586, in runfile
    execfile(filename, namespace)

  File "/usr/lib/python3/dist-packages/spyderlib/widgets/externalshell/sitecustomize.py", line 48, in execfile
    exec(compile(open(filename, 'rb').read(), filename, 'exec'), namespace)

  File "/home/alexpetralia/Desktop/boston_events/main.py", line 71, in <module>
    x = populate_df()

  File "/usr/local/lib/python3.4/dist-packages/deco/conc.py", line 47, in __call__
    rewriter.visit(node.body[0])

  File "/usr/lib/python3.4/ast.py", line 245, in visit
    return visitor(node)

  File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 85, in visit_FunctionDef
    self.generic_visit(node)

  File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 55, in generic_visit
    super(NodeTransformer, self).generic_visit(node)

  File "/usr/lib/python3.4/ast.py", line 253, in generic_visit
    self.visit(item)

  File "/usr/lib/python3.4/ast.py", line 245, in visit
    return visitor(node)

  File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 68, in generic_visit
    name = child.targets[0].value

AttributeError: 'Tuple' object has no attribute 'value'

Any ideas what may be causing this?

Assignment attempted on something that is not index based

I am using a dictionary to store the contents of two files to enable parallel loading of both file in a dictionary, yet I get an error:


@concurrent
def readfile(path):
    return codecs.open(path, encoding='utf-8').read()


@synchronized
def readdocs():
    d=dict()
    d['source']=readfile(r'./../accessmysql/t1')
    d['target']=readfile(r'./../accessmysql/t2')
    return d
if __name__ == '__main__':
    d=readdocs()

Giver error:
ValueError: Assignment attempted on something that is not index based

cannot pass **kwargs to concurrent?

I found this neat library super useful for speeding up (at least 10x from testing) Pandas read_csv for reading large files and concat the resulting data frames.

However, I found when reading in YAML config for passing **kwargs to read_csv, suddenly it doesn't work.

Example:

from typing import List
from pathlib import Path
from pandas import DataFrame, read_csv, concat
from deco import concurrent, synchronized

@concurrent
def read_csv_worker(path: Path, **kwargs) -> DataFrame:
    return read_csv(path, **kwargs)

@synchronized
def read_csv_dispatcher(paths: List[Path], **kwargs) -> DataFrame:
    data = list()
    for p in paths:
        data.append(read_csv_worker(p, **kwargs))
    return concat(data)

Fails, but:

from typing import List
from pathlib import Path
from pandas import DataFrame, read_csv, concat
from deco import concurrent, synchronized

@concurrent
def read_csv_worker(path: Path, kwargs) -> DataFrame:
    return read_csv(path, **kwargs)
    
@synchronized
def read_csv_dispatcher(paths: List[Path], **kwargs) -> DataFrame:
    data = list()
    for p in paths:
        data.append(read_csv_worker(p, kwargs))
    return concat(data)

works fine. Do the decorators not support **kwargs pass-throughs?

Memory isn't freed when creating and returning large objects in concurrent function

First of all thanks for the great tool, it's really easy to use.

However I've found a strange behaviour when creating and returning relatively large structures (e.g. a list with 1000000) in the concurrent function and returning them to the synchronized function. The memory allocated for the list in the concurrent function is simply never freed again. I guess some reference is leftover maybe!? I've build a little stand-alone code to reproduce the problem:

The setup looks like the following:

import multiprocessing
from deco import concurrent, synchronized


@concurrent
def my_conc():
    tmp = range(1000000)
    return tmp


@synchronized
def my_sync(my_dict):
    new_dict = {}

    for key, value in my_dict.iteritems():
        new_dict[key] = my_conc()


def main():
    cpus = multiprocessing.cpu_count()
    my_dict = {}
    for i in xrange(cpus):
        my_dict[i] = 0

    for i in xrange(100):
        print i
        my_sync(my_dict)


if __name__ == '__main__':
    main()

So depending on the number of cpus I build n lists with 1000000 ints, and call the synchronized consecutively in a for loop. The allocated memory basically increases until all of it is used and my pc starts swapping...

As soon as I remove the decoraters everything works fine (although not concurrent) ;). Also this only happens if I return tmp in the my_conc() function. Once I replace it with 'return 0' everythings fine again.

I'm sorry if I misunderstood some limitation of the tool - it's my first time using parallel processing in python.

Thanks in advance!

conc_test.py was eror in my python 3

here i was run conc_test.py

$ python conc_test.py
Traceback (most recent call last):
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/synchronize.py", line 29, in
from _multiprocessing import SemLock, sem_unlink
ImportError: cannot import name 'SemLock'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "conc_test.py", line 29, in
test_size(SIZE)
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 61, in call
return self.f(*args, **kwargs)
File "", line 5, in test_size
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 110, in assign
self.assigns.append((target, self(*args, **kwargs)))
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 121, in call
self.concurrency = self.conc_constructor(*self.conc_args, **self.conc_kwargs)
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/pool.py", line 156, in init
self._setup_queues()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/pool.py", line 249, in _setup_queues
self._inqueue = self._ctx.SimpleQueue()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 112, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/queues.py", line 315, in init
self._rlock = ctx.Lock()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 66, in Lock
from .synchronize import Lock
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/synchronize.py", line 34, in
" function, see issue 3770.")
ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.

what i forgoted to modul install?

Bug with deco function call

The following example seems to not work:

@concurrent
def get_region(region_filepath, variables_to_get):
	return {}

@synchronized
def get_data(variable_list, regions):
	return_variables = {}
	for key in variable_list:
		return_variables[key] = []
	region_dicts = []
	for lat_lon in regions:
		region_dicts.append(get_region(root_dir + lat_lon, variable_list))
	#get_region.wait()
	print("len", len(region_dicts), regions)
	#consolidate all the regions
	for region in region_dicts:
		for key in region.keys():
			return_variables[key].extend(region[key])
	return return_variables

function(concurrent) in synchronized which is passed by value can not work

hi,

guys, thanks for your great lib. I read it from blog https://www.peterbe.com/plog/deco

I have a question on usage of synchronized, below code works fine.
and I made small modification, it does not work now.
The reason I am doing so is that I want to pass function to run as a parameter.
I am wondering if you could kindly explain why.
Thanks!

Working version:

import time, requests

urls = """
https://www.peterbe.com/plog/blogitem-040212-1
https://www.peterbe.com/plog/geopy-distance-calculation-pitfall
https://www.peterbe.com/plog/app-for-figuring-out-the-best-car-for-you
https://www.peterbe.com/plog/Mvbackupfiles
https://www.peterbe.com/plog/swedish-holidays-explaine
https://www.peterbe.com/plog/wing-ide-versus-jed
https://www.peterbe.com/plog/worst-flash-site-of-the-year-2010
""".strip().splitlines()

@concurrent
def download(url, data):
    t0 = time.time()
    assert requests.get(url).status_code == 200
    t1 = time.time()
    data[url] = t1-t0
  
@synchronized
def run(data):
    for url in urls:
        download(url, data)
    print(data)

t0 = time.time()
data = {}
run(data)
print(data)
t1 = time.time()
print ("TOOK", t1-t0)
print ("WOULD HAVE TAKEN", sum(data.values()), "seconds")

not working version:

import time, requests

urls = """
https://www.peterbe.com/plog/blogitem-040212-1
https://www.peterbe.com/plog/geopy-distance-calculation-pitfall
https://www.peterbe.com/plog/app-for-figuring-out-the-best-car-for-you
https://www.peterbe.com/plog/Mvbackupfiles
https://www.peterbe.com/plog/swedish-holidays-explaine
https://www.peterbe.com/plog/wing-ide-versus-jed
https://www.peterbe.com/plog/worst-flash-site-of-the-year-2010
""".strip().splitlines()

@concurrent
def download(url, data):
    t0 = time.time()
    assert requests.get(url).status_code == 200
    t1 = time.time()
    data[url] = t1-t0
  
function_map = {
    'download': download
}

@synchronized
def run(data):
    for url in urls:
        function_map['download'](url, data)
    print(data)

t0 = time.time()
data = {}
run(data)
print(data)
t1 = time.time()
print ("TOOK", t1-t0)
print ("WOULD HAVE TAKEN", sum(data.values()), "seconds")

Feature request: allow .append() to list instead of direct index assignment

The following code works:

@concurrent
def scrape(link):
    # do work
    pass

x = [None] * 100000 
@synchronized
def populate_df(links):
    for i, link in enumerate(links):
        x[i] = scrape(link)
    return x

However, this code does not:

@concurrent
def scrape(link):
    # do work
    pass

@synchronized
def populate_df(links):
    x = []
    for i, link in enumerate(links):
        x.append(scrape(link))
    return x

Instead it returns a list, one element for each element in the initial links list:

 <multiprocessing.pool.ApplyResult at 0x7f784ba424a8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42550>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba425f8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba426a0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42748>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba427f0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42898>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42940>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba429e8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42a90>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42b38>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42be0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42c88>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42d30>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42dd8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42e80>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42f28>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42fd0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba480b8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48160>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48208>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba482b0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48358>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48400>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba484a8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48550>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba485f8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba486a0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48748>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba487f0>]

Would it be possible to get append to work?

ValueError: If using all scalar values, you must pass an index

getting following error:

@concurrent
def createtraindata(filpath):
    print filpath

@synchronized
def iterate(fils):
    test={}
    for f in fils:
        test[f[0]]=createtraindata(f[0])
    return test


Traceback (most recent call last):                                                                                                                                                     
  File "createdata.py", line 183, in <module>                                                                                                                                          
    print iterate(fils)                                                                                                                                                                
  File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 58, in __call__                                                                                       
    return self.f(*args, **kwargs)                                                                                                                                                     
  File "<string>", line 257, in iterate                                                                                                                                                
  File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 129, in wait                                                                                          
    results.append(self.results.pop().get())                                                                                                                                           
  File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 144, in get                                                                                           
    result, operations = self.async_result.get()                                                                                                                                       
  File "/home/ttt/anaconda2/lib/python2.7/multiprocessing/pool.py", line 567, in get                                                                                              
    raise self._value                                                                                                                                                                  
ValueError: If using all scalar values, you must pass an index  

Question: do processes automatically stop when done?

Hi,

This looks like a great tool :-)

Just wondering about the memory performance of potentially spinning up lots of processes.

Also, once a set of processes are spun up to handle the users parallel requirements, do they get destroyed? is thatpossible or does that not make sense to do?

Hopefully you can help me understand the answer to these questions.

Thanks!

Support pluggable pool classes

As a deco user, I would like to be able to specify the type of pool to be used for concurrent work so that I can use the concurrency model (threads, processes, etc.) most suited to my workload.

For example, if I'm doing network IO bound work, I may want to use the multiprocessing.pool.ThreadPool rather than multiprocessing.Pool in order to avoid the overhead of creating new Python processes. I may want to add some features to Pool, say auditing & profiling, so I've created a subclass of Pool to add the features, and I would like to use it with deco.

The change would be to add a supported argument to concurrent decorator-- something like concurrent(pool_class = SomeClass). It appears that this will only require a change to the __init__ method of concurrent in order to check for the presence of this argument and set the instance local pool class, and a single line change to __call__-- if self.pool is None: self.pool = self.pool_class(....).

Does this sound reasonable to you all?

'Name' object is not iterable

Hey,

I am looking at deco and thought would be cool to use.

What I am doing is I have a parent thread who creates a csv writes headers to the file and then spawns child threads to go to a rest api and request data until there is no more data to request. At which point they return a list of results and the parent thread takes the results and writes them after the header

I keep hitting the same error. The error is a bit meaningless and there are no comments in the code so I cant make head or tails of what is going on?

Any thoughts?

Stack below:

Error
Traceback (most recent call last):
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\unittest2\case.py", line 67, in testPartExecutor
yield
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\unittest2\case.py", line 625, in run
testMethod()
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\freezegun\api.py", line 451, in wrapper
result = func(_args, *_kwargs)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\mock\mock.py", line 1305, in patched
return func(_args, *_keywargs)
File "D:\Projects\Python\dbexport\tests\test_dbexport.py", line 95, in test_something
export(database, output_dir=actual_dir)
File "D:\Projects\Python\dbexport\dbexport\dbexport.py", line 53, in export
export_dbrw(db_details, start_time, end_time, target_directory)
File "D:\Projects\Python\dbexport\dbexport\dbrw_export.py", line 129, in export_dbrw
run_dbrw_query(query.get('query_name'), bindings, write_headers, (csvwriter,), append_records, (csvwriter,))
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\conc.py", line 47, in call
rewriter.visit(node.body[0])
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 85, in visit_FunctionDef
self.generic_visit(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 249, in generic_visit
self.visit(item)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 249, in generic_visit
self.visit(item)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 251, in generic_visit
self.visit(value)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 251, in generic_visit
self.visit(value)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 57, in generic_visit
returns = [i for i, child in enumerate(node.body) if type(child) is ast.Return]
TypeError: 'Name' object is not iterable

Cheers, Nice piece of kit (When I did get it to work)

Nested concurrency

Is it possible to have a @Synchronised function whose body calls another function which is decorated with @Synchronised decorator?


from deco import *
from time import sleep
@concurrent
def bconc():
    sleep(3)
    return 1

@concurrent
def fconc():
    sleep(10)
    return 2
@synchronized
def bar():
    bd={}
    for i in range(5):
        bd[i]=bconc()
    return bd



@synchronized
def foo():
    bd=bar()
    fd={}
    for i in range(5):
        fd[i]=fconc()
    return fd,bd


print foo()

site-packages\deco\conc.py", line 57, in __call__
    out = compile(self.ast, "<string>", "exec")
TypeError: required field "lineno" missing from stmt

astutil.py line 68 `name = child.targets[0].value` "'Name' object has no attribute 'value'" (Py 3.4, 3.5)

On trying to adapt an existing program to use deco, I'm getting an error on the first (& only) call to the synchronized function:

my_fn( _list_of_dicts, _dict, _int1, _int2, _str )

with args of the indicated types. Traceback starting from that call is:

File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/conc.py", line 47, in __call__
  rewriter.visit(node.body[0])
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
  return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 85, in visit_FunctionDef
  self.generic_visit(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 55, in generic_visit
  super(NodeTransformer, self).generic_visit(node)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 253, in generic_visit
  self.visit(item)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 55, in generic_visit
  super(NodeTransformer, self).generic_visit(node)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 253, in generic_visit
  self.visit(item)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
  return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 68, in generic_visit
  name = child.targets[0].value
AttributeError: 'Name' object has no attribute 'value'

I haven't yet stripped things down to a more basic example.

cannot import concurrent

from PIL import Image
from deco import concurrent, synchronized
import time

@Concurrent
def slow(index):
time.sleep(5)

@synchronized
def run():
for index in list('123'):
slow(index)

run()

When I ran the code above, I get error msg below:
"C:\Program Files\Anaconda3\python.exe" C:/Users/zlan1/PycharmProjects/hellow/parallel_computing/deco.py
Traceback (most recent call last):
File "C:/Users/zlan1/PycharmProjects/hellow/parallel_computing/deco.py", line 2, in
from deco import concurrent, synchronized
File "C:\Users\zlan1\PycharmProjects\hellow\parallel_computing\deco.py", line 2, in
from deco import concurrent, synchronized
ImportError: cannot import name 'concurrent'

Process finished with exit code 1

Processes don't exit upon completion

I'm using deco with Ubuntu 16 and Python 3. When I run my script, I can successfully spin up 6 processes but they don't exit when the script is finished running. Next time I run it, it spins up another 6 processes. So in this example, I'll have 12+ instance of python running.

Is there a way to gracefully shut them down that I'm missing?

Can deco decorates nested functions?

I try to do something like the below snippet since each time I want to use deco on a function f I have to define two functions, g with @concurrent and h with @synchronized. The return value of the function with @concurrent is 'decorated' so I need a g instead of putting @concurrent on f directly, and h is usually perform a for loop.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import deco


def parallel(fn, items):

    @deco.concurrent
    def _concurrent(item):
        return fn(item)

    @deco.synchronized
    def _parallel(items):
        return [_concurrent(item) for item in items]

    return _parallel(items)


def f(x):
    return np.random.randn(x)


parallel(f, list(range(1000)))

However, I got the following error

Traceback (most recent call last):
  File "./test.py", line 25, in <module>
    parallel(f, list(range(1000)))
  File "./test.py", line 18, in parallel
    return _parallel(items)
  File "/usr/local/lib/python3.6/site-packages/deco/conc.py", line 61, in __call__
    return self.f(*args, **kwargs)
  File "<string>", line 3, in _parallel
  File "<string>", line 3, in <listcomp>
NameError: name '_concurrent' is not defined

Is it possible to use deco on nested functions? Thanks.

Pass arguments of @concurrent to pool

Rather than making a bunch of redundant arguments for the @concurrent decorator, pass the arguments directly to pool's constructor.

This also avoids a Python 2.6 incompatability pointed out by @gst

Logging Broke

I have logging in my @concurrent function and it doesn't seem to work, is it possible the logging level is not being changed for all the processes? When I do a print statement, it seems to work just fine with printing to screen. I'm using the logging module, I had the logger=logging.getLogger() as a global, then placed it into the function as well and it still doesn't work.

Synchronized concurrent method?

@concurrent
def preprocess(txt):
    txt = rep.replace(txt)
    txt = txt.split(u'\n')
    return txt


@concurrent
@synchronized
def readfile(path):
    txt= codecs.open(path, encoding='utf-8').read().strip()
    return preprocess(txt)

In the above code I want preprocess to be called individually hence even that is marked as concurrent and since I am using it in readfile I have marked it as both
@Concurrent
@synchronized

However error thrown :TypeError: unsupported operand type(s) for -: 'synchronized' and 'int'

thread/process count

I just wanted to verify this, since it doesn't seem to be documented.

In conc_test.py I see the use of test.processes, does this set the max thread/process count for that specific concurrent function? What does the test.p = None refer too?

Version on PyPI seems not to import conc correcly

When installing the version available on PyPI the statement import conc on deco/__init__.py does not import deco.py correctly, eventually failing on usage. The master version seems to have corrected that issue.

It may be worth making a new release with this issue fixed.

dict object has no attribute iteritems

Hello,

Here is a program I quickly put together in order to play around with Deco:

import collections
import time
from deco import concurrent, synchronized

@concurrent
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    time.sleep(2)
    return x + y

@synchronized
def print_sum():
    results = collections.defaultdict(dict)
    results[1] = compute(1, 2)
    results[2] = compute(1, 2)
    results[3] = compute(1, 2)
    results[4] = compute(1, 2)
    final_result = results[1] + results[2] + results[3] + results[4]
    print("combined results {}".format(final_result))

print_sum()

Note: I know it's not great, but wanted something quick to play with

But I get the following error...

AttributeError: 'dict' object has no attribute 'iteritems'

...which suggests the code isn't suitable for Python 3 as per this Stack Overflow response to a similar error: http://stackoverflow.com/a/30418498

Am I mistaken, or missing something obvious?

Thanks.

@synchronized with generators

Because of the index-based limitation, this is currently not supported:

@synchronized
for thing in range(10):
    concurrent_function(thing)

Obviously there's a simple workaround in most cases to just use list(generator). Do you plan to support generators in the future or is this a race condition problem?

Decorators from deco cannot be used on class methods

Hey,

I know this because, well, it's not implemented.

However since standalone methods are objects in python just as class methods are, what are the main sticking point preventing that from being usable on class methods? Could we get some sort of "what needs to happen" for that to be possible?

I would see a real use for that on class methods. Say you have a DataObject that contains a Gig of data in numpy arrays or something. Maybe that object also defines operations that can be perform on its data. It would be great from a software architecture perspective to keep the object-oriented code and those methods in the class definition, but still be able to parallele process it like you allow it to be done on standalone methods.

Am I am making sense?

IndentationError: unexpected indent

Python 3.5.1 / deco from pip installed from github

Test case:

from deco import concurrent, synchronized
import time


class DecoClass:

    @concurrent
    def work(self):
        time.sleep(0.1)

    @synchronized
    def run(self):
        for _ in range(100):
            self.work()

if __name__ == "__main__":
    start = time.time()
    x = DecoClass()
    x.run()
    print("Executing in serial should take 10 seconds")
    print("Executing in parallel took:", time.time() - start, "seconds")

The error message:

  File "<unknown>", line 1
    @synchronized
    ^
IndentationError: unexpected indent

Example of a dynamic work load.

Is it possible to have your concurrent function accept new work items after the concurrent function has been started already? Perhaps with a work queue for instance?

I gather from the documentation and examples this is not possible at this time.

Nice work on a clean , simple way of enabling multiprocessing in a straight forward way!

Thank you,
Rob

Deco with pandas data structures

While experimenting with deco and pandas I was hoping that the code below would work.

The intention was to simulate parallel-processing of a dummy pandas.DataFrame, where vectorized implementation is supposedly not possible.

import pandas as pd
n = 1000
df = pd.DataFrame({'str': 'Row'+pd.Series(range(n)).astype(str),
                   'num': pd.np.random.randint(1,10,n)})
df.head()

Produces:

   num   str
0    3  Row0
1    8  Row1
2    1  Row2
3    2  Row3
4    9  Row4

Now trying to join cells in each row with:

from deco import *

@concurrent
def join_str(series, sep=', '):
    return sep.join(map(str, series))

@synchronized
def joiner(df, cols, sep=', '):
    joined = pd.Series(index=df.index)
    for row in df.index:
        joined[row] = join_str(df.loc[row, cols], sep=sep)
    return joined

joiner(df, ['str','num'])

Gives an error Assignment attempted on something that is not index based:

ValueError                                Traceback (most recent call last)
..........

D:\Anaconda\envs\py2k\lib\site-packages\deco\astutil.pyc in subscript_name(node)
     38             return node.id
     39         elif type(node) is ast.Subscript:
---> 40             return SchedulerRewriter.subscript_name(node.value)
     41         raise ValueError("Assignment attempted on something that is not index based")
     42 

D:\Anaconda\envs\py2k\lib\site-packages\deco\astutil.pyc in subscript_name(node)
     39         elif type(node) is ast.Subscript:
     40             return SchedulerRewriter.subscript_name(node.value)
---> 41         raise ValueError("Assignment attempted on something that is not index based")
     42 
     43     def is_concurrent_call(self, node):

ValueError: Assignment attempted on something that is not index based

Which is somewhat strange given that assignment is index based.

Is this because deco doesn't 'understand' this particular data structure (pandas.Series) or is there a problem in the code?

KeyError with peterbe.com easy example on WinPython 3.6.3.0-64

I'm getting this in Jupyter notebok 5.7.8 with WinPython 3.6.8-64 on Windows 10. This is the first deco'd example from here


RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "c:\wpy64-368zro\python-3.6.8.amd64\lib\multiprocessing\pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py", line 10, in concWrapper
result = concurrent.functions[f](*args, **kwargs)
KeyError: 'slow'
"""

The above exception was the direct cause of the following exception:

KeyError Traceback (most recent call last)
in
4 slow(index)
5
----> 6 run()

c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in call(self, *args, **kwargs)
59 exec(out, scope)
60 self.f = scope[self.orig_f.name]
---> 61 return self.f(*args, **kwargs)
62
63

in run()

c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in wait(self)
134 results = []
135 while self.results:
--> 136 result, operations = self.results.pop().get()
137 self.apply_operations(operations)
138 results.append(result)

c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in get(self)
154
155 def get(self):
--> 156 return self.async_result.get(3e+6)
157
158 def result(self):

c:\wpy64-368zro\python-3.6.8.amd64\lib\multiprocessing\pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):

KeyError: 'slow'

# after.py

from deco import concurrent, synchronized

@concurrent
def slow(index):
    time.sleep(5)

@synchronized
def run():
    for index in list('123'):
        slow(index)

run()

Support for PyPy processes

It could be interesting to offload the parallel jobs to PyPy processes instead of the main Python interpreter, which would be mostly CPython if one wants to plot data, use Pandas or any other library that is not (yet) supported by PyPy. In this use case I see PyPy as a fast computation backend similar to Numba and its @jit decorator. Furthermore, Numpy arrays could be passed in parameters and handled by Numpypy, provided that pickling/unpickling is compatible between the two.
For it to work it is necessary to use multiprocessing.set_start_method('spawn') and set_executable() to set the interpreter to PyPy. Unfortunately this option of starting a fresh Python interpreter process is only available from Python 3.4, and PyPy only supports 3.3 for now. There is also this multiprocess fork of multiprocessing, which uses dill for a better serialization, so it could be worth integrating the py3.5 version of multiprocess into deco, so that set_start_method can be back-ported to an older version of Python and available in PyPy.
What do you think?

Implement free variable capture

Motivated by the use of module global variables, like logging.Logger instances, deco should be able to detect free variable reference in its concurrent functions and pass in/proxy any it encounters. This functionality should be optional, perhaps even allowing the specification of exactly which free variables to capture.

A simple test is the following:

from deco import *

global_var = False

@concurrent
def my_conc():
    return global_var

if __name__ == '__main__':
    global_var = True
    result = my_conc().get()
    print(result) #Should print True, but prints False unless using concurrent.threaded

Please add a LICENSE file

Thanks for publishing this code, I would really like to play with it!
Sadly, without an explicit license, I can't really do so.

"a proper software license is a necessary evil for any code you plan to release to the public (...) Because I did not explicitly indicate a license, I declared an implicit copyright without explaining how others could use my code." Coding Horror

GH has made this easy.

wiki problems

  1. you def process_url and then call process_lat_lon
  2. will by modified typo

Handle invalid access to @concurrent results

Loops like the following are currently not allowed in @synchronized functions, but will not throw a useful exception.

@synchronized
def run():
    x = []
    for i in range(...):
        x.append(conccurent(i))
    return x

Handling more cases of @concurrent result uses is a good idea, but more importantly users should be warned when their function is invalid.

using futures.ascompleted construct

How is it possible to process the results like the futures.ascompleted construct , so as soon as the result is available it can be processed?

Instead of doing function decoration cant we do at lower granularity and mark the section of code rather then delegating it as a separate function?

Google App Engine / Pandas Request Failing:

Hi guys,

I'm on Windows so forgive me in advance if that's the cause of any problems I'm also a beginner when it comes to async io and so I might just be misunderstanding something.

I'm making a request to the google app-engine (google analytics) using the pandas ga module, which uses OAuth to communicate with the analytics portion of the app engine.

Here's the code I had written:

import pandas.io.ga as ga
import pandas as pd
from deco import concurrent, synchronized
import time

@concurrent
def d_fetch(date, hour):
        t0 = time.time()
        data[str(date)+'h'+str(hour)] = [
            ga.read_ga(
            account_id  = "xxx",
            profile_id  = "xxx",
            property_id = "UA-xxx",
            metrics     = ['sessions','hits', 'bounces'],
            dimensions  = ['date', 'hour', 'minute', 'medium', 'keyword'],
            start_date  = date,
            end_date    = date,
            index_col = 0,
            filters = "hour==" + '{0:02d}'.format(hour))]
        t1 = time.time()
        data[str(date)+'h'+str(hour)].append(round(t1-t0,2))
        print str(date)+str(hour)+": completed in "+str(round(t1-t0,2))+" secs."

@synchronized
def run(data, dates):
    for date in dates:
        for hour in xrange(24):
            d_fetch(date, hour)

if __name__ == "__main__":
    somemute = {}
    date_range = pd.date_range(start='5/8/2016', end='5/8/2016', freq='D')

    t0 = time.time()
    run(somemute, date_range)
    t1 = time.time()
    print "TOOK", round(t1-t0,2)

And the error that was being raised:

image

Thanks!
Matt

Python 3 support

Hi, just wondering if you plan to update this to work on Python 3?

NameError: name 'concurrent' is not defined

This was my program.

@concurrent
def add_to_list(num):
    num = num + 1
    return num

x = []
@synchronized
def loop_to_add():
    for i in xrange(1, 100000):
        x.append(add_to_list(i))
    return x

loop_to_add()
print "Done"

It shows the error -


NameError Traceback (most recent call last)
in ()
1 #%%timeit
----> 2 @Concurrent
3 def add_to_list(num):
4 num = num + 1
5 return num
NameError: name 'concurrent' is not defined

I think I am doing something silly here. But I cannot seem to figure out what.

Processor limit?

I've been testing Deco on a AWS instance with 36 cores and but Deco seems to max out at 16-18. Is there a limit to how many processes Deco can run?

Thanks
screen shot 2018-02-14 at 3 38 59 pm

setup.py problem (Python 3.5)

I downloaded the archive and thought I would install from the directory containing setup, in a Py3.5 virtual env. Just trying the--help option gives:

$ ./setup.py --help
from: can't read /var/mail/distutils.core
./setup.py: line 3: syntax error near unexpected token `newline'
./setup.py: line 3: `setup('

pip install deco works just fine, so this is a minor issue. However, the PyPI version of deco seems to be not up to date.

Issues With Hanging Processes / Restart

Hi,

Really love this idea, this is the way parallel processing should be :)

I'm having an issue where the same code will sometimes work as expected and create many processes, however, sometimes it appears it will get stuck with one process.

Wondering if anyone else is having/has had this issue and if there is a fix.

I've had this issue on osx, windows and through python and ipython qt console.

High memory usage - how to reduce?

deco seems to make my program use more memory than I expected (regardless of the number of workers)

Let's say I have code like this (left out details for clarity)

@concurrent
def slow(index):
    ... do something

def run():
    for index in iterator_w_200K_items:
        slow(index)
    slow.wait()

It seems like the iterator is being read all the way through at once (and pending jobs created). So it's using too much memory. (To verify I replaced iterator_w_200K_items with iterator_w_2K_items and memory usage went way down.)

Is there a way I can have deco work in smaller sized chunks?

I hope that makes sense.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.