Skip to content

Parallel

Parallel Computation Module using Ray

This module provides tools to efficiently parallelize and manage computational tasks using Ray, a high-performance distributed execution framework. It is designed to facilitate the distribution of workloads across multiple CPU cores or machines, thereby accelerating processing times for large-scale data or compute-intensive operations.

Usage

  1. Define a Task Class

    Subclass RayTaskGeneral and implement the process_item method with the desired computation.

    class MyTask(RayTaskGeneral):
        def process_item(self, item):
            # Implement the computation
            return item * 2  # Example operation
    
  2. Initialize Ray and RayManager

    import ray
    from your_module import RayManager, MyTask
    
    ray.init()  # Initialize Ray
    
    manager = RayManager(task_class=MyTask, n_cores=4)  # Use 4 cores
    
  3. Submit Tasks

    items_to_process = [1, 2, 3, 4, 5]
    
    def save_results(results, **kwargs):
        # Implement saving logic, e.g., write to a file or database
        print("Saving results:", results)
    
    manager.submit_tasks(
        items=items_to_process,
        chunk_size=2,
        save_func=save_results,
        save_kwargs={'destination': 'output.txt'},
        save_interval=2
    )
    
  4. Retrieve Results

    all_results = manager.get_results()
    print("All Results:", all_results)
    

Notes

The RayManager handles task submission and result collection efficiently by maintaining a pool of worker futures up to the specified number of cores. The save_func allows for intermediate saving of results, which is useful for long-running tasks to prevent data loss and manage memory usage. Exception handling is incorporated to ensure that individual task failures do not halt the entire processing pipeline. Errors are logged and returned as part of the results for further inspection.

RayManager(task_class, n_cores=-1, use_ray=False)

A manager class for handling task submissions and result collection using Ray's Task Parallelism.

Initializes the RayManager.

PARAMETER DESCRIPTION

task_class

A callable that returns an instance with a run method for processing each item.

TYPE: Callable[[], Any]

n_cores

Number of parallel tasks to run. If <= 0, uses all available CPUs.

TYPE: int DEFAULT: -1

use_ray

Flag to determine if Ray should be used. If False, runs tasks sequentially.

TYPE: bool DEFAULT: False

futures: list[ray.ObjectRef] = []

A list of Ray object references representing the currently submitted but not yet completed tasks. This manages the pool of active workers.

n_cores = n_cores if n_cores > 0 else int(ray.available_resources().get('CPU', 1))

The number of parallel tasks to run. If set to -1 or any value less than or equal to 0, all available CPU cores are utilized.

results: list[Any] = []

A list that stores the results of all completed tasks. It aggregates the output returned by each worker.

save_func: Callable[[Any, Any], None] | None = None

An optional callable function that takes a batch of results and performs a save operation. This can be used to persist intermediate results to disk, a database, or any other storage medium. If set to None, results are not saved automatically.

save_interval: int = 1

The number of results to accumulate before invoking save_func. When the number of collected results reaches this interval, save_func is called to handle the batch of results.

save_kwargs: dict[str, Any] = dict()

A dictionary of additional keyword arguments to pass to save_func when it is called. This allows for flexible configuration of the save operation, such as specifying file paths, database connections, or other parameters required by save_func.

task_class = task_class

A callable that returns an instance with a run method for processing each item.

use_ray = use_ray

Flag to determine if Ray should be used. If False, runs tasks sequentially.

get_results()

Retrieves all collected results.

RETURNS DESCRIPTION
list[Any]

A list of results from all completed tasks.

submit_tasks(items, chunk_size=100, save_func=None, save_kwargs=dict(), save_interval=100)

Submits tasks using a generator and manages workers up to n_cores.

PARAMETER DESCRIPTION

items

A list of items to process.

TYPE: list[Any]

chunk_size

Number of items per chunk.

TYPE: int DEFAULT: 100

save_func

A callable that takes a list of results and saves them.

TYPE: Callable[[Any, Any], None] | None DEFAULT: None

save_interval

The number of results after which to invoke save_func.

TYPE: int DEFAULT: 100

task_generator(items, chunk_size)

Generator that yields individual items from chunks.

PARAMETER DESCRIPTION

items

A list of items to process.

TYPE: list[Any]

chunk_size

Number of items per chunk.

TYPE: int

YIELDS DESCRIPTION
Any

Individual items to be processed.

RayTaskGeneral()

Bases: ABC

A parent class of others that governs what calculations are run on each task.

results: list[Any] = []

process_item(item)

The definition that computes or processes a single item.

This method should be implemented by subclasses to define the specific processing logic for each item.

PARAMETER DESCRIPTION

item

The input data required for the calculation.

TYPE: Any

RETURNS DESCRIPTION
Any

The result of processing the item.

TYPE: Any

run(items)

Processes multiple items.

This method wraps the process_item method with a try-except block.

PARAMETER DESCRIPTION

items

The input data required for the calculation.

TYPE: list[Any]

RETURNS DESCRIPTION
list[Any]

The result of the calculations or an error tuple.

ray_worker(task_class, chunk)

Remote function to process a single item using the provided task class.

PARAMETER DESCRIPTION

task_class

A callable that returns an instance with a run method.

TYPE: Callable[[], Any]

item

The input data required for the calculation.

TYPE: Any

RETURNS DESCRIPTION
Any

The result of the run method or an error tuple.

TYPE: Any