Giter VIP home page Giter VIP logo

ray_beam_runner's Introduction

Ray-based Apache Beam Runner

This is a WIP proof of concept implementation undergoing frequent breaking changes and should not be used in production.

The Portable Ray Beam Runner

The directory ray_beam_runner/portability contains a prototype for an implementation of a Beam runner for Ray that relies on Beam's portability framework.

Install and build from source

To install the existing Ray Beam runner from a clone of this repository, you can follow the next steps:

# First create a virtual environment to install and run Python dependencies
virtualenv venv
. venv/bin/activate

# Install development dependencies for the project
pip install -r requirements_dev.txt

# Create a local installation to include test dependencies
pip install -e .[test]
# Or if saw error messages like "zsh: no matches found: .[test]", try:
pip install -e '.[test]'

Testing

The project has extensive unit tests that can run on a local environment. Tests that verify the basic runner functionality exist in the file ray_beam_runner/portability/ray_runner_test.py.

To run the runner functionality test suite for the Ray Beam Runner, you can run the following command:

pytest ray_beam_runner/portability/ray_runner_test.py

To run all local unit tests, you can simply run pytest from the root directory.

Found issues? Want to help?

Please file any problems with the runner in this repository's issue section. If you would like to help, please look at the issues as well. You can pick up one of them and try to implement it.

Performance testing

# TODO: Write these tests and document how to run them.

ray_beam_runner's People

Contributors

flyingimer avatar iasoon avatar jjyao avatar krfricke avatar pabloem avatar rkenmi avatar valiantljk avatar wilsonwang371 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ray_beam_runner's Issues

Running TFX tests failed with error

I am executing TFX code on top of beam on ray. I saw the following error. Need further investigation.

2023-01-11 22:06:21,746 INFO worker.py:1538 -- Started a local Ray instance.
(ray_execute_bundle pid=932030) 2023-01-11 22:06:26.138460: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
(ray_execute_bundle pid=932030) To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
(ray_execute_bundle pid=932030) 2023-01-11 22:06:26.273821: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=932030) 2023-01-11 22:06:26.273868: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
(ray_execute_bundle pid=932030) 2023-01-11 22:06:27.034191: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=932030) 2023-01-11 22:06:27.034290: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=932030) 2023-01-11 22:06:27.034306: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 6 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 90, in process
    self._run_node()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 99, in _run_node
    launcher.Launcher(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 573, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 448, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 112, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
    result = executor.Do(execution_info.input_dict, output_dict,
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 283, in Do
    (example_split
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 198, in run_pipeline
    return self.execute_pipeline(stage_context, stages)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 227, in execute_pipeline
    result = self._run_stage(runner_execution_context, bundle_ctx, queue)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 274, in _run_stage
    ) = self._run_bundle(
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 348, in _run_bundle
    result = ray.get(next(result_generator))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py", line 2309, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::ray_execute_bundle() (pid=932030, ip=10.189.114.217)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/execution.py", line 113, in ray_execute_bundle
    result_future = worker_handler.control_conn.push(instruction_request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 815, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "apache_beam/runners/common.py", line 884, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/sdf_utils.py", line 62, in __init__
    raise ValueError(
ValueError: Initialize ThreadsafeRestrictionTracker requiresRestrictionTracker.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./feature_skewness.py", line 75, in <module>
    BeamDagRunner().run(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/tfx_runner.py", line 124, in run
    return self.run_with_ir(pipeline_pb, run_options=run_options_pb, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 297, in run_with_ir
    logging.info('Node %s is scheduled.', node_id)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
    self._run_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 324, in apache_beam.runners.worker.operations.GeneralPurposeConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 905, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 90, in process
    self._run_node()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 99, in _run_node
    launcher.Launcher(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 573, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 448, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 112, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
    result = executor.Do(execution_info.input_dict, output_dict,
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 283, in Do
    (example_split
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 198, in run_pipeline
    return self.execute_pipeline(stage_context, stages)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 227, in execute_pipeline
    result = self._run_stage(runner_execution_context, bundle_ctx, queue)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 274, in _run_stage
    ) = self._run_bundle(
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 348, in _run_bundle
    result = ray.get(next(result_generator))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py", line 2309, in get
    raise value.as_instanceof_cause()
RuntimeError: ray.exceptions.RayTaskError(ValueError): ray::ray_execute_bundle() (pid=932030, ip=10.189.114.217)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/execution.py", line 113, in ray_execute_bundle
    result_future = worker_handler.control_conn.push(instruction_request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 815, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "apache_beam/runners/common.py", line 884, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/sdf_utils.py", line 62, in __init__
    raise ValueError(
ValueError: Initialize ThreadsafeRestrictionTracker requiresRestrictionTracker. [while running 'Run[FileBasedExampleGen]']

Design a path for work items to report progress

The Beam API implements several RPCs for processing of data.

The main RPC is the ProcessBundle RPC, which we've implemented.

During the execution of a ProcessBundle rpc, the Beam worker consumes data to process, and outputs data. While this request is being executed, the runner can request progress reports from the worker. These progress reports are requested via the ProcessBundleProgress rpc.

This work item consists on:

  • Adding a thread/task that performs periodic ProcessBundleProgress rpcs for the SDK worker
  • Passing the results of these progress reports back to the coordinator thread somehow

Some ideas:

  • We could spawn a thread in ray_execute_bundle that polls the worker
  • This thread could then push updates to an actor that is shared globally and the coordinator thread can consult?

This is just one idea, but we can do things in different ways.

[batch] Create Work Item Scheduler

The Ray Work Item Scheduler takes work items from the batch FnApiRunner's topological scheduler as input, and submits them for execution as Ray worker tasks. Its design should be based on the FnApiRunner's WorkerHandlerManager.

Ray worker tasks should be based on Beam's EmbeddedPythonWorker and other worker handlers. Each Ray worker task takes a ProcessBundleRequest proto to execute, and stores/retrieves its state as object references in the central Pipeline State Manager.

Prototype expansion of SQL transforms for single-node execution

One of the main targets for the Ray Beam Runner is to support SQL (and streaming SQL).

Beam's SQL support is implemented in Java. There are two parts for the execution of SQL transforms in Beam:

  • Expansion: The way Beam implements expansion of multi-language transforms is by implementing an ExpansionService interface (sample of the GRPC implementation - this seems way too complicated to be honest)

My idea:

  • Implement a class "RayJavaExpansionService" - that receives the expansion request that can be a relatively simple thing. It must contain:
    • Schema of the Input PCollection (what are schemas)
    • Identifier of the transform to apply (these ideantifiers are provided by SchemaTransformProvider implementations (see a few examples)
      • Note: I will implement a Sql one: SqlSchemaTransformProvider with id "beam:schematransform:org.apache.beam:sql:v1" this week.
    • Parameters for the transform (in this case, just the SQL statement)

The RayJavaExpansionService should then return the schema of the resulting PCollection, as well as the expanded graph of operations in protobuf format (the proto format).

  • Java dependencies:
    • "org.apache.beam:beam-sdks-java-core"
    • "org.apache.beam:beam-sdks-java-extensions-sql"

The expansion is not enough to execute SQL, but it's the first step. The next step is to recognize Java Stages, and execute them in a Java process rather than a Python process (basically, a Java implementation of this code, where we return some kind of JavaWorkerHandler

[batch] Create Pipeline State Manager

The Ray Pipeline State Manager is a central service that consolidates the execution state of all scheduled pipeline work items in Ray's object store. This should be based on the single-process current implementation used in Beam's FnApiRunner.

At a high-level, this should be a Ray Actor that worker tasks use for (1) durable persistence of any ObjectRef that they have persisted in Ray's object store via ref = ray.put(obj) and (2) on-demand retrieval of any persisted ObjectRef which they can materialize via obj = ray.get(ref).

The state manager should also support efficient, atomic checkpointing and restoration of all state persisted in Ray's in-memory object store to durable storage (e.g. on-disk or to a durable cloud storage service etc.).

Add support for timers in Portable Ray Runner

The prototype outlined in #10 supports batch pipelines that read / write data.

For streaming workflows, the state and timer primitives exist, which are used to manage long-term persisted state, and managing event time / processing time in a streaming pipeline (see this blog post for more information).

In fact, windows and triggers (higher-level concepts) for Beam can be built using state and timers.

Timers are passed in a similar way that data is passed in Beam (serialized and sent to the runner by the worker), so we only need to process them and schedule them for execution.

Code pounters

[batch] Create Python Ray Runner based on Beam Portability API

To support Beam pipeline graph construction, optimization, and submission for execution, we need to build a Ray Runner class based on Beam Portability API's FnApiRunner that supports both local (i.e. single-node) and remote (i.e. multi-node) pipeline execution.

At a high-level, its run_pipeline method should take a Beam pipeline DAG to run as input, and produce a PipelineResult that can describe/manage pipeline execution state as output. The Ray Runner Beam Components Doc provides additional details about the FnApiRunner's end-to-end workflow, and how it relates to Ray.

This runner will also depend on successful implementation of the following components:

  1. Ray Work Item Scheduler: The Ray Work Item Scheduler takes work items from the batch FnApiRunner's topological scheduler as input, and submits them for execution as Ray tasks.
  2. Ray Pipeline State Manager: The Ray Pipeline State Manager is a central service that consolidates the execution state of all scheduled pipeline work items in Ray's object store.

Add persistence of side inputs into state after they are calculated / executed

Side inputs are not supported at the moment. To test support, we only need to enable this test:

https://github.com/ray-project/ray_beam_runner/blob/master/ray_beam_runner/portability/ray_runner_test.py#L151-L163

The code exists to support this (https://github.com/ray-project/ray_beam_runner/blob/master/ray_beam_runner/portability/ray_fn_runner.py#L455-L459), but we need to figure out how the protocol to write and access side input state.

We need first to support state (issue #12 ) and then implement this.

Standardize how we fix serialization problems for Protobufs

From review:

Nit / general remark: It's a bit unfortunate that we keep running into the serialization issue, and sometimes solve it by using a custom reduce, sometimes by registering a custom serializer (ray.util.register_serializer), and sometimes manually (SerializeToString / FromString).
It would be good if we could enable serialization for all the protobuf components in one central place - I'm not sure how that could be done though, as ray.util.register_serializer would have to be called on every ray worker that transmits protobuf objects. Maybe something to discuss?

Parallelization: 'Reshuffle' data shared between stages

When a stage is executed (in ray_execute_bundle), its output can be immediately reshuffled so that its downstream processing can be parallelized.

When the upstream stage performs a write to GroupByKey, then we must group before reshuffling data (data belonging to the same key must be processed in the same worker).

If the upstream stage is not performing a GBK, then we can simply reshard everything without worrying about individual keys.

Refactor how we manage pcollections

Currently all the data (PCollections) is managed by PcollectionBufferManager actor and ray.put() into object store in ray_execute_bundle worker process. This approach has several implications:

  1. The owner of the data is whoever calls ray.put(), in our case, the worker processes that execute ray_execute_bundle. This is not ideal for fault tolerance since the data is fate share with the owner. In our cases, the data fate share with all the worker processes. Instead, we want the driver program to be the owner of all the data. This means instead of doing ray.put() we should return data as part of ray_execute_bundle return values so the driver program becomes the owner. More on the ownership design: https://docs.google.com/document/d/1lAy0Owi-vPz2jEqBSaHNQcy2IBSDEHyXNOQZlGuj93c/preview#heading=h.vjc9egi2q5aa
  2. Currently ray_execute_bundle gets the input data references from PcollectionBufferManager and call ray.get() to fetch the actual data. This way ray scheduler cannot do locality aware scheduling of ray_execute_bundle remote function since by examining the function arguments, it doesn't know what data it needs. Instead, we should pass data ObjRefs directly as arguments to ray_execute_bundle.

At high level, it should look like:

output_data_obj_ref1, output_data_obj_ref2, ... = ray_execute_bundle.remote(input_data_obj_ref1, input_data_obj_ref2...)

ray_execute_bundle(input_data_obj_1, input_data_obj_2...):
   return output_data_obj_1, output_data_obj_2, ...

General questions on worker design

As we discussed previously, I want to discuss a little bit about how we implement worker on ray. In FnAPIRunner implementation, workers have a few design detail questions.

  • input/output using a centralized manager buffer. How do we implement them in ray? Do we need to use actors? because ray saved objects are immutable. To be able to update data, we may need to use actors.
  • state management is done through a StateServicer. What changes do we need to implement it on Ray? Will actors pattern be good enough for a state management in beam on ray?
  • SDKHarness is a wrapper around SDKWorker, I am thinking probably we only need to implement SDKWorker and no need to implement SDKHarness.
  • To improve the performance, we may need to change the design after a certain point. Since current implementation of FnAPIRunner has a bottleneck in the central manager which serially run and communicate with each of the stage bundles.

If needed, we can later hold meetings to talk some of the details.

Resource leak in tests

When running the test suite, the following message appears:

2022-08-12 20:12:41,753	WARNING worker.py:1404 -- WARNING: 46 PYTHON worker processes have been started on node: 711ff7ca6e04ebd43f741dc793cac210f317e2cb1a237efd6ed17789 with address: 10.1.0.30. This could be a result of using a large number of actors, or due to tasks blocked in ray.get() calls (see https://github.com/ray-project/ray/issues/3644 for some discussion of workarounds).

As the number of worker processes keeps increasing as the test suite progresses, it looks like we have a resource leak somewhere in our code.

[batch] Create Foundational CI/CD Workflows

We need to create an initial set of basic CI/CD workflows based on GitHub Actions to vet the correctness of critical components of the Ray Beam Runner and guard them against regressions.

The initial deliverable should support running lightweight unit and single-node integration tests within GitHub's free plan usage limits, and minimally requires the creation of a workflow configuration file to determine when and how CI/CD workflows will be initiated for each commit.

Recommended GitHub-hosted runner environments for these workflows (specified in the workflow config's runs on parameter for each job) are, in priority-order:

  1. ubuntu-18.04
  2. macos-10.15
  3. ubuntu-latest
  4. macos-latest
  5. windows-latest (Ray on Windows is currently in Beta)

Our initial workflows should be run against the latest supported Ray release version (currently 1.11.0) and Apache Beam release version (currently 2.37.0), but we should also create dedicated Ray/Beam version compatibility branches to hold changes meant to ensure ongoing CI/CD workflow stability against:

  1. The latest nightly Ray wheels and latest Apache Beam SDK release.
  2. The latest Ray official release and latest Apache Beam SDK release.

Ideally, changes passing CI/CD from branch [1] should be periodically merged into branch [2] (e.g. at commits corresponding to release branch version cuts for Ray), and changes passing CI/CD from branch [2] should be periodically merged into master (e.g. for a new release branch version cut of the Ray Beam Runner).

Workflows that rely on connecting to multi-node clusters for integration testing will be considered as part of a separate issue, and are expected to require dedicated cloud service provider (e.g. AWS, GCP, Azure, etc.) funding to host.

Aggregate metrics within the main thread

When a Beam bundle is executed, a list of metrics is returned with the result. These metrics measure the occurrence of certain events. For example:

class MyDoFn(DoFn):
    def process(self, element):
        Metrics.counter('sample_dofn', 'sum_of_events').inc(element)
        Metrics.counter('sample_dofn', 'count_of_events').inc(1)
with beam.Pipeline() as p:
    (p 
     | beam.Create([1, 2, 3])
     | beam.ParDo(MyDoFn())

p.result.metrics()  # => Returns {('sample_dofn', 'sum_of_events'): 6, ('sample_dofn', 'count_of_events'): 3}] 

The metrics are reported in the InstructionResult object that we recover after each bundle execution: https://github.com/ray-project/ray_beam_runner/blob/master/ray_beam_runner/portability/ray_fn_runner.py#L332

Here's the definition of the InstructionResult:

And specifically, this contains a ProcessBundleResponse, which has monitoring_data:

So - what we want to do is take those metrics and aggregate them to have a unified view of them.

Here's an example of code doing that in the local runner. We may basically copy that code fully and run it on our runner as well:

Probably around the spot where we recover the bundle result

Finally, another code sample that is less important, but worth knowing about is the part of the SDK worker that actually fills up these metrics: https://github.com/apache/beam/blob/0e61b026ea7accd666fc443f3aeec7f93147a3b6/sdks/python/apache_beam/runners/worker/sdk_worker.py#L635-L646

Add watermark-based scheduling to the Ray Runner

The Ray Runner currently works by topologically sorting the pipeline graph, and executing stage by stage until the whole pipeline has been executed. This means that it only supports batch mode, and it can't execute multiple stages in parallel.

By implementing watermark-based scheduling, and by executing any bundle that is ready for execution, we can start gaining parallelism, and move towards streaming support.

This work is somewhat involved, because it requires changing the whole execution logic for the pipeline, however it should increase our parallelism, which will be great (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L420-L487)

Add support for State in ParDo / DoFn operations

The Ray Runner does not support accessing state by ParDo / DoFns. This test verifies this behavior:

https://github.com/ray-project/ray_beam_runner/blob/master/ray_beam_runner/portability/ray_runner_test.py#L328-L352

We have an implementation of a state manager (https://github.com/ray-project/ray_beam_runner/blob/master/ray_beam_runner/portability/execution.py#L205-L243), as an Actor that implements the state servicer interface (https://github.com/apache/beam/blob/044313637c9eea2e3c2b0baa60bc853a948c12ee/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L944). We need to verify that the state is plumbed through and working.

Start designing shuffling algorithm

When a stage sends its output, we want to start using that to shuffle data to downstream stages.

for output in worker_handler.data_conn.input_elements(
process_bundle_id,
expect_reads,
abort_callback=lambda:
(result_future.is_done() and bool(result_future.get().error))):
if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
output_buffers[expected_outputs[(output.transform_id, output.timer_family_id)]].append(output.data)
if isinstance(output, beam_fn_api_pb2.Elements.Data) and not dry_run:
output_buffers[expected_outputs[output.transform_id]].append(output.data)
for pcoll, buffer in output_buffers.items():
objrefs = [ray.put(buffer)]
runner_context.pcollection_buffers.put.remote(pcoll, objrefs)
output_buffers[pcoll] = objrefs

Example of shuffle implementation for Ray Datasets 1.13: ray-project/ray#23758

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.