Giter VIP home page Giter VIP logo

pyfunctional's Introduction

PyFunctional

Build Status Code Coverage ReadTheDocs PyPI version

Features

PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a few examples of what it can do:

  • Chained operators: seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y)
  • Expressive and feature complete API
  • Read and write text, csv, json, jsonl, sqlite, gzip, bz2, and lzma/xz files
  • Parallelize "embarrassingly parallel" operations like map easily
  • Complete documentation, rigorous unit test suite, 100% test coverage, and CI which provide robustness

PyFunctional's API takes inspiration from Scala collections, Apache Spark RDDs, and Microsoft LINQ.

Table of Contents

  1. Installation
  2. Examples
    1. Simple Example
    2. Aggregates and Joins
    3. Reading and Writing SQLite3
    4. Data Interchange with Pandas
  3. Writing to Files
  4. Parallel Execution
  5. Github Shortform Documentation
    1. Streams, Transformations, and Actions
    2. Streams API
    3. Transformations and Actions APIs
    4. Lazy Execution
  6. Contributing and Bug Fixes
  7. Changelog

Installation

PyFunctional is available on pypi and can be installed by running:

# Install from command line
$ pip install pyfunctional

Then in python run: from functional import seq

Examples

PyFunctional is useful for many tasks, and can natively open several common file types. Here are a few examples of what you can do.

Simple Example

from functional import seq

seq(1, 2, 3, 4)\
    .map(lambda x: x * 2)\
    .filter(lambda x: x > 4)\
    .reduce(lambda x, y: x + y)
# 14

# or if you don't like backslash continuation
(seq(1, 2, 3, 4)
    .map(lambda x: x * 2)
    .filter(lambda x: x > 4)
    .reduce(lambda x, y: x + y)
)
# 14

Streams, Transformations and Actions

PyFunctional has three types of functions:

  1. Streams: read data for use by the collections API.
  2. Transformations: transform data from streams with functions such as map, flat_map, and filter
  3. Actions: These cause a series of transformations to evaluate to a concrete value. to_list, reduce, and to_dict are examples of actions.

In the expression seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y), seq is the stream, map is the transformation, and reduce is the action.

Filtering a list of account transactions

from functional import seq
from collections import namedtuple

Transaction = namedtuple('Transaction', 'reason amount')
transactions = [
    Transaction('github', 7),
    Transaction('food', 10),
    Transaction('coffee', 5),
    Transaction('digitalocean', 5),
    Transaction('food', 5),
    Transaction('riotgames', 25),
    Transaction('food', 10),
    Transaction('amazon', 200),
    Transaction('paycheck', -1000)
]

# Using the Scala/Spark inspired APIs
food_cost = seq(transactions)\
    .filter(lambda x: x.reason == 'food')\
    .map(lambda x: x.amount).sum()

# Using the LINQ inspired APIs
food_cost = seq(transactions)\
    .where(lambda x: x.reason == 'food')\
    .select(lambda x: x.amount).sum()

# Using PyFunctional with fn
from fn import _
food_cost = seq(transactions).filter(_.reason == 'food').map(_.amount).sum()

Aggregates and Joins

The account transactions example could be done easily in pure python using list comprehensions. To show some of the things PyFunctional excels at, take a look at a couple of word count examples.

words = 'I dont want to believe I want to know'.split(' ')
seq(words).map(lambda word: (word, 1)).reduce_by_key(lambda x, y: x + y)
# [('dont', 1), ('I', 2), ('to', 2), ('know', 1), ('want', 2), ('believe', 1)]

In the next example we have chat logs formatted in json lines (jsonl) which contain messages and metadata. A typical jsonl file will have one valid json on each line of a file. Below are a few lines out of examples/chat_logs.jsonl.

{"message":"hello anyone there?","date":"10/09","user":"bob"}
{"message":"need some help with a program","date":"10/09","user":"bob"}
{"message":"sure thing. What do you need help with?","date":"10/09","user":"dave"}
from operator import add
import re
messages = seq.jsonl('examples/chat_logs.jsonl')

# Split words on space and normalize before doing word count
def extract_words(message):
    return re.sub('[^0-9a-z ]+', '', message.lower()).split(' ')


word_counts = messages\
    .map(lambda log: extract_words(log['message']))\
    .flatten().map(lambda word: (word, 1))\
    .reduce_by_key(add).order_by(lambda x: x[1])

Next, lets continue that example but introduce a json database of users from examples/users.json. In the previous example we showed how PyFunctional can do word counts, in the next example lets show how PyFunctional can join different data sources.

# First read the json file
users = seq.json('examples/users.json')
#[('sarah',{'date_created':'08/08','news_email':True,'email':'[email protected]'}),...]

email_domains = users.map(lambda u: u[1]['email'].split('@')[1]).distinct()
# ['yahoo.com', 'python.org', 'gmail.com']

