trw.train.job_executor2

Module Contents

Classes

JobMetadata

JobExecutor2

Execute jobs on multiple processes.

Functions

flush_queue(queue)

worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue, transform: Callable[[trw.basic_typing.Batch], trw.basic_typing.Batch], global_abort_event: multiprocessing.Event, local_abort_event: multiprocessing.Event, synchronized_stop: multiprocessing.Event, wait_time: float, seed: int) → None

Worker that will execute a transform on a process.

collect_results_to_main_process(job_session_id: multiprocessing.Value, jobs_queued: multiprocessing.Value, worker_output_queue: multiprocessing.Queue, output_queue: queue.Queue, global_abort_event: multiprocessing.Event, synchronized_stop: multiprocessing.Event, local_abort_event: multiprocessing.Event, stop_event: multiprocessing.Event, wait_time: float) → None

Attributes

multiprocessing

default_queue_timeout

trw.train.job_executor2.multiprocessing
trw.train.job_executor2.default_queue_timeout = 0.001
trw.train.job_executor2.flush_queue(queue)
class trw.train.job_executor2.JobMetadata(job_session_id)
trw.train.job_executor2.worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue, transform: Callable[[trw.basic_typing.Batch], trw.basic_typing.Batch], global_abort_event: multiprocessing.Event, local_abort_event: multiprocessing.Event, synchronized_stop: multiprocessing.Event, wait_time: float, seed: int) None

Worker that will execute a transform on a process.

Parameters
  • input_queue – the queue to listen to

  • output_queue – the queue to output the results

  • transform – the transform to be applied on each data queued

  • global_abort_event – specify when the jobs need to shutdown

  • local_abort_event – specify when the jobs need to shutdown but only for a given job executor

  • wait_time – process will sleep this amount of time when input queue is empty

  • seed – an int to seed random generators

  • synchronized_stop – the workers will NOT exit the process until this event is set to ensure the correct order of destruction of workers/threads/queues

Returns

None

trw.train.job_executor2.collect_results_to_main_process(job_session_id: multiprocessing.Value, jobs_queued: multiprocessing.Value, worker_output_queue: multiprocessing.Queue, output_queue: queue.Queue, global_abort_event: multiprocessing.Event, synchronized_stop: multiprocessing.Event, local_abort_event: multiprocessing.Event, stop_event: multiprocessing.Event, wait_time: float) None
class trw.train.job_executor2.JobExecutor2(nb_workers: int, function_to_run: Callable[[trw.basic_typing.Batch], trw.basic_typing.Batch], max_queue_size_per_worker: int = 2, max_queue_size_pin_thread_per_worker: int = 3, max_queue_size_pin: Optional[int] = None, wait_time: float = default_queue_timeout, wait_until_processes_start: bool = True, restart_crashed_worker: bool = True)

Execute jobs on multiple processes.

At a high level, we have worker executing on multiple processes. Each worker will be fed by an input queue and results of the processing pushed to an output queue.

Pushing data on a queue is very fast BUT retrieving it from a different process takes time. Even if PyTorch claims to have memory shared arrays, retrieving a large array from a queue has a linear runtime complexity (still true with pytorch 1.11). To limit this copy penalty, we can use threads that copy from the worker process to the main process (pinning threads. Here, sharing data between threads is almost free).

Notes

  • This class was designed for maximum speed and not reproducibility in mind.

    The processed of jobs will not keep their ordering.

  • the proper destruction of the job executor is the most difficult part with risk of process hangs or memory leaks:

    • first threads and processes are signaled to stop their processing and avoid pushing results to queues

    • queues are emptied to avoid memory leaks (in case of abrupt termination)

    • queues are joined

    • processes are joined

    • threads are joined

start(self, timeout: float = 10.0) None

Start the processes and queues.

Parameters

timeout

Returns:

close(self, timeout: float = 10) None

Stops the processes and threads.

Parameters

timeout – time allowed for the threads and processes to shutdown cleanly before using terminate()

is_full(self) bool

Check if the worker input queues are full.

Returns

True if full, False otherwise

put(self, data: trw.basic_typing.Batch) bool

Queue a batch of data to be processed.

Warning

if the queues are full, the batch will NOT be appended

Parameters

data – a batch of data to process

Returns

True if the batch was successfully appended, False otherwise.

is_idle(self) bool
Returns

True if the executor is not currently processing jobs

_check_process_killed_and_restart(self)

Verify the workers are alive. If not, restart a new process.

job_report(self, f=sys.stdout)

Summary of the executor state. Useful for debugging.

reset(self)

Reset the input and output queues as well as job session IDs.

The results of the jobs that have not yet been calculated will be discarded

__del__(self)