Giter VIP home page Giter VIP logo

cytotable's People

Contributors

axiomcura avatar d33bs avatar dependabot[bot] avatar gwaybio avatar kenibrewer avatar vincerubinetti avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

cytotable's Issues

Describe tutorial use cases and clarify steps

"This tutorial is a good start, but perhaps when you come around to editing it again you might describe what the user would want to do, and how they can use your library to achieve that. At the moment I can infer how I'd use convert() from the values you specify for the arguments, but having it spelled out explicitly would be nice."

Originally posted by @falquaddoomi in #22 (comment)

Enable convert preset parameter overrides

This issue highlights the need to allow parameters to override presets when needed. For instance, if a preset configuration is used and only some of the values are needed from the preset, and others are desired to be overridden, a user should be able to specify those parameters when used by convert. The current functionality appears to use only the preset parameters when they are specified.

Based on conversations with @gwaybio.
References #61

BinderException: Binder Error: Table "cytoplasm" does not have a column named "Metadata_Cytoplasm_Parent_Cells"

As a followup to #52 (comment), I ran into a new error.

Command

import cytotable
from parsl.config import Config
from parsl.executors import HighThroughputExecutor

parsl_config = Config(
    executors=[
        HighThroughputExecutor()
    ]
)

cytotable.convert(
    source_path="'s3://cellpainting-gallery/cpg0016-jump/source_1/workspace/backend/Batch1_20221004/UL001643/UL001643.sqlite'",
    dest_path="test2.parquet",
    dest_datatype="parquet",
    chunk_size=150000,
    parsl_config=parsl_config
)

Error

BinderException: Binder Error: Table "cytoplasm" does not have a column named "Metadata_Cytoplasm_Parent_Cells"
Full python traceback
BinderException                           Traceback (most recent call last)
File <timed exec>:1

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:1186, in convert(source_path, dest_path, dest_datatype, source_datatype, metadata, compartments, identifying_columns, concat, join, joins, chunk_columns, chunk_size, infer_common_schema, drop_null, preset, parsl_config, **kwargs)
   1169 # send sources to be written to parquet if selected
   1170 if dest_datatype == "parquet":
   1171     output = _to_parquet(
   1172         source_path=source_path,
   1173         dest_path=dest_path,
   1174         source_datatype=source_datatype,
   1175         metadata=metadata,
   1176         compartments=compartments,
   1177         identifying_columns=identifying_columns,
   1178         concat=concat,
   1179         join=join,
   1180         joins=joins,
   1181         chunk_columns=chunk_columns,
   1182         chunk_size=chunk_size,
   1183         infer_common_schema=infer_common_schema,
   1184         drop_null=drop_null,
   1185         **kwargs,
-> 1186     ).result()
   1188 return output

File ~/miniconda3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File ~/miniconda3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py:301, in DataFlowKernel.handle_exec_update(self, task_record, future)
    298     raise RuntimeError("done callback called, despite future not reporting itself as done")
    300 try:
--> 301     res = self._unwrap_remote_exception_wrapper(future)
    303 except Exception as e:
    304     logger.debug("Task {} try {} failed".format(task_id, task_record['try_id']))

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py:567, in DataFlowKernel._unwrap_remote_exception_wrapper(future)
    565 result = future.result()
    566 if isinstance(result, RemoteExceptionWrapper):
--> 567     result.reraise()
    568 return result

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py:122, in RemoteExceptionWrapper.reraise(self)
    118 logger.debug("Reraising exception of type {}".format(self.e_type))
    120 v = self.get_exception()
--> 122 reraise(t, v, v.__traceback__)

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/six.py:719, in reraise(tp, value, tb)
    717     if value.__traceback__ is not tb:
    718         raise value.with_traceback(tb)
--> 719     raise value
    720 finally:
    721     value = None

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py:160, in wrapper()
    158 from parsl.app.errors import RemoteExceptionWrapper
    159 try:
--> 160     return func(*args, **kwargs)
    161 except Exception:
    162     return RemoteExceptionWrapper(*sys.exc_info())

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:956, in _to_parquet()
    951 # conditional section for merging
    952 # note: join implies a concat, but concat does not imply a join
    953 if join:
    954     # map joined results based on the join groups gathered above
    955     # note: after mapping we end up with a list of strings (task returns str)
--> 956     join_sources_result = [
    957         _join_source_chunk(
    958             # gather the result of concatted sources prior to
    959             # join group merging as each mapped task run will need
    960             # full concat results
    961             sources=results,
    962             dest_path=dest_path,
    963             joins=joins,
    964             # get merging chunks by join columns
    965             join_group=join_group,
    966             drop_null=drop_null,
    967         ).result()
    968         # create join group for querying the concatenated
    969         # data in order to perform memory-safe joining
    970         # per user chunk size specification.
    971         for join_group in _get_join_chunks(
    972             sources=results,
    973             chunk_columns=chunk_columns,
    974             chunk_size=chunk_size,
    975             metadata=metadata,
    976         ).result()
    977     ]
    979     # concat our join chunks together as one cohesive dataset
    980     # return results in common format which includes metadata
    981     # for lineage and debugging
    982     results = _concat_join_sources(
    983         dest_path=dest_path,
    984         join_sources=join_sources_result,
    985         sources=results,
    986     ).result()

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:967, in <listcomp>()
    951 # conditional section for merging
    952 # note: join implies a concat, but concat does not imply a join
    953 if join:
    954     # map joined results based on the join groups gathered above
    955     # note: after mapping we end up with a list of strings (task returns str)
    956     join_sources_result = [
    957         _join_source_chunk(
    958             # gather the result of concatted sources prior to
    959             # join group merging as each mapped task run will need
    960             # full concat results
    961             sources=results,
    962             dest_path=dest_path,
    963             joins=joins,
    964             # get merging chunks by join columns
    965             join_group=join_group,
    966             drop_null=drop_null,
--> 967         ).result()
    968         # create join group for querying the concatenated
    969         # data in order to perform memory-safe joining
    970         # per user chunk size specification.
    971         for join_group in _get_join_chunks(
    972             sources=results,
    973             chunk_columns=chunk_columns,
    974             chunk_size=chunk_size,
    975             metadata=metadata,
    976         ).result()
    977     ]
    979     # concat our join chunks together as one cohesive dataset
    980     # return results in common format which includes metadata
    981     # for lineage and debugging
    982     results = _concat_join_sources(
    983         dest_path=dest_path,
    984         join_sources=join_sources_result,
    985         sources=results,
    986     ).result()

