Giter VIP home page Giter VIP logo

aliyun-odps-python-sdk's Introduction

ODPS Python SDK and data analysis framework

PyPI version Docs License Implementation



-----------------

Elegent way to access ODPS API. Documentation

Installation

The quick way:

pip install pyodps[full]

If you don't need to use Jupyter, just type

pip install pyodps

The dependencies will be installed automatically.

Or from source code (not recommended for production use):

$ virtualenv pyodps_env
$ source pyodps_env/bin/activate
$ pip install git+https://github.com/aliyun/aliyun-odps-python-sdk.git

Dependencies

  • Python (>=2.7), including Python 3+, pypy, Python 3.7 recommended
  • setuptools (>=3.0)

Run Tests

  • install pytest
  • copy conf/test.conf.template to odps/tests/test.conf, and fill it with your account
  • run pytest odps

Usage

>>> import os
>>> from odps import ODPS
>>> # Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
>>> # while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
>>> # Not recommended to hardcode Access Key ID or Access Key Secret in your code.
>>> o = ODPS(
>>>     os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
>>>     os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
>>>     project='**your-project**',
>>>     endpoint='**your-endpoint**',
>>> )
>>> dual = o.get_table('dual')
>>> dual.name
'dual'
>>> dual.table_schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
}
>>> dual.creation_time
datetime.datetime(2014, 6, 6, 13, 28, 24)
>>> dual.is_virtual_view
False
>>> dual.size
448
>>> dual.table_schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]

DataFrame API

>>> from odps.df import DataFrame
>>> df = DataFrame(o.get_table('pyodps_iris'))
>>> df.dtypes
odps.Schema {
  sepallength           float64
  sepalwidth            float64
  petallength           float64
  petalwidth            float64
  name                  string
}
>>> df.head(5)
|==========================================|   1 /  1  (100.00%)         0s
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa
3          4.6         3.1          1.5         0.2  Iris-setosa
4          5.0         3.6          1.4         0.2  Iris-setosa
>>> df[df.sepalwidth > 3]['name', 'sepalwidth'].head(5)
|==========================================|   1 /  1  (100.00%)        12s
          name  sepalwidth
0  Iris-setosa         3.5
1  Iris-setosa         3.2
2  Iris-setosa         3.1
3  Iris-setosa         3.6
4  Iris-setosa         3.9

Command-line and IPython enhancement

In [1]: %load_ext odps

In [2]: %enter
Out[2]: <odps.inter.Room at 0x10fe0e450>

In [3]: %sql select * from pyodps_iris limit 5
|==========================================|   1 /  1  (100.00%)         2s
Out[3]:
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa
3          4.6         3.1          1.5         0.2  Iris-setosa
4          5.0         3.6          1.4         0.2  Iris-setosa

Python UDF Debugging Tool

#file: plus.py
from odps.udf import annotate

@annotate('bigint,bigint->bigint')
class Plus(object):
    def evaluate(self, a, b):
        return a + b
$ cat plus.input
1,1
3,2
$ pyou plus.Plus < plus.input
2
5

Contributing

For a development install, clone the repository and then install from source:

git clone https://github.com/aliyun/aliyun-odps-python-sdk.git
cd pyodps
pip install -r requirements.txt -e .

If you need to modify the frontend code, you need to install nodejs/npm. To build and install your frontend code, use

python setup.py build_js
python setup.py install_js

License

Licensed under the Apache License 2.0

aliyun-odps-python-sdk's People

Contributors

hekaisheng avatar lyman avatar miunice avatar qinxuye avatar wjsi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aliyun-odps-python-sdk's Issues

reader :UnicodeDecodeError: 'utf8' codec can't decode byte 0x89 in position 3: invalid start byte

我有个odps 中的字段string中存放是BINARY类型(从mysql同步的)

在使用open_record_reader 中获取数据报错

        with download_session.open_record_reader(0,download_session.count) as reader:
            for record in reader:

traceback:

Traceback (most recent call last):
  ...........

    for record in reader:
  File "/Users/silenceper/Library/Python/2.7/lib/python/site-packages/odps/tunnel/tabletunnel/reader.py", line 168, in __next__
    record = self.read()
  File "/Users/silenceper/Library/Python/2.7/lib/python/site-packages/odps/tunnel/tabletunnel/reader.py", line 142, in read
    val = utils.to_text(self._reader.read_string())
  File "/Users/silenceper/Library/Python/2.7/lib/python/site-packages/odps/utils.py", line 266, in to_text
    return binary.decode(encoding)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x89 in position 3: invalid start byte

怎么将reader转化为dataframe

你好请问下,比如我根据table和partition(partition是每天)得到reader,如何将这个reader转化为dataframe,谢谢
reader = table.open_reader(partition=‘20180110’)