# Join users with their messages
message_tuples = messages.group_by(lambda m: m['user'])
data = users.inner_join(message_tuples)
# [('sarah',
#    (
#      {'date_created':'08/08','news_email':True,'email':'[email protected]'},
#      [{'date':'10/10','message':'what is a...','user':'sarah'}...]
#    )
#  ),...]

# From here you can imagine doing more complex analysis

CSV, Aggregate Functions, and Set functions

In examples/camping_purchases.csv there are a list of camping purchases. Lets do some cost analysis and compare it the required camping gear list stored in examples/gear_list.txt.

purchases = seq.csv('examples/camping_purchases.csv')
total_cost = purchases.select(lambda row: int(row[2])).sum()
# 1275

most_expensive_item = purchases.max_by(lambda row: int(row[2]))
# ['4', 'sleeping bag', ' 350']

purchased_list = purchases.select(lambda row: row[1])
gear_list = seq.open('examples/gear_list.txt').map(lambda row: row.strip())
missing_gear = gear_list.difference(purchased_list)
# ['water bottle','gas','toilet paper','lighter','spoons','sleeping pad',...]

In addition to the aggregate functions shown above (sum and max_by) there are many more. Similarly, there are several more set like functions in addition to difference.

Reading/Writing SQLite3

PyFunctional can read and write to SQLite3 database files. In the example below, users are read from examples/users.db which stores them as rows with columns id:Int and name:String.

db_path = 'examples/users.db'
users = seq.sqlite3(db_path, 'select * from user').to_list()
# [(1, 'Tom'), (2, 'Jack'), (3, 'Jane'), (4, 'Stephan')]]

sorted_users = seq.sqlite3(db_path, 'select * from user order by name').to_list()
# [(2, 'Jack'), (3, 'Jane'), (4, 'Stephan'), (1, 'Tom')]

Writing to a SQLite3 database is similarly easy

import sqlite3
from collections import namedtuple

with sqlite3.connect(':memory:') as conn:
    conn.execute('CREATE TABLE user (id INT, name TEXT)')
    conn.commit()
    User = namedtuple('User', 'id name')

    # Write using a specific query
    seq([(1, 'pedro'), (2, 'fritz')]).to_sqlite3(conn, 'INSERT INTO user (id, name) VALUES (?, ?)')

    # Write by inserting values positionally from a tuple/list into named table
    seq([(3, 'sam'), (4, 'stan')]).to_sqlite3(conn, 'user')

    # Write by inferring schema from namedtuple
    seq([User(name='tom', id=5), User(name='keiga', id=6)]).to_sqlite3(conn, 'user')

    # Write by inferring schema from dict
    seq([dict(name='david', id=7), dict(name='jordan', id=8)]).to_sqlite3(conn, 'user')

    # Read everything back to make sure it wrote correctly
    print(list(conn.execute('SELECT * FROM user')))

    # [(1, 'pedro'), (2, 'fritz'), (3, 'sam'), (4, 'stan'), (5, 'tom'), (6, 'keiga'), (7, 'david'), (8, 'jordan')]

Writing to files

Just as PyFunctional can read from csv, json, jsonl, sqlite3, and text files, it can also write them. For complete API documentation see the collections API table or the official docs.

Compressed Files

PyFunctional will auto-detect files compressed with gzip, lzma/xz, and bz2. This is done by examining the first several bytes of the file to determine if it is compressed so therefore requires no code changes to work.

To write compressed files, every to_ function has a parameter compression which can be set to the default None for no compression, gzip or gz for gzip compression, lzma or xz for lzma compression, and bz2 for bz2 compression.

Parallel Execution

The only change required to enable parallelism is to import from functional import pseq instead of from functional import seq and use pseq where you would use seq. The following operations are run in parallel with more to be implemented in a future release:

  • map/select
  • filter/filter_not/where
  • flat_map

Parallelization uses python multiprocessing and squashes chains of embarrassingly parallel operations to reduce overhead costs. For example, a sequence of maps and filters would be executed all at once rather than in multiple loops using multiprocessing

Documentation

Shortform documentation is below and full documentation is at docs.pyfunctional.pedro.ai.

Streams API

All of PyFunctional streams can be accessed through the seq object. The primary way to create a stream is by calling seq with an iterable. The seq callable is smart and is able to accept multiple types of parameters as shown in the examples below.

# Passing a list
seq([1, 1, 2, 3]).to_set()
# [1, 2, 3]

# Passing direct arguments
seq(1, 1, 2, 3).map(lambda x: x).to_list()
# [1, 1, 2, 3]

# Passing a single value
seq(1).map(lambda x: -x).to_list()
# [-1]

seq also provides entry to other streams as attribute functions as shown below.

# number range
seq.range(10)

# text file
seq.open('filepath')

# json file
seq.json('filepath')

# jsonl file
seq.jsonl('filepath')

# csv file
seq.csv('filepath')
seq.csv_dict_reader('filepath')

# sqlite3 db and sql query
seq.sqlite3('filepath', 'select * from data')

For more information on the parameters that these functions can take, reference the streams documentation

Transformations and Actions APIs

