diff --git a/python/distributed-ucxx/distributed_ucxx/ucxx.py b/python/distributed-ucxx/distributed_ucxx/ucxx.py index 8627dec7..f1424955 100644 --- a/python/distributed-ucxx/distributed_ucxx/ucxx.py +++ b/python/distributed-ucxx/distributed_ucxx/ucxx.py @@ -92,24 +92,23 @@ def synchronize_stream(stream=0): def make_register(): - """Register a Dask resource with the UCXX context. - - Register a Dask resource with the UCXX context and keep track of it with the - use of a unique ID for the resource. The resource ID is later used to - deregister the resource from the UCXX context calling - `_deregister_dask_resource(resource_id)`, which stops the notifier thread - and progress tasks when no more UCXX resources are alive. - - Returns - ------- - resource_id: int - The ID of the registered resource that should be used with - `_deregister_dask_resource` during stop/destruction of the resource. - """ - count = itertools.count() def register() -> int: + """Register a Dask resource with the UCXX context. + + Register a Dask resource with the UCXX context and keep track of it with the + use of a unique ID for the resource. The resource ID is later used to + deregister the resource from the UCXX context calling + `_deregister_dask_resource(resource_id)`, which stops the notifier thread + and progress tasks when no more UCXX resources are alive. + + Returns + ------- + resource_id: int + The ID of the registered resource that should be used with + `_deregister_dask_resource` during stop/destruction of the resource. + """ ctx = ucxx.core._get_ctx() with ctx._dask_resources_lock: resource_id = next(count) @@ -123,6 +122,8 @@ def register() -> int: _register_dask_resource = make_register() +del make_register + def _deregister_dask_resource(resource_id): """Deregister a Dask resource with the UCXX context.