Parallel
Parallel Computation Module using Ray
This module provides tools to efficiently parallelize and manage computational tasks using Ray, a high-performance distributed execution framework. It is designed to facilitate the distribution of workloads across multiple CPU cores or machines, thereby accelerating processing times for large-scale data or compute-intensive operations.
Usage¶
-
Define a Task Class
Subclass
RayTaskGeneral
and implement theprocess_item
method with the desired computation. -
Initialize Ray and RayManager
-
Submit Tasks
items_to_process = [1, 2, 3, 4, 5] def save_results(results, **kwargs): # Implement saving logic, e.g., write to a file or database print("Saving results:", results) manager.submit_tasks( items=items_to_process, chunk_size=2, save_func=save_results, save_kwargs={'destination': 'output.txt'}, save_interval=2 )
-
Retrieve Results
Notes¶
The RayManager
handles task submission and result collection efficiently by
maintaining a pool of worker futures up to the specified number of cores.
The save_func
allows for intermediate saving of results, which is useful for
long-running tasks to prevent data loss and manage memory usage.
Exception handling is incorporated to ensure that individual task failures do not
halt the entire processing pipeline. Errors are logged and returned as part of
the results for further inspection.
RayManager(task_class, n_cores=-1, use_ray=False)
¶
A manager class for handling task submissions and result collection using Ray's Task Parallelism.
Initializes the RayManager.
PARAMETER | DESCRIPTION |
---|---|
|
A callable that returns an instance with a
TYPE:
|
|
Number of parallel tasks to run. If <= 0, uses all available CPUs.
TYPE:
|
|
Flag to determine if Ray should be used. If False, runs tasks sequentially.
TYPE:
|
futures: list[ray.ObjectRef] = []
¶
A list of Ray object references representing the currently submitted but not yet completed tasks. This manages the pool of active workers.
n_cores = n_cores if n_cores > 0 else int(ray.available_resources().get('CPU', 1))
¶
The number of parallel tasks to run. If set to -1
or any value less than or
equal to 0
, all available CPU cores are utilized.
results: list[Any] = []
¶
A list that stores the results of all completed tasks. It aggregates the output returned by each worker.
save_func: Callable[[Any, Any], None] | None = None
¶
An optional callable function that takes a batch of results and performs a
save operation. This can be used to persist intermediate results to
disk, a database, or any other storage medium. If set to None
, results are
not saved automatically.
save_interval: int = 1
¶
The number of results to accumulate before invoking save_func
.
When the number of collected results reaches this interval,
save_func
is called to handle the batch of results.
save_kwargs: dict[str, Any] = dict()
¶
A dictionary of additional keyword arguments to pass to
save_func
when it is called. This allows for flexible configuration of the
save operation, such as specifying file paths,
database connections, or other parameters required by save_func
.
task_class = task_class
¶
A callable that returns an instance with a run
method for
processing each item.
use_ray = use_ray
¶
Flag to determine if Ray should be used. If False, runs tasks sequentially.
get_results()
¶
Retrieves all collected results.
RETURNS | DESCRIPTION |
---|---|
list[Any]
|
A list of results from all completed tasks. |
submit_tasks(items, chunk_size=100, save_func=None, save_kwargs=dict(), save_interval=100)
¶
Submits tasks using a generator and manages workers up to n_cores.
PARAMETER | DESCRIPTION |
---|---|
|
A list of items to process.
TYPE:
|
|
Number of items per chunk.
TYPE:
|
|
A callable that takes a list of results and saves them.
TYPE:
|
|
The number of results after which to invoke save_func.
TYPE:
|
RayTaskGeneral()
¶
Bases: ABC
A parent class of others that governs what calculations are run on each task.
results: list[Any] = []
¶
process_item(item)
¶
The definition that computes or processes a single item.
This method should be implemented by subclasses to define the specific processing logic for each item.
PARAMETER | DESCRIPTION |
---|---|
|
The input data required for the calculation.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Any
|
The result of processing the item.
TYPE:
|
ray_worker(task_class, chunk)
¶
Remote function to process a single item using the provided task class.
PARAMETER | DESCRIPTION |
---|---|
|
A callable that returns an instance with a
TYPE:
|
|
The input data required for the calculation.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Any
|
The result of the
TYPE:
|