File ~/miniconda3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:458, in result()
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File ~/miniconda3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:403, in __get_result()
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py:301, in handle_exec_update()
    298     raise RuntimeError("done callback called, despite future not reporting itself as done")
    300 try:
--> 301     res = self._unwrap_remote_exception_wrapper(future)
    303 except Exception as e:
    304     logger.debug("Task {} try {} failed".format(task_id, task_record['try_id']))

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py:567, in _unwrap_remote_exception_wrapper()
    565 result = future.result()
    566 if isinstance(result, RemoteExceptionWrapper):
--> 567     result.reraise()
    568 return result

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py:122, in reraise()
    118 logger.debug("Reraising exception of type {}".format(self.e_type))
    120 v = self.get_exception()
--> 122 reraise(t, v, v.__traceback__)

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/six.py:719, in reraise()
    717     if value.__traceback__ is not tb:
    718         raise value.with_traceback(tb)
--> 719     raise value
    720 finally:
    721     value = None

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py:160, in wrapper()
    158 from parsl.app.errors import RemoteExceptionWrapper
    159 try:
--> 160     return func(*args, **kwargs)
    161 except Exception:
    162     return RemoteExceptionWrapper(*sys.exc_info())

File ~/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:565, in _join_source_chunk()
    543 joins += (
    544     "WHERE ("
    545     + ") OR (".join(
   (...)
    561     + ")"
    562 )
    564 # perform compartment joins using duckdb over parquet files
--> 565 result = _duckdb_reader().execute(joins).arrow()
    567 # drop nulls if specified
    568 if drop_null:

BinderException: Binder Error: Table "cytoplasm" does not have a column named "Metadata_Cytoplasm_Parent_Cells"

Details

  • It took 30-60 minutes to process.
  • package versions are as specified here #52 (comment)
  • CytoTable was able to concat the different chunked files into individual parquet files (e.g. cells.parquet, image.parquet) so it seems the error was at the merge/join step
  • Perhaps the error occurred because I set an incorrect parameter? If this is the case, I wonder if there should be some quick check prior to exhausting all the compute? (or this could be a very bad suggestion indeed, so please ignore if so)

Allow data source filtering by directory structure

Provide capability for filtering data sources by their directory structure. For example, if there are many directories containing similar compartment data, a user will be able to filter these directories to a subset using understandable function arguments.

Add docsite

  • Include initial documentation within docsite compatible files
  • Include automatic publishing

Duckdb's `TypeMismatchException` raised in CytoTable's convert() workflow due to `nan` values being stored as strings instead of expected types

CytoTable's convert() function seems to capture nan's as string types within the cell-helath dataset causing duckdb to raise duckdb.TypeMismatchException error.

Below is the code to replicate the problem:

import pathlib
import cytotable

# sqlite file path
sqlite_file = str(pathlib.Path("./SQ00014613.sqlite").resolve(strict=True))

# execute covert workflow 
data = cytotable.convert(sqlite_file, dest_path="./parquet_data/test.parquet", dest_datatype="parquet", preset="cellprofiler_sqlite", source_datatype="sqlite")

link to download data

Traceback

From Prefect:

13:38:45.442 | ERROR   | Task run '_read_data-1' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/engine.py", line 1533, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/cytotable/convert.py", line 59, in _read_data
    _duckdb_with_sqlite()
duckdb.TypeMismatchException: Mismatch Type Error: Invalid type in column "Nuclei_Correlation_Costes_AGP_DNA": expected float or integer, found "nan" of type "text" instead.
13:38:45.680 | ERROR   | Flow run 'wonderful-cuckoo' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/engine.py", line 665, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/cytotable/convert.py", line 730, in _to_parquet
    common_schema = _infer_source_group_common_schema(
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/tasks.py", line 469, in __call__
    return enter_task_run_engine(
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/engine.py", line 965, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/engine.py", line 1114, in get_task_call_return_value
    return await future._result()
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/erikserrano/Programs/miniconda3/envs/pycytominer/lib/python3.9/site-packages/prefect/states.py", line 103, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.

It seems that the second exception being raised by Prefect is caused by the previous exception thrown by duckdb, which prevents it to change the state of the data.

Include versioning metadata within output files

Include versioning metadata within output files so as to differentiate between different releases of pycytominer-transform and optionally perform updates as schema changes occur over time.

Document system and workflow architecture options

Document system and workflow architecture options in order to improve developer understanding of remote and/or scalable options. For example, see the following reference sketch for a potential future state (from discussions in #10):

flowchart LR
subgraph actor["Actor / Initiator"]
pycytominer-transform
end

subgraph Storage
direction TB
subgraph sourcestorage["Source"]
s3source[S3 or Cloud Source]
end
subgraph deststorage["Destination"]
s3dest[S3 or Cloud Destination]
end
end
subgraph worker["Worker"]
dask[Dask or Ray Cluster]
end
pycytominer-transform --> |initiates work| dask
dask --> |creates data| s3dest
s3source --> |gathers data| dask

Explore the use of schema for input or output specification

This issue focuses on exploring and possibly implementing the use of schema as a way to describe input or output data. Specifically, this would be a way to describe column names, data types, and groupings of files using an existing standard of some kind.

Invalid Input Error

Hi @d33bs - I am running into the following error when testing cytotable.convert()

duckdb.InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result
Error: Invalid Error: unable to open database file
Full error log
 14:32:42.102 | ERROR   | Task run '_get_source_filepaths-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py", line 70, in _get_source_filepaths
    for table_name in _duckdb_with_sqlite()
duckdb.InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result
Error: Invalid Error: unable to open database file

14:32:42.118 | ERROR   | Task run '_get_source_filepaths-0' - Finished in state Failed('Task run encountered an exception: duckdb.InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result\nError: Invalid Error: unable to open database file\n')

14:32:42.119 | ERROR   | Flow run 'berserk-gopher' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py", line 215, in _gather_sources
    sources = _get_source_filepaths(
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
    return await future._result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py", line 70, in _get_source_filepaths
    for table_name in _duckdb_with_sqlite()
duckdb.InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result
Error: Invalid Error: unable to open database file

14:32:42.145 | ERROR   | Flow run 'berserk-gopher' - Finished in state Failed('Flow run encountered an exception. duckdb.InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result\nError: Invalid Error: unable to open database file\n')

14:32:42.146 | ERROR   | Flow run 'spiked-macaque' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py", line 695, in _to_parquet
    sources = _gather_sources(
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 184, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 566, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py", line 215, in _gather_sources
    sources = _get_source_filepaths(
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
    return await future._result()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/waygr/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py", line 70, in _get_source_filepaths
    for table_name in _duckdb_with_sqlite()
duckdb.InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result
Error: Invalid Error: unable to open database file

14:32:42.162 | ERROR   | Flow run 'spiked-macaque' - Finished in state Failed('Flow run encountered an exception. duckdb.InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result\nError: Invalid Error: unable to open database file\n')
Full python traceback
---------------------------------------------------------------------------
InvalidInputException                     Traceback (most recent call last)
Cell In[21], line 1
----> 1 what = cytotable.convert(
      2     source_path=manifest_df.sqlite_file[2],
      3     dest_path="test.parquet",
      4     dest_datatype="parquet"
      5 )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:942, in convert(source_path, dest_path, dest_datatype, source_datatype, metadata, compartments, identifying_columns, concat, join, joins, chunk_columns, chunk_size, infer_common_schema, drop_null, preset, task_runner, log_level, **kwargs)
    940 # send sources to be written to parquet if selected
    941 if dest_datatype == "parquet":
--> 942     output = _to_parquet.with_options(task_runner=task_runner)(
    943         source_path=source_path,
    944         dest_path=dest_path,
    945         source_datatype=source_datatype,
    946         metadata=metadata,
    947         compartments=compartments,
    948         identifying_columns=identifying_columns,
    949         concat=concat,
    950         join=join,
    951         joins=joins,
    952         chunk_columns=chunk_columns,
    953         chunk_size=chunk_size,
    954         infer_common_schema=infer_common_schema,
    955         drop_null=drop_null,
    956         **kwargs,
    957     )
    959 return output

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/flows.py:468, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
    464 parameters = get_call_parameters(self.fn, args, kwargs)
    466 return_type = "state" if return_state else "result"
--> 468 return enter_flow_run_engine_from_flow_call(
    469     self,
    470     parameters,
    471     wait_for=wait_for,
    472     return_type=return_type,
    473 )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:184, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    179     retval = from_async.wait_for_call_in_loop_thread(
    180         begin_run, done_callbacks=done_callbacks
    181     )
    183 else:
--> 184     retval = from_sync.wait_for_call_in_loop_thread(
    185         begin_run, done_callbacks=done_callbacks
    186     )
    188 return retval

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:137, in from_sync.wait_for_call_in_loop_thread(call, timeout, done_callbacks)
    135     waiter.add_done_callback(callback)
    136 waiter.wait()
--> 137 return call.result()

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:173, in Call.result(self, timeout)
    167 def result(self, timeout: Optional[float] = None) -> T:
    168     """
    169     Wait for the result of the call.
    170 
    171     Not safe for use from asynchronous contexts.
    172     """
--> 173     return self.future.result(timeout=timeout)

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
    449     raise CancelledError()
    450 elif self._state == FINISHED:
--> 451     return self.__get_result()
    453 self._condition.wait(timeout)
    455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:218, in Call._run_async(***failed resolving arguments***)
    216                 logger.debug("%r using async cancel scope %r", self, ctx)
    217                 ctx.chain(self.cancel_context, bidirectional=True)
--> 218                 result = await coro
    219 except BaseException as exc:
    220     logger.debug("Encountered exception in async call %r", self, exc_info=True)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/client/utilities.py:40, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     38 async with client_context as new_client:
     39     kwargs.setdefault("client", new_client or client)
---> 40     return await fn(*args, **kwargs)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:258, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client)
    256     return state
    257 elif return_type == "result":
--> 258     return await state.result(fetch=True)
    259 else:
    260     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     86     raise PausedRun("Run paused.")
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:669, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context)
    664         else:
    665             from_async.call_soon_in_new_thread(
    666                 flow_call, timeout=flow.timeout_seconds
    667             )
--> 669         result = await flow_call.aresult()
    671         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    672             flow_run_context.task_run_futures, client=client
    673         )
    674 except PausedRun:

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:181, in Call.aresult(self)
    175 async def aresult(self):
    176     """
    177     Wait for the result of the call.
    178 
    179     For use from asynchronous contexts.
    180     """
--> 181     return await asyncio.wrap_future(self.future)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:194, in Call._run_sync(***failed resolving arguments***)
    192     with cancel_sync_at(self.cancel_context.deadline) as ctx:
    193         ctx.chain(self.cancel_context, bidirectional=True)
--> 194         result = self.fn(*self.args, **self.kwargs)
    196 # Return the coroutine for async execution
    197 if inspect.isawaitable(result):

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:695, in _to_parquet(source_path, dest_path, source_datatype, metadata, compartments, identifying_columns, concat, join, joins, chunk_columns, chunk_size, infer_common_schema, drop_null, **kwargs)
    652 """
    653 Export data to parquet.
    654 
   (...)
    691         result.
    692 """
    694 # gather sources to be processed
--> 695 sources = _gather_sources(
    696     source_path=source_path,
    697     source_datatype=source_datatype,
    698     targets=list(metadata) + list(compartments),
    699     **kwargs,
    700 )
    702 if not isinstance(sources, Dict):
    703     sources = sources.result()

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/flows.py:468, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
    464 parameters = get_call_parameters(self.fn, args, kwargs)
    466 return_type = "state" if return_state else "result"
--> 468 return enter_flow_run_engine_from_flow_call(
    469     self,
    470     parameters,
    471     wait_for=wait_for,
    472     return_type=return_type,
    473 )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:184, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    179     retval = from_async.wait_for_call_in_loop_thread(
    180         begin_run, done_callbacks=done_callbacks
    181     )
    183 else:
--> 184     retval = from_sync.wait_for_call_in_loop_thread(
    185         begin_run, done_callbacks=done_callbacks
    186     )
    188 return retval

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:137, in from_sync.wait_for_call_in_loop_thread(call, timeout, done_callbacks)
    135     waiter.add_done_callback(callback)
    136 waiter.wait()
--> 137 return call.result()

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:173, in Call.result(self, timeout)
    167 def result(self, timeout: Optional[float] = None) -> T:
    168     """
    169     Wait for the result of the call.
    170 
    171     Not safe for use from asynchronous contexts.
    172     """
--> 173     return self.future.result(timeout=timeout)

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
    449     raise CancelledError()
    450 elif self._state == FINISHED:
--> 451     return self.__get_result()
    453 self._condition.wait(timeout)
    455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:218, in Call._run_async(***failed resolving arguments***)
    216                 logger.debug("%r using async cancel scope %r", self, ctx)
    217                 ctx.chain(self.cancel_context, bidirectional=True)
--> 218                 result = await coro
    219 except BaseException as exc:
    220     logger.debug("Encountered exception in async call %r", self, exc_info=True)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/client/utilities.py:40, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     38 async with client_context as new_client:
     39     kwargs.setdefault("client", new_client or client)
---> 40     return await fn(*args, **kwargs)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:566, in create_and_begin_subflow_run(flow, parameters, wait_for, return_type, client)
    564     return terminal_state
    565 elif return_type == "result":
--> 566     return await terminal_state.result(fetch=True)
    567 else:
    568     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     86     raise PausedRun("Run paused.")
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:669, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context)
    664         else:
    665             from_async.call_soon_in_new_thread(
    666                 flow_call, timeout=flow.timeout_seconds
    667             )
--> 669         result = await flow_call.aresult()
    671         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    672             flow_run_context.task_run_futures, client=client
    673         )
    674 except PausedRun:

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:181, in Call.aresult(self)
    175 async def aresult(self):
    176     """
    177     Wait for the result of the call.
    178 
    179     For use from asynchronous contexts.
    180     """
--> 181     return await asyncio.wrap_future(self.future)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:194, in Call._run_sync(***failed resolving arguments***)
    192     with cancel_sync_at(self.cancel_context.deadline) as ctx:
    193         ctx.chain(self.cancel_context, bidirectional=True)
--> 194         result = self.fn(*self.args, **self.kwargs)
    196 # Return the coroutine for async execution
    197 if inspect.isawaitable(result):

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py:215, in _gather_sources(source_path, source_datatype, targets, **kwargs)
    212 source_path = _build_path(path=source_path, **kwargs)
    214 # gather filepaths which will be used as the basis for this work
--> 215 sources = _get_source_filepaths(
    216     path=source_path, source_datatype=source_datatype, targets=targets
    217 )
    219 # infer or validate the source datatype based on source filepaths
    220 source_datatype = _infer_source_datatype(
    221     sources=sources, source_datatype=source_datatype
    222 )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/tasks.py:485, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
    481 parameters = get_call_parameters(self.fn, args, kwargs)
    483 return_type = "state" if return_state else "result"
--> 485 return enter_task_run_engine(
    486     self,
    487     parameters=parameters,
    488     wait_for=wait_for,
    489     task_runner=SequentialTaskRunner(),
    490     return_type=return_type,
    491     mapped=False,
    492 )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:972, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
    970     return from_async.wait_for_call_in_loop_thread(begin_run)
    971 else:
--> 972     return from_sync.wait_for_call_in_loop_thread(begin_run)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:137, in from_sync.wait_for_call_in_loop_thread(call, timeout, done_callbacks)
    135     waiter.add_done_callback(callback)
    136 waiter.wait()
--> 137 return call.result()

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:173, in Call.result(self, timeout)
    167 def result(self, timeout: Optional[float] = None) -> T:
    168     """
    169     Wait for the result of the call.
    170 
    171     Not safe for use from asynchronous contexts.
    172     """
--> 173     return self.future.result(timeout=timeout)

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
    449     raise CancelledError()
    450 elif self._state == FINISHED:
--> 451     return self.__get_result()
    453 self._condition.wait(timeout)
    455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:218, in Call._run_async(***failed resolving arguments***)
    216                 logger.debug("%r using async cancel scope %r", self, ctx)
    217                 ctx.chain(self.cancel_context, bidirectional=True)
--> 218                 result = await coro
    219 except BaseException as exc:
    220     logger.debug("Encountered exception in async call %r", self, exc_info=True)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:1137, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs)
   1135     return await future._wait()
   1136 elif return_type == "result":
-> 1137     return await future._result()
   1138 else:
   1139     raise ValueError(f"Invalid return type for task engine {return_type!r}.")

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/futures.py:241, in PrefectFuture._result(self, timeout, raise_on_failure)
    239 if not final_state:
    240     raise TimeoutError("Call timed out before task finished.")
--> 241 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     86     raise PausedRun("Run paused.")
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/engine.py:1551, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, log_prints, interruptible, client)
   1544             logger.debug(
   1545                 f"Beginning execution...", extra={"state_message": True}
   1546             )
   1548             call = from_async.call_soon_in_new_thread(
   1549                 create_call(task.fn, *args, **kwargs)
   1550             )