Below is the complete list of functions which can be called on a stream object from seq. For complete documentation reference transformation and actions API.

Function Description Type
map(func)/select(func) Maps func onto elements of sequence transformation
starmap(func)/smap(func) Apply func to sequence with itertools.starmap transformation
filter(func)/where(func) Filters elements of sequence to only those where func(element) is True transformation
filter_not(func) Filters elements of sequence to only those where func(element) is False transformation
flatten() Flattens sequence of lists to a single sequence transformation
flat_map(func) func must return an iterable. Maps func to each element, then merges the result to one flat sequence transformation
group_by(func) Groups sequence into (key, value) pairs where key=func(element) and value is from the original sequence transformation
group_by_key() Groups sequence of (key, value) pairs by key transformation
reduce_by_key(func) Reduces list of (key, value) pairs using func transformation
count_by_key() Counts occurrences of each key in list of (key, value) pairs transformation
count_by_value() Counts occurrence of each value in a list transformation
union(other) Union of unique elements in sequence and other transformation
intersection(other) Intersection of unique elements in sequence and other transformation
difference(other) New sequence with unique elements present in sequence but not in other transformation
symmetric_difference(other) New sequence with unique elements present in sequence or other, but not both transformation
distinct() Returns distinct elements of sequence. Elements must be hashable transformation
distinct_by(func) Returns distinct elements of sequence using func as a key transformation
drop(n) Drop the first n elements of the sequence transformation
drop_right(n) Drop the last n elements of the sequence transformation
drop_while(func) Drop elements while func evaluates to True, then returns the rest transformation
take(n) Returns sequence of first n elements transformation
take_while(func) Take elements while func evaluates to True, then drops the rest transformation
init() Returns sequence without the last element transformation
tail() Returns sequence without the first element transformation
inits() Returns consecutive inits of sequence transformation
tails() Returns consecutive tails of sequence transformation
zip(other) Zips the sequence with other transformation
zip_with_index(start=0) Zips the sequence with the index starting at start on the right side transformation
enumerate(start=0) Zips the sequence with the index starting at start on the left side transformation
cartesian(*iterables, repeat=1) Returns cartesian product from itertools.product transformation
inner_join(other) Returns inner join of sequence with other. Must be a sequence of (key, value) pairs transformation
outer_join(other) Returns outer join of sequence with other. Must be a sequence of (key, value) pairs transformation
left_join(other) Returns left join of sequence with other. Must be a sequence of (key, value) pairs transformation
right_join(other) Returns right join of sequence with other. Must be a sequence of (key, value) pairs transformation
join(other, join_type='inner') Returns join of sequence with other as specified by join_type. Must be a sequence of (key, value) pairs transformation
partition(func) Partitions the sequence into elements which satisfy func(element) and those that don't transformation
grouped(size) Partitions the elements into groups of size size transformation
sorted(key=None, reverse=False)/order_by(func) Returns elements sorted according to python sorted transformation
reverse() Returns the reversed sequence transformation
slice(start, until) Sequence starting at start and including elements up to until transformation
head(no_wrap=None) / first(no_wrap=None) Returns first element in sequence (if no_wrap=True, the result will never be wrapped with Sequence) action
head_option(no_wrap=None) Returns first element in sequence or None if its empty (if no_wrap=True, the result will never be wrapped with Sequence) action
last(no_wrap=None) Returns last element in sequence (if no_wrap=True, the result will never be wrapped with Sequence) action
last_option(no_wrap=None) Returns last element in sequence or None if its empty (if no_wrap=True, the result will never be wrapped with Sequence) action
len() / size() Returns length of sequence action
count(func) Returns count of elements in sequence where func(element) is True action
empty() Returns True if the sequence has zero length action
non_empty() Returns True if sequence has non-zero length action
all() Returns True if all elements in sequence are truthy action
exists(func) Returns True if func(element) for any element in the sequence is True action
for_all(func) Returns True if func(element) is True for all elements in the sequence action
find(func) Returns the element that first evaluates func(element) to True action
any() Returns True if any element in sequence is truthy action
max() Returns maximal element in sequence action
min() Returns minimal element in sequence action
max_by(func) Returns element with maximal value func(element) action
min_by(func) Returns element with minimal value func(element) action
sum()/sum(projection) Returns the sum of elements possibly using a projection action
product()/product(projection) Returns the product of elements possibly using a projection action
average()/average(projection) Returns the average of elements possibly using a projection action
aggregate(func)/aggregate(seed, func)/aggregate(seed, func, result_map) Aggregate using func starting with seed or first element of list then apply result_map to the result action
fold_left(zero_value, func) Reduces element from left to right using func and initial value zero_value action
fold_right(zero_value, func) Reduces element from right to left using func and initial value zero_value action
make_string(separator) Returns string with separator between each str(element) action
dict(default=None) / to_dict(default=None) Converts a sequence of (Key, Value) pairs to a dictionary. If default is not None, it must be a value or zero argument callable which will be used to create a collections.defaultdict action
list() / to_list() Converts sequence to a list action
set() / to_set() Converts sequence to a set action
to_file(path) Saves the sequence to a file at path with each element on a newline action
to_csv(path) Saves the sequence to a csv file at path with each element representing a row action
to_jsonl(path) Saves the sequence to a jsonl file with each element being transformed to json and printed to a new line action
to_json(path) Saves the sequence to a json file. The contents depend on if the json root is an array or dictionary action
to_sqlite3(conn, tablename_or_query, *args, **kwargs) Save the sequence to a SQLite3 db. The target table must be created in advance. action
to_pandas(columns=None) Converts the sequence to a pandas DataFrame action
cache() Forces evaluation of sequence immediately and caches the result action
for_each(func) Executes func on each element of the sequence action
peek(func) Executes func on each element of the sequence but returns the element transformation

