From 70ed8d685ba2a4d5cb3f63f22423ad9048602eb9 Mon Sep 17 00:00:00 2001 From: Ben Pedigo Date: Mon, 14 Oct 2024 13:46:23 -0700 Subject: [PATCH] Remove factory syntax for subclients (#222) * refactor * remove pcg chunkedgraph (#224) * remove factory (#226) * remove factory (#225) * Remove JSONService and L2CacheClient factories (#227) * remove factory * remove factory * remove client mappings * remove more * Remove MaterializationClient factory (#229) * remove factory * possible smarter versioning * attempt tables merge * attempt views merge * refactor live_live_query --- caveclient/annotationengine.py | 122 ++----- caveclient/base.py | 5 +- caveclient/chunkedgraph.py | 86 ++--- caveclient/emannotationschemas.py | 74 ++-- caveclient/frameworkclient.py | 30 +- caveclient/infoservice.py | 84 ++--- caveclient/jsonservice.py | 85 ++--- caveclient/l2cache.py | 77 ++-- caveclient/materializationengine.py | 539 ++++++---------------------- tests/test_materialization.py | 8 +- 10 files changed, 280 insertions(+), 830 deletions(-) diff --git a/caveclient/annotationengine.py b/caveclient/annotationengine.py index 97276171..d7d62266 100644 --- a/caveclient/annotationengine.py +++ b/caveclient/annotationengine.py @@ -12,103 +12,47 @@ SERVER_KEY = "ae_server_address" -def AnnotationClient( - server_address, - dataset_name=None, - aligned_volume_name=None, - auth_client=None, - api_version="latest", - verify=True, - max_retries=None, - pool_maxsize=None, - pool_block=None, - over_client=None, -) -> "AnnotationClientV2": - """Factory for returning AnnotationClient +class AnnotationClient(ClientBase): + """ + Client for interacting with the annotation engine. Parameters ---------- server_address : str server_address to use to connect to (i.e. https://minniev1.microns-daf.com) - dataset_name : str - Name of the datastack. + aligned_volume_name : str + Name of the aligned volume to use. auth_client : AuthClient or None, optional - Authentication client to use to connect to server. If None, do not use authentication. + Authentication client to use to connect to server. If None, do not use + authentication. api_version : str or int (default: latest) What version of the api to use, 0: Legacy client (i.e www.dynamicannotationframework.com) 2: new api version, (i.e. minniev1.microns-daf.com) 'latest': default to the most recent (current 2) verify : str (default : True) - whether to verify https + Whether to verify https max_retries : Int or None, optional - Set the number of retries per request, by default None. If None, defaults to requests package default. + Set the number of retries per request, by default None. If None, defaults to + requests package default. pool_block : Bool or None, optional - If True, restricts pool of threads to max size, by default None. If None, defaults to requests package default. + If True, restricts pool of threads to max size, by default None. If None, + defaults to requests package default. pool_maxsize : Int or None, optional - Sets the max number of threads in the pool, by default None. If None, defaults to requests package default. + Sets the max number of threads in the pool, by default None. If None, defaults + to requests package default. over_client: - client to overwrite configuration with - - Returns - ------- - ClientBaseWithDatastack - List of datastack names for available datastacks on the annotation engine + Client to overwrite configuration with. + schema_client: + Client to use to get schema information. If None, uses the `over_client`'s + schema client. """ - if auth_client is None: - auth_client = AuthClient() - - auth_header = auth_client.request_header - endpoints, api_version = _api_endpoints( - api_version, - SERVER_KEY, - server_address, - annotation_common, - annotation_api_versions, - auth_header, - verify=verify, - ) - - AnnoClient = client_mapping[api_version] - if api_version > 1: - return AnnoClient( - server_address, - auth_header, - api_version, - endpoints, - SERVER_KEY, - aligned_volume_name, - verify=verify, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - ) - else: - return AnnoClient( - server_address, - auth_header, - api_version, - endpoints, - SERVER_KEY, - dataset_name, - verify=verify, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - ) - - -class AnnotationClientV2(ClientBase): def __init__( self, server_address, - auth_header, - api_version, - endpoints, - server_name, - aligned_volume_name, + aligned_volume_name=None, + auth_client=None, + api_version="latest", verify=True, max_retries=None, pool_maxsize=None, @@ -116,12 +60,26 @@ def __init__( over_client=None, schema_client=None, ): - super(AnnotationClientV2, self).__init__( + if auth_client is None: + auth_client = AuthClient() + auth_header = auth_client.request_header + + endpoints, api_version = _api_endpoints( + api_version, + SERVER_KEY, + server_address, + annotation_common, + annotation_api_versions, + auth_header, + verify=verify, + ) + + super(AnnotationClient, self).__init__( server_address, auth_header, api_version, endpoints, - server_name, + SERVER_KEY, verify=verify, max_retries=max_retries, pool_maxsize=pool_maxsize, @@ -857,9 +815,3 @@ def upload_staged_annotations( staged_annos.annotation_list, aligned_volume_name=aligned_volume_name, ) - - -client_mapping = { - 2: AnnotationClientV2, - "latest": AnnotationClientV2, -} diff --git a/caveclient/base.py b/caveclient/base.py index 32a43960..3ac9c27d 100644 --- a/caveclient/base.py +++ b/caveclient/base.py @@ -251,7 +251,10 @@ def server_version(self) -> Optional[Version]: """The version of the service running on the remote server. Note that this refers to the software running on the server and has nothing to do with the version of the datastack itself.""" - return self._server_version + if self._server_version is None and self._api_version is not None: + return Version(str(self._api_version)) + else: + return self._server_version @staticmethod def raise_for_status(r, log_warning=True): diff --git a/caveclient/chunkedgraph.py b/caveclient/chunkedgraph.py index 83532ed1..760934bb 100644 --- a/caveclient/chunkedgraph.py +++ b/caveclient/chunkedgraph.py @@ -108,77 +108,45 @@ def root_id_int_list_check( return root_id -def ChunkedGraphClient( - server_address=None, - table_name=None, - auth_client=None, - api_version="latest", - timestamp=None, - verify=True, - max_retries=None, - pool_maxsize=None, - pool_block=None, - over_client=None, -) -> "ChunkedGraphClientV1": - if server_address is None: - server_address = default_global_server_address - - if auth_client is None: - auth_client = AuthClient() - - auth_header = auth_client.request_header - - endpoints, api_version = _api_endpoints( - api_version, - SERVER_KEY, - server_address, - chunkedgraph_endpoints_common, - chunkedgraph_api_versions, - auth_header, - verify=verify, - ) - - ChunkedClient = client_mapping[api_version] - return ChunkedClient( - server_address, - auth_header, - api_version, - endpoints, - SERVER_KEY, - timestamp=timestamp, - table_name=table_name, - verify=verify, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - ) - - -class ChunkedGraphClientV1(ClientBase): +class ChunkedGraphClient(ClientBase): """ChunkedGraph Client for the v1 API""" def __init__( self, - server_address, - auth_header, - api_version, - endpoints, - server_key=SERVER_KEY, - timestamp=None, + server_address=None, table_name=None, + auth_client=None, + api_version="latest", + timestamp=None, verify=True, max_retries=None, pool_maxsize=None, pool_block=None, over_client=None, ): - super(ChunkedGraphClientV1, self).__init__( + if server_address is None: + server_address = default_global_server_address + + if auth_client is None: + auth_client = AuthClient() + + auth_header = auth_client.request_header + + endpoints, api_version = _api_endpoints( + api_version, + SERVER_KEY, + server_address, + chunkedgraph_endpoints_common, + chunkedgraph_api_versions, + auth_header, + verify=verify, + ) + super(ChunkedGraphClient, self).__init__( server_address, auth_header, api_version, endpoints, - server_key, + SERVER_KEY, verify=verify, max_retries=max_retries, pool_maxsize=pool_maxsize, @@ -1502,9 +1470,3 @@ def base_resolution(self): 3-long list of x/y/z voxel dimensions in nm """ return self.segmentation_info["scales"][0].get("resolution") - - -client_mapping = { - 1: ChunkedGraphClientV1, - "latest": ChunkedGraphClientV1, -} diff --git a/caveclient/emannotationschemas.py b/caveclient/emannotationschemas.py index cb9e8ae1..d771b2b2 100644 --- a/caveclient/emannotationschemas.py +++ b/caveclient/emannotationschemas.py @@ -10,63 +10,38 @@ logger = logging.getLogger(__name__) -server_key = "emas_server_address" - - -def SchemaClient( - server_address=None, - auth_client=None, - api_version="latest", - max_retries=None, - pool_maxsize=None, - pool_block=None, - over_client=None, -) -> "SchemaClientLegacy": - if auth_client is None: - auth_client = AuthClient() - - auth_header = auth_client.request_header - endpoints, api_version = _api_endpoints( - api_version, - server_key, - server_address, - schema_endpoints_common, - schema_api_versions, - auth_header, - ) - SchemaClient = client_mapping[api_version] - return SchemaClient( - server_address=server_address, - auth_header=auth_header, - api_version=api_version, - endpoints=endpoints, - server_name=server_key, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - ) - - -class SchemaClientLegacy(ClientBase): +SERVER_KEY = "emas_server_address" + + +class SchemaClient(ClientBase): def __init__( self, - server_address, - auth_header, - api_version, - endpoints, - server_name, + server_address=None, + auth_client=None, + api_version="latest", max_retries=None, pool_maxsize=None, pool_block=None, over_client=None, ): - super(SchemaClientLegacy, self).__init__( + if auth_client is None: + auth_client = AuthClient() + + auth_header = auth_client.request_header + endpoints, api_version = _api_endpoints( + api_version, + SERVER_KEY, + server_address, + schema_endpoints_common, + schema_api_versions, + auth_header, + ) + super(SchemaClient, self).__init__( server_address, auth_header, api_version, endpoints, - server_name, + SERVER_KEY, max_retries=max_retries, pool_maxsize=pool_maxsize, pool_block=pool_block, @@ -148,10 +123,3 @@ def schema_definition_all(self) -> dict[str]: 'Client requested an schema service endpoint (see "schema_definition_all") not yet available on your deployment. Please talk to your admin about updating your deployment' ) return None - - -client_mapping = { - 1: SchemaClientLegacy, - 2: SchemaClientLegacy, - "latest": SchemaClientLegacy, -} diff --git a/caveclient/frameworkclient.py b/caveclient/frameworkclient.py index db65c089..e676cdb7 100644 --- a/caveclient/frameworkclient.py +++ b/caveclient/frameworkclient.py @@ -1,16 +1,16 @@ from datetime import datetime from typing import Optional -from .annotationengine import AnnotationClient, AnnotationClientV2 +from .annotationengine import AnnotationClient from .auth import AuthClient, default_token_file -from .chunkedgraph import ChunkedGraphClient, ChunkedGraphClientV1 +from .chunkedgraph import ChunkedGraphClient from .datastack_lookup import handle_server_address -from .emannotationschemas import SchemaClient, SchemaClientLegacy +from .emannotationschemas import SchemaClient from .endpoints import default_global_server_address -from .infoservice import InfoServiceClient, InfoServiceClientV2 -from .jsonservice import JSONService, JSONServiceV1 -from .l2cache import L2CacheClient, L2CacheClientLegacy -from .materializationengine import MaterializationClient, MaterializationClientType +from .infoservice import InfoServiceClient +from .jsonservice import JSONService +from .l2cache import L2CacheClient +from .materializationengine import MaterializationClient from .skeletonservice import SkeletonClient @@ -253,7 +253,7 @@ def auth(self) -> AuthClient: return self._auth @property - def info(self) -> InfoServiceClientV2: + def info(self) -> InfoServiceClient: """ A client for the info service. See [client.info](../client_api/info.md) for more information. """ @@ -271,7 +271,7 @@ def info(self) -> InfoServiceClientV2: return self._info @property - def state(self) -> JSONServiceV1: + def state(self) -> JSONService: """ A client for the neuroglancer state service. See [client.state](../client_api/state.md) for more information. @@ -288,7 +288,7 @@ def state(self) -> JSONServiceV1: return self._state @property - def schema(self) -> SchemaClientLegacy: + def schema(self) -> SchemaClient: """ A client for the EM Annotation Schemas service. See [client.schema](../client_api/schema.md) for more information. @@ -476,7 +476,7 @@ def datastack_name(self) -> str: return self._datastack_name @property - def chunkedgraph(self) -> ChunkedGraphClientV1: + def chunkedgraph(self) -> ChunkedGraphClient: """ A client for the chunkedgraph service. See [client.chunkedgraph](../client_api/chunkedgraph.md) for more information. @@ -497,7 +497,7 @@ def chunkedgraph(self) -> ChunkedGraphClientV1: return self._chunkedgraph @property - def annotation(self) -> AnnotationClientV2: + def annotation(self) -> AnnotationClient: """ A client for the annotation service. See [client.annotation](../client_api/annotation.md) for more information. @@ -515,7 +515,7 @@ def annotation(self) -> AnnotationClientV2: return self._annotation @property - def materialize(self) -> MaterializationClientType: + def materialize(self) -> MaterializationClient: """ A client for the materialization service. See [client.materialize](../client_api/materialize.md) for more information. @@ -553,7 +553,7 @@ def skeleton(self) -> SkeletonClient: return self._skeleton @property - def state(self) -> JSONServiceV1: + def state(self) -> JSONService: """ A client for the neuroglancer state service. See [client.state](../client_api/state.md) for more information. @@ -571,7 +571,7 @@ def state(self) -> JSONServiceV1: return self._state @property - def l2cache(self) -> L2CacheClientLegacy: + def l2cache(self) -> L2CacheClient: """ A client for the L2 cache service. See [client.l2cache](../client_api/l2cache.md) for more information. diff --git a/caveclient/infoservice.py b/caveclient/infoservice.py index 64873b15..e053787a 100644 --- a/caveclient/infoservice.py +++ b/caveclient/infoservice.py @@ -21,61 +21,13 @@ SERVER_KEY = "i_server_address" -def InfoServiceClient( - server_address=None, - datastack_name=None, - auth_client=None, - api_version="latest", - verify=True, - max_retries=None, - pool_maxsize=None, - pool_block=None, - over_client=None, - info_cache=None, -) -> "InfoServiceClientV2": - if server_address is None: - server_address = default_global_server_address - - if auth_client is None: - auth_client = AuthClient() - - auth_header = auth_client.request_header - endpoints, api_version = _api_endpoints( - api_version, - SERVER_KEY, - server_address, - infoservice_common, - infoservice_api_versions, - auth_header, - verify=verify, - ) - - InfoClient = client_mapping[api_version] - return InfoClient( - server_address, - auth_header, - api_version, - endpoints, - SERVER_KEY, - datastack_name, - verify=verify, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - info_cache=info_cache, - ) - - -class InfoServiceClientV2(ClientBaseWithDatastack): +class InfoServiceClient(ClientBaseWithDatastack): def __init__( self, - server_address, - auth_header, - api_version, - endpoints, - server_name, - datastack_name, + server_address=None, + datastack_name=None, + auth_client=None, + api_version="latest", verify=True, max_retries=None, pool_maxsize=None, @@ -83,12 +35,28 @@ def __init__( over_client=None, info_cache=None, ): - super(InfoServiceClientV2, self).__init__( + if server_address is None: + server_address = default_global_server_address + + if auth_client is None: + auth_client = AuthClient() + + auth_header = auth_client.request_header + endpoints, api_version = _api_endpoints( + api_version, + SERVER_KEY, + server_address, + infoservice_common, + infoservice_api_versions, + auth_header, + verify=verify, + ) + super(InfoServiceClient, self).__init__( server_address, auth_header, api_version, endpoints, - server_name, + SERVER_KEY, datastack_name, verify=verify, max_retries=max_retries, @@ -477,9 +445,3 @@ def _make_cloudvolume(self, cloudpath, use_client_secret=True, **kwargs): **kwargs, ) return cv - - -client_mapping = { - 2: InfoServiceClientV2, - "latest": InfoServiceClientV2, -} diff --git a/caveclient/jsonservice.py b/caveclient/jsonservice.py index 98e5e281..92ef0fb4 100644 --- a/caveclient/jsonservice.py +++ b/caveclient/jsonservice.py @@ -20,7 +20,7 @@ ngl_endpoints_common, ) -server_key = "json_server_address" +SERVER_KEY = "json_server_address" def neuroglancer_json_encoder(obj): @@ -39,17 +39,8 @@ def neuroglancer_json_encoder(obj): raise TypeError -def JSONService( - server_address=None, - auth_client=None, - api_version="latest", - ngl_url=None, - max_retries=None, - pool_maxsize=None, - pool_block=None, - over_client=None, -) -> "JSONServiceV1": - """Client factory to interface with the JSON state service. +class JSONService(ClientBase): + """Client to interface with the JSON state service. Parameters ---------- @@ -66,58 +57,41 @@ def JSONService( ngl_url : str or None, optional Default neuroglancer deployment URL. Only used for V1 and later. """ - if server_address is None: - server_address = default_global_server_address - - if auth_client is None: - auth_client = AuthClient() - - auth_header = auth_client.request_header - - endpoints, api_version = _api_endpoints( - api_version, - server_key, - server_address, - jsonservice_common, - jsonservice_api_versions, - auth_header, - ) - - JSONClient = client_mapping[api_version] - return JSONClient( - server_address=server_address, - auth_header=auth_header, - api_version=api_version, - endpoints=endpoints, - server_name=server_key, - ngl_url=ngl_url, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - ) - - -class JSONServiceV1(ClientBase): + def __init__( self, - server_address, - auth_header, - api_version, - endpoints, - server_name, - ngl_url, + server_address=None, + auth_client=None, + api_version="latest", + ngl_url=None, max_retries=None, pool_maxsize=None, pool_block=None, over_client=None, ): - super(JSONServiceV1, self).__init__( + if server_address is None: + server_address = default_global_server_address + + if auth_client is None: + auth_client = AuthClient() + + auth_header = auth_client.request_header + + endpoints, api_version = _api_endpoints( + api_version, + SERVER_KEY, + server_address, + jsonservice_common, + jsonservice_api_versions, + auth_header, + ) + + super(JSONService, self).__init__( server_address, auth_header, api_version, endpoints, - server_name, + SERVER_KEY, max_retries=max_retries, pool_maxsize=pool_maxsize, pool_block=pool_block, @@ -401,8 +375,3 @@ def build_neuroglancer_url( get_state_url = self._endpoints["get_state"].format_map(url_mapping) url = ngl_url + parameter_text + auth_text + get_state_url return url - - -client_mapping = { - 1: JSONServiceV1, -} diff --git a/caveclient/l2cache.py b/caveclient/l2cache.py index 80245399..eb78a3e0 100644 --- a/caveclient/l2cache.py +++ b/caveclient/l2cache.py @@ -11,69 +11,40 @@ l2cache_endpoints_common, ) -server_key = "l2cache_server_address" - - -def L2CacheClient( - server_address=None, - table_name=None, - auth_client=None, - api_version="latest", - max_retries=None, - pool_maxsize=None, - pool_block=None, - over_client=None, - verify=True, -) -> "L2CacheClientLegacy": - if auth_client is None: - auth_client = AuthClient() - - auth_header = auth_client.request_header - endpoints, api_version = _api_endpoints( - api_version, - server_key, - server_address, - l2cache_endpoints_common, - l2cache_api_versions, - auth_header, - ) - L2client = client_mapping[api_version] - return L2client( - server_address=server_address, - auth_header=auth_header, - api_version=api_version, - endpoints=endpoints, - server_name=server_key, - table_name=table_name, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - verify=verify, - ) - - -class L2CacheClientLegacy(ClientBase): +SERVER_KEY = "l2cache_server_address" + + +class L2CacheClient(ClientBase): def __init__( self, - server_address, - auth_header, - api_version, - endpoints, - server_name, + server_address=None, table_name=None, + auth_client=None, + api_version="latest", max_retries=None, pool_maxsize=None, pool_block=None, over_client=None, verify=True, ): - super(L2CacheClientLegacy, self).__init__( + if auth_client is None: + auth_client = AuthClient() + + auth_header = auth_client.request_header + endpoints, api_version = _api_endpoints( + api_version, + SERVER_KEY, + server_address, + l2cache_endpoints_common, + l2cache_api_versions, + auth_header, + ) + super(L2CacheClient, self).__init__( server_address, auth_header, api_version, endpoints, - server_name, + SERVER_KEY, max_retries=max_retries, pool_maxsize=pool_maxsize, pool_block=pool_block, @@ -183,9 +154,3 @@ def has_cache(self, datastack_name=None): else: raise e return table_name in table_mapping - - -client_mapping = { - 1: L2CacheClientLegacy, - "latest": L2CacheClientLegacy, -} diff --git a/caveclient/materializationengine.py b/caveclient/materializationengine.py index 3daf1ea4..1eb163bb 100644 --- a/caveclient/materializationengine.py +++ b/caveclient/materializationengine.py @@ -14,6 +14,7 @@ from cachetools import TTLCache, cached from cachetools.keys import hashkey from IPython.display import HTML +from packaging.version import Version from requests import HTTPError from .auth import AuthClient @@ -138,22 +139,20 @@ def string_format_timestamp(ts): return ts -def MaterializationClient( - server_address, - datastack_name=None, - auth_client=None, - cg_client=None, - synapse_table=None, - api_version="latest", - version=None, - verify=True, - max_retries=None, - pool_maxsize=None, - pool_block=None, - desired_resolution=None, - over_client=None, -) -> "MaterializationClientType": - """Factory for returning AnnotationClient +def _tables_metadata_key(matclient, *args, **kwargs): + if "version" in kwargs: + version = kwargs["version"] + else: + version = matclient.version + if "datastack_name" in kwargs: + datastack_name = kwargs["datastack_name"] + else: + datastack_name = matclient.datastack_name + return hashkey(datastack_name, version) + + +class MaterializationClient(ClientBase): + """Client for interacting with the materialization engine Parameters ---------- @@ -176,73 +175,45 @@ def MaterializationClient( desired_resolution : Iterable[float] or None, optional If given, should be a list or array of the desired resolution you want queries returned in useful for materialization queries. - - Returns - ------- - ClientBaseWithDatastack - List of datastack names for available datastacks on the annotation engine """ - if auth_client is None: - auth_client = AuthClient() - - auth_header = auth_client.request_header - endpoints, api_version = _api_endpoints( - api_version, - SERVER_KEY, - server_address, - materialization_common, - materialization_api_versions, - auth_header, - fallback_version=2, - verify=verify, - ) - - MatClient = client_mapping[api_version] - return MatClient( - server_address, - auth_header, - api_version, - endpoints, - SERVER_KEY, - datastack_name, - cg_client=cg_client, - synapse_table=synapse_table, - version=version, - verify=verify, - max_retries=max_retries, - pool_maxsize=pool_maxsize, - pool_block=pool_block, - over_client=over_client, - desired_resolution=desired_resolution, - ) - - -class MaterializationClientV2(ClientBase): def __init__( self, server_address, - auth_header, - api_version, - endpoints, - server_name, - datastack_name, + datastack_name=None, + auth_client=None, cg_client=None, synapse_table=None, + api_version="latest", version=None, verify=True, max_retries=None, pool_maxsize=None, pool_block=None, - over_client=None, desired_resolution=None, + over_client=None, ): - super(MaterializationClientV2, self).__init__( + if auth_client is None: + auth_client = AuthClient() + + auth_header = auth_client.request_header + endpoints, api_version = _api_endpoints( + api_version, + SERVER_KEY, + server_address, + materialization_common, + materialization_api_versions, + auth_header, + fallback_version=2, + verify=verify, + ) + + super(MaterializationClient, self).__init__( server_address, auth_header, api_version, endpoints, - server_name, + SERVER_KEY, verify=verify, max_retries=max_retries, pool_maxsize=pool_maxsize, @@ -308,26 +279,6 @@ def homepage(self) -> HTML: ) return HTML(f'Materialization Engine') - @property - def tables(self) -> TableManager: - """The table manager for the materialization engine.""" - if self._tables is None: - if self.fc is not None and self.fc._materialize is not None: - self._tables = TableManager(self.fc) - else: - raise ValueError("No full CAVEclient specified") - return self._tables - - @property - def views(self) -> ViewManager: - """The view manager for the materialization engine.""" - if self._views is None: - if self.fc is not None and self.fc._materialize is not None: - self._views = ViewManager(self.fc) - else: - raise ValueError("No full CAVEclient specified") - return self._views - def most_recent_version(self, datastack_name=None) -> int: """ Get the most recent version of materialization for this datastack name @@ -1326,278 +1277,6 @@ def lookup_supervoxel_ids( ) return handle_response(response) - @_check_version_compatibility( - kwarg_use_constraints={ - "filter_greater_dict": ">=4.34.0", - "filter_less_dict": ">=4.34.0", - "filter_greater_equal_dict": ">=4.34.0", - "filter_less_equal_dict": ">=4.34.0", - } - ) - def live_live_query( - self, - table: str, - timestamp: datetime, - joins=None, - filter_in_dict=None, - filter_out_dict=None, - filter_equal_dict=None, - filter_greater_dict=None, - filter_less_dict=None, - filter_greater_equal_dict=None, - filter_less_equal_dict=None, - filter_spatial_dict=None, - select_columns=None, - offset: int = None, - limit: int = None, - datastack_name: str = None, - split_positions: bool = False, - metadata: bool = True, - suffixes: dict = None, - desired_resolution: Iterable = None, - allow_missing_lookups: bool = False, - random_sample: int = None, - log_warning: bool = True, - ): - """Beta method for querying cave annotation tables with rootIDs and annotations - at a particular timestamp. Note: this method requires more explicit mapping of - filters and selection to table as its designed to test a more general endpoint - that should eventually support complex joins. - - Parameters - ---------- - table: - Principle table to query - timestamp: - Timestamp to query - joins: list of lists of str, optional - List of joins, where each join is a list of [table1,column1, table2, column2] - filter_in_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and list - values to accept. - filter_out_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and list - values to reject. - filter_equal_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and values - to equate. - filter_greater_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and values - as exclusive upper-bound. - filter_less_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and values - as exclusive lower-bound. - filter_greater_equal_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and values - as inclusive upper-bound. - filter_less_equal_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and values - as inclusive lower-bound. - filter_spatial_dict: dict of dicts, optional - A dictionary with tables as keys, values are dicts with column keys and values - of 2x3 list of bounds. - select_columns: dict of lists of str, optional - A dictionary with tables as keys, values are lists of columns to select. - offset: - Value to offset query by. - limit: - Limit of query. - datastack_name: - Datastack to query. Defaults to set by client. - split_positions: - Whether to split positions into separate columns, True is faster. - metadata: - Whether to attach metadata to dataframe. - suffixes: - What suffixes to use on joins, keys are table_names, values are suffixes. - desired_resolution: - What resolution to convert position columns to. - allow_missing_lookups: - If there are annotations without supervoxels and root IDs yet, allow results. - random_sample: - If given, will do a table sample of the table to return that many annotations. - log_warning: - Whether to log warnings. - - Returns - ------- - : - Results of query - - Examples - -------- - >>> from caveclient import CAVEclient - >>> client = CAVEclient('minnie65_public_v117') - >>> live_live_query("table_name", datetime.datetime.now(datetime.timezone.utc), - >>> joins=[[table_name, table_column, joined_table, joined_column], - >>> [joined_table, joincol2, third_table, joincol_third]] - >>> suffixes={ - >>> "table_name":"suffix1", - >>> "joined_table":"suffix2", - >>> "third_table":"suffix3" - >>> }, - >>> select_columns= { - >>> "table_name":[ "column","names"], - >>> "joined_table":["joined_colum"] - >>> }, - >>> filter_in_dict= { - >>> "table_name":{ - >>> "column_name":[included,values] - >>> } - >>> }, - >>> filter_out_dict= { - >>> "table_name":{ - >>> "column_name":[excluded,values] - >>> } - >>> }, - >>> filter_equal_dict"={ - >>> "table_name":{ - >>> "column_name":value - >>> }, - >>> filter_greater_dict"={ - >>> "table_name":{ - >>> "column_name":value - >>> }, - >>> filter_less_dict"={ - >>> "table_name":{ - >>> "column_name":value - >>> }, - >>> filter_greater_equal_dict"={ - >>> "table_name":{ - >>> "column_name":value - >>> }, - >>> filter_less_equal_dict"={ - >>> "table_name":{ - >>> "column_name":value - >>> }, - >>> filter_spatial_dict"= { - >>> "table_name": { - >>> "column_name": [[min_x, min_y, min_z], [max_x, max_y, max_z]] - >>> } - >>> filter_regex_dict"= { - >>> "table_name": { - >>> "column_name": "regex_string" - >>> } - """ - - logging.warning( - "Deprecation: this method is to facilitate beta testing of this feature, \ - it will likely get removed in future versions. " - ) - timestamp = convert_timestamp(timestamp) - return_df = True - if datastack_name is None: - datastack_name = self.datastack_name - if desired_resolution is None: - desired_resolution = self.desired_resolution - endpoint_mapping = self.default_url_mapping - endpoint_mapping["datastack_name"] = datastack_name - data = {} - query_args = {} - query_args["return_pyarrow"] = True - query_args["arrow_format"] = True - query_args["merge_reference"] = False - query_args["allow_missing_lookups"] = allow_missing_lookups - if random_sample: - query_args["random_sample"] = random_sample - data["table"] = table - data["timestamp"] = timestamp - url = self._endpoints["live_live_query"].format_map(endpoint_mapping) - if joins is not None: - data["join_tables"] = joins - if filter_in_dict is not None: - data["filter_in_dict"] = filter_in_dict - if filter_out_dict is not None: - data["filter_notin_dict"] = filter_out_dict - if filter_equal_dict is not None: - data["filter_equal_dict"] = filter_equal_dict - if filter_greater_dict is not None: - data["filter_greater_dict"] = filter_greater_dict - if filter_less_dict is not None: - data["filter_less_dict"] = filter_less_dict - if filter_greater_equal_dict is not None: - data["filter_greater_equal_dict"] = filter_greater_equal_dict - if filter_less_equal_dict is not None: - data["filter_less_equal_dict"] = filter_less_equal_dict - if filter_spatial_dict is not None: - data["filter_spatial_dict"] = filter_spatial_dict - if select_columns is not None: - data["select_columns"] = select_columns - if offset is not None: - data["offset"] = offset - if limit is not None: - assert limit > 0 - data["limit"] = limit - if suffixes is not None: - data["suffixes"] = suffixes - encoding = DEFAULT_COMPRESSION - - response = self.session.post( - url, - data=json.dumps(data, cls=BaseEncoder), - headers={ - "Content-Type": "application/json", - "Accept-Encoding": encoding, - }, - params=query_args, - stream=~return_df, - verify=self.verify, - ) - self.raise_for_status(response, log_warning=log_warning) - - if desired_resolution is None: - desired_resolution = self.desired_resolution - - with MyTimeIt("deserialize"): - with warnings.catch_warnings(): - warnings.simplefilter(action="ignore", category=FutureWarning) - warnings.simplefilter(action="ignore", category=DeprecationWarning) - df = deserialize_query_response(response) - if desired_resolution is not None: - if len(desired_resolution) != 3: - raise ValueError( - "desired resolution needs to be of length 3, for xyz" - ) - vox_res = self.get_table_metadata( - table_name=table, - datastack_name=datastack_name, - log_warning=False, - )["voxel_resolution"] - df = convert_position_columns(df, vox_res, desired_resolution) - if not split_positions: - concatenate_position_columns(df, inplace=True) - - if metadata: - try: - attrs = self._assemble_attributes( - table, - join_query=False, - filters={ - "inclusive": filter_in_dict, - "exclusive": filter_out_dict, - "equal": filter_equal_dict, - "greater": filter_greater_dict, - "less": filter_less_dict, - "greater_equal": filter_greater_equal_dict, - "less_equal": filter_less_equal_dict, - "spatial": filter_spatial_dict, - }, - select_columns=select_columns, - offset=offset, - limit=limit, - live_query=timestamp is not None, - timestamp=string_format_timestamp(timestamp), - materialization_version=None, - desired_resolution=desired_resolution, - ) - df.attrs.update(attrs) - except HTTPError as e: - raise Exception( - e.message - + " Metadata could not be loaded, try with metadata=False if not needed" - ) - return df - @_check_version_compatibility( kwarg_use_constraints={ "filter_greater_dict": ">=4.34.0", @@ -2120,49 +1799,34 @@ def _assemble_attributes( attrs.update(kwargs) return json.loads(json.dumps(attrs, cls=BaseEncoder)) - -def _tables_metadata_key(matclient, *args, **kwargs): - if "version" in kwargs: - version = kwargs["version"] - else: - version = matclient.version - if "datastack_name" in kwargs: - datastack_name = kwargs["datastack_name"] - else: - datastack_name = matclient.datastack_name - return hashkey(datastack_name, version) - - -class MaterializationClientV3(MaterializationClientV2): - def __init__(self, *args, **kwargs): - super(MaterializationClientV3, self).__init__(*args, **kwargs) - @property def tables(self) -> TableManager: """The table manager for the materialization engine.""" if self._tables is None: if self.fc is not None and self.fc._materialize is not None: - metadata = [] - with ThreadPoolExecutor(max_workers=2) as executor: - metadata.append( - executor.submit( - self.get_tables_metadata, - ) - ) - metadata.append( - executor.submit(self.fc.schema.schema_definition_all) - ) - - if ( - metadata[0].result() is not None - and metadata[1].result() is not None - ): - tables = TableManager( - self.fc, metadata[0].result(), metadata[1].result() - ) + if self.server_version < Version("3"): + tables = TableManager(self.fc) else: - # TODO fix this for when the metadata is not available - tables = None + metadata = [] + with ThreadPoolExecutor(max_workers=2) as executor: + metadata.append( + executor.submit( + self.get_tables_metadata, + ) + ) + metadata.append( + executor.submit(self.fc.schema.schema_definition_all) + ) + if ( + metadata[0].result() is not None + and metadata[1].result() is not None + ): + tables = TableManager( + self.fc, metadata[0].result(), metadata[1].result() + ) + else: + logger.warning("Warning: Metadata for tables not available.") + tables = TableManager(self.fc) self._tables = tables else: raise ValueError("No full CAVEclient specified") @@ -2171,8 +1835,10 @@ def tables(self) -> TableManager: @property def views(self) -> ViewManager: """The view manager for the materialization engine.""" - if self._views is None: - if self.fc is not None and self.fc._materialize is not None: + if self.fc is not None and self.fc._materialize is not None: + if self.server_version < Version("3"): + views = ViewManager(self.fc) + else: metadata = [] with ThreadPoolExecutor(max_workers=2) as executor: metadata.append( @@ -2183,11 +1849,13 @@ def views(self) -> ViewManager: metadata.append(executor.submit(self.get_view_schemas)) views = ViewManager(self.fc, metadata[0].result(), metadata[1].result()) - self._views = views - else: - raise ValueError("No full CAVEclient specified") + + self._views = views + else: + raise ValueError("No full CAVEclient specified") return self._views + @_check_version_compatibility(method_constraint=">=3.0.0") @cached(cache=TTLCache(maxsize=100, ttl=60 * 60 * 12), key=_tables_metadata_key) def get_tables_metadata( self, @@ -2233,6 +1901,8 @@ def get_tables_metadata( @_check_version_compatibility( kwarg_use_constraints={ + "filter_regex_dict": ">=3.0.0", + "allow_invalid_root_ids": ">=3.0.0", "filter_greater_dict": ">=4.34.0", "filter_less_dict": ">=4.34.0", "filter_greater_equal_dict": ">=4.34.0", @@ -2392,10 +2062,9 @@ def live_live_query( >>> "column_name": "regex_string" >>> } """ - logging.warning( "Deprecation: this method is to facilitate beta testing of this feature, \ -it will likely get removed in future versions. " + it will likely get removed in future versions. " ) timestamp = convert_timestamp(timestamp) return_df = True @@ -2448,8 +2117,9 @@ def live_live_query( data["suffixes"] = suffixes if desired_resolution is None: desired_resolution = self.desired_resolution - if desired_resolution is not None: - data["desired_resolution"] = desired_resolution + if self.server_version >= Version("3"): + if desired_resolution is not None: + data["desired_resolution"] = desired_resolution encoding = DEFAULT_COMPRESSION response = self.session.post( @@ -2471,14 +2141,16 @@ def live_live_query( warnings.simplefilter(action="ignore", category=DeprecationWarning) df = deserialize_query_response(response) if desired_resolution is not None: - if not response.headers.get("dataframe_resolution", None): + if self.server_version < Version("3") or not response.headers.get( + "dataframe_resolution", None + ): if len(desired_resolution) != 3: raise ValueError( "desired resolution needs to be of length 3, for xyz" ) vox_res = self.get_table_metadata( - table, - datastack_name, + table_name=table, + datastack_name=datastack_name, log_warning=False, )["voxel_resolution"] df = convert_position_columns(df, vox_res, desired_resolution) @@ -2487,30 +2159,35 @@ def live_live_query( concatenate_position_columns(df, inplace=True) if metadata: + filters = { + "inclusive": filter_in_dict, + "exclusive": filter_out_dict, + "equal": filter_equal_dict, + "greater": filter_greater_dict, + "less": filter_less_dict, + "greater_equal": filter_greater_equal_dict, + "less_equal": filter_less_equal_dict, + "spatial": filter_spatial_dict, + } + if self.server_version < Version("3"): + _desired_resolution = desired_resolution + else: + _desired_resolution = response.headers.get( + "dataframe_resolution", desired_resolution + ) + filters["regex"] = filter_regex_dict try: attrs = self._assemble_attributes( table, join_query=False, - filters={ - "inclusive": filter_in_dict, - "exclusive": filter_out_dict, - "equal": filter_equal_dict, - "greater": filter_greater_dict, - "less": filter_less_dict, - "greater_equal": filter_greater_equal_dict, - "less_equal": filter_less_equal_dict, - "spatial": filter_spatial_dict, - "regex": filter_regex_dict, - }, + filters=filters, select_columns=select_columns, offset=offset, limit=limit, live_query=timestamp is not None, timestamp=string_format_timestamp(timestamp), materialization_version=None, - desired_resolution=response.headers.get( - "dataframe_resolution", desired_resolution - ), + desired_resolution=_desired_resolution, ) df.attrs.update(attrs) except HTTPError as e: @@ -2520,6 +2197,7 @@ def live_live_query( ) return df + @_check_version_compatibility(method_constraint=">=3.0.0") def get_views(self, version: Optional[int] = None, datastack_name: str = None): """ Get all available views for a version @@ -2549,6 +2227,7 @@ def get_views(self, version: Optional[int] = None, datastack_name: str = None): self.raise_for_status(response) return response.json() + @_check_version_compatibility(method_constraint=">=3.0.0") def get_view_metadata( self, view_name: str, @@ -2588,6 +2267,7 @@ def get_view_metadata( self.raise_for_status(response, log_warning=log_warning) return response.json() + @_check_version_compatibility(method_constraint=">=3.0.0") def get_view_schema( self, view_name: str, @@ -2627,6 +2307,7 @@ def get_view_schema( self.raise_for_status(response, log_warning=log_warning) return response.json() + @_check_version_compatibility(method_constraint=">=3.0.0") def get_view_schemas( self, materialization_version: Optional[int] = None, @@ -2662,12 +2343,13 @@ def get_view_schemas( return response.json() @_check_version_compatibility( + method_constraint=">=3.0.0", kwarg_use_constraints={ "filter_greater_dict": ">=4.34.0", "filter_less_dict": ">=4.34.0", "filter_greater_equal_dict": ">=4.34.0", "filter_less_equal_dict": ">=4.34.0", - } + }, ) def query_view( self, @@ -2851,6 +2533,7 @@ def query_view( else: return response.json() + @_check_version_compatibility(method_constraint=">=3.0.0") def get_unique_string_values( self, table: str, datastack_name: Optional[str] = None ): @@ -2879,17 +2562,3 @@ def get_unique_string_values( response = self.session.get(url, verify=self.verify) self.raise_for_status(response) return response.json() - - -# included for historical reasons, there was a typo in the class name -MaterializatonClientV2 = MaterializationClientV2 - -MaterializatonClientV3 = MaterializationClientV3 - -client_mapping = { - 2: MaterializationClientV2, - 3: MaterializationClientV3, - "latest": MaterializationClientV3, -} - -MaterializationClientType = Union[MaterializationClientV2, MaterializationClientV3] diff --git a/tests/test_materialization.py b/tests/test_materialization.py index 5e60d545..ed24b7fc 100644 --- a/tests/test_materialization.py +++ b/tests/test_materialization.py @@ -631,19 +631,19 @@ def mock_get_root_timestamps(self, root_ids): return np.array([timestamp_dict[root_id] for root_id in root_ids]) mocker.patch( - "caveclient.chunkedgraph.ChunkedGraphClientV1.get_roots", + "caveclient.chunkedgraph.ChunkedGraphClient.get_roots", my_get_roots, ) mocker.patch( - "caveclient.chunkedgraph.ChunkedGraphClientV1.get_past_ids", + "caveclient.chunkedgraph.ChunkedGraphClient.get_past_ids", mocked_get_past_ids, ) mocker.patch( - "caveclient.chunkedgraph.ChunkedGraphClientV1.is_latest_roots", + "caveclient.chunkedgraph.ChunkedGraphClient.is_latest_roots", mock_is_latest_roots, ) mocker.patch( - "caveclient.chunkedgraph.ChunkedGraphClientV1.get_root_timestamps", + "caveclient.chunkedgraph.ChunkedGraphClient.get_root_timestamps", mock_get_root_timestamps, ) df = pd.read_pickle("tests/test_data/live_query_before.pkl")