Giter VIP home page Giter VIP logo

adlfs's Introduction

Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage

PyPI version shields.io Latest conda-forge version

Quickstart

This package can be installed using:

pip install adlfs

or

conda install -c conda-forge adlfs

The adl:// and abfs:// protocols are included in fsspec's known_implementations registry in fsspec > 0.6.1, otherwise users must explicitly inform fsspec about the supported adlfs protocols.

To use the Gen1 filesystem:

import dask.dataframe as dd

storage_options={'tenant_id': TENANT_ID, 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET}

dd.read_csv('adl://{STORE_NAME}/{FOLDER}/*.csv', storage_options=storage_options)

To use the Gen2 filesystem you can use the protocol abfs or az:

import dask.dataframe as dd

storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}

ddf = dd.read_csv('abfs://{CONTAINER}/{FOLDER}/*.csv', storage_options=storage_options)
ddf = dd.read_parquet('az://{CONTAINER}/folder.parquet', storage_options=storage_options)

Accepted protocol / uri formats include:
'PROTOCOL://container/path-part/file'
'PROTOCOL://container@account.dfs.core.windows.net/path-part/file'

or optionally, if AZURE_STORAGE_ACCOUNT_NAME and an AZURE_STORAGE_<CREDENTIAL> is 
set as an environmental variable, then storage_options will be read from the environmental
variables

To read from a public storage blob you are required to specify the 'account_name'. For example, you can access NYC Taxi & Limousine Commission as:

storage_options = {'account_name': 'azureopendatastorage'}
ddf = dd.read_parquet('az://nyctlc/green/puYear=2019/puMonth=*/*.parquet', storage_options=storage_options)

Details

The package includes pythonic filesystem implementations for both Azure Datalake Gen1 and Azure Datalake Gen2, that facilitate interactions between both Azure Datalake implementations and Dask. This is done leveraging the intake/filesystem_spec base class and Azure Python SDKs.

Operations against both Gen1 Datalake currently only work with an Azure ServicePrincipal with suitable credentials to perform operations on the resources of choice.

Operations against the Gen2 Datalake are implemented by leveraging Azure Blob Storage Python SDK.

Setting credentials

The storage_options can be instantiated with a variety of keyword arguments depending on the filesystem. The most commonly used arguments are:

  • connection_string
  • account_name
  • account_key
  • sas_token
  • tenant_id, client_id, and client_secret are combined for an Azure ServicePrincipal e.g. storage_options={'account_name': ACCOUNT_NAME, 'tenant_id': TENANT_ID, 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET}
  • anon: boo, optional. The value to use for whether to attempt anonymous access if no other credential is passed. By default (None), the AZURE_STORAGE_ANON environment variable is checked. False values (false, 0, f) will resolve to False and anonymous access will not be attempted. Otherwise the value for anon resolves to True.
  • location_mode: valid values are "primary" or "secondary" and apply to RA-GRS accounts

For more argument details see all arguments for AzureBlobFileSystem here and AzureDatalakeFileSystem here.

The following environmental variables can also be set and picked up for authentication:

  • "AZURE_STORAGE_CONNECTION_STRING"
  • "AZURE_STORAGE_ACCOUNT_NAME"
  • "AZURE_STORAGE_ACCOUNT_KEY"
  • "AZURE_STORAGE_SAS_TOKEN"
  • "AZURE_STORAGE_TENANT_ID"
  • "AZURE_STORAGE_CLIENT_ID"
  • "AZURE_STORAGE_CLIENT_SECRET"

The filesystem can be instantiated for different use cases based on a variety of storage_options combinations. The following list describes some common use cases utilizing AzureBlobFileSystem, i.e. protocols abfsor az. Note that all cases require the account_name argument to be provided:

  1. Anonymous connection to public container: storage_options={'account_name': ACCOUNT_NAME, 'anon': True} will assume the ACCOUNT_NAME points to a public container, and attempt to use an anonymous login. Note, the default value for anon is True.
  2. Auto credential solving using Azure's DefaultAzureCredential() library: storage_options={'account_name': ACCOUNT_NAME, 'anon': False} will use DefaultAzureCredential to get valid credentials to the container ACCOUNT_NAME. DefaultAzureCredential attempts to authenticate via the mechanisms and order visualized here.
  3. Auto credential solving without requiring storage_options: Set AZURE_STORAGE_ANON to false, resulting in automatic credential resolution. Useful for compatibility with fsspec.
  4. Azure ServicePrincipal: tenant_id, client_id, and client_secret are all used as credentials for an Azure ServicePrincipal: e.g. storage_options={'account_name': ACCOUNT_NAME, 'tenant_id': TENANT_ID, 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET}.

Append Blob

The AzureBlobFileSystem accepts all of the Async BlobServiceClient arguments.

By default, write operations create BlockBlobs in Azure, which, once written can not be appended. It is possible to create an AppendBlob using mode="ab" when creating and operating on blobs. Currently, AppendBlobs are not available if hierarchical namespaces are enabled.

adlfs's People

Contributors

albertdefusco avatar almeida-cloves-bcg avatar basnijholt avatar benrutter avatar btel avatar dfigus avatar dtrifiro avatar efiop avatar firecast avatar hayesgb avatar isidentical avatar jacobtomlinson avatar jamesmyatt avatar johnmacnamararseg avatar joostvdoorn avatar jsignell avatar kamalsharma2 avatar keerthiyandaos avatar madhur-tandon avatar manjuransari avatar marcelotrevisani avatar martindurant avatar microft avatar pmrowla avatar raybellwaves avatar rlamy avatar rukartok avatar scott-zhou avatar skshetry avatar tomaugspurger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

adlfs's Issues

Misleading Exceptions

I just updated to 0.3.0 and am adapting my own libraries to these changes - great to have support for azure blob storage 12!

I noticed that some of the exceptions are overly broad and gives the end-user a misleading error message.

For example, passing invalid credentials will raise a FileNotFoundError, suppressing Azure Blob Storages AuthorizationFailure.

Example:
While running Azurite:

>>> connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=test;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;" # Notice invalid AccountKey here

>>> abfs = AzureBlobFileSystem(account_name="test", connection_string=connection_string)
>>> abfs.ls(".")
azure.core.exceptions.HttpResponseError: Server failed to authenticate the request. Make sure the value of the Authorization header is formed correctly including the signature.
RequestId:63c0f3c2-d764-48dd-b7fa-522322a61e79
Time:2020-05-25T14:01:15.322Z
ErrorCode:AuthorizationFailure
Error:
  
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/adlfs/core.py", line 534, in ls
    raise FileNotFoundError(f"File {path} does not exist!!")
FileNotFoundError: File  does not exist!!

I would expect to either have a custom exception I can catch, or to let through the Azure exception so that the user can handle it.

As-is, I can't handle an Authentication issue separately from other issues when using adlfs in my libraries.

Feature request: Use adlfs to access public blobs

Originally posted it at https://stackoverflow.com/questions/63856476/adlfs-create-azureblobfilesystem-with-credential-none but should probably post here. Need rep points to create a adlfs tag and fsspec tags...

I'm going through this:
https://azure.microsoft.com/en-us/services/open-datasets/catalog/goes-16/

and I was curious how to do the equivalent of s3fs.S3FileSystem(anon=True) with adlfs.AzureBlobFileSystem.

It seems credential=None is a good candidate (https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.containerclient?view=azure-python) but as far as i'm aware it hasn't been implemented?