Lazy Execution

Whenever possible, PyFunctional will compute lazily. This is accomplished by tracking the list of transformations that have been applied to the sequence and only evaluating them when an action is called. In PyFunctional this is called tracking lineage. This is also responsible for the ability for PyFunctional to cache results of computation to prevent expensive re-computation. This is predominantly done to preserve sensible behavior and used sparingly. For example, calling size() will cache the underlying sequence. If this was not done and the input was an iterator, then further calls would operate on an expired iterator since it was used to compute the length. Similarly, repr also caches since it is most often used during interactive sessions where its undesirable to keep recomputing the same value. Below are some examples of inspecting lineage.

def times_2(x):
    return 2 * x

elements = (
   seq(1, 1, 2, 3, 4)
      .map(times_2)
      .peek(print)
      .distinct()
)

elements._lineage
# Lineage: sequence -> map(times_2) -> peek(print) -> distinct

l_elements = elements.to_list()
# Prints: 1
# Prints: 1
# Prints: 2
# Prints: 3
# Prints: 4

elements._lineage
# Lineage: sequence -> map(times_2) -> peek(print) -> distinct -> cache

l_elements = elements.to_list()
# The cached result is returned so times_2 is not called and nothing is printed

Files are given special treatment if opened through the seq.open and related APIs. functional.util.ReusableFile implements a wrapper around the standard python file to support multiple iteration over a single file object while correctly handling iteration termination and file closing.

no_wrap option

Even though functions like first() are supposed to return a single element, if the element is an iterable, then it is wrapped into a Sequence. For instance:

>>> s = seq(list(), list())
>>> type(s.first())
<class 'functional.pipeline.Sequence'>

That behaviour can be changed with no_wrap option:

>>> type(s.first(no_wrap=True))
<class 'list'>

The option is also accpeted by seq()/pseq() as well as Sequence() constructor, for example:

>>> type(seq([list(), list()], no_wrap=True).last())
<class 'list'>

Road Map Idea

  • SQL based query planner and interpreter
  • _ lambda operator

Contributing and Bug Fixes

Any contributions or bug reports are welcome. Thus far, there is a 100% acceptance rate for pull requests and contributors have offered valuable feedback and critique on code. It is great to hear from users of the package, especially what it is used for, what works well, and what could be improved.

To contribute, create a fork of PyFunctional, make your changes, then make sure that they pass. In order to be merged, all pull requests must:

  • Pass all the unit tests
  • Pass all the pylint tests, or ignore warnings with explanation of why its correct to do so
  • Not significantly reduce coverage without a good reason (coveralls.io)
  • Edit the CHANGELOG.md file in the Next Release heading with changes

Contact

Gitter for chat

Supported Python Versions

  • PyFunctional 1.5 is tested against Python 3.8 to 3.11. PyPy3 is not tested, but bug fixed on best effort basis.
  • PyFunctional 1.4 supports and is tested against Python 3.6, Python 3.7, and PyPy3
  • PyFunctional 1.4 and above do not support python 2.7
  • PyFunctional 1.4 works in Python 3.5, but is not tested against it
  • PyFunctional 1.4 and above partially works in 3.8, parallel processing currently has issues, but other feature work fine
  • PyFunctional 1.3 and below supports and was tested against Python 2.7, Python 3.5, Python 3.6, PyPy2, and PyPy3

Changelog

Changelog

About me

To learn more about me (the author) visit my webpage at pedro.ai.

I created PyFunctional while using Python extensively, and finding that I missed the ease of use for manipulating data that Spark RDDs and Scala collections have. The project takes the best ideas from these APIs as well as LINQ to provide an easy way to manipulate data when using Scala is not an option or PySpark is overkill.

Contributors

These people have generously contributed their time to improving PyFunctional

pyfunctional's People

Contributors

abybaddi009 avatar adrian17 avatar ameier38 avatar artemisart avatar chrisrink10 avatar chuyuhsu avatar dependabot[bot] avatar digenis avatar entilzha avatar geenen124 avatar guillem96 avatar jsemric avatar jwilk avatar kache avatar kae2312 avatar kchro3 avatar lucidfrontier45 avatar maestro-1 avatar oliverlockwood avatar piercefreeman avatar ponytailer avatar simlei avatar stephan-rayner avatar swiergot avatar thejohnfreeman avatar timgates42 avatar versae avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pyfunctional's Issues