sql中含有left join 会报错

执行这样一段脚本
%%sql
select a.*
from tablea a left join tableb b
on a.id=b.id
where b.id is null;

然后会报错ODPS-0130161:Parse exception - line 2:35 cannot recognize input near 'left' 'join' 'tableb' in join type specifier

然而在odps直接执行是可以的

odps.df 操作 odpsSql

请问是否可以用dataframe 执行sql 返回result 为df类型?
例如: DataFrame(o.execute_sql('sql query')),
如果可以是否有示例代码,或者有其他办法

UDAF 执行过程

UDAFRunner 里面看到udaf是将所有数据均分为两份,分别放到buffer0 buffer1 里面。在merge函数里合并这两个buffer。但是我发现实际上,可能所有数据都在一个buffer里面。导致数据错误

我是做一个将string合并的函数。输入n个string。输出是n个string的依次首位相连的string。
下面是代码。
@Annotate('string->string')
class array2string(BaseUDAF):
def new_buffer(self):
return list()
def iterate(self, buffer, unit):
buffer.append(unit)
def merge(self, buffer, pbuffer):
if len(buffer) == 0:
buffer.append(pbuffer[0])
for i in range(len(pbuffer)):
buffer.append(pbuffer[i])
buffer.append('')

    else:
        pass
        
        for i in range(len(pbuffer)):
            if 2*i+1 < len(buffer):
                buffer[2*i+1] = pbuffer[i]
            else:
                pass
    	
def terminate(self, buffer):
    return ';'.join(buffer)

使用tunnel时,如何用多个block啊?

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(odps)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

with upload_session.open_record_writer(0) as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

upload_session.commit([0])

这里使用只有一个block_id,感觉速度不够快,怎么能够加快这个操作呢?因为record非常多,需要的时间较长。

pyodps.dataframe.value_counts 限制10000条数据?

如果我全部采用dataframe接口,如何通过dataframe获取完整的数据?
另外,
open_reader可以采用,limit_enabled=False方法实现,但是也是仅仅一次调用,每次都要配置。

采用enter 空间的做法,进入项目空间。就无法再初始化配置中,直接配置此项配置,源码中,在进入空间中,没有获取options.tunnel.limited_instance_tunnel这项配置,是否考虑添加额外配置,可以允许配置?

pyodps sdk支持编写udf吗

在maxCompute的文档里只有JAVA的udf。我在pyodps的sdk里看到可以编写自定义的函数,但是没有看到具体接口实现的文档。

unknown exception

像是环境问题,同一份代码在另一台机器上运行正常,这台机器python3.5/2.7.13试过都有这问题
Traceback (most recent call last):
File "/usr/local/lib/python3.5/site-packages/requests-2.12.4-py3.5.egg/requests/utils.py", line 792, in check_header_validity
if not pat.match(value):
TypeError: expected string or bytes-like object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "./test_aliyun_mc.py", line 55, in
UploadData()
File "./test_aliyun_mc.py", line 18, in UploadData
upload_session = tunnel.create_upload_session(table.name) #, partition_spec='pt=test')
File "/usr/local/lib/python3.5/site-packages/pyodps-0.5.6-py3.5.egg/odps/tunnel/tabletunnel/tabletunnel.py", line 92, in create_upload_session
compress_option=compress_option)
File "/usr/local/lib/python3.5/site-packages/pyodps-0.5.6-py3.5.egg/odps/tunnel/tabletunnel/uploadsession.py", line 60, in init
self._init()
File "/usr/local/lib/python3.5/site-packages/pyodps-0.5.6-py3.5.egg/odps/tunnel/tabletunnel/uploadsession.py", line 74, in _init
resp = self._client.post(url, {}, params=params, headers=headers)
File "/usr/local/lib/python3.5/site-packages/pyodps-0.5.6-py3.5.egg/odps/rest.py", line 115, in post
return self.request(url, 'post', data=data, **kwargs)
File "/usr/local/lib/python3.5/site-packages/pyodps-0.5.6-py3.5.egg/odps/rest.py", line 94, in request
prepared_req = req.prepare()
File "/usr/local/lib/python3.5/site-packages/requests-2.12.4-py3.5.egg/requests/models.py", line 257, in prepare
hooks=self.hooks,
File "/usr/local/lib/python3.5/site-packages/requests-2.12.4-py3.5.egg/requests/models.py", line 303, in prepare
self.prepare_headers(headers)
File "/usr/local/lib/python3.5/site-packages/requests-2.12.4-py3.5.egg/requests/models.py", line 427, in prepare_headers
check_header_validity(header)
File "/usr/local/lib/python3.5/site-packages/requests-2.12.4-py3.5.egg/requests/utils.py", line 796, in check_header_validity
"not %s" % (value, type(value)))
requests.exceptions.InvalidHeader: Header value 0 must be of type str or bytes, not <class 'int'>