>>> import adlfs
>>> fs = adlfs.AzureBlobFileSystem(credential=None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ray/local/bin/anaconda3/envs/test_env/lib/python3.8/site-packages/fsspec/spec.py", line 52, in __call__
    obj = super().__call__(*args, **kwargs)
TypeError: __init__() missing 1 required positional argument: 'account_name'

abfs produces corrupted files when data is big enough

Hi,
Thanks again for working on azure integration.

I've been hitting a tricky adlfs/abfs bug on a dask data extraction pipeline.

I have a bag containing json data spread in several partitions spread over several workers in a dask cluster hosted in Kubernetes using dask-kubernetes. The whole dataset contains ~1-10 million items. When the partition size exceeds a certain threshold (in my case more than 150_000 items), the export to textfiles produces corrupted files:

  • If I export to plain jsonl files, the files contain only a subset of the partition's data and they start with an empty line.
  • If I export to jsonl.gz compressed files, the resulting gzipped files are invalid and can't be read.

I've been unable to reproduce this issue on a local machine using the distributed client, but I'm quite sure the issue comes from adlfs because exporting to amazon s3 using s3fs produces clean files.

I've been trying to reproduce the issue on a distributed cluster in a simple and reproducible way that I could share with you, but I couldn't reproduce the problem. I think that it's my specific pipeline's steps that result in a state where adlfs fails to export the data properly.

Each item in the bag looks like this:

{
    "geohash_aggregation_precision": 8,
    "location": {"type": "Polygon",
                 "coordinates": [[[
                     129.12368774414062,
                     35.201568603515625],
                     [
                         129.12403106689453,
                         35.201568603515625],
                     [
                         129.12403106689453,
                         35.20174026489258],
                     [
                         129.12368774414062,
                         35.20174026489258],
                     [
                         129.12368774414062,
                         35.201568603515625]]]},
    "location_centroid": {"lat": 35.2016544342041,
                          "lon": 129.12385940551758},
    "short_events_count": 1, 
    "long_events_count": 0,
    "total_events_count": 0,
    "utc_datetime": "2019-11-01 00:00:00.000000",
    "local_datetime": "2019-11-01 09:00:00.000000",
    "_id": "KR-wy7b6230-2019-11-01 00:00:00.000000"
}

The partitions are not evenly sized, here is an example of their sizes:

[(0, 82302),
 (1, 59934),
 (2, 230304),
 (3, 46914),
 (4, 114548),
 (5, 140497),
 (6, 69740),
 (7, 80690),
 (8, 134518),
 (9, 99581),
(10, 59099),
(11, 106069)]

In this example partitions 2, 5, 8 result in corrupted files.
I export the data using:

    final_bag.map(json.dumps).to_textfiles(
        'abfs://storage/data/data_part_*.jsonl.gz'
        storage_options=STORAGE_OPTIONS,
        last_endline=True
    )

For the moment I work around the problem by using more and smaller partitions, but because getting balanced partitions efficiently is tricky in dask, the situation is not really ideal.

I'm using:

  • python 3.6.9
  • dask 2.9.1
  • dask_kubernetes master
  • adlfs 0.1.5

Each worker in the dask cluster has:

  • 1 vcpu
  • 7 GiB of ram

xfailing ls test

On master, I wrote a test for ls that's currently failing

The setup in conftest.py is

    bbs = BlockBlobService(
        account_name=USERNAME,
        account_key=KEY,
        custom_domain=f"http://{host}/devstoreaccount1",
    )
    bbs.create_container("data", timeout=1)

    bbs.create_blob_from_bytes("data", "/root/a/file.txt", data)
    bbs.create_blob_from_bytes("data", "/root/b/file.txt", data)

which IIUC is

- root/
  - a/
    - file.txt
  - b/
    - file.txt  

The test is

    fs = adlfs.AzureBlobFileSystem(storage.account_name, "data", storage.account_key)
    assert fs.ls("/") == ["root"]
    assert fs.ls("/root/a/") == ["file.txt"]

Regardless of what we pass to ls, the contents seem the same

(Pdb) pp fs.ls("/")
['/root/a/file.txt', '/root/b/file.txt']
(Pdb) pp fs.ls("/root/")
['/root/a/file.txt', '/root/b/file.txt']

cc @hayesgb

Strange behavior reading from S3 and into ADLS

What happened:

Here is my code

def main():
    source = 'covid19-lake/enigma-aggregation/csv/global/enigma_covid_19_global.csv'
    target = 'data/enigma_covid_19_global.csv'

    s3fs = fsspec.filesystem('s3')
    abfs = fsspec.filesystem('abfs', account_name='...', account_key='...')

    with s3fs.open(source, 'rb') as fp:
        out = fp.read()

    with abfs.open(target, 'wb') as fp:
        fp.write(out)

The following error comes up when I attempt to write the file:

azure.core.exceptions.HttpResponseError: The value for one of the HTTP headers is not in the correct format.
RequestId:51f0e5f3-101e-0007-49d2-534e06000000
Time:2020-07-06T20:19:27.0090111Z
ErrorCode:InvalidHeaderValue
Error:None
HeaderName:Content-Length
HeaderValue:0

This comes from adlfs/core.py +763

758  	    def _upload_chunk(self, final=False, **kwargs):
759  	        data = self.buffer.getvalue()
760  	        length = len(data)
761  	        block_id = len(self._block_list)
762  	        block_id = f"{block_id:07d}"
763  ->	        self.blob_client.stage_block(block_id=block_id, data=data, length=length)
764  	        self._block_list.append(block_id)
765
766  	        if final:
767  	            block_list = [BlobBlock(_id) for _id in self._block_list]
768  	            self.blob_client.commit_block_list(block_list=block_list)

What you expected to happen:

No error is raised.

Minimal Complete Verifiable Example:

# Put your MCVE code here

Anything else we need to know?:

If you run fp.write(out.decode('utf-8').encode('utf-8')) no error is raised.

Environment:

adal==1.2.4
adlfs==0.3.1
azure-core==1.6.0
azure-datalake-store==0.0.48
azure-storage-blob==12.3.2
botocore==1.17.14
certifi==2020.6.20
cffi==1.14.0
chardet==3.0.4
cryptography==2.9.2
docutils==0.15.2
fsspec==0.7.4
idna==2.10
isodate==0.6.0
jmespath==0.10.0
msrest==0.6.17
numpy==1.19.0
oauthlib==3.1.0
pandas==1.0.5
pycparser==2.20
PyJWT==1.7.1
python-dateutil==2.8.1
pytz==2020.1
requests==2.24.0
requests-oauthlib==1.3.0
s3fs==0.4.2
six==1.15.0
urllib3==1.25.9
  • Dask version: N/A
  • Python version: 3.8
  • Operating System: OSX
  • Install method (conda, pip, source): pip

It does not work in a dask cluster

What happened:
The library runs when dask is on a single machine. But when I run it in a dask cluster, error occured.

KilledWorker Traceback (most recent call last)
in
----> 1 ddf.head()

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
1004 Whether to compute the result, default is True.
1005 """
-> 1006 return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
1007
1008 def _head(self, n, npartitions, compute, safe):

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
1037
1038 if compute:
-> 1039 result = result.compute()
1040 return result
1041

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
450 postcomputes.append(x.dask_postcompute())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2712 should_rejoin = False
2713 try:
-> 2714 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2715 finally:
2716 for f in futures.values():

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1991 direct=direct,
1992 local_worker=local_worker,
-> 1993 asynchronous=asynchronous,
1994 )
1995

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
832 else:
833 return sync(
--> 834 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
835 )
836

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1850 exc = CancelledError(key)
1851 else:
-> 1852 raise exception.with_traceback(traceback)
1853 raise exc
1854 if errors == "skip":

KilledWorker: ("('read-parquet-head-1-5-read-parquet-ccef788ad675d1ca53552831cb61f745', 0)", <Worker 'tcp://10.0.0.5:37793', name: tcp://10.0.0.5:37793, memory: 0, processing: 1>)

What you expected to happen:
Should run in a multi-node cluster
Minimal Complete Verifiable Example:

# Put your MCVE code here

import dask.dataframe as dd
STORAGE_OPTIONS={'account_name': 'someaccount', 'account_key': 'some_key'}
ddf = dd.read_parquet('abfs://mltraining/ISDWeather///*.parquet', storage_options=STORAGE_OPTIONS)

#ok up to this point

#103 Then when running with head, error happened
ddf.head()

Anything else we need to know?:

Environment:

  • Dask version: 2.27
  • Python version: 3.6
  • Operating System: Ubuntu
  • Install method (conda, pip, source):
    pip

Not able to install adlfs through pip

When trying to install the newest version of adlfs with pip I get the following error:

Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\Users\SEBAST~1\AppData\Local\Temp\pip-install-30y0kaw9\adlfs\setup.py", line 5, in <module> import versioneer ModuleNotFoundError: No module named 'versioneer' ---------------------------------------- ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.

pip version: 19.3.1
python version: 3.7

use of 'az' as a shorter version of 'abfs'

Moved from filesystem_spec to here

This is more of a discussion than a feature request.

Looking at known_implementations in fsspec there are gcs and gs which both point to gcsfs.GCSFileSystem. Could az be created as a shorter version of abfs?

az is common with Azure CLI syntax e.g. docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10#run-azcopy.
As someone who is very lazy I prefer two key strokes instead of four.

Caveats

Unlike gs and s3, Azure has two classes (adlfs.AzureDatalakeFileSystem and adlfs.AzureBlobFileSystem). I imagine you can't use az for both classes.

Proposed solution

Use az as a shortcut for 'abfs': {'class': 'adlfs.AzureBlobFileSystem'}. Given this is for the latest version of storage (Azure Datalake Gen2 and Azure Blob Storage). In addition my 50 % reduction in key strokes motivation doesn't apply to adl (Azure Datalake Gen1).

ERROR - ... HTTP status code=404, Exception=The specified blob does not exist. ErrorCode: BlobNotFound.

I am running the below code and everything works just fine -- in can process the whole dataset and no parts are missing.

import dask.dataframe as dd
from fsspec.registry import known_implementations
known_implementations['abfs'] = {'class': 'adlfs.AzureBlobFileSystem'}
STORAGE_OPTIONS={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
df = dd.read_csv(f'abfs://{CONTAINER}/nyctaxi/2015/*.csv', 
                 storage_options=STORAGE_OPTIONS,
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

Still I am getting this error message as the code is run:

ERROR - Client-Request-ID=bcfef538-1079-11ea-9010-37c2dc712507 Retry policy did not allow for a retry: Server-Timestamp=Tue, 26 Nov 2019 18:22:42 GMT, Server-Request-ID=3807c197-101e-000b-5c86-a41b61000000, HTTP status code=404, Exception=The specified blob does not exist. ErrorCode: BlobNotFound.

It seems to be inconsequential, but I would like to know if it can be avoided.

opening alternates between 'ValueError: I/O operation on closed file' and no exception

What happened:

I am trying to load a xarray.Dataset from a Gen2 filesystem.

The first time I open a file with xarray, it opens without problem. The next call to open the same file raises:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-27-314f04be463e> in <module>
     16         return xr.open_dataset(f).load()
     17 
---> 18 open_da(loc) # runs fine
     19 open_da(loc) # throws
     20 

<ipython-input-27-314f04be463e> in open_da(loc)
     14 def open_da(loc):
     15     with fsspec.open("abfs://" + loc, **STORAGE_OPTIONS) as f:
---> 16         return xr.open_dataset(f).load()
     17 
     18 open_da(loc) # runs fine

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/core/dataset.py in load(self, **kwargs)
    664         for k, v in self.variables.items():
    665             if k not in lazy_data:
--> 666                 v.load()
    667 
    668         return self

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/core/variable.py in load(self, **kwargs)
    379             self._data = as_compatible_data(self._data.compute(**kwargs))
    380         elif not hasattr(self._data, "__array_function__"):
--> 381             self._data = np.asarray(self._data)
    382         return self
    383 

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order)
     81 
     82     """
---> 83     return array(a, dtype, copy=False, order=order)
     84 
     85 

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    675 
    676     def __array__(self, dtype=None):
--> 677         self._ensure_cached()
    678         return np.asarray(self.array, dtype=dtype)
    679 

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/core/indexing.py in _ensure_cached(self)
    672     def _ensure_cached(self):
    673         if not isinstance(self.array, NumpyIndexingAdapter):
--> 674             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    675 
    676     def __array__(self, dtype=None):

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order)
     81 
     82     """
---> 83     return array(a, dtype, copy=False, order=order)
     84 
     85 

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    651 
    652     def __array__(self, dtype=None):
--> 653         return np.asarray(self.array, dtype=dtype)
    654 
    655     def __getitem__(self, key):

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order)
     81 
     82     """