product() should return value on empty lists

In Scala, .product() on an empty list returns 1, 1.0 etc... depending on the type of List's values.

Here, currently seq([]).product() throws.

I think product should take an optional initializer parameter (in case someone used classes with overloaded multiplication) with default value 1 or 1.0... I don't know which though.

Implement data passing functions

So far the only way to ingest data into ScalaFunctional is to read through using python defined data structures. It would be helpful to be able to read directly from data formats such as json/sql/csv.

Target milestone for everything completed will be 0.4.0.

This issue will serve as a parent issue for implementing each specific function.

Child issues:
#34 seq.open
#35 seq.range
#36 seq.csv
#37 seq.jsonl
#29 seq.json
#30 to_json
#31 to_csv
#32 to_file
#33 to_jsonl

Investigate Integrating Toolz

Via Twitter got suggested to checkout toolz. I think its worth looking into as powering part of the backend. It may help clean up code, make certain things easier to extend, and improve performance.

I've posted to their mailing list expressing interest in collaboration.

Edit documentation and add alias methods for LINQ

Based on a reddit thread, this package would be helpful for users looking for features of LINQ (from .NET) in Python. This is a parent issue for editing documentation to talk about this use case and add new/alias methods (like where and select) common in LINQ.

Add SQLite3 Output Stream

Add a function to_sqlite3 that can write to sqlite3 databases.

Text from prior discussion

Potential API

Writing to sqlite won't be complex if we can supply insertion SQL like below.

the_seq.to_sqlite3("db_path", "insert into test_table (id, name) values (?, ?);")

However, API without query like pandas' to_sql needs some work.

the_seq.to_sqlite3("db_path", "table_name")

Potential API Description

For inserting, the first example seems fine to me. The reason Pandas can do the second one is that it works with structured data for which it knows the types/names. The second query would get translated to something like insert into table_name (col1,col2....) values (?.....); where the columns come from the DataFrame's columns.

The second call could be fairly useful and not too difficult to write. Since we don't keep track of columns, in order to do something like this we would have to enforce that the sequence is a sequence of Tuple/namedtuple/List/dict of the same length/form (for dict this would require a scan to determine all dict fields since that is more friendly than every dict having every column, for list it would require getting max length list). Following that, we could do our best to infer the names of the columns for insert into test_table (id, name) values (?, ?) (from namedtuple._fields or scanning for dict fields) or give up and use insert into test_table values (?, ?).

The second requirement would be to check the input string against a table name regex to determine what should be done.

Using Generators Multiple Times

The general bug or undesirable behavior comes from using a generator from ScalaFunctional twice. This will of course, make it keep returning nothing since the generator is exhausted. While in general this is expected behavior, there are some functions where this could be prevented.

Specifically, for to_list, list, to_set, set, to_dict, and dict, since the sequence is getting expanded, it should also get stored for future calls. This increases memory use, but only by a constant factor.

In general, need to look at the library and consider where it makes sense to do this (sparingly).

Improve to_file for string writing

In using seq.to_file I have found a common case is to write a collection to a file as a string. I think the right way to expose is through a delimiter option in to_file. If it is None, the default is to str(self.to_list()), if it is defined then it will do self.make_string(separator) and write that.

Create `to_json`

Implement to_json

This should give the option to write values as an array at the json root, or if the sequence is a list of (Key, Value) pairs to write it as a dictionary at the json root.

Child of #19

Create `to_file`

Implement to_file with similar options to builtins.open in write mode

Child of #19

Edit Documentation for LINQ

Add documentation, change summary, and pypi keywords to improve discoverability for users looking for LINQ-like features.

Child to #38

Fix `zip_with_index` Behavior

zip_with_index behavior is inconsistent with how it is defined in spark/scala, and redundant with enumerate. Specifically, it zips with the index on the left hand side of the tuple, instead of the right hand side of the tuple.

Create `to_jsonl`

Implement to_jsonl which matches the implementation of functional.streams.jsonl

Child of #19

Fix count to match Scala docs instead of Spark docs

In Spark docs count returns the number of elements from all partitions without using a predicate. In scala count returns the number of elements which satisfy some predicate. In general I think its better to go with Scala definitions over Spark (although things like group_by_key are inspired from there). Additionally, len and size already do what count does.

Create `to_csv`

Implement to_csv with similar interface to python module csv.writer

Child of #19

Underscore similar to fn.py _

Creating to discuss possibly implementing and better integrating something similar to _ in fn.py. From the 0.5.0 milestones:

Another idea is to implement the _ operator from fn.py. It is quite useful, but its overkill to require the library as a dependency and gimmicky to check if it exists just to import. This might open doors to integrate it more deeply as well.

Create `seq.json`