dataframe处理的数据量的问题

我的ODPS的表的数据量非常大,每天是千万级别的,这种情况下能用dataframe吗,我想预览下 head(5),半天也出不来数据 😅

Datetime时间范围问题,请提供个解决方案

问题描述:

  • sdk中支持的时间范围为:1970-01-01 00:00:00~3000-12-31 23:59:59,无法导出存储出生日期字段的值(导出为None)
# 运行环境:python2.7.13
if __name__ == "__main__":
    xm_odps_connector = OdpsConnector().get_connector()
    chk_sql = "select to_date('1969-12-31 23:59:59', 'yyyy-mm-dd hh:mi:ss') as today from dual;"
    with xm_odps_connector.execute_sql(chk_sql).open_reader() as reader:
        for record in reader:
            dt = Record.get_by_name(record, 'today')
            if dt:
                print(dt.strftime('%Y-%m-%d %H:%M:%S'))
            else:
                print('9999-12-31 00:00:00')

错误信息

Traceback (most recent call last):
  File "odps\tunnel\io\reader_c.pyx", line 159, in odps.tunnel.io.reader_c.BaseTunnelRecordReader._read_datetime
  File "C:\Anaconda3\envs\python27\lib\site-packages\odps\utils.py", line 349, in to_datetime
    return _fromtimestamp(seconds).replace(microsecond=microseconds)
ValueError: timestamp out of range for platform localtime()/gmtime() function
Exception ValueError: 'timestamp out of range for platform localtime()/gmtime() function' in 'odps.tunnel.io.reader_c.BaseTunnelRecordReader._set_datetime' ignored

df.head(n) 对于分区列的值均解析为 NoneType

直接对分区表执行 df.head(n),相应分区列debug发现其类型解析正确,但是值均解析为 NoneType;
df._types ->
qq20170110-0
df._values ->
qq20170110-1

如果使用 df['xxx', '分区列'].head(n),则可解析出相应列和值。

problems with running unittest in PyCharm

PyCharm complains as follows when I try running unittest.
It seems caused by this http://stackoverflow.com/questions/29501029/managed-to-break-my-venv-is-it-possible-to-fix

Traceback (most recent call last):
  File "/home/lyman/.pyenv/versions/2.7.2/lib/python2.7/site.py", line 62, in <module>
    import os
  File "/home/lyman/.pyenv/versions/2.7.2/lib/python2.7/os.py", line 49, in <module>
    import posixpath as path
  File "/home/lyman/.pyenv/versions/2.7.2/lib/python2.7/posixpath.py", line 17, in <module>
    import warnings
  File "/home/lyman/.pyenv/versions/2.7.2/lib/python2.7/warnings.py", line 8, in <module>
    import types
  File "/home/lyman/workspace/ali/odps/pyodps/odps/types.py", line 20, in <module>
    import re
  File "/home/lyman/.pyenv/versions/2.7.2/lib/python2.7/re.py", line 282, in <module>
    import copy_reg
  File "/home/lyman/.pyenv/versions/2.7.2/lib/python2.7/copy_reg.py", line 7, in <module>
    from types import ClassType as _ClassType
ImportError: cannot import name ClassType

Process finished with exit code 1

pyodps 中 replace的用法报错

用replace 函数
报错为tuple index out of range。
dataframe 名为list, 列名为p, 其中有些行为空,有些有数,有些含有+86
代码为 list.p.repalce('+86', '')

我尝试用另外一个数据源的数据,也有相同问题,
我下载了iris的数据,然后传至公共服务器,然后用replace,还是报错相同问题,
是不是本身这个function 有问题,或者能否在文档中举例说明如何使用replace?

统计文档问题

1.odps.ml.statistics 文档中很多方法没有具体的例子,同时没给出相关的return的值,在使用过程中并不友好,希望各位大神增添相关的文档,调用更加清楚。
2.请教一下各位大神,odps.ml中 是否直接就是和PAI一套的计算框架

endpoint参数含义?

初始化时(示例代码如下),endpoint是什么意思?如果可能,能否给出详细的解释,谢谢!

from odps import ODPS

o = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
            endpoint='**your-end-point**')

to_pandas报错

ODPSError: InstanceId: 20171221102131257g9c5vlu
ODPS-0130071:[0,0] Semantic analysis exception - INT type is not enabled in current mode
ODPS-0130071:[0,0] Semantic analysis exception - INT type is not enabled in current mode

python setup.py install ERROR

python setup.py install
File "setup.py", line 118
with open('requirements.txt') as f:
^
SyntaxError: invalid syntax

批量读取数据到dataframe