---> 83     return array(a, dtype, copy=False, order=order)
     84 
     85 

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    555     def __array__(self, dtype=None):
    556         array = as_indexable(self.array)
--> 557         return np.asarray(array[self.key], dtype=None)
    558 
    559     def transpose(self, order):

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     26 
     27     def __getitem__(self, key):
---> 28         return indexing.explicit_indexing_adapter(
     29             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     30         )

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    835     """
    836     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 837     result = raw_indexing_method(raw_key.tuple)
    838     if numpy_indices.tuple:
    839         # index the loaded np.ndarray

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _getitem(self, key)
     36         with self.datastore.lock:
     37             array = self.get_array(needs_lock=False)
---> 38             return array[key]
     39 
     40 

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/h5netcdf/core.py in __getitem__(self, key)
    144 
    145     def __getitem__(self, key):
--> 146         return self._h5ds[key]
    147 
    148     def __setitem__(self, key, value):

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/h5py/_hl/dataset.py in __getitem__(self, args)
    541             arr = numpy.ndarray(selection.mshape, dtype=new_dtype)
    542             for mspace, fspace in selection:
--> 543                 self.id.read(mspace, fspace, arr, mtype)
    544             if len(names) == 1:
    545                 arr = arr[names[0]]

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5d.pyx in h5py.h5d.DatasetID.read()

h5py/_proxy.pyx in h5py._proxy.dset_rw()

h5py/_proxy.pyx in h5py._proxy.H5PY_H5Dread()

h5py/defs.pyx in h5py.defs.H5Dread()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/adlfs/spec.py in readinto(self, b)
   1547         https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
   1548         """
-> 1549         data = self.read(len(b))
   1550         memoryview(b).cast("B")[: len(data)] = data
   1551         return len(data)

~/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/adlfs/spec.py in read(self, length)
   1566             length = self.size - self.loc
   1567         if self.closed:
-> 1568             raise ValueError("I/O operation on closed file.")
   1569         logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
   1570         if length == 0:

ValueError: I/O operation on closed file.

Then when I try to open the file again, it works just fine again. It continues to alternate between exception and no exception.

I tried it wth adlfs latest release and with master.

What you expected to happen:
The file to load without an error.

Minimal Complete Verifiable Example:

import adlfs
import fsspec
import xarray as xr

from majoanalysis.utils import datalake_storage_options
STORAGE_OPTIONS = datalake_storage_options()   # returns {'account_name': '...', 'account_key': '...'}


fs = adlfs.AzureBlobFileSystem(**STORAGE_OPTIONS)
da = xr.DataArray(1)
da.to_netcdf("test.nc")
loc = "majodata/test.nc"
fs.put("test.nc", loc)

# works just fine
with fsspec.open("abfs://" + loc, **STORAGE_OPTIONS) as f:
    x = xr.open_dataset(f).load()

# throws 'ValueError: I/O operation on closed file.'
with fsspec.open("abfs://" + loc, **STORAGE_OPTIONS) as f:
    x = xr.open_dataset(f).load()

Anything else we need to know?:

Environment:

import dask, xarray, fsspec, adlfs
print(dask.__version__, xarray.__version__, fsspec.__version__, adlfs.__version__)

2.22.0 0.16.0 0.8.0 v0.5.0

  • Python version: 3.8.1
  • Operating System: CentOS
  • Install method (conda, pip, source): pip

azure.storage.blob 12.x compatability

https://pypi.org/project/azure-storage-blob/ 12.x was recently released. Some things broke.

In [1]: import adlfs
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-1-fd1e7da2cebf> in <module>
----> 1 import adlfs

~/sandbox/dask-adlfs/adlfs/__init__.py in <module>
----> 1 from .core import AzureDatalakeFileSystem
      2 from .core import AzureBlobFileSystem, AzureBlobFile
      3
      4 __all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem"]

~/sandbox/dask-adlfs/adlfs/core.py in <module>
      7 from azure.datalake.store import lib, AzureDLFileSystem
      8 from azure.datalake.store.core import AzureDLPath, AzureDLFile
----> 9 from azure.storage.blob import BlockBlobService
     10 from fsspec import AbstractFileSystem
     11 from fsspec.spec import AbstractBufferedFile

ImportError: cannot import name 'BlockBlobService' from 'azure.storage.blob' (/Users/taugspurger/.virtualenvs/filesystems/lib/python3.7/site-packages/azure/storage/blob/__init__.py)

docker compose file doesn't work

Following the notes in the README.md in the tests folder

https://github.com/dask/adlfs/blob/master/adlfs/tests/README.md

When I run docker-compose.yml I get