In this ticket, implement the seq.json function. This should be styled similar to other functions in functional.streams. The primary decision points are:

  • When given a list at the json root, seq.json will make each element in the json list an element in the sequence
  • When given a dictionary at the json root, seq.json will return a list of (Key, Value) pairs where the keys are the root dictionary keys and values the corresponding values.

The behavior of the second is consistent that Sequence is storing a list, not other collection types and that in the context of functional, it is best represented as a list of (Key, Value) pairs.

Child of #19

Implement join function

Issue to match with implementing a join function. The implementation should take two sequences with tuples (K, V) and (K, W). The return value is the sequence joined on K to return a sequence of (K, (V, W)) tuples.

Additionally, should implement join_on which creates the keys via the result of a passed function.

support compression in File I/O functions

I could be usefull if stream functions like seq.open, seq.csv etc can read compressed files like Spark sc.textFile.

Also writing a compressed file by to_file, to_csv etc is great.

Add SQLite3 Input Stream

As described in Pull Request #55, add seq.sqlite3(arg, sql_statement) to input streams API. arg can be any of

  1. Connection string
  2. SQLite connection
  3. SQLite cursor

The input stream comes from the sqlite3 execute(sql_statement) function which returns an iterable of tuple rows

Broken 0.4.0 on Python 3 due to enum34 bug in wheel distribution

Looking into this. I suspect its that the wheel is built using python2, which means that unlike the source distribution the code in setup.py to handle the correct version is not functioning correctly. This didn't appear to be a problem before because part of this release fixed using the correct requirements list.

Doing my best to get a good fix out tonight and bump to 0.4.1 to avoid breaking things on pip.

Add functionality to seq

Issue for book keeping, already implemented. seq has been modified to support this behavior:

>>> # Already supported
>>> seq([1, 2, 3])
[1, 2, 3]
>>> # Newly added
>>> seq(1, 2, 3)
[1, 2, 3]
>>> seq(1)
[1]
>>> # Behavior changed, used to expand string
>>> seq("abc")
["abc"]

RFC: Name Change

I have long thought that the name ScalaFunctional is not that great, but so far haven't done anything about it. I think that this might be a good time to come up with a better name that suits what it does and direction of the project better. To be clear, the name change is for the distribution name (repository, website, PyPI), the import name will not change, because too many things would break.

I would like to detail where the name came from, why its not that good, and what would be desired in a new name by explaining the overall goals for the project. I plan on posting some name ideas later this week after more thought, but would like to get ideas from others as well.

At the end of the issue, I will explain logistically what the plan is to TLDR not break anything.

ScalaFunctional Name

Origin

  • API heavily inspired by/copies the Scala collections API
  • Second major source of ideas is Apache Spark, which is written in Scala
  • Library itself facilitates functional programming
  • Didn't think too much about name when making library since I primarily wanted it on PyPI to use it easily at the company I worked for (ie, not setup private PyPI server and share code across projects)
  • Matched the import name functional
  • functional on PyPI had/has been dead for a long time, but cannot be reclaimed. Since it is dead, a name conflict with it from import functional is unlikely, but not possible to claim the dead project's distribution name

Why its not good

  • This is a Python package, not a Scala package
  • Users may not care/know that it is Scala inspired so its confusing
  • Package is focused on data pipelines. This is pretty clear from lots of work to support various input/output data forms (to python collections, files, sql, pandas, and probably more later). The name doesn't highlight that
  • Hurts discoverability by LINQ users, which is a fairly large segment

Overall Goals and Direction

  1. Support creating data pipelines using functional programming concepts
  2. Provide read/write interoperability with all common data sources in the domain of data pipelines
  3. Improve aspects of the functional programming experience in python that enhance the first goal
  4. Let LINQ users seamlessly use the same engine from a familiar API
  5. Provide the above with negligible impact on performance, and the possibility of a parallel execution engine

Currently, the project is doing very well in supporting the first two. The streams/actions API is very complete, and more or less all common data formats/sources are supported (file compression coming in next release). The next possible targets would be SQL DBs with SQLAlchemy or similar to to_pandas, provide a way to make an SKLearn node (auto generate class that satisfies the node API).

I am not quite happy with progress on the third goal, namely making lambda calls more succinct. This is my motivation to at some point natively support something like _ from fn.py. This is paired with the exploratory work I have been doing on a SQL Parser/Compiler. With the code/understanding I have right now, something like below is looking pretty easy:

User = namedtuple('User', 'id name city')
seq([User(1, 'pedro', 'boulder'), User(2, 'fritz', 'seattle')]).filter('id < 1').select('name,city')

I am fairly confidant that as time goes on, the fourth goal will be better and better met.

The last goal has a few things wrapped in:

  • Currently performance is good (tested the other day) amortized over larger collections
  • However, there is no parallel execution support. This hasn't been done because it requires quite a bit of work, and lots of testing.
  • Currently, seq forces an expansion of its input, I would like to provide a family of seq operations that behave slightly differently for particular use cases. seq will stay default, sseq (stream sequence) will not force expand its input, pseq (parallel sequence) will provide a parallel execution engine. sseq is fairly low hanging fruit of these