-> 1551             result = await call.aresult()
   1553 except Exception as exc:
   1554     name = message = None

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:181, in Call.aresult(self)
    175 async def aresult(self):
    176     """
    177     Wait for the result of the call.
    178 
    179     For use from asynchronous contexts.
    180     """
--> 181     return await asyncio.wrap_future(self.future)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:194, in Call._run_sync(***failed resolving arguments***)
    192     with cancel_sync_at(self.cancel_context.deadline) as ctx:
    193         ctx.chain(self.cancel_context, bidirectional=True)
--> 194         result = self.fn(*self.args, **self.kwargs)
    196 # Return the coroutine for async execution
    197 if inspect.isawaitable(result):

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py:70, in _get_source_filepaths(path, targets, source_datatype)
     50 """
     51 Gather dataset of filepaths from a provided directory path.
     52 
   (...)
     64         Data structure which groups related files based on the compartments.
     65 """
     67 if source_datatype == "sqlite" or (path.is_file() and path.suffix == ".sqlite"):
     68     return {
     69         f"{table_name}.sqlite": [{"table_name": table_name, "source_path": path}]
---> 70         for table_name in _duckdb_with_sqlite()
     71         .execute(
     72             """
     73             /* perform query on sqlite_master table for metadata on tables */
     74             SELECT name as table_name
     75             from sqlite_scan(?, 'sqlite_master')
     76             where type='table'
     77             """,
     78             parameters=[str(path)],
     79         )
     80         .arrow()["table_name"]
     81         .to_pylist()
     82         if any(target.lower() in table_name.lower() for target in targets)
     83     }
     85 # gathers files from provided path using compartments as a filter
     86 sources = [
     87     {"source_path": file}
     88     for file in path.glob("**/*")
   (...)
     93     )
     94 ]