(adlfs-dev) Ray@UM-404XFVH4 adlfs % docker-compose up
Creating network "adlfs_default" with the default driver
Pulling azurite (mcr.microsoft.com/azure-storage/azurite:)...
latest: Pulling from azure-storage/azurite
cbdbe7a5bc2a: Pull complete
bd07af9ed1a4: Pull complete
3556ccf180b2: Pull complete
089d4748da74: Pull complete
6971638b255d: Pull complete
84e862bb769b: Pull complete
6c4a6b731cae: Pull complete
4f6d04b7d045: Pull complete
1385221e8f87: Pull complete
418c070ee704: Pull complete
ef2c1abd1a8a: Pull complete
720542683141: Pull complete
8ef034eecb73: Pull complete
d8ac6bf2d5e8: Pull complete
Digest: sha256:f1fdee4a45226659c5afa30b8d8d3a479037c701826d32448078f22a7cd011a4
Status: Downloaded newer image for mcr.microsoft.com/azure-storage/azurite:latest
Building test
Step 1/4 : FROM continuumio/miniconda3
ERROR: Service 'test' failed to build: Get https://registry-1.docker.io/v2/continuumio/miniconda3/manifests/latest: unauthorized: incorrect username or password

It's unclear how to set tenant_id and client_id

Trying to use this for the first time. Running the following:

>>> import dask_adlfs
>>> import dask.dataframe as dd
>>> df = dd.read_csv("adl://somedatalakestore.azuredatalakestore.net/somefile.csv")

Yields:

File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/io/csv.py", line 391, in read
    **kwargs)
File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/io/csv.py", line 271, in read_pandas
    **(storage_options or {}))
File "/usr/local/lib/python2.7/dist-packages/dask/bytes/core.py", line 138, in read_bytes
    None, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/dask/bytes/core.py", line 394, in get_fs_paths_myopen
    encoding=encoding, errors=errors, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/dask/bytes/core.py", line 260, in __init__
    self.fs = _filesystems[self.protocol](**self.storage_options)
File "/usr/local/lib/python2.7/dist-packages/dask_adlfs/core.py", line 32, in __init__
    self.do_connect()
File "/usr/local/lib/python2.7/dist-packages/dask_adlfs/core.py", line 37, in do_connect
    client_secret=self.client_secret)
File "/usr/local/lib/python2.7/dist-packages/azure/datalake/store/lib.py", line 110, in auth
    raise ValueError("tenant_id and client_id must be supplied for authentication")
ValueError: tenant_id and client_id must be supplied for authentication

These are assigned a default value of None here: https://github.com/dask/dask-adlfs/blob/5d8333ca8cd81571c271901b549a86df66c11230/dask_adlfs/core.py#L23-L27

Not able install adlfs from conda channel

Tried to install adlfs from the defusco conda channel. But, it's failing with the following error:

$conda install -c defusco adlfs

Solving environment: failed

PackagesNotFoundError: The following packages are not available from current channels:

  - adlfs
  - azure-datalake-store[version='>=0.0.46,<0.1']
  - adlfs
  - azure-storage-blob[version='>=2.1.0,<3.0.0']

Current channels:

  - https://conda.anaconda.org/defusco/linux-64
  - https://conda.anaconda.org/defusco/noarch
  - https://repo.anaconda.com/pkgs/main/linux-64
  - https://repo.anaconda.com/pkgs/main/noarch
  - https://repo.anaconda.com/pkgs/free/linux-64
  - https://repo.anaconda.com/pkgs/free/noarch
  - https://repo.anaconda.com/pkgs/r/linux-64
  - https://repo.anaconda.com/pkgs/r/noarch
  - https://repo.anaconda.com/pkgs/pro/linux-64
  - https://repo.anaconda.com/pkgs/pro/noarch

To search for alternate channels that may provide the conda package you're
looking for, navigate to

    https://anaconda.org

and use the search bar at the top of the page.

Please note, I have appended defusco as a channel in my .condarc file:

conda config --append channels defusco 

regression since v0.5.0: RecursionError: maximum recursion depth exceeded

What happened:

I am trying to load a xarray.Dataset from a Gen2 filesystem.

I tried it wth adlfs latest release and with master.

The error does not occur with v0.4.0!!

What you expected to happen:
The file to load without an error.

Minimal Complete Verifiable Example:

This code used to work (in v0.4.0).

url = "abfs://majodata/some_folder/calibration/left.nc"
with fsspec.open(url, **STORAGE_OPTIONS) as f:
    ds = xr.open_dataset(f)

gives

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    197             try:
--> 198                 file = self._cache[self._key]
    199             except KeyError:

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/lru_cache.py in __getitem__(self, key)
     52         with self._lock:
---> 53             value = self._cache[key]
     54             self._cache.move_to_end(key)

KeyError: [<class 'h5netcdf.core.File'>, (<adlfs.spec.AzureBlobFile object at 0x2b9c7aa6d790>,), 'r', (('invalid_netcdf', None),)]

During handling of the above exception, another exception occurred:

RecursionError                            Traceback (most recent call last)
<ipython-input-8-9024338e9664> in <module>
     11 with fsspec.open(url, **STORAGE_OPTIONS) as f:
     12 #     x = f.readlines()
---> 13     ds = xr.open_dataset(f)
     14 # with fsspec.open(url, **STORAGE_OPTIONS) as f:
     15 #     ds = xr.open_dataset(f)

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
    538             store = backends.ScipyDataStore(filename_or_obj, **backend_kwargs)
    539         elif engine == "h5netcdf":
--> 540             store = backends.H5NetCDFStore.open(
    541                 filename_or_obj, group=group, lock=lock, **backend_kwargs
    542             )

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in open(cls, filename, mode, format, group, lock, autoclose, invalid_netcdf, phony_dims)
    143 
    144         manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs)
--> 145         return cls(manager, group=group, mode=mode, lock=lock, autoclose=autoclose)
    146 
    147     def _acquire(self, needs_lock=True):

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __init__(self, manager, group, mode, lock, autoclose)
    104         # todo: utilizing find_root_and_group seems a bit clunky
    105         #  making filename available on h5netcdf.Group seems better
--> 106         self._filename = find_root_and_group(self.ds)[0].filename
    107         self.is_remote = is_remote_uri(self._filename)
    108         self.lock = ensure_lock(lock)

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in ds(self)
    154     @property
    155     def ds(self):
--> 156         return self._acquire()
    157 
    158     def open_store_variable(self, name, var):

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _acquire(self, needs_lock)
    146 
    147     def _acquire(self, needs_lock=True):