New Name Goals

  • Describes better what the package does: data pipelines, functional programming, chain functional programming...
  • Does not provide confusion with inspiration source, but makes sense given them

Name Name Requirements

  • Must be available on PyPI
  • Name is not too similar to existing python package

Names Taken on PyPi

  • functional
  • functionally
  • chainz
  • pipeline
  • linqpy
  • linqish
  • py-linq
  • asq
  • PyLINQ
  • chain
  • fn
  • pipe
  • datapipeline

Plan

  1. Reserve name on PyPI (don't release yet), rename repository to new name, change all references to new name, and place notices wherever needed
  2. Verify that old links redirect to new name (I plan on making a dummy repository to test this behavior)
  3. Make sure readthedocs, travis, and codecov work correctly with new name
  4. Dual release package under new name and ScalaFunctional as 0.6.0
  5. Current plan is to dual release under both names until 1.0, whenever that might be. The import name will not change, only the distribution/repository name. Open to comment on this or any part of the plan

Hopefully I didn't forget anything, open to comments on anything at all (including that name change is not a good idea)

`tail` differing from the Scala version

From documentation: "Selects all elements except the first."

Your version: "get last element".

Any good reason behind it? I can change it to the former, and also implement stuff like inits and tails.

Performance Regression with seq.open

Thanks for @adrian17 to finding this.

Using the file here: http://norvig.com/ngrams/enable1.txt, and output/images below, its easy to see that LazyFile is creating a 2x overhead. Currently, the culprit seems to be a combination of additional call overhead to next and that next in builtins.open seems to be implemented in C.

$ python3 -m timeit -s "from functional import seq;" "lines = list(open('/Users/pedro/Desktop/enable1.txt')); seq(lines).select(str.rstrip).count(lambda line: len(line) > 20)"
10 loops, best of 3: 101 msec per loop

$ python3 -m timeit -s "from functional import seq" "seq.open('/Users/pedro/Desktop/enable1.txt').select(str.rstrip).count(lambda line: len(line) > 20)"
10 loops, best of 3: 195 msec per loop

Putting these in files and us pygraphviz have these call graphs (look at the far right, the rest is not relevant):
normal-callgraph
special-callgraph

fold_left and fold_right have incorrect order of arguments in fold function

While working to add aggregate I noticed a major bug which affects fold_left and fold_right. Referencing the scala documentation for foldLeft shows that given a sequence of type A, the passed function should have the type func(x: B, y: A) => B. This means that x should be the current folded value and y should be the next value to fold.

Currently, fold_left and fold_right` behave with this arguments reversed which is inconsistent with both scala as well as the similar aggregate function defined in the LINQ documentation.

To confirm this behavior:

Scala REPL

List("a", "b", "c").foldLeft("")((current, next) => current + ":" + next)
res3: String = :a:b:c

Python Terminal

In [1]: seq('a', 'b', 'c').fold_left("", lambda current, next: current + ":" + next)
Out[1]: 'c:b:a:'

Correcting this bug introduces a breaking change with all previous versions of ScalaFunctional which contain fold_left namely 0.2.0, 0.3.0, 0.3.1. Since this fix is a breaking change, it will not be backported to versions as patches (third number in version), but will be introduced in 0.4.0.

Create library of common operators

There are quite a few of common operators which are passed into functions such as map/filter/reduce. It might be a good idea to compile a library of common operators in functional.ops.

Implement functions with generators

Re-implement/change all the functions in the library to be compatible with generators. Currently, sequential calls to transformations produces a new list between each transformation even if it is only used for the next transformation, not in the result. This is wasteful and could be eliminated using generators.

Targeting this to be the large (potentially breaking, hopefully not though) feature of 0.2.0 while 0.1.7 will be used to add more utility functions.

[lineage] Potential performance problems

With this code, I traced all function calls for a simple operation.

import sys
from functional import seq

def tracefunc(frame, event, arg, indent=[0]):
    if event == "call":
        indent[0] += 2
        print (" " * indent[0] + "|", frame.f_code.co_name)
    elif event == "return":
        indent[0] -= 2
    return tracefunc

sys.settrace(tracefunc)

def dummyPredicate(line):
    return True

list(seq([1, 2, 3, 4, 5]).filter(dummyPredicate))

Here are the results for master and lineage-rewrite branches: https://gist.github.com/adrian17/5daa0db38fb4340c9f6e

As you can see, dummyPredicate is called twice as much as it should be - it looks like the base collection is iterated twice.

Better LINQ Integration

Creating to discuss potential better LINQ integration for 0.6.0. From the milestone summery:

Another possible focus is on LINQ. This could take the form of implementing a limited SQL parser and optimizer using pyparsing. This might also be giving select, where, and related methods more definition. For example, if the LINQ functions are used using calls like select("atr").filter("atr == 1") be smarter about how they are executed. This is a wide open door, looking for thoughts and suggestions on what is of value. The basic concept is to start working on smarter ways of reading data, although this might tread into the territory of much more mature libraries like pandas its dataframes.

Regression using iterators using PyPy

First reported here: #24

The core issue is when running code like this in pypy

>>> l = seq([1, 2, 3]).union([4, 5])
>>> set(l)
set([])

The result in standard python is different:

>>> l = seq([1, 2, 3]).union([4, 5])
>>> set(l)
set([1, 2, 3, 4, 5])

Looking into this further, I had a suspicion that at heart of the issue is that on master, union and many other operators return iterators. If they are iterated over once, then will return nothing. So it seemed like something was iterating over them before set() got to them. A common culprit for this type of problem is len, so I stuck some debugging statements and confirmed this is the problem.

For demonstration purposes, below is minimalistic code to replicate the same behavior, followed by the terminal session for that and scalafunctional with the print statements on iter, getitem and len

from collections import Iterable

class A(object):
    def __init__(self, seq):
        self.l = seq
    def __getitem__(self, item):
        print "DEBUG:getitem called"
        return self.l[item]
    def __iter__(self):
        print "DEBUG:iter called"
        return iter(self.l)
    def __len__(self):
        print "DEBUG:len called"
        if isinstance(self.l, Iterable):
            self.l = list(self.l)
        return len(self.l)

class B(object):
    def __init__(self, seq):
        self.l = seq
    def __iter__(self):
        print "DEBUG:iter called"
        return iter(self.l)


print "Calling set(A([1, 2]))"
a = A([1, 2])
print set(a)


print "Calling set(B([1, 2]))"
b = B([1, 2])
print set(b)

print "Calling union"
s = set([1, 2, 3]).union([4, 5])
c = A(iter(s))
print set(c)

Output

$ pypy iterable.py
Calling set(A([1, 2]))
DEBUG:iter called
DEBUG:len called
set([1, 2])
Calling set(B([1, 2]))
DEBUG:iter called
set([1, 2])
Calling union
DEBUG:iter called
DEBUG:len called
set([])
$ python iterable.py
Calling set(A([1, 2]))
DEBUG:iter called
set([1, 2])
Calling set(B([1, 2]))
DEBUG:iter called
set([1, 2])
Calling union
DEBUG:iter called
set([1, 2, 3, 4, 5])

Terminal sessions for scalafunctional

$ python
Python 2.7.9 (default, Jan  7 2015, 11:49:12)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.56)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from functional import seq
>>> l = seq([1, 1, 2, 3, 3]).union([1, 4, 5])
>>> set(l)
DEBUG:iter
set([1, 2, 3, 4, 5])
$ pypy
Python 2.7.9 (9c4588d731b7fe0b08669bd732c2b676cb0a8233, Mar 31 2015, 07:55:22)
[PyPy 2.5.1 with GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>>> from functional import seq
>>>> l = seq([1, 1, 2, 3, 3]).union([1, 4, 5])
>>>> set(l)
DEBUG:iter
DEBUG:len
DEBUG:iterable expanded in len via list()
set([])

Basically what is happening is that in both examples:

  1. set() results in a call to iter. Since the return type of union is an iterator, the final return value of that call is like iter(iter(resultOfUnion)).
  2. However the object holds a reference to the inner iter(resultOfUnion). When len gets called, it evaluates iter(resultOfUnion) and saves it to l. scalafunctional does this in order to reduce many evaluations of a generator which can cause problems.
  3. This causes a problem when the outer iter is finally called though because there are no elements to call it on.

I am unsure of the best way to fix this, but some considerations

  1. Is pypy correct to be calling len when standard python doesn't? Is there a good reason for this (probably)? Moreover, it seems even if i disagree, unlikely that this would be changed.
  2. The current work on lineage-rewrite, #20, and #17 will fix this I think without any specific attention to it.

Given that, I am inclined to followup with pypy devs to see if this is expected "correct" behavior or something needing fixing. I will also finish up the work on the lineage rewrite, then include the tests using set() and dict(). I am presuming the problems are due to similar issues with iterators. If it is still a problem, then I will have to think more about what to do.

Parallel Execution Engine

Creating issue to discuss potential of implementing a parallel execution engine. From the 0.5.0 milestone this might include:

The first possibility is to abstract the execution engine away so that ScalaFunctional can use either a sequential or parallel execution engine. This would need to be done through a combination of multiprocessing and determining where it could be used without creating massive code duplication. Additionally, this would require writing completely new tests and infrastructure since order is not guaranteed, but expected in the current sequential tests.

on_error Functionality

The functionality from https://github.com/jagill/python-chainz#errors would be useful for certain use cases. Wrapping this into Lineage seems like a fairly clean way to accomplish this.

On a side note, this might be a good time to look at making the exceptions raised from evaluating an incorrect user function in a PyFunctional pipeline cleaner. Currently, there is quite a bit of noise when its is very unlikely the core issue is coming from PyFunctional.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.