想将从数据库中读取大量数据到本地的csv
用odps.df接口似乎会一次读取所有数据, 导致内存不足
目前只能通过open_reader() 读取
有更好的办法吗?

关于 str.replace 不被内置函数支持的问题

现在想用 str.replace 正则替换,但是发现 compile 出来的SQL是带 pyodps_udf_xxxx 的,于是跟踪源码发现,strings.Replace 确实没在 compiler 中实现。
目前发现 strings.Contains 是实现了内部函数以及正则,于是想参考实现 strings.Replace,但是发现补充 elif 后打断点程序并不会执行(不会进入 visit_string_op )。跟了一下代码,但还是不清楚哪儿有问题,麻烦指导一下子,谢谢~

数据上传时日期时间格式 缺少to_timestamp方法

/Users/tux/.pyenv/versions/2.7.10/bin/python2.7 /Users/tux/projects/python/other/Aliyun/AliyunODPS/test.py
Traceback (most recent call last):
File "/Users/tux/projects/python/other/Aliyun/AliyunODPS/test.py", line 22, in
record = tb.new_record([1, 'daixijun', 'hashpassword', 20, True, datetime.now(), datetime.now()])
File "/Users/tux/github/aliyun-odps-python-sdk/odps/models/table.py", line 479, in new_record
return types.Record(schema=self.schema, values=values)
File "/Users/tux/github/aliyun-odps-python-sdk/odps/types.py", line 358, in init
self._sets(values)
File "/Users/tux/github/aliyun-odps-python-sdk/odps/types.py", line 380, in _sets
[self._set(i, value) for i, value in enumerate(values)]
File "/Users/tux/github/aliyun-odps-python-sdk/odps/types.py", line 369, in _set
val = validate_value(value, data_type)
File "/Users/tux/github/aliyun-odps-python-sdk/odps/types.py", line 868, in validate_value
data_type.validate_value(res)
File "/Users/tux/github/aliyun-odps-python-sdk/odps/types.py", line 622, in validate_value
timestamp = self.to_timestamp(val)
AttributeError: 'Datetime' object has no attribute 'to_timestamp'

Process finished with exit code 1

程序在win10上经常出现异常

同一个程序在linux上运行正常,但在win10上经常在这出现异常
record = table.new_record([。。。。])
writer.write(record)

Traceback (most recent call last):
File ".\test_aliyun_mc.py", line 71, in
UploadData()
File ".\test_aliyun_mc.py", line 57, in UploadData
skipped += 1
File "F:\Python27\lib\site-packages\odps\tunnel\io\writer.py", line 234, in exit
self.close()
File "F:\Python27\lib\site-packages\odps\tunnel\io\writer.py", line 276, in close
super(RecordWriter, self).close()
File "F:\Python27\lib\site-packages\odps\tunnel\io\writer.py", line 227, in close
super(BaseRecordWriter, self).close()
File "F:\Python27\lib\site-packages\odps\tunnel\io\writer.py", line 67, in close
self.flush_all()
File "F:\Python27\lib\site-packages\odps\tunnel\io\writer.py", line 70, in flush_all
self.flush()
File "F:\Python27\lib\site-packages\odps\tunnel\io\writer.py", line 62, in flush
self.output.write(data)
File "F:\Python27\lib\site-packages\odps\tunnel\io\stream.py", line 82, in write
raise_exc(ex_type, ex_value, tb)
File "F:\Python27\lib\site-packages\odps\lib\lib_utils.py", line 98, in raise_exc
six.exec
('raise ex_type, ex, tb', glb, locals())
File "F:\Python27\lib\site-packages\odps\lib\six.py", line 719, in exec_
exec("""exec code in globs, locs""")
File "", line 1, in
File "F:\Python27\lib\site-packages\odps\tunnel\io\stream.py", line 51, in async_func
self._resp = post_call(self.data_generator())
File "F:\Python27\lib\site-packages\odps\tunnel\tabletunnel.py", line 258, in upload
self._client.put(url, data=data, params=params, headers=headers)
File "F:\Python27\lib\site-packages\odps\rest.py", line 128, in put
return self.request(url, 'put', data=data, **kwargs)
File "F:\Python27\lib\site-packages\odps\rest.py", line 107, in request
proxies=self._proxy)
File "F:\Python27\lib\site-packages\requests\sessions.py", line 639, in send
r = adapter.send(request, **kwargs)
File "F:\Python27\lib\site-packages\requests\adapters.py", line 488, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: [Errno 10054]

过滤数据,string类型怎么处理呢?

wiki上的例子都是数字过滤,涉及string如何处理。
我把日期也直接存为了string,比如「20161123」,如何选择这一天的记录?

update:
改用filter方法,提示「keyword can't be an expression」

org.filter(org.product='vt_finance', org.day='20161122', hour='23').head(5)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.