--> 148         with self._manager.acquire_context(needs_lock) as root:
    149             ds = _nc4_require_group(
    150                 root, self._group, self._mode, create_group=_h5netcdf_create_group

~/miniconda3/envs/py38/lib/python3.8/contextlib.py in __enter__(self)
    111         del self.args, self.kwds, self.func
    112         try:
--> 113             return next(self.gen)
    114         except StopIteration:
    115             raise RuntimeError("generator didn't yield") from None

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/file_manager.py in acquire_context(self, needs_lock)
    184     def acquire_context(self, needs_lock=True):
    185         """Context manager for acquiring a file."""
--> 186         file, cached = self._acquire_with_cache_info(needs_lock)
    187         try:
    188             yield file

~/miniconda3/envs/py38/lib/python3.8/site-packages/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    202                     kwargs = kwargs.copy()
    203                     kwargs["mode"] = self._mode
--> 204                 file = self._opener(*self._args, **kwargs)
    205                 if self._mode == "w":
    206                     # ensure file doesn't get overriden when opened again

~/miniconda3/envs/py38/lib/python3.8/site-packages/h5netcdf/core.py in __init__(self, path, mode, invalid_netcdf, phony_dims, **kwargs)
    678                 else:
    679                     self._preexisting_file = mode in {'r', 'r+', 'a'}
--> 680                     self._h5file = h5py.File(path, mode, **kwargs)
    681         except Exception:
    682             self._closed = True

~/miniconda3/envs/py38/lib/python3.8/site-packages/h5py/_hl/files.py in __init__(self, name, mode, driver, libver, userblock_size, swmr, rdcc_nslots, rdcc_nbytes, rdcc_w0, track_order, **kwds)
    404             with phil:
    405                 fapl = make_fapl(driver, libver, rdcc_nslots, rdcc_nbytes, rdcc_w0, **kwds)
--> 406                 fid = make_fid(name, mode, userblock_size,
    407                                fapl, fcpl=make_fcpl(track_order=track_order),
    408                                swmr=swmr)

~/miniconda3/envs/py38/lib/python3.8/site-packages/h5py/_hl/files.py in make_fid(name, mode, userblock_size, fapl, fcpl, swmr)
    171         if swmr and swmr_support:
    172             flags |= h5f.ACC_SWMR_READ
--> 173         fid = h5f.open(name, flags, fapl=fapl)
    174     elif mode == 'r+':
    175         fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5f.pyx in h5py.h5f.open()

h5py/defs.pyx in h5py.defs.H5Fopen()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

~/miniconda3/envs/py38/lib/python3.8/site-packages/adlfs/spec.py in readinto(self, b)
   1541 
   1542     def readinto(self, b):
-> 1543         return self.readinto(b)
   1544 
   1545     def read(self, length=-1):

... last 1 frames repeated, from the frame below ...

~/miniconda3/envs/py38/lib/python3.8/site-packages/adlfs/spec.py in readinto(self, b)
   1541 
   1542     def readinto(self, b):
-> 1543         return self.readinto(b)
   1544 
   1545     def read(self, length=-1):

RecursionError: maximum recursion depth exceeded

Anything else we need to know?:

Environment:

import dask, xarray, fsspec, adlfs
print(dask.__version__, xarray.__version__, fsspec.__version__, adlfs.__version__)

2.22.0 0.16.0 0.8.0 v0.5.0

  • Python version: 3.8.1
  • Operating System: CentOS
  • Install method (conda, pip, source): pip

Update to fsspec glob method in v0.6.3 causing failed unit tests

A change in fsspec glob method is causing one of the unit tests in adlfs to fail. Need to determine if this is a cause for concern, or if the unit test can be updated. If its a concern, need to implement a fix before allowing fsspec >=0.6.3 in test requirements, requirements.txt and setup.py

azure-storage-blob 12.4.0 causes import error

What happened:
install adlfs and try to import you get the error:

>>> import adlfs
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/conda/lib/python3.8/site-packages/adlfs/__init__.py", line 1, in <module>
    from .core import AzureDatalakeFileSystem
  File "/opt/conda/lib/python3.8/site-packages/adlfs/core.py", line 14, in <module>
    from azure.storage.blob._models import BlobBlock, BlobPrefix
ImportError: cannot import name 'BlobPrefix' from 'azure.storage.blob._models' (/opt/conda/lib/python3.8/site-packages/azure/storage/blob/_models.py)
>>>

This is definitely tied to the latest version, even 12.4.0b1 was not causing this, which is weird.

Environment:

  • Dask version: 2.22.0
  • Python version: 3.8.0
  • Operating System: linux
  • Install method (conda, pip, source): pip

how to write parquet file to blob?

Posted a question on SO regarding writing a dask.dataframe to a parquet file in blob storage.

https://stackoverflow.com/questions/60765331/moving-data-from-a-database-to-azure-blob-storage

Wonder if anyone here can help?

dd.to_parquet(df=df,
              path='absf://CONTAINER/ACCOUNT_NAME.blob.core.windows.net',
              storage_options={'account_name': 'ACCOUNT_NAME',
                               'account_key': 'ACCOUNT_KEY'})

getting

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-18-c2f308521fc6> in <module>
     11               path='absf://CONTAINER/ACCOUNT_NAME.blob.core.windows.net',
     12               storage_options={'account_name': 'ACCOUNT_NAME',
---> 13                                'account_key': 'ACCOUNT_KEY'})

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\core.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, write_metadata_file, compute, **kwargs)
    368     if hasattr(path, "name"):
    369         path = stringify_path(path)
--> 370     fs, _, _ = get_fs_token_paths(path, mode="wb", storage_options=storage_options)
    371     # Trim any protocol information from the path before forwarding
    372     path = fs._strip_protocol(path)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\core.py in get_fs_token_paths(urlpath, mode, num, name_function, storage_options, protocol)
    391         protocols, path = split_protocol(urlpath)
    392         protocol = protocol or protocols
--> 393         cls = get_filesystem_class(protocol)
    394 
    395         options = cls._get_kwargs_from_urls(urlpath)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\registry.py in get_filesystem_class(protocol)
     85     if protocol not in registry:
     86         if protocol not in known_implementations:
---> 87             raise ValueError("Protocol not known: %s" % protocol)
     88         bit = known_implementations[protocol]
     89         mod, name = bit["class"].rsplit(".", 1)

ValueError: Protocol not known: absf

On windows.
dask==2.10.1
fsspec==0.6.2

I guess for whatever reason it's not picking up the protocol as 'absf' but instead protocol as absf

>>> from fsspec.registry import known_implementations
>>> known_implementations
{'file': {'class': 'fsspec.implementations.local.LocalFileSystem'}, 'memory': {'class': 'fsspec.implementations.memory.MemoryFileSystem'}, 'http': {'class': 'fsspec.implementations.http.HTTPFileSystem', 'err': 'HTTPFileSystem requires "requests" to be installed'}, 'https': {'class': 'fsspec.implementations.http.HTTPFileSystem', 'err': 'HTTPFileSystem requires "requests" to be installed'}, 'zip': {'class': 'fsspec.implementations.zip.ZipFileSystem'}, 'gcs': {'class': 'gcsfs.GCSFileSystem', 'err': 'Please install gcsfs to access Google Storage'}, 'gs': {'class': 'gcsfs.GCSFileSystem', 'err': 'Please install gcsfs to access Google Storage'}, 'sftp': {'class': 'fsspec.implementations.sftp.SFTPFileSystem', 'err': 'SFTPFileSystem requires "paramiko" to be installed'}, 'ssh': {'class': 'fsspec.implementations.sftp.SFTPFileSystem', 'err': 'SFTPFileSystem requires "paramiko" to be installed'}, 'ftp': {'class': 'fsspec.implementations.ftp.FTPFileSystem'}, 'hdfs': {'class': 'fsspec.implementations.hdfs.PyArrowHDFS', 'err': 'pyarrow and local java libraries required for HDFS'}, 'webhdfs': {'class': 'fsspec.implementations.webhdfs.WebHDFS', 'err': 'webHDFS access requires "requests" to be installed'}, 's3': {'class': 's3fs.S3FileSystem', 'err': 'Install s3fs to access S3'}, 'adl': {'class': 'adlfs.AzureDatalakeFileSystem', 'err': 'Install adlfs to access Azure Datalake Gen1'}, 'abfs': {'class': 'adlfs.AzureBlobFileSystem', 'err': 'Install adlfs to access Azure Datalake Gen2 and Azure Blob Storage'}, 'cached': {'class': 'fsspec.implementations.cached.CachingFileSystem'}, 'blockcache': {'class': 'fsspec.implementations.cached.CachingFileSystem'}, 'filecache': {'class': 'fsspec.implementations.cached.WholeFileCacheFileSystem'}, 'dask': {'class': 'fsspec.implementations.dask.DaskWorkerFileSystem', 'err': 'Install dask distributed to access worker file system'}}

@martindurant (sorry for the tag). I'll update the SO Q and/or docs once I get this.

azure-storage-blob >=v12 causes slow, high memory dd.read_csv

dd.read_csv(path, storage_options) on a 15Gb csv file

with adlfs==0.2.4 normal speed and low memory usage
with adlfs==0.3 much slower and high memory usage

I think is related to azure-storage-blob >=v12 and I thought it would be important for you to be aware. with read_parquet didn't find issues.

Any idea? How to best report this upstream (azure)?

KeyError: 'type' using fs.glob

My data is stored in an ADLS gen2 account. It is partitioned by year=*/month=*/part-*.snappy.parquet.

The setup code is:

STORAGE_OPTIONS = {
    'account_name': 'data4dask', 
    'account_key' : 'my_account_key'
}

protocol  = 'abfs'      # use 'adl' for Azure Data Lake Gen 1
container = 'datasets'  #

fs = fsspec.filesystem(protocol, **STORAGE_OPTIONS, container_name=container)

This works:

files = []
for file in fs.glob('noaa/isd/year=*/month=*'):
    files += fs.ls(f'{file}/')
files = [f'{protocol}://{container}/{file}' for file in files if '2019' not in file] # issue with 2019 data - fix wip 
files[-5:]

and returns the paths to the last 5 files.

This would be simpler, but doesn't work:

files = fs.glob('noaa/isd/year=*/month=*/*')
files = [f'{protocol}://{container}/{file}' for file in files if '2019' not in file] # issue with 2019 data - fix wip 
files[-5:]

