Skip to content

Commit

Permalink
merge new kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
bdpedigo committed Oct 14, 2024
2 parents 522caf3 + 5986634 commit c6a0d02
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 5.29.1
current_version = 5.30.2
commit = True
tag = True

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ jobs:
- name: build sdist
run: python setup.py sdist
- name: Store the distribution packages
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: python-package-distributions
path: dist/
Expand All @@ -110,7 +110,7 @@ jobs:

steps:
- name: Download all the dists
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: python-package-distributions
path: dist/
Expand Down
2 changes: 1 addition & 1 deletion caveclient/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "5.29.1"
__version__ = "5.30.2"

from .frameworkclient import CAVEclient
from .session_config import get_session_defaults, set_session_defaults
Expand Down
2 changes: 2 additions & 0 deletions caveclient/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@
skeletonservice_endpoints_v1 = {
"get_version": skeleton_common + "/version",
"skeleton_info": skeleton_v1 + "/{datastack_name}/precomputed/skeleton/info",
"skeleton_info_versioned": skeleton_v1
+ "/{datastack_name}/precomputed/skeleton/info/{skvn}",
"get_skeleton_via_rid": skeleton_v1
+ "/{datastack_name}/precomputed/skeleton/{root_id}",
"get_skeleton_via_skvn_rid": skeleton_v1
Expand Down
248 changes: 240 additions & 8 deletions caveclient/materializationengine.py

Large diffs are not rendered by default.

44 changes: 36 additions & 8 deletions caveclient/session_config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import logging
from typing import Collection, Optional, Union

import requests
from packaging.version import Version
from urllib3 import __version__ as urllib3_version
from urllib3.util.retry import Retry

SESSION_DEFAULTS = {}

if Version(urllib3_version) < Version("2.0.0"):
HAS_URLLIB3_V2 = False
else:
HAS_URLLIB3_V2 = True


def set_session_defaults(
max_retries: int = 3,
Expand Down Expand Up @@ -78,6 +86,17 @@ def set_session_defaults(
SESSION_DEFAULTS["backoff_max"] = backoff_max
SESSION_DEFAULTS["status_forcelist"] = status_forcelist

if not HAS_URLLIB3_V2 and backoff_max != 120:
logging.warning(
(
"`backoff_max` is only supported in urllib3 v2.0.0 and above "
"and will be ignored. "
"Please upgrade urllib3 to take advantage of this feature. "
"Note that this upgrade may conflict with other packages that depend on "
"urllib3, including `cloud-volume`."
)
)


set_session_defaults()

Expand Down Expand Up @@ -119,14 +138,23 @@ def _patch_session(
if pool_maxsize is None:
pool_maxsize = SESSION_DEFAULTS["pool_maxsize"]

retries = Retry(
total=max_retries,
backoff_factor=SESSION_DEFAULTS["backoff_factor"],
status_forcelist=SESSION_DEFAULTS["status_forcelist"],
allowed_methods=frozenset(["GET", "POST"]),
backoff_max=SESSION_DEFAULTS["backoff_max"],
raise_on_status=False,
)
if HAS_URLLIB3_V2:
retries = Retry(
total=max_retries,
backoff_factor=SESSION_DEFAULTS["backoff_factor"],
status_forcelist=SESSION_DEFAULTS["status_forcelist"],
allowed_methods=frozenset(["GET", "POST"]),
backoff_max=SESSION_DEFAULTS["backoff_max"],
raise_on_status=False,
)
else:
retries = Retry(
total=max_retries,
backoff_factor=SESSION_DEFAULTS["backoff_factor"],
status_forcelist=SESSION_DEFAULTS["status_forcelist"],
allowed_methods=frozenset(["GET", "POST"]),
raise_on_status=False,
)

http = requests.adapters.HTTPAdapter(
pool_maxsize=pool_maxsize,
Expand Down
187 changes: 128 additions & 59 deletions caveclient/skeletonservice.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import annotations

import gzip
import json
import logging
from io import BytesIO
from io import BytesIO, StringIO
from typing import Literal, Optional

import pandas as pd
from cachetools import TTLCache, cached
from packaging.version import Version

try:
Expand All @@ -18,38 +21,13 @@

CLOUDVOLUME_AVAILABLE = False

try:
import h5py

H5PY_AVAILABLE = True
except ImportError:
logging.warning("h5py not installed. Some output formats will not be available.")

H5PY_AVAILABLE = False

try:
from cloudfiles import CloudFiles

CLOUDFILES_AVAILABLE = True
except ImportError:
logging.warning(
"cloudfiles not installed. Some output formats will not be available."
)

CLOUDFILES_AVAILABLE = False

from .auth import AuthClient
from .base import ClientBase, _api_endpoints
from .endpoints import skeletonservice_api_versions, skeletonservice_common

SERVER_KEY = "skeleton_server_address"


"""
Usage
"""


class NoL2CacheException(Exception):
def __init__(self, value=""):
"""
Expand Down Expand Up @@ -165,6 +143,64 @@ def parse(url):
url = parse(self.build_endpoint(rid, ds, 1, "json"))
assert url == f"{ds}{innards}1/{rid}/json"

@staticmethod
def compressStringToBytes(inputString):
"""
Shamelessly copied from SkeletonService to avoid importing the entire repo. Consider pushing these utilities to a separate module.
REF: https://stackoverflow.com/questions/15525837/which-is-the-best-way-to-compress-json-to-store-in-a-memory-based-store-like-red
read the given string, encode it in utf-8, compress the data and return it as a byte array.
"""
bio = BytesIO()
bio.write(inputString.encode("utf-8"))
bio.seek(0)
stream = BytesIO()
compressor = gzip.GzipFile(fileobj=stream, mode="w")
while True: # until EOF
chunk = bio.read(8192)
if not chunk: # EOF?
compressor.close()
return stream.getvalue()
compressor.write(chunk)

@staticmethod
def compressDictToBytes(inputDict, remove_spaces=True):
"""
Shamelessly copied from SkeletonService to avoid importing the entire repo. Consider pushing these utilities to a separate module.
"""
inputDictStr = json.dumps(inputDict)
if remove_spaces:
inputDictStr = inputDictStr.replace(" ", "")
inputDictStrBytes = SkeletonClient.compressStringToBytes(inputDictStr)
return inputDictStrBytes

@staticmethod
def decompressBytesToString(inputBytes):
"""
Shamelessly copied from SkeletonService to avoid importing the entire repo. Consider pushing these utilities to a separate module.
REF: https://stackoverflow.com/questions/15525837/which-is-the-best-way-to-compress-json-to-store-in-a-memory-based-store-like-red
decompress the given byte array (which must be valid compressed gzip data) and return the decoded text (utf-8).
"""
bio = BytesIO()
stream = BytesIO(inputBytes)
decompressor = gzip.GzipFile(fileobj=stream, mode="r")
while True: # until EOF
chunk = decompressor.read(8192)
if not chunk:
decompressor.close()
bio.seek(0)
return bio.read().decode("utf-8")
bio.write(chunk)
return None

@staticmethod
def decompressBytesToDict(inputBytes):
"""
Shamelessly copied from SkeletonService to avoid importing the entire repo. Consider pushing these utilities to a separate module.
"""
inputBytesStr = SkeletonClient.decompressBytesToString(inputBytes)
inputBytesStrDict = json.loads(inputBytesStr)
return inputBytesStrDict

def build_endpoint(
self,
root_id: int,
Expand Down Expand Up @@ -207,14 +243,47 @@ def build_endpoint(
url = self._endpoints[endpoint].format_map(endpoint_mapping)
return url

@cached(TTLCache(maxsize=32, ttl=3600))
def get_precomputed_skeleton_info(
self,
skvn: int = 0,
datastack_name: Optional[str] = None,
):
"""get's the precomputed skeleton information
Args:
datastack_name (Optional[str], optional): _description_. Defaults to None.
"""
if not self.fc.l2cache.has_cache():
raise NoL2CacheException("SkeletonClient requires an L2Cache.")
if datastack_name is None:
datastack_name = self._datastack_name
assert datastack_name is not None

endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["skvn"] = skvn
url = self._endpoints["skeleton_info_versioned"].format_map(endpoint_mapping)

response = self.session.get(url)
self.raise_for_status(response)
return response.json()

def get_skeleton(
self,
root_id: int,
datastack_name: Optional[str] = None,
skeleton_version: Optional[int] = None,
skeleton_version: Optional[int] = 0,
output_format: Literal[
"none", "h5", "swc", "json", "arrays", "precomputed"
"none",
"h5",
"swc",
"json",
"jsoncompressed",
"arrays",
"arrayscompressed",
"precomputed",
] = "none",
log_warning: bool = True,
):
"""Gets basic skeleton information for a datastack
Expand All @@ -235,9 +304,11 @@ def get_skeleton(
- 'none': No return value (this can be used to generate a skeleton without retrieving it)
- 'precomputed': A cloudvolume.Skeleton object
- 'json': A dictionary
- 'jsoncompressed': A dictionary using compression for transmission (generally faster than 'json')
- 'arrays': A dictionary (literally a subset of the json response)
- 'arrayscompressed': A dictionary using compression for transmission (generally faster than 'arrays')
- 'swc': A pandas DataFrame
- 'h5': An h5py file object
- 'h5': An BytesIO object containing bytes for an h5 file
"""
if not self.fc.l2cache.has_cache():
raise NoL2CacheException("SkeletonClient requires an L2Cache.")
Expand All @@ -247,6 +318,7 @@ def get_skeleton(
)

response = self.session.get(url)
self.raise_for_status(response, log_warning=log_warning)

if output_format == "none":
return
Expand All @@ -255,44 +327,41 @@ def get_skeleton(
raise ImportError(
"'precomputed' output format requires cloudvolume, which is not available."
)
return cloudvolume.Skeleton.from_precomputed(response.content)
metadata = self.get_precomputed_skeleton_info(
skeleton_version, datastack_name
)
vertex_attributes = metadata["vertex_attributes"]
return cloudvolume.Skeleton.from_precomputed(
response.content, vertex_attributes=vertex_attributes
)
if output_format == "json":
return response.json()
if output_format == "jsoncompressed":
return SkeletonClient.decompressBytesToDict(response.content)
if output_format == "arrays":
return response.json()
if output_format == "arrayscompressed":
return SkeletonClient.decompressBytesToDict(response.content)
if output_format == "swc":
if not CLOUDFILES_AVAILABLE:
raise ImportError(
"'swc' output format requires cloudvolume, which is not available."
)
# Curiously, the response is quoted and contains a terminal endline. Sigh.
parts = response.text.strip()[1:-1].split("/")
dir_, filename = "/".join(parts[0:-1]), parts[-1]
cf = CloudFiles(dir_)
skeleton_bytes = cf.get(filename)
arr = [
[float(v) for v in row.split()]
for row in skeleton_bytes.decode().split("\n")
]
# I got the SWC column header from skeleton_plot.skel_io.py
df = pd.DataFrame(
arr, columns=["id", "type", "x", "y", "z", "radius", "parent"]
df = pd.read_csv(
StringIO(response.content.decode()),
sep=" ",
names=["id", "type", "x", "y", "z", "radius", "parent"],
)

# Reduce 'id' and 'parent' columns from int64 to int16, and 'type' column from int64 to int8
df = df.apply(pd.to_numeric, downcast="integer")
# Convert 'type' column from int8 to uint8
df["type"] = df["type"].astype("uint8")

# Reduce float columns from float64 to float32. This sacrifies precision and therefore is perhaps undesirable.
# I have it left here, commented out, for demonstration purposes, should it be deemed desirable in the future.
# df = df.apply(pd.to_numeric, downcast='float')

return df
if output_format == "h5":
if not CLOUDFILES_AVAILABLE:
raise ImportError(
"'h5' output format requires cloudvolume, which is not available."
)
if not H5PY_AVAILABLE:
raise ImportError(
"'h5' output format requires h5py, which is not available."
)
parts = response.text.strip()[1:-1].split("/")
dir_, filename = "/".join(parts[0:-1]), parts[-1]
cf = CloudFiles(dir_)
skeleton_bytes = cf.get(filename)
skeleton_bytesio = BytesIO(skeleton_bytes)
return h5py.File(skeleton_bytesio, "r")
skeleton_bytesio = BytesIO(response.content)
return skeleton_bytesio

raise ValueError(f"Unknown output format: {output_format}")
9 changes: 5 additions & 4 deletions docs/tutorials/materialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ df=client.materialize.query_table('my_table', limit=10)

For most applications, you will want to filter the query in some way.

We offer three kinds of filters you can apply: `filter_equal_dict`, `filter_in_dict`
and `filter_out_dict`. For query_table each is specified as a dictionary
where the keys are column names, and the values are a list of values (or
single value in the case of filter_equal).
We offer seven kinds of filters you can apply: `filter_in_dict`
`filter_out_dict`, `filter_equal_dict`, `filter_greater_dict`, `filter_less_dict`,
`filter_greater_equal_dict`, and `filter_less_equal_dict`. For query_table each is
specified as a dictionary where the keys are column names, and the values are a
list of values (or single value in the case of filter_equal).

So for example to query a synapse table for all synapses onto a neuron
in flywire you would use
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
numpy<2.0.0
pyarrow>=3
requests
urllib3>=2.0.2
urllib3
pandas<3.0.0
cachetools>=4.2.1
ipython
Expand Down
Loading

0 comments on commit c6a0d02

Please sign in to comment.