Source code for bioio_conversion.cluster

import atexit
import signal
from types import FrameType

import psutil
from dask.distributed import Client, LocalCluster


[docs] class Cluster: """ A custom Dask cluster class that allows for the creation and management of a Dask cluster. """ def __init__(self, n_workers: int = 4) -> None: cpu_count = psutil.cpu_count(logical=False) or n_workers self._n_workers = max(1, cpu_count // 2) self._worker_memory = ( int(psutil.virtual_memory().available * 0.5) // self._n_workers )
[docs] def start(self) -> Client: """ Start the Dask LocalCluster and return a Client. Registers clean shutdown on exit or SIGINT/SIGTERM. """ cluster = LocalCluster( n_workers=self._n_workers, memory_limit=self._worker_memory, processes=True, threads_per_worker=2, scheduler_port=0, ) client = Client(cluster) def _shutdown(sig: int, frame: FrameType | None = None) -> None: client.shutdown() atexit.register(client.shutdown) signal.signal(signal.SIGTERM, _shutdown) signal.signal(signal.SIGINT, _shutdown) if client is not None: print(client) print(client.dashboard_link)