trw.train.job_executor2
¶
Module Contents¶
Classes¶
Execute jobs on multiple processes. |
Functions¶
|
|
|
Worker that will execute a transform on a process. |
|
Attributes¶
- trw.train.job_executor2.multiprocessing¶
- trw.train.job_executor2.default_queue_timeout = 0.001¶
- trw.train.job_executor2.flush_queue(queue)¶
- class trw.train.job_executor2.JobMetadata(job_session_id)¶
- trw.train.job_executor2.worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue, transform: Callable[[trw.basic_typing.Batch], trw.basic_typing.Batch], global_abort_event: multiprocessing.Event, local_abort_event: multiprocessing.Event, synchronized_stop: multiprocessing.Event, wait_time: float, seed: int) None ¶
Worker that will execute a transform on a process.
- Parameters
input_queue – the queue to listen to
output_queue – the queue to output the results
transform – the transform to be applied on each data queued
global_abort_event – specify when the jobs need to shutdown
local_abort_event – specify when the jobs need to shutdown but only for a given job executor
wait_time – process will sleep this amount of time when input queue is empty
seed – an int to seed random generators
synchronized_stop – the workers will NOT exit the process until this event is set to ensure the correct order of destruction of workers/threads/queues
- Returns
None
- trw.train.job_executor2.collect_results_to_main_process(job_session_id: multiprocessing.Value, jobs_queued: multiprocessing.Value, worker_output_queue: multiprocessing.Queue, output_queue: queue.Queue, global_abort_event: multiprocessing.Event, synchronized_stop: multiprocessing.Event, local_abort_event: multiprocessing.Event, stop_event: multiprocessing.Event, wait_time: float) None ¶
- class trw.train.job_executor2.JobExecutor2(nb_workers: int, function_to_run: Callable[[trw.basic_typing.Batch], trw.basic_typing.Batch], max_queue_size_per_worker: int = 2, max_queue_size_pin_thread_per_worker: int = 3, max_queue_size_pin: Optional[int] = None, wait_time: float = default_queue_timeout, wait_until_processes_start: bool = True, restart_crashed_worker: bool = True)¶
Execute jobs on multiple processes.
At a high level, we have worker executing on multiple processes. Each worker will be fed by an input queue and results of the processing pushed to an output queue.
Pushing data on a queue is very fast BUT retrieving it from a different process takes time. Even if PyTorch claims to have memory shared arrays, retrieving a large array from a queue has a linear runtime complexity (still true with pytorch 1.11). To limit this copy penalty, we can use threads that copy from the worker process to the main process (pinning threads. Here, sharing data between threads is almost free).
Notes
- This class was designed for maximum speed and not reproducibility in mind.
The processed of jobs will not keep their ordering.
the proper destruction of the job executor is the most difficult part with risk of process hangs or memory leaks:
first threads and processes are signaled to stop their processing and avoid pushing results to queues
queues are emptied to avoid memory leaks (in case of abrupt termination)
queues are joined
processes are joined
threads are joined
- start(self, timeout: float = 10.0) None ¶
Start the processes and queues.
- Parameters
timeout –
Returns:
- close(self, timeout: float = 10) None ¶
Stops the processes and threads.
- Parameters
timeout – time allowed for the threads and processes to shutdown cleanly before using terminate()
- is_full(self) bool ¶
Check if the worker input queues are full.
- Returns
True if full, False otherwise
- put(self, data: trw.basic_typing.Batch) bool ¶
Queue a batch of data to be processed.
Warning
if the queues are full, the batch will NOT be appended
- Parameters
data – a batch of data to process
- Returns
True if the batch was successfully appended, False otherwise.
- is_idle(self) bool ¶
- Returns
True if the executor is not currently processing jobs
- _check_process_killed_and_restart(self)¶
Verify the workers are alive. If not, restart a new process.
- job_report(self, f=sys.stdout)¶
Summary of the executor state. Useful for debugging.
- reset(self)¶
Reset the input and output queues as well as job session IDs.
The results of the jobs that have not yet been calculated will be discarded
- __del__(self)¶