Stacktrace:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-21-49b9609aae79> in <module>
      1 files = []
----> 2 files = fs.glob('noaa/isd/year=*/month=*/*')
      3 files = [f'{protocol}://{container}/{file}' for file in files if '2019' not in file] # issue with 2019 data - fix wip
      4 files[-5:]

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/fsspec/spec.py in glob(self, path, **kwargs)
    446             root = ""
    447             depth = 20 if "**" in path else 1
--> 448         allpaths = self.find(root, maxdepth=depth, withdirs=True, **kwargs)
    449         pattern = (
    450             "^"

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/fsspec/spec.py in find(self, path, maxdepth, withdirs, **kwargs)
    369         # TODO: allow equivalent of -name parameter
    370         out = set()
--> 371         for path, dirs, files in self.walk(path, maxdepth, **kwargs):
    372             if withdirs:
    373                 files += dirs

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/adlfs/core.py in walk(self, path, maxdepth, **kwargs)
    506                     d,
    507                     maxdepth=(maxdepth - 1) if maxdepth is not None else None,
--> 508                     **kwargs,
    509                 ):
    510                     yield res

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/adlfs/core.py in walk(self, path, maxdepth, **kwargs)
    506                     d,
    507                     maxdepth=(maxdepth - 1) if maxdepth is not None else None,
--> 508                     **kwargs,
    509                 ):
    510                     yield res

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/adlfs/core.py in walk(self, path, maxdepth, **kwargs)
    471             name = info["name"].rstrip("/")
    472             logging.debug(
--> 473                 f"Test path with name, path, type, size:  {name}, {path}, {info['type']}, {info['size']}"
    474             )
    475             if info["type"] == "directory" and name != path and info["size"] == 0:

KeyError: 'type'

Invalid/empty header with write_csv

What happened:
using dask.to_csv("abfs://...") in a file with several partitions causes random invalid/empty header errors.

The value for one of the HTTP headers is not in the correct format.
RequestId: ...
Time: .....
ErrorCode: InvalidHeaderValue
Error:None
HeaderName:Content-length
HeaderValue:0

Minimal Complete Verifiable Example:

_ = df.to_csv(
    os.path.join(f"abfs://{output_path_csv}", table_name, "part_*.csv"),
    index=False,
    sep="|",
    storage_options=storage_options,
)

Hard to reproduce, all partitions but one fail to write, failing partition has same dtypes and size as others. Code to write to csv that fails above. Error seems to indicate length 0 partition, but when I do len(df.get_partition(x)) I get the same size as other partitions. Also partition that fails is not the last one.

Help on how to debug this?

Environment:

  • Dask version: 2.22.0
  • adlfs version: 0.3.2
  • Python version: 3.8.5
  • Operating System: linux arch 64 bit
  • Install method (conda, pip, source): conda

Buggy docker-compose setup

I think that right now, our docker-compose has a couple issues.

  1. It's caching stuff for too long. A change in the local adlfs repo should cause a rebuild. Right now I don't think it does
  2. Shutdown? Right now you have to ctrl-c to stop the azurite server. Don't know if we want that.

ImportError: cannot import name 'infer_storage_options'

Similar to: intake/intake#388

from dask.bytes.utils import infer_storage_options

Should probably be:

try:
    from dask.bytes.utils import infer_storage_options
except ImportError:
    from dask.bytes.core import infer_storage_options

Possibly consider:

from fsspec.utils import infer_storage_options

as per: dask/dask#5064
Not sure what else changed in dask that would break this plugin

Exception: Server encountered an internal error. Please try again after some time. ErrorCode: InternalError

I've been using this on Dask clusters and I've run into this issue. My data is ~25 GB in compressed parquet files, ~160 GB in uncompressed CSV files. ADLS Gen2 storage account. I consistently hit this issue when I attempt to write out to parquet files partitioned on the stationName column (about 16,000 unique values):

datapath     = f'{protocol}://{container}/noaa-isd-by-station/'
partition_on = ['stationName']
compression  = 'lz4'

df.to_parquet(datapath, 
              compression     = compression, 
              partition_on    = partition_on, 
              write_index     = False,
              storage_options = STORAGE_OPTIONS)

After some time (10s of minutes), I get the following exception:

Exception: Server encountered an internal error. Please try again after some time. ErrorCode: InternalError
<?xml version="1.0" encoding="utf-8"?><Error><Code>InternalError</Code><Message>Server encountered an internal error. Please try again after some time.
RequestId:b6751854-801e-00c2-5a18-fbd6d8000000
Time:2020-03-15T22:23:27.5580439Z</Message></Error>

See full traceback here: https://github.com/lostmygithubaccount/dasky/blob/master/wip/03.cpu-cluster-mm-demo.ipynb

--

To get to this point:

from azureml.core import Workspace, Dataset

ws = Workspace.from_config()

dsetdata = 'noaa-isd-files'
data_url = 'https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather'

if dsetdata not in ws.datasets:
    os.system('sudo chmod 777 /mnt')
    for year in range(2008, 2020+1):
        ds = Dataset.File.from_files(f'{data_url}/year={year}/month=*/*.parquet', validate=False)
        print('Downloading...')
        %time ds.download(f'/mnt/data/isd/year={year}', overwrite=True)
    print('Uploading...')
    %time ws.get_default_datastore().upload('/mnt/data/isd', '/noaa-isd', show_progress=False)
    ds = Dataset.File.from_files((ws.get_default_datastore(), '/noaa-isd/**/*.parquet'))
    ds = ds.register(ws, dsetdata)

This takes a few minutes to download/upload.

Startup Dask cluster. Load data - this takes a minute or two:

STORAGE_OPTIONS = {
    'account_name': run.experiment.workspace.get_default_datastore().account_name,
    'account_key' : run.experiment.workspace.get_default_datastore().account_key
}

protocol  = 'abfs'      # change to 'adl' for gen 1
container = run.experiment.workspace.get_default_datastore().container_name

fs = fsspec.filesystem(protocol, **STORAGE_OPTIONS)

files = fs.glob(f'{container}/noaa-isd/year=*/month=*/*.parquet') # adjust size of data if needed
files = [f'{protocol}://{file}' for file in files]  

df = dd.read_parquet(files, engine='pyarrow', storage_options=STORAGE_OPTIONS).repartition(npartitions=200).persist()
df = df.set_index(dd.to_datetime(df.datetime).dt.floor('d'), sorted=True).persist()

Write out to CSV files, uncompressed. This takes ~30 minutes:

df = df.set_index(dd.to_datetime(df.datetime).dt.floor('d'), sorted=True).persist()

Load data from CSV files - this takes a few minutes:

STORAGE_OPTIONS = {
    'account_name': run.experiment.workspace.get_default_datastore().account_name,
    'account_key' : run.experiment.workspace.get_default_datastore().account_key
}

protocol  = 'abfs' # change to 'adl' for gen 1
container = run.experiment.workspace.get_default_datastore().container_name

datapath  = f'{protocol}://{container}/noaa-isd-csv/*-data.csv'
blocksize = '5GB'
dtypes    = {'cloudCoverage': 'object', 'usaf': 'object'}

df = dd.read_csv(datapath, blocksize=blocksize, dtype=dtypes, storage_options=STORAGE_OPTIONS).repartition(npartitions=1000).persist()

Then attempting to write back to compressed parquet files partitioned on the stationName column so that I can train a simple sklearn model on each partition.

--

See: https://github.com/lostmygithubaccount/dasky

Next Release?

@hayesgb it'd be nice to cut a release with the fixes for fsspec 0.6.0. Anything else that should go in first?

Connect to ADLS Gen2 using Service Principal

Hi,

I'm starting using Dask and I wanted to connecto to ADLS Gen2.
I was not finding any way to do it using a service principal and checking the code I see something that I'm not sure if make sense.

In the condition of line 324 of core.py, we are calling "self._get_token_from_service_principal()" if we don't have certain values, but one of those values is the "client_id", which seems to be required later inside that method to generate the credentials. So I guess that if I include the client_id, this is not going to call "self._get_token_from_service_principal()" and if I do not include it, I won't have the right one when it tries to create the "ServicePrincipalCredentials".

Is this ok? How can I use it correctly?

https://github.com/dask/adlfs/blob/c91106fbe9184e7c75efbc40cf292a7d5b8c8491/adlfs/core.py#L324

ADLFS documentation in Pandas and elsewhere

Hello,

Not sure if a GitHub issue is appropriate for this but not sure how else to ask - what is preventing reading ADLFS in Pandas, cuDF, and other tools as you can with s3? See pandas.read_csv and cudf.io.csv.read_csv.

Since this implements ADLFS in fsspec, is it simply a documentation issue? Is there more that is needed?

Appreciate the work on this a lot!

Issue reading parquet using pyarrow

When trying to read parquet files using Dask==2.15.0 and adlfs==0.3.0 I got exceptions that I didn't have before.

I boiled it down to the following example:
Using Azurite:

>>> import dask.dataframe as dd
>>> import pandas as pd
>>> from azure.storage.blob import BlobServiceClient
>>> conn_str = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey" \
         "=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr" \
           "/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
>>> client: BlobServiceClient = BlobServiceClient.from_connection_string(conn_str)
>>> client.create_container("test")
>>> df = pd.DataFrame(
    {
        "col1": [1, 2, 3, 4],
        "col2": [2, 4, 6, 8],
        "index_key": [1, 1, 2, 2],
        "partition_key": [1, 1, 2, 2],
    }
)
>>> STORAGE_OPTIONS = {"account_name": "devstoreaccount1", "connection_string": conn_str}
>>> dask_dataframe = dd.from_pandas(df, npartitions=1)
>>> dask_dataframe.to_parquet(
    "abfs://test/test_group",
    storage_options=STORAGE_OPTIONS,
    engine="pyarrow"
)
>>> data_out = dd.read_parquet("abfs://test/test_group",
                           engine="pyarrow",
                           storage_options=STORAGE_OPTIONS)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/home/anders/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-0/201.7223.92/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/home/anders/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-0/201.7223.92/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/home/anders/.config/JetBrains/PyCharm2020.1/scratches/scratch.py", line 34, in <module>
    data_out = dd.read_parquet("abfs://test/test_group",
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 225, in read_parquet
    meta, statistics, parts = engine.read_metadata(
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py", line 267, in read_metadata
    parts, dataset = _determine_dataset_parts(
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py", line 198, in _determine_dataset_parts
    dataset = pq.ParquetDataset(paths, filesystem=fs, **dataset_kwargs)
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/pyarrow/parquet.py", line 1171, in __init__
    self.metadata_path) = _make_manifest(
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/pyarrow/parquet.py", line 1367, in _make_manifest
    raise OSError('Passed non-file path: {}'
OSError: Passed non-file path: test/test_group

Using fastparquet instead solves the problem - but last I tried, fastparquet didn't handle my filters properly, so I would prefer to be able to use pyarrow

rewrite checkin

@hayesgb , how is your development going? Others would be very glad to see a new working version of datalake/blob support (e.g., #4 )

Clarify role of requirements.txt

As I understand it, your requirements.txt file contains the requirements for the local development environment. Is this correct? In which case, I think it would be better renamed to dev-requirements.txt, moved to "dev" extras, or something else.

EmptyDataError: No columns to parse from file

Hi, I am unable to use the following code for my adls gen2 blob file.

import dask.dataframe as dd
from pkg_resources import parse_version
import fsspec
if parse_version(fsspec.version) < parse_version('0.6.2'):
from fsspec.registry import known_implementations
known_implementations['abfs'] = {'class': 'adlfs.AzureBlobFileSystem'}

STORAGE_OPTIONS={'account_name': 'account_key', 'account_key': 'account_key'}

ddf = dd.read_csv('abfs://{container_name}/test.csv', storage_options=STORAGE_OPTIONS)

Below is the error I am getting
EmptyDataError: No columns to parse from file

abfs gen2 implementation based on azure.storage.filedatalake?

Microsoft has released the beta of https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-file-datalake.

While the current abfs implementation using multi-protocol access works fine, the azure-storage-file-datalake client exposes some nice features for hierarchical namespace like atomic delete and moves semantics for directories.

To get used to the api I have written a demo implementation https://github.com/hoffmann/fsspec_adls

@hayesgb, @martindurant what are your plans for the abfs gen2 implementation?

Directories identified as files in walk()

I'm running adlfs==0.2.4 & fsspec==0.6.2 and observed the following behavior:

Given the folllowing folder structure in ADLS Gen1:

/folder/subfolder1/subfolder1_1
/folder/subfolder1/subfolder1_2
...

If I do a

_, directories, files = next(fs.walk('/folder'))

I get subfolder1 in files and not directories. I did a short debugging and it looks like that the official Microsoft AzureDLFileSystem.ls() returns "type" = "DIRECTORY", but in fsspec's AbstractFileSystem.walk() "type" = "directory" is expected. Therefore all folders are treated as files.

Unable to read a parquet file from blob with Hierarchical namespace enabled

Following on from #48 I am unable to read a parquet file from blob.

I am able to read a csv from the blob but reading a parquet files gives a persistent azure.common.AzureHttpError:

>>> dd.read_parquet('abfs://tmp/tmp.parquet',
                     storage_options=storage_options)
RROR:azure.storage.common.storageclient:Client-Request-ID=591c7559-85aa-11ea-91b3-095b84d992ed Retry policy did not allow for a retry: Server-Timestamp=Thu, 23 Apr 2020 21:36:24 GMT, Server-Request-ID=c751a7f7-a01e-0002-5bb7-194764000000, HTTP status code=500, Exception=Server encountered an internal error. Please try again after some time. ErrorCode: InternalError<?xml version="1.0" encoding="utf-8"?><Error><Code>InternalError</Code><Message>Server encountered an internal error. Please try again after some time.RequestId:c751a7f7-a01e-0002-5bb7-194764000000Time:2020-04-23T21:36:25.4617555Z</Message></Error>.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 225, in read_parquet
    meta, statistics, parts = engine.read_metadata(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 202, in read_metadata
    parts, pf, gather_statistics, fast_metadata = _determine_pf_parts(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 150, in _determine_pf_parts
    pf = ParquetFile(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/fastparquet/api.py", line 111, in __init__
    with open_with(fn2, 'rb') as f:
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/fsspec/spec.py", line 718, in open
    f = self._open(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/adlfs/core.py", line 592, in _open
    return AzureBlobFile(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/adlfs/core.py", line 621, in __init__
    super().__init__(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/fsspec/spec.py", line 956, in __init__
    self.details = fs.info(path)
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/fsspec/spec.py", line 501, in info
    out = self.ls(path, detail=True, **kwargs)
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/adlfs/core.py", line 488, in ls
    elif self._matches(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/adlfs/core.py", line 422, in _matches
    gen = self.blob_fs.list_blob_names(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/azure/storage/blob/baseblobservice.py", line 1362, in list_blob_names
    resp = self._list_blobs(*args, **kwargs)
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/azure/storage/blob/baseblobservice.py", line 1437, in _list_blobs
    return self._perform_request(request, _converter, operation_context=_context)
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/azure/storage/common/storageclient.py", line 446, in _perform_request
    raise ex
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/azure/storage/common/storageclient.py", line 374, in _perform_request
    raise ex
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/azure/storage/common/storageclient.py", line 359, in _perform_request
    _http_error_handler(
  File "/home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/azure/storage/common/_error.py", line 115, in _http_error_handler
    raise ex
azure.common.AzureHttpError: Server encountered an internal error. Please try again after some time. ErrorCode: InternalError
<?xml version="1.0" encoding="utf-8"?><Error><Code>InternalError</Code><Message>Server encountered an internal error. Please try again after some time.
RequestId:c751a7f7-a01e-0002-5bb7-194764000000
Time:2020-04-23T21:36:25.4617555Z</Message></Error>

This can be tested using

> conda create -n adlfs python=3.8
> conda activate adlfs
> conda install -c conda-forge dask fastparquet
> pip install adlfs
> python

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> from distributed import Client
>>> client = Client()

>>> storage_options = {'account_name': 'ACCOUNT_NAME',
                        'account_key': 'ACCOUNT_KEY'}

>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = pd.DataFrame(data=d)

>>> df = dd.from_pandas(df, npartitions=2)

>>> dd.to_parquet(df=df,
                  path='abfs://tmp/tmp.parquet',
                  storage_options=storage_options)

>>> dd.read_parquet('abfs://tmp/tmp.parquet',
                     storage_options=storage_options)

You can see the versions on the packages here:

> conda list:

_libgcc_mutex 0.1 main
adal 1.2.2 pypi_0 pypi
adlfs 0.2.4 pypi_0 pypi
azure-common 1.1.25 pypi_0 pypi
azure-datalake-store 0.0.48 pypi_0 pypi
azure-storage-blob 2.1.0 pypi_0 pypi
azure-storage-common 2.1.0 pypi_0 pypi
bokeh 2.0.1 py38h32f6830_0 conda-forge
ca-certificates 2020.4.5.1 hecc5488_0 conda-forge
certifi 2020.4.5.1 py38h32f6830_0 conda-forge
cffi 1.14.0 pypi_0 pypi
chardet 3.0.4 pypi_0 pypi
click 7.1.1 pyh8c360ce_0 conda-forge
cloudpickle 1.3.0 py_0 conda-forge
cryptography 2.9.2 pypi_0 pypi
cytoolz 0.10.1 py38h516909a_0 conda-forge
dask 2.14.0 py_0 conda-forge
dask-core 2.14.0 py_0 conda-forge
distributed 2.14.0 py38h32f6830_0 conda-forge
fastparquet 0.3.3 py38hc1659b7_0 conda-forge
freetype 2.10.1 he06d7ca_0 conda-forge
fsspec 0.6.2 pypi_0 pypi
heapdict 1.0.1 py_0 conda-forge
idna 2.9 pypi_0 pypi
jinja2 2.11.2 pyh9f0ad1d_0 conda-forge
jpeg 9c h14c3975_1001 conda-forge
ld_impl_linux-64 2.33.1 h53a641e_7
libblas 3.8.0 14_openblas conda-forge
libcblas 3.8.0 14_openblas conda-forge
libedit 3.1.20181209 hc058e9b_0
libffi 3.2.1 hd88cf55_4
libgcc-ng 9.1.0 hdf63c60_0
libgfortran-ng 7.3.0 hdf63c60_5 conda-forge
liblapack 3.8.0 14_openblas conda-forge
libllvm8 8.0.1 hc9558a2_0 conda-forge
libopenblas 0.3.7 h5ec1e0e_6 conda-forge
libpng 1.6.37 hed695b0_1 conda-forge
libstdcxx-ng 9.1.0 hdf63c60_0
libtiff 4.1.0 hc7e4089_6 conda-forge
libwebp-base 1.1.0 h516909a_3 conda-forge
llvmlite 0.31.0 py38h4f45e52_1 conda-forge
locket 0.2.0 py_2 conda-forge
lz4-c 1.9.2 he1b5a44_0 conda-forge
markupsafe 1.1.1 py38h1e0a361_1 conda-forge
msgpack-python 1.0.0 py38hbf85e49_1 conda-forge
ncurses 6.2 he6710b0_0
numba 0.48.0 py38hb3f55d8_0 conda-forge
numpy 1.18.1 py38h8854b6b_1 conda-forge
olefile 0.46 py_0 conda-forge
openssl 1.1.1g h516909a_0 conda-forge
packaging 20.1 py_0 conda-forge
pandas 1.0.3 py38hcb8c335_1 conda-forge
partd 1.1.0 py_0 conda-forge
pillow 7.0.0 py38hb39fc2d_0
pip 20.0.2 py38_1
psutil 5.7.0 py38h1e0a361_1 conda-forge
pycparser 2.20 pypi_0 pypi
pyjwt 1.7.1 pypi_0 pypi
pyparsing 2.4.7 pyh9f0ad1d_0 conda-forge
python 3.8.2 hcf32534_0
python-dateutil 2.8.1 py_0 conda-forge
python_abi 3.8 1_cp38 conda-forge
pytz 2019.3 py_0 conda-forge
pyyaml 5.3.1 py38h1e0a361_0 conda-forge
readline 8.0 h7b6447c_0
requests 2.23.0 pypi_0 pypi
setuptools 46.1.3 py38_0
six 1.14.0 py_1 conda-forge
sortedcontainers 2.1.0 py_0 conda-forge
sqlite 3.31.1 h62c20be_1
tblib 1.6.0 py_0 conda-forge
thrift 0.11.0 py38he1b5a44_1001 conda-forge
tk 8.6.8 hbc83047_0
toolz 0.10.0 py_0 conda-forge
tornado 6.0.4 py38h1e0a361_1 conda-forge
typing_extensions 3.7.4.1 py38h32f6830_3 conda-forge
urllib3 1.25.9 pypi_0 pypi
wheel 0.34.2 py38_0
xz 5.2.5 h7b6447c_0
yaml 0.2.4 h516909a_0 conda-forge
zict 2.0.0 py_0 conda-forge
zlib 1.2.11 h7b6447c_3
zstd 1.4.4 h6597ccf_3 conda-forge

TypeError: __init__() got an unexpected keyword argument 'account_name'

I have been using version 0.1.1 from pypi and this code:

import dask.dataframe as dd
from fsspec.registry import known_implementations
known_implementations['abfs'] = {'class': 'adlfs.AzureDatalakeFileSystem'}
STORAGE_OPTIONS={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
ddf = dd.read_csv(f'abfs://{CONTAINER}/nycflight/*.csv', storage_options=STORAGE_OPTIONS)

shows me this error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-13-27477c1a278c> in <module>
      3 known_implementations['abfs'] = {'class': 'adlfs.AzureDatalakeFileSystem'}
      4 STORAGE_OPTIONS={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
----> 5 ddf = dd.read_csv(f'abfs://{CONTAINER}/nycflight/*.csv', storage_options=STORAGE_OPTIONS)

/anaconda/envs/dask/lib/python3.6/site-packages/dask/dataframe/io/csv.py in read(urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    576             storage_options=storage_options,
    577             include_path_column=include_path_column,
--> 578             **kwargs
    579         )
    580 

/anaconda/envs/dask/lib/python3.6/site-packages/dask/dataframe/io/csv.py in read_pandas(reader, urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    403         compression=compression,
    404         include_path=include_path_column,
--> 405         **(storage_options or {})
    406     )
    407 

/anaconda/envs/dask/lib/python3.6/site-packages/dask/bytes/core.py in read_bytes(urlpath, delimiter, not_zero, blocksize, sample, compression, include_path, **kwargs)
     91 
     92     """
---> 93     fs, fs_token, paths = get_fs_token_paths(urlpath, mode="rb", storage_options=kwargs)
     94 
     95     if len(paths) == 0:

/anaconda/envs/dask/lib/python3.6/site-packages/fsspec/core.py in get_fs_token_paths(urlpath, mode, num, name_function, storage_options, protocol)
    396         path = cls._strip_protocol(urlpath)
    397         update_storage_options(options, storage_options)
--> 398         fs = cls(**options)
    399 
    400         if "w" in mode:

/anaconda/envs/dask/lib/python3.6/site-packages/fsspec/spec.py in __call__(self, *args, **kwargs)
     51             return self._cache[token]
     52         else:
---> 53             obj = super().__call__(*args, **kwargs)
     54             # Setting _fs_token here causes some static linters to complain.
     55             obj._fs_token_ = token

TypeError: __init__() got an unexpected keyword argument 'account_name'

Any thoughts what I might be doing wrong?

Fix compatibility with fsspec>=0.8.0

Running unit tests with fsspec==0.8.0 causes the following test to fail:
test_mkdir_rm_recursive()

Current fix is to pin requirements to fsspec >= 0.6.0,<0.8

Importing azure.common, which is no longer part of 'install_requires'

What happened:
If AzureBlobFilesystem._get_token_from_service_principal is called, it will raise an import error as azure.common is no longer part of the requirements

https://github.com/dask/adlfs/blob/1778049d1b341d8526a412efcaa70ff7b81b0521/adlfs/core.py#L362

I can't find any reference to token_credentials being used anywhere, so this might just be a dead code path, in which case I can remove it in a PR.

Since I'm not sure what token_credential is actually used for, I don't know what to substitute it with - otherwise I would just make a PR fixing it

Create conda-forge adlfs-feedstock

Following on from #22 I would be interested in creating and maintaining adlfs-feedstock (along with @hayesgb and @martindurant) to build the package on conda-forge.

I've done this for one package (https://github.com/conda-forge/xskillscore-feedstock) and my normal procedure is to release on pypi then use the tar.gz in the meta.yaml.

Following some discussion here #22 (comment) I haven't had the the need to automate this build. I simply change a couple of lines after the package is released on pypi.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.