InvalidInputException: Invalid Input Error: Attempting to execute an unsuccessful or closed pending query result
Error: Invalid Error: unable to open database file

The command I used to obtain this error:

# Convert CellProfiler SQLite to parquet
cytotable.convert(
    source_path="'s3://cellpainting-gallery/cpg0016-jump/source_1/workspace/backend/Batch1_20221004/UL001643/UL001643.sqlite'",
    dest_path="test.parquet",
    dest_datatype="parquet"
)

Enhance preset capabilities through JSON or similar

...you might consider a "plugin" system where presets are stored as JSON or some other user-editable format (vs. having to edit a dictionary in the source), then made available as arguments to this function. You could also factor out the application of a preset into a separate function, as applying presets is starting to take up more than a few lines in this core function and I can only imagine it taking up more as you add options.

Originally posted by @falquaddoomi in #65 (review)

How does CytoTable handle duplicate ImageNumbers?

ImageNumber is guaranteed to be unique only within an Image.csv but not across all Image.csv files in a dataset.

Does pycytominer-transform handle this correctly?

In cytominer-database, we create a column, TableNumber, to identify rows in the Image table uniquely. TableNumber is set to be the hash of the Image.csv file:

https://github.com/cytomining/cytominer-database/blob/5aa00f58e4a31bbbd2a3779c87e7a3620b0030db/cytominer_database/ingest_variable_engine.py#L99

Add pytest markers indicating relative size of tests

This issue focuses on enabling the use of pytest markers to allow for more granular testing during development of this repo, for example poetry run pytest -m small to run "small" tests. Using markers could also allow for dagger based testing to use these markers for reproducible testing results, for example dagger do test 3.9 pytest small. As a start towards this work, small, medium, and large could be used for sizes (using only very rough estimates). These markers would allow for quicker results when developing specific aspects of this project.

Enhance convert() default or unspecified parameters with preferential choices

This issue highlights the potential to enable "smarter" default or unspecified parameter decisions for convert() such that parameters match, for example, the storage type or align themselves in unsurprising ways. For example, if a SQLite file is provided, default parameters could align to patterns found for those data sources in order to create a better user experience. Currently, many of the defaults stem from CellProfiler CSV's, which sometimes causes unexpected challenges. Inspired by #57 .

Enhance documentation and typing for interface composability

"Later on, while documenting the interface (both public and private) I think it'd be helpful for a reader to know how the methods within the library are composed or could be composed. You could do this by calling out methods that rely on the current method or vice versa in the docs (e.g., "called by X(), used as input to Y()"), to at least demonstrate one example of how they could be composed. If you wanted to take this idea further, you could define named types (https://docs.python.org/3/library/typing.html#type-aliases) that are returned or taken as arguments to the methods. For example, to_parquet() takes several arguments whose types are Union[List[str], Tuple[str, ...]]; if you were to instead distinguish each argument as a type of the thing you expect it to take, you could simply read the signatures of to_parquet() and other methods in the library to see how you'd chain them together.

On that note, you could probably save yourself some typing (no pun intended) by aliasing Union[List[str], Tuple[str, ...]] as, say, IterableCollection. ..."

Originally posted by @falquaddoomi in #22 (comment)

Rename project to CytoTable

Rename this project to CytoTable from pycytominer-transform.

  • Update all documentation references
  • Update any related code
  • Rename Github Repo
  • Make updates to related Github repo content or links

Add data type casting controls

This issue focuses on adding capabilities for data type casting controls within CytoTable based on the needs or desires of users. For example, if Float64 data types are encountered in source data and Float32 is desired, one should be able to use a parameter or setting of some kind to indicate the data should be casted as is passes through CytoTable procedures.

JoinError from remote s3

Thanks @d33bs for addressing #47, which may have solved #43, which describes my initial example application (pasted below again)

# Convert CellProfiler SQLite to parquet
cytotable.convert(
    source_path="'s3://cellpainting-gallery/cpg0016-jump/source_1/workspace/backend/Batch1_20221004/UL001643/UL001643.sqlite'",
    dest_path="test.parquet",
    dest_datatype="parquet"
)

I received the following error (full traceback bellow)

JoinError: Join failure for task 1 with failed join dependencies from tasks [5]
Full python traceback
---------------------------------------------------------------------------
JoinError                                 Traceback (most recent call last)
Cell In[5], line 1
----> 1 what = cytotable.convert(
      2     source_path="/".join(manifest_df.sqlite_file[2].split("/")[0:-1]),
      3     dest_path="test.parquet",
      4     dest_datatype="parquet"
      5 )

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:1186, in convert(source_path, dest_path, dest_datatype, source_datatype, metadata, compartments, identifying_columns, concat, join, joins, chunk_columns, chunk_size, infer_common_schema, drop_null, preset, parsl_config, **kwargs)
   1169 # send sources to be written to parquet if selected
   1170 if dest_datatype == "parquet":
   1171     output = _to_parquet(
   1172         source_path=source_path,
   1173         dest_path=dest_path,
   1174         source_datatype=source_datatype,
   1175         metadata=metadata,
   1176         compartments=compartments,
   1177         identifying_columns=identifying_columns,
   1178         concat=concat,
   1179         join=join,
   1180         joins=joins,
   1181         chunk_columns=chunk_columns,
   1182         chunk_size=chunk_size,
   1183         infer_common_schema=infer_common_schema,
   1184         drop_null=drop_null,
   1185         **kwargs,
-> 1186     ).result()
   1188 return output

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py:301, in DataFlowKernel.handle_exec_update(self, task_record, future)
    298     raise RuntimeError("done callback called, despite future not reporting itself as done")
    300 try:
--> 301     res = self._unwrap_remote_exception_wrapper(future)
    303 except Exception as e:
    304     logger.debug("Task {} try {} failed".format(task_id, task_record['try_id']))

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py:567, in DataFlowKernel._unwrap_remote_exception_wrapper(future)
    565 result = future.result()
    566 if isinstance(result, RemoteExceptionWrapper):
--> 567     result.reraise()
    568 return result

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py:122, in RemoteExceptionWrapper.reraise(self)
    118 logger.debug("Reraising exception of type {}".format(self.e_type))
    120 v = self.get_exception()
--> 122 reraise(t, v, v.__traceback__)

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/six.py:719, in reraise(tp, value, tb)
    717     if value.__traceback__ is not tb:
    718         raise value.with_traceback(tb)
--> 719     raise value
    720 finally:
    721     value = None

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py:160, in wrapper()
    158 from parsl.app.errors import RemoteExceptionWrapper
    159 try:
--> 160     return func(*args, **kwargs)
    161 except Exception:
    162     return RemoteExceptionWrapper(*sys.exc_info())

File ~/miniforge3/envs/jump_sc/lib/python3.10/site-packages/cytotable/convert.py:855, in _to_parquet()
    847 from cytotable.sources import _gather_sources
    849 # gather sources to be processed
    850 sources = _gather_sources(
    851     source_path=source_path,
    852     source_datatype=source_datatype,
    853     targets=list(metadata) + list
(compartments),
    854     **kwargs,
--> 855 ).result()
    857 # if we already have a file in dest_path, remove it
    858 if pathlib.Path(dest_path).is_file():

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:458, in result()
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File ~/miniforge3/envs/jump_sc/lib/python3.10/concurrent/futures/_base.py:403, in __get_result()
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

JoinError: Join failure for task 1 with failed join dependencies from tasks [5]

parcl log file: parsl.log

Default for `drop_nulls=True` causes loss of data

When reviewing the number of segmented single cells per image, I noticed a big discrepancy between the output from CytoTable and CellProfiler.

After looking at the CytoTable convert function, the parameter drop_nulls is defaulted to true, which means that a significant amount of data (rows) is removed due to having at least one column with an NaN value.

In my original example, I run through a dictionary with paths for each plate to run convert:

# run through each run with each set of paths based on dictionary
for plate, info in plate_info_dictionary.items():
    source_path = info["source_path"]
    dest_path = info["dest_path"]
    print(f"Performing merge single cells and conversion on {plate}!")

    # merge single cells and output as parquet file
    convert(
        source_path=source_path,
        dest_path=dest_path,
        dest_datatype=dest_datatype,
        preset=preset,
    )
    print(f"Merged and converted {pathlib.Path(dest_path).name}!")

    # add single cell count per well as metadata column to parquet file and save back to same path
    sc_utils.add_sc_count_metadata_file(
        data_path=dest_path, well_column_name="Image_Metadata_Well", file_type="parquet"
    )
    print(f"Added single cell count as metadata to {pathlib.Path(dest_path).name}!")

With the default parameter drop_nulls set to true, I get this output for the last variable run in this for loop:

image

After adding the drop_nulls parameter to the above function and setting it to False, we get this output for the same plate:

image

Since we preprocess the features before analysis, we can drop columns containing NaN values without removing the segmented single cells entirely. This will be an easy fix to default to False instead and let the user decide if they are okay with removing single cells/rows.

Develop a cytotable.annotate function to incorporate metadata

Based on the tests, it seems like the convert function should work the same as using Pycytominer Single Cells, where we merge the single cells from the database with the corresponding metadata.

After running this function, I can see that this was not the case.

For clarification, should the convert function be able to merge single cells (e.g. with the compartments being able to associate the objects together to get a file with all single cells) along with converting to parquet?

Improve exception visibility within console

When exceptions are encountered the console displayed message often includes Parsl information (which app failed and workflow-wise why it failed) but this sometimes does not include information about what specifically failed. This may leave the audience with an understanding that something went wrong but without an understanding of how it fix or share it with others (without further consulting with the log file(s)). This issue highlights the need to raise nested exceptions from within the Parsl apps to provide better visibility and quicker troubleshooting. Inspired from conversations in #52 .

Refine SQL table references for joins parameter

This issue highlights the possibility of enhancing the convert(joins="") parameter using conventions which may be more user friendly or recognizable by leveraging common syntax from other SQL dialects. At the moment, this statement makes references leveraging DuckDB SQL and references to intermediary files, which may not be as understandable as alternatives. Possible alternatives might include substitution variables (for ex. &a_variable_name).

Expand testing for MacOS environments

This issue stems from #52 , seeking to provide a proactive way of observing potential errors on MacOS systems. Addressing this issue will mean:

  • Adding MacOS automated tests
  • Providing upfront documentation on tested OS environments for compatibility understanding

no_sign_request=True does not skip AWS config ID

I'm receiving an InvalidAccessKeyID error after deleting my AWS configure ID from maple. I wanted to try the no_sign_request=True option before adding credentials back to maple, but received an error that I outline below.

As specified here, #52 (comment), I ran cytotable.convert(..., no_sign_request=True) (full command below)

cytotable.convert(
    source_path="s3://cellpainting-gallery/cpg0016-jump/source_1/workspace/backend/Batch1_20221004/UL001643"
    dest_path="test2.parquet",
    dest_datatype="parquet",
    chunk_size=150000,
    parsl_config=parsl_config,
    no_sign_request=True,
    preset="cellprofiler_sqlite_pycytominer"
)

But I received the following error:

exception of type <class 'botocore.exceptions.ClientError'>
1685550430.488742 2023-05-31 10:27:10 MainProcess-248907 HTEX-Queue-Management-Thread-139723934787136 parsl.dataflow.dflow:304 handle_exec_update DEBUG: Task 3 try 0 failed
1685550430.488833 2023-05-31 10:27:10 MainProcess-248907 HTEX-Queue-Management-Thread-139723934787136 parsl.dataflow.dflow:350 handle_exec_update ERROR: Task 3 failed after 0 retry attempts
Traceback (most recent call last):
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py", line 301, in handle_exec_update
    res = self._unwrap_remote_exception_wrapper(future)
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/dataflow/dflow.py", line 567, in _unwrap_remote_exception_wrapper
    result.reraise()
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py", line 122, in reraise
    reraise(t, v, v.__traceback__)
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/six.py", line 719, in reraise
    raise value
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/parsl/app/errors.py", line 160, in wrapper
    return func(*args, **kwargs)
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cytotable/sources.py", line 81, in _get_source_filepaths
    if AnyPath(path).is_file()
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cloudpathlib/s3/s3path.py", line 39, in is_file
    return self.client._is_file_or_dir(self) == "file"
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cloudpathlib/s3/s3client.py", line 164, in _is_file_or_dir
    return self._s3_file_query(cloud_path)
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cloudpathlib/s3/s3client.py", line 197, in _s3_file_query
    return next(
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/cloudpathlib/s3/s3client.py", line 198, in <genexpr>
    (
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/boto3/resources/collection.py", line 81, in __iter__
    for page in self.pages():
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/boto3/resources/collection.py", line 171, in pages
    for page in pages:
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/botocore/paginate.py", line 269, in __iter__
    response = self._make_request(current_kwargs)
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/botocore/paginate.py", line 357, in _make_request
    return self._method(**current_kwargs)
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/botocore/client.py", line 530, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/gway/miniconda3/envs/jump_sc/lib/python3.10/site-packages/botocore/client.py", line 964, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjects operation: The AWS Access Key Id you provided does not exist in our records.
1685550430.490109 2023-

parsl.log

Address column naming differences from Pycytominer-processed CellProfiler SQLite sources

Currently there is a gap between Pycytominer processed SQLite files (using SingleCells.merge_single_cells()) and CytoTable (using convert functionality). This gap occurs where the string "Metadata_" is added to specific columns. A configuration preset change in addition to other aspects may need to be addressed.

Specifically, the following columns are named differently:

Pycytominer columns which are named differently from CytoTable results on same dataset CytoTable columns which are named differently from Pycytominer results on same dataset
['Metadata_Cytoplasm_Parent_Cells',
 'Metadata_Cytoplasm_Parent_Nuclei',
 'Metadata_Cells_Number_Object_Number',
 'Metadata_Nuclei_Number_Object_Number']
['Cytoplasm_Parent_Cells',
 'Cytoplasm_Parent_Nuclei',
 'Cells_Number_Object_Number',
 'Nuclei_Number_Object_Number']

Enhance project title documentation

This issue highlights a need to better document the meaning of the project name in context to the work performed.

"... I wonder if in a future PR you might describe why you chose the name or what it means, ..."

Originally posted by @falquaddoomi in #33 (review)

Adding: expand on microscopy specific elements alongside the more technical aspects.

Add readme

  • Include objectives for this repo
  • Provide diagram outlining data flow
  • Provide references for related repos/projects involved

Account for CellProfiler data schema differences across versions

CellProfiler has various releases which may include slightly different output and as a result entail different treatment within pycytominer-transform. Closing this issue may entail addressing the following:

  • Specify which CellProfiler versions are supported
  • Implement pycytominer-transform design for CellProfiler version specification
  • Implement isolated testing for CellProfiler versions with pcytominer-transform output

Increase documentation on cloud-based sources

This issue highlights a need to increase documentation surrounding cloud-based locations due to their dependencies or expectations. For example, more documentation covering common no_sign_request usage and other similar scenarios may be helpful.

Inspired from #61

Increase MacOS compatibility through DuckDB v0.8.0 availability

This issue highlights the need to increase compatibility with MacOS versions 11.x - 12.6 by removing DuckDB version constraints and making necessary modifications for compatibility with DuckDB v0.8.0. This new version of DuckDB provides compatibility features necessary for MacOS versions earlier than 12.6. This work stems from conversations in #52 .

Resolve potential datetime schema concatenation challenges

There may be datetime schema concatenation challenges which need to be resolved in order to join data with related fields.

I suspect the datetime is the column Metadata_AbsTime in the Image.csv files

csvcut -c Metadata_AbsTime ~/Downloads/Image.csv 
# Metadata_AbsTime
# 2021-05-02T16:07:21.633-04:00

It's ok to parse these as text, if that makes things easier (in the short term)

Originally posted by @shntnu in #29 (comment)

Move to SequentialTaskRunner default to avoid Prefect API SQLite write conflicts

Existing functionality within this repo leverages Prefect's ConcurrentTaskRunner to execute workflows using Prefect's ephemeral API. Upon testing within #8 by @shntnu it was demonstrated that there are SQLite write conflicts which are experienced when attempting to handle many concurrent operations. While there are options for using Prefect's non-ephemeral API to avoid this challenge, these would require further system integration and are not yet a part of the design of this tool. As a result, the default task runner should be moved to a more certain option.

Enhance compartment schema specification for compartment data source groups

  • Improve the ability for pycytominer-transform to handle groups of compartment data sources which may have different columns and/or datatypes by inference
  • Do the same for column and datatype specification using a specific expected schema (instead of, for example, relying on the "first" file encountered by a procedure).

Referencing cytominer-database design where appropriate: https://github.com/cytomining/cytominer-database/blob/9cf43400a01b82e4b4bce0901e039b7f24519ffe/README.rst#the-schema-section

Ensure reasonable performance with large CSV datasets

In order to provide the best utility to Pycytominer users it's crucial we ensure reasonable performance when ingesting large amounts of CSV data (whether by number of files, size of files, or both). This issue is intended to help to guide this repo towards reasonable performance expectations (computing resources and time).

Initial release

  • Add initial release to Github
  • Deploy release to PyPI
  • Deploy release to conda
  • Document release instructions as part of #14

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.