diff --git a/docs/notes/key-concepts/data-accessing.rst b/docs/notes/key-concepts/data-accessing.rst index a1ffbecafb..a1e7cd21ee 100644 --- a/docs/notes/key-concepts/data-accessing.rst +++ b/docs/notes/key-concepts/data-accessing.rst @@ -170,6 +170,34 @@ Vineyard offers low-level APIs for creating and accessing local blobs with enhan Remote Objects -------------- +Creating and accessing remote objects in vineyard can be easily achieved using :code:`put` and :code:`get` methods (see +:meth:`vineyard.RPCClient.put` and :meth:`vineyard.RPCClient.get`). + +.. code:: python + :caption: Effortlessly create and access remote objects using :code:`put` and :code:`get` + + >>> import pandas as pd + >>> import vineyard + >>> import numpy as np + >>> + >>> vineyard_rpc_client = vineyard.connect("localhost", 9600) + >>> + >>> df = pd.DataFrame(np.random.rand(10, 2)) + >>> + >>> # put object into vineyard + >>> r = vineyard_rpc_client.put(df) + >>> r, type(r) + (o000a45730a85f8fe, vineyard._C.ObjectID) + >>> + >>> # get object from vineyard using object id + >>> data = vineyard_rpc_client.get(r) + >>> data + 0 1 + 0 0.884227 0.576031 + 1 0.863040 0.069815 + 2 0.297906 0.911874 + ... + The RPC client enables inspection of remote object metadata and facilitates operations on blobs within the remote cluster, while taking into account the associated network transfer costs. diff --git a/python/client.cc b/python/client.cc index ecbd8277f0..891bd7d41c 100644 --- a/python/client.cc +++ b/python/client.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +#include #include #include @@ -580,7 +581,7 @@ void bind_client(py::module& mod) { .def_property_readonly("is_rpc", &ClientBase::IsRPC, doc::ClientBase_is_rpc); - // Client + // IPCClient py::class_, ClientBase>(mod, "IPCClient", doc::IPCClient) .def( @@ -644,20 +645,13 @@ void bind_client(py::module& mod) { .def( "get_meta", [](Client* self, ObjectIDWrapper const& object_id, - bool const sync_remote, bool const fetch) -> ObjectMeta { + bool const sync_remote) -> ObjectMeta { ObjectMeta meta; - // FIXME: do we really not need to sync from etcd? We assume the - // object is a local object - if (fetch) { - throw_on_error( - self->FetchAndGetMetaData(object_id, meta, sync_remote)); - } else { - throw_on_error(self->GetMetaData(object_id, meta, sync_remote)); - } + throw_on_error(self->GetMetaData(object_id, meta, sync_remote)); return meta; }, "object_id"_a, py::arg("sync_remote") = false, - py::arg("fetch") = false, doc::IPCClient_get_meta) + doc::IPCClient_get_meta) .def( "get_metas", [](Client* self, std::vector const& object_ids, @@ -886,6 +880,12 @@ void bind_client(py::module& mod) { throw_on_error(self->Fork(*rpc_client)); return rpc_client; }) + .def( + "is_fetchable", + [](RPCClient* self, ObjectMeta& metadata) -> bool { + return self->IsFetchable(metadata); + }, + doc::RPCClient_is_fetchable) .def_property_readonly("remote_instance_id", &RPCClient::remote_instance_id, doc::RPCClient_remote_instance_id) @@ -896,33 +896,14 @@ void bind_client(py::module& mod) { mod.def( "_connect", - [](std::nullptr_t, const std::string& username, + [](std::string const& socket, const SessionID session_id, + const std::string& username, const std::string& password) -> py::object { - if (!read_env("VINEYARD_IPC_SOCKET").empty()) { - return py::cast(ClientManager::GetManager()->Connect( - username, password)); - } - if (!read_env("VINEYARD_RPC_ENDPOINT").empty()) { - return py::cast(ClientManager::GetManager()->Connect( - username, password)); - } - throw_on_error(Status::ConnectionFailed( - "Failed to resolve IPC socket or RPC endpoint of vineyard " - "server from environment variables VINEYARD_IPC_SOCKET or " - "VINEYARD_RPC_ENDPOINT.")); - return py::none(); + return py::cast(ClientManager::GetManager()->Connect( + socket, session_id, username, password)); }, - py::arg("target") = py::none(), py::kw_only(), - py::arg("username") = "", py::arg("password") = "", doc::connect) - .def( - "_connect", - [](std::string const& socket, const std::string& username, - const std::string& password) -> py::object { - return py::cast(ClientManager::GetManager()->Connect( - socket, username, password)); - }, - "socket"_a, py::kw_only(), py::arg("username") = "", - py::arg("password") = "") + "socket"_a, py::kw_only(), py::arg("session") = RootSessionID(), + py::arg("username") = "", py::arg("password") = "") .def( "_connect", [](std::string const& host, const uint32_t port, @@ -932,8 +913,9 @@ void bind_client(py::module& mod) { return py::cast(ClientManager::GetManager()->Connect( rpc_endpoint, session_id, username, password)); }, - "host"_a, "port"_a, py::arg("session") = RootSessionID(), - py::kw_only(), py::arg("username") = "", py::arg("password") = "") + "host"_a, "port"_a, py::kw_only(), + py::arg("session") = RootSessionID(), py::arg("username") = "", + py::arg("password") = "") .def( "_connect", [](std::string const& host, std::string const& port, @@ -943,8 +925,9 @@ void bind_client(py::module& mod) { return ClientManager::GetManager()->Connect( rpc_endpoint, session_id, username, password); }, - "host"_a, "port"_a, py::arg("session") = RootSessionID(), - py::kw_only(), py::arg("username") = "", py::arg("password") = "") + "host"_a, "port"_a, py::kw_only(), + py::arg("session") = RootSessionID(), py::arg("username") = "", + py::arg("password") = "") .def( "_connect", [](std::pair const& endpoint, @@ -955,7 +938,7 @@ void bind_client(py::module& mod) { return ClientManager::GetManager()->Connect( rpc_endpoint, session_id, username, password); }, - "endpoint"_a, py::arg("session") = RootSessionID(), py::kw_only(), + "endpoint"_a, py::kw_only(), py::arg("session") = RootSessionID(), py::arg("username") = "", py::arg("password") = "") .def( "_connect", @@ -966,7 +949,7 @@ void bind_client(py::module& mod) { return ClientManager::GetManager()->Connect( rpc_endpoint, session_id, username, password); }, - "endpoint"_a, py::arg("session") = RootSessionID(), py::kw_only(), + "endpoint"_a, py::kw_only(), py::arg("session") = RootSessionID(), py::arg("username") = "", py::arg("password") = ""); } // NOLINT(readability/fn_size) diff --git a/python/core.cc b/python/core.cc index 1c1ad3b415..56b452ac94 100644 --- a/python/core.cc +++ b/python/core.cc @@ -445,9 +445,10 @@ void bind_core(py::module& mod) { // NB: don't expose the "Build" method to python. .def( "seal", - [](ObjectBuilder* self, Client* client) { + [](ObjectBuilder* self, py::object client) { std::shared_ptr object; - throw_on_error(self->Seal(*client, object)); + Client* ipc_client = py::cast(client.attr("ipc_client")); + throw_on_error(self->Seal(*ipc_client, object)); return object; }, "client"_a) diff --git a/python/pybind11_docs.cc b/python/pybind11_docs.cc index 6d20325277..e51530057a 100644 --- a/python/pybind11_docs.cc +++ b/python/pybind11_docs.cc @@ -1053,9 +1053,6 @@ Close the client. const char* RPCClient = R"doc( RPC client that connects to vineyard instance's RPC endpoints. - -The RPC client can only access the metadata of objects, any access to the blob payload -will trigger a :code:`RuntimeError` exception. )doc"; const char* RPCClient_get_object = R"doc( @@ -1241,6 +1238,12 @@ const char* RPCClient_close = R"doc( Close the client. )doc"; +const char* RPCClient_is_fetchable = R"doc( +Whether the rpc client is able to fetch objects from the connected vineyard server. +When the instance connected by the rpc client is not the same as the instance +of metadata, the rpc client is not able to fetch the object. +)doc"; + const char* RPCClient_remote_instance_id = R"doc( The instance id of the connected remote vineyard server. )doc"; diff --git a/python/pybind11_docs.h b/python/pybind11_docs.h index 82d08db19b..823a44bf9a 100644 --- a/python/pybind11_docs.h +++ b/python/pybind11_docs.h @@ -140,6 +140,7 @@ extern const char* RPCClient_get_metas; extern const char* RPCClient_list_objects; extern const char* RPCClient_list_metadatas; extern const char* RPCClient_close; +extern const char* RPCClient_is_fetchable; extern const char* RPCClient_remote_instance_id; extern const char* connect; diff --git a/python/vineyard/_C.pyi b/python/vineyard/_C.pyi index ef5b35c660..e05c3732aa 100644 --- a/python/vineyard/_C.pyi +++ b/python/vineyard/_C.pyi @@ -24,6 +24,8 @@ from typing import Tuple from typing import Union from typing import overload +from vineyard.core.client import Client + class ObjectID: @overload def __init__(self) -> None: ... @@ -119,7 +121,7 @@ class Object: def isglobal(self) -> bool: ... class ObjectBuilder: - def seal(self, client: "IPCClient") -> "Object": ... + def seal(self, client: Client) -> "Object": ... @property def issealed(self) -> bool: ... @@ -235,7 +237,7 @@ class ClientBase: def next_chunk_id(self, stream_id: ObjectID) -> ObjectID: ... def next_chunk_meta(self, stream_id: ObjectID) -> ObjectMeta: ... def next_chunk(self, stream_id: ObjectID) -> Object: ... - def stop_stream(self, stream_id: ObjectID) -> None: ... + def stop_stream(self, stream_id: ObjectID, failed: bool) -> None: ... def drop_stream(self, stream_id: ObjectID) -> None: ... def persist(self, object: Union[ObjectID, Object, ObjectMeta]) -> None: ... def exists(self, object: ObjectID) -> bool: ... @@ -371,10 +373,6 @@ class RPCClient(ClientBase): def __enter__(self) -> "RPCClient": ... def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: ... -@overload -def connect( - *, username: str = "", password: str = "" -) -> Union[IPCClient, RPCClient]: ... @overload def connect( target: None = None, *, username: str = "", password: str = "" diff --git a/python/vineyard/__init__.py b/python/vineyard/__init__.py index 4d5ae0ab34..5b89f5476d 100644 --- a/python/vineyard/__init__.py +++ b/python/vineyard/__init__.py @@ -186,6 +186,7 @@ def _init_global_context(): from ._C import VineyardServerNotReadyException from ._C import _connect from ._C import memory_copy +from .core import Client from .core import builder_context from .core import default_builder_context from .core import default_driver_context @@ -398,4 +399,4 @@ def connect(*args, **kwargs): 'try to launch a standalone one.' ) try_init() - return _connect(*args, **kwargs) + return Client(*args, **kwargs) diff --git a/python/vineyard/core/__init__.py b/python/vineyard/core/__init__.py index db62617b61..49346cae87 100644 --- a/python/vineyard/core/__init__.py +++ b/python/vineyard/core/__init__.py @@ -29,6 +29,7 @@ from .builder import builder_context from .builder import default_builder_context +from .client import Client from .driver import default_driver_context from .driver import driver_context from .resolver import default_resolver_context diff --git a/python/vineyard/core/builder.py b/python/vineyard/core/builder.py index d2cb6fbfeb..36cc46bb1c 100644 --- a/python/vineyard/core/builder.py +++ b/python/vineyard/core/builder.py @@ -168,7 +168,7 @@ def put( 00002ec13bc81226 Parameters: - client: IPCClient + client: IPCClient or RPCClient The vineyard client to use. value: The python value that will be put to vineyard. Supported python value diff --git a/python/vineyard/core/client.py b/python/vineyard/core/client.py new file mode 100644 index 0000000000..cdd1611c9d --- /dev/null +++ b/python/vineyard/core/client.py @@ -0,0 +1,498 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2020-2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import warnings +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union + +from vineyard._C import Blob +from vineyard._C import BlobBuilder +from vineyard._C import IPCClient +from vineyard._C import Object +from vineyard._C import ObjectID +from vineyard._C import ObjectMeta +from vineyard._C import RemoteBlob +from vineyard._C import RemoteBlobBuilder +from vineyard._C import RPCClient +from vineyard._C import _connect +from vineyard.core.builder import BuilderContext +from vineyard.core.builder import put +from vineyard.core.resolver import ResolverContext +from vineyard.core.resolver import get + + +def _apply_docstring(func): + def _apply(fn): + fn.__doc__ = func.__doc__ + return fn + + return _apply + + +class Client: + """Client is responsible for managing IPC and RPC clients for Vineyard + and provides a high-level interface to fetch objects from the Vineyard cluster. + """ + + def __init__( + self, + socket: str = None, + port: Union[int, str] = None, + # move host after port to make sure unnamed (host, port) works + host: str = None, + endpoint: Tuple[str, Union[str, int]] = None, + session: int = None, + username: str = None, + password: str = None, + ): + """Connects to the vineyard IPC socket and RPC socket. + + - The argument `socket` takes precedence over environment variable + `VINEYARD_IPC_SOCKET` for IPC client. + - The argument `endpoint` takes precedence over arguments (`host`, `port`), + which further takes precedence over environment variable + `VINEYARD_RPC_ENDPOINT` for RPC client. + """ + self._ipc_client: IPCClient = None + self._rpc_client: RPCClient = None + + kwargs = {} + if session is not None: + kwargs['session'] = session + if username is not None: + kwargs['username'] = username + if password is not None: + kwargs['password'] = password + + if socket is not None and port is not None and host is None: + socket, host = None, socket + + if not socket: + socket = os.getenv('VINEYARD_IPC_SOCKET', None) + if not endpoint and not (host and port): + endpoint = os.getenv('VINEYARD_RPC_ENDPOINT', None) + if endpoint: + if not isinstance(endpoint, (tuple, list)): + endpoint = endpoint.split(':') + host, port = endpoint + + if socket: + self._ipc_client = _connect(socket, **kwargs) + if host and port: + self._rpc_client = _connect(host, port, **kwargs) + + if self._ipc_client is None and self._rpc_client is None: + raise ConnectionError( + "Failed to connect to vineyard via both IPC and RPC connection. " + "Arguments are not and environment variables VINEYARD_IPC_SOCKET " + "and VINEYARD_RPC_ENDPOINT are not available." + ) + + @property + def ipc_client(self) -> IPCClient: + assert self._ipc_client is not None, "IPC client is not available." + return self._ipc_client + + @property + def rpc_client(self) -> RPCClient: + assert self._rpc_client is not None, "RPC client is not available." + return self._rpc_client + + def has_ipc_client(self): + return self._ipc_client is not None + + def has_rpc_client(self): + return self._rpc_client is not None + + def default_client(self) -> Union[IPCClient, RPCClient]: + return self._ipc_client if self._ipc_client else self._rpc_client + + # The following functions are wrappers of the corresponding functions in the + # ClientBase class. + + @_apply_docstring(IPCClient.create_metadata) + def create_metadata( + self, metadata: ObjectMeta, instance_id: int = None + ) -> ObjectMeta: + if instance_id is not None: + return self.default_client().create_metadata(metadata, instance_id) + return self.default_client().create_metadata(metadata) + + @_apply_docstring(IPCClient.delete) + def delete( + self, + object: Union[ObjectID, Object, ObjectMeta, List[ObjectID]], + force: bool = False, + deep: bool = True, + ) -> None: + return self.default_client().delete(object, force, deep) + + @_apply_docstring(IPCClient.create_stream) + def create_stream(self, id: ObjectID) -> None: + return self.default_client().create_stream(id) + + @_apply_docstring(IPCClient.open_stream) + def open_stream(self, id: ObjectID, mode: str) -> None: + return self.default_client().open_stream(id, mode) + + @_apply_docstring(IPCClient.push_chunk) + def push_chunk(self, stream_id: ObjectID, chunk: ObjectID) -> None: + return self.default_client().push_chunk(stream_id, chunk) + + @_apply_docstring(IPCClient.next_chunk_id) + def next_chunk_id(self, stream_id: ObjectID) -> ObjectID: + return self.default_client().next_chunk_id(stream_id) + + @_apply_docstring(IPCClient.next_chunk_meta) + def next_chunk_meta(self, stream_id: ObjectID) -> ObjectMeta: + return self.default_client().next_chunk_meta(stream_id) + + @_apply_docstring(IPCClient.next_chunk) + def next_chunk(self, stream_id: ObjectID) -> Object: + return self.default_client().next_chunk(stream_id) + + @_apply_docstring(IPCClient.stop_stream) + def stop_stream(self, stream_id: ObjectID, failed: bool) -> None: + return self.default_client().stop_stream(stream_id, failed) + + @_apply_docstring(IPCClient.drop_stream) + def drop_stream(self, stream_id: ObjectID) -> None: + return self.default_client().drop_stream(stream_id) + + @_apply_docstring(IPCClient.persist) + def persist(self, object: Union[ObjectID, Object, ObjectMeta]) -> None: + return self.default_client().persist(object) + + @_apply_docstring(IPCClient.exists) + def exists(self, object: ObjectID) -> bool: + return self.default_client().exists(object) + + @_apply_docstring(IPCClient.shallow_copy) + def shallow_copy( + self, object_id: ObjectID, extra_metadata: dict = None + ) -> ObjectID: + if extra_metadata: + return self.default_client().shallow_copy(object_id, extra_metadata) + return self.default_client().shallow_copy(object_id) + + @_apply_docstring(IPCClient.list_names) + def list_names( + self, pattern: str, regex: bool = False, limit: int = 5 + ) -> List[str]: + return self.default_client().list_names(pattern, regex, limit) + + @_apply_docstring(IPCClient.put_name) + def put_name(self, object: Union[Object, ObjectMeta, ObjectID], name: str) -> None: + return self.default_client().put_name(object, name) + + @_apply_docstring(IPCClient.get_name) + def get_name(self, name: str, wait: bool = False) -> ObjectID: + return self.default_client().get_name(name, wait) + + @_apply_docstring(IPCClient.drop_name) + def drop_name(self, name: str) -> None: + return self.default_client().drop_name(name) + + @_apply_docstring(IPCClient.sync_meta) + def sync_meta(self) -> None: + return self.default_client().sync_meta() + + @_apply_docstring(IPCClient.migrate) + def migrate(self, object_id: ObjectID) -> ObjectID: + return self.default_client().migrate(object_id) + + @_apply_docstring(IPCClient.clear) + def clear(self) -> None: + return self.default_client().clear() + + @_apply_docstring(IPCClient.memory_trim) + def memory_trim(self) -> bool: + return self.default_client().memory_trim() + + @_apply_docstring(IPCClient.label) + def label( + self, + object_id: ObjectID, + key_or_labels: Union[str, Dict[str, str]], + value: str = None, + ) -> None: + if isinstance(key_or_labels, dict) and value is None: + return self.default_client().label(object_id, key_or_labels) + else: + return self.default_client().label(object_id, key_or_labels, value) + + @_apply_docstring(IPCClient.evict) + def evict(self, objects: List[ObjectID]) -> None: + return self.default_client().evict(objects) + + @_apply_docstring(IPCClient.load) + def load(self, objects: List[ObjectID], pin: bool = False) -> None: + return self.default_client().load(objects, pin) + + @_apply_docstring(IPCClient.unpin) + def unpin(self, objects: List[ObjectID]) -> None: + return self.default_client().unpin(objects) + + @_apply_docstring(IPCClient.reset) + def reset(self) -> None: + if self._ipc_client: + self._ipc_client.reset() + if self._rpc_client: + self._rpc_client.reset() + + @property + @_apply_docstring(IPCClient.connected) + def connected(self): + return self.default_client().connected + + @property + @_apply_docstring(IPCClient.instance_id) + def instance_id(self): + return self.default_client().instance_id + + @property + @_apply_docstring(IPCClient.meta) + def meta(self): + return self.default_client().meta + + @property + @_apply_docstring(IPCClient.status) + def status(self): + return self.default_client().status + + @_apply_docstring(IPCClient.debug) + def debug(self, debug: dict): + return self.default_client().debug(debug) + + @property + @_apply_docstring(IPCClient.ipc_socket) + def ipc_socket(self): + return self.default_client().ipc_socket + + @property + @_apply_docstring(IPCClient.rpc_endpoint) + def rpc_endpoint(self): + if self._rpc_client: + return self._rpc_client.rpc_endpoint + return self.default_client().rpc_endpoint + + @property + @_apply_docstring(IPCClient.is_ipc) + def is_ipc(self): + return self.default_client().is_ipc + + @property + @_apply_docstring(IPCClient.is_rpc) + def is_rpc(self): + return self.default_client().is_rpc + + @property + @_apply_docstring(IPCClient.version) + def version(self): + return self.default_client().version + + # The following functions are wrappers of the corresponding functions in the + # IPCClient and RPCClient classes. + + @_apply_docstring(IPCClient.create_blob) + def create_blob(self, size: int) -> BlobBuilder: + return self.ipc_client.create_blob(size) + + @_apply_docstring(IPCClient.create_empty_blob) + def create_empty_blob(self) -> BlobBuilder: + return self.ipc_client.create_empty_blob() + + @_apply_docstring(IPCClient.get_blob) + def get_blob(self, object_id: ObjectID, unsafe: bool = False) -> Blob: + return self.ipc_client.get_blob(object_id, unsafe) + + @_apply_docstring(IPCClient.get_blobs) + def get_blobs(self, object_ids: List[ObjectID], unsafe: bool = False) -> List[Blob]: + return self.ipc_client.get_blobs(object_ids, unsafe) + + @_apply_docstring(RPCClient.create_remote_blob) + def create_remote_blob(self, blob_builder: RemoteBlobBuilder) -> ObjectID: + return self.rpc_client.create_remote_blob(blob_builder) + + @_apply_docstring(RPCClient.get_remote_blob) + def get_remote_blob(self, object_id: ObjectID, unsafe: bool = False) -> RemoteBlob: + return self.rpc_client.get_remote_blob(object_id, unsafe) + + @_apply_docstring(RPCClient.get_remote_blobs) + def get_remote_blobs( + self, object_ids: List[ObjectID], unsafe: bool = False + ) -> List[RemoteBlob]: + return self.rpc_client.get_remote_blobs(object_ids, unsafe) + + @_apply_docstring(IPCClient.get_object) + def get_object(self, object_id: ObjectID) -> Object: + """ + Fetches the object associated with the given object_id from Vineyard. + The IPC client is preferred if it's available, otherwise the RPC client + """ + return self._fetch_object(object_id) + + @_apply_docstring(IPCClient.get_objects) + def get_objects(self, object_ids: List[ObjectID]) -> List[Object]: + objects = [] + for object_id in object_ids: + objects.append(self.get_object(object_id)) + return objects + + @_apply_docstring(IPCClient.get_meta) + def get_meta( + self, + object_id: ObjectID, + sync_remote: bool = False, + ) -> ObjectMeta: + return self.default_client().get_meta(object_id, sync_remote) + + @_apply_docstring(IPCClient.get_metas) + def get_metas( + self, object_ids: List[ObjectID], sync_remote: bool = False + ) -> List[ObjectMeta]: + metas = [] + for object_id in object_ids: + metas.append(self.get_meta(object_id, sync_remote)) + return metas + + @_apply_docstring(IPCClient.list_objects) + def list_objects( + self, pattern: str, regex: bool = False, limit: int = 5 + ) -> List[ObjectID]: + return self.default_client().list_objects(pattern, regex, limit) + + @_apply_docstring(IPCClient.list_metadatas) + def list_metadatas( + self, pattern: str, regex: bool = False, limit: int = 5, nobuffer: bool = False + ) -> List[ObjectMeta]: + return self.default_client().list_metadatas(pattern, regex, limit, nobuffer) + + @_apply_docstring(IPCClient.new_buffer_chunk) + def new_buffer_chunk(self, stream: ObjectID, size: int) -> memoryview: + return self.ipc_client.new_buffer_chunk(stream, size) + + @_apply_docstring(IPCClient.next_buffer_chunk) + def next_buffer_chunk(self, stream: ObjectID) -> memoryview: + return self.ipc_client.next_buffer_chunk(stream) + + @_apply_docstring(IPCClient.allocated_size) + def allocated_size(self, object_id: Union[Object, ObjectID]) -> int: + return self.ipc_client.allocated_size(object_id) + + @_apply_docstring(IPCClient.is_shared_memory) + def is_shared_memory(self, pointer: int) -> bool: + return self.ipc_client.is_shared_memory(pointer) + + @_apply_docstring(IPCClient.find_shared_memory) + def find_shared_memory(self, pointer: int) -> ObjectID: + return self.ipc_client.find_shared_memory(pointer) + + @property + @_apply_docstring(RPCClient.remote_instance_id) + def remote_instance_id(self) -> int: + return self.rpc_client.remote_instance_id + + @_apply_docstring(IPCClient.close) + def close(self) -> None: + if self._ipc_client: + self._ipc_client.close() + if self._rpc_client: + self._rpc_client.close() + + @_apply_docstring(IPCClient.fork) + def fork(self) -> 'Client': + if self._ipc_client: + self._ipc_client = self._ipc_client.fork() + if self._rpc_client: + self._rpc_client = self._rpc_client.fork() + return self + + def _fetch_object(self, object_id: ObjectID) -> Object: + meta = self.get_meta(object_id) + + if self.has_ipc_client(): + if meta.instance_id == self._ipc_client.instance_id: + return self._ipc_client.get_object(object_id, fetch=False) + else: + warnings.warn( + f"Migrating object {object_id} from another vineyard instance " + f"{meta.instance_id}" + ) + return self._ipc_client.get_object(object_id, fetch=True) + if self.has_rpc_client(): + if self._rpc_client.is_fetchable(meta): + return self._rpc_client.get_object(object_id) + else: + return self._locate_and_fetch(meta) + + def _locate_and_fetch(self, meta) -> Object: + """ + Fetches an object from another instance in the Vineyard cluster based on + the meta information. + + It's triggered when the RPC client is not able to fetch the object from the + current instance. + """ + cluster_info = self._rpc_client.meta + instance_status = cluster_info.get(meta.instance_id) + + if instance_status is None or instance_status['rpc_endpoint'] is None: + raise RuntimeError( + "The rpc endpoint of the vineyard instance " + f"{meta.instance_id} is not available." + ) + + host, port = instance_status['rpc_endpoint'].split(':') + remote_client = _connect(host, port) + + warnings.warn( + f"Fetching remote object {meta.id} from the remote vineyard instance " + f"{meta.instance_id} at {host}:{port}." + ) + return remote_client.get_object(meta.id) + + @_apply_docstring(get) + def get( + self, + object_id: Optional[ObjectID] = None, + name: Optional[str] = None, + resolver: Optional[ResolverContext] = None, + fetch: bool = False, + **kwargs, + ): + return get(self, object_id, name, resolver, fetch, **kwargs) + + @_apply_docstring(put) + def put( + self, + value: Any, + builder: Optional[BuilderContext] = None, + persist: bool = False, + name: Optional[str] = None, + **kwargs, + ): + return put(self, value, builder, persist, name, **kwargs) + + +__all__ = ['Client'] diff --git a/python/vineyard/core/resolver.py b/python/vineyard/core/resolver.py index f5240f6bc4..5f2d1c6e9f 100644 --- a/python/vineyard/core/resolver.py +++ b/python/vineyard/core/resolver.py @@ -191,7 +191,7 @@ def get( name: Optional[str] = None, resolver: Optional[ResolverContext] = None, fetch: bool = False, - **kw + **kwargs ): """Get vineyard object as python value. @@ -229,17 +229,11 @@ def get( elif name is not None: object_id = client.get_name(name) - # run resolver - if client.is_rpc: - obj = client.get_object(object_id) - elif client.is_ipc: - obj = client.get_object(object_id, fetch=fetch) - else: - raise RuntimeError('Unknown vineyard client type: %s' % type(client)) + obj = client.get_object(object_id) if resolver is None: resolver = get_current_resolvers() - return resolver(obj, __vineyard_client=client, **kw) + return resolver(obj, __vineyard_client=client, **kwargs) setattr(IPCClient, 'get', get) diff --git a/python/vineyard/data/tests/test_pickle.py b/python/vineyard/data/tests/test_pickle.py index 82e51e2d75..0cda2ad8e6 100644 --- a/python/vineyard/data/tests/test_pickle.py +++ b/python/vineyard/data/tests/test_pickle.py @@ -266,9 +266,9 @@ def test_data_consistency_between_ipc_and_rpc( object_id = vineyard_client.put(value) v1 = vineyard_client.get(object_id) v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1, v2) + np.testing.assert_equal(v1, v2) object_id = vineyard_rpc_client.put(value) v1 = vineyard_client.get(object_id) v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1, v2) + np.testing.assert_equal(v1, v2) diff --git a/python/vineyard/data/tests/test_tensor.py b/python/vineyard/data/tests/test_tensor.py index 4093a9f9b4..f6949b1c01 100644 --- a/python/vineyard/data/tests/test_tensor.py +++ b/python/vineyard/data/tests/test_tensor.py @@ -151,9 +151,9 @@ def test_data_consistency_between_ipc_and_rpc(vineyard_client, vineyard_rpc_clie object_id = vineyard_client.put(value) v1 = vineyard_client.get(object_id) v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1.todense(), v2.todense()) + np.testing.assert_equal(v1.todense(), v2.todense()) object_id = vineyard_rpc_client.put(value) v1 = vineyard_client.get(object_id) v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1.todense(), v2.todense()) + np.testing.assert_equal(v1.todense(), v2.todense()) diff --git a/python/vineyard/deploy/tests/test_distributed.py b/python/vineyard/deploy/tests/test_distributed.py index c5f5881ab3..aa581cd1d0 100644 --- a/python/vineyard/deploy/tests/test_distributed.py +++ b/python/vineyard/deploy/tests/test_distributed.py @@ -26,6 +26,8 @@ from concurrent.futures import ThreadPoolExecutor import numpy as np +import pandas as pd +import pyarrow as pa import pytest @@ -595,3 +597,50 @@ def start_requests(rs, state, job_per_proc, vineyard_ipc_sockets): r, message = rs.get(block=True) if not r: pytest.fail(message) + + +@pytest.mark.parametrize( + "value", + [ + 1, + 'abcde', + True, + (1, "2", pytest.approx(3.456), 4444, "5.5.5.5.5.5.5"), + {1: 2, 3: 4, 5: None, None: 6}, + np.asfortranarray(np.random.rand(10, 7)), + np.zeros((0, 1, 2, 3), dtype='int'), + pa.array([1, 2, None, 3]), + pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8]}), + pd.Series([1, 3, 5, np.nan, 6, 8], name='foo'), + ], +) +def test_get_and_put_with_different_vineyard_instances( + value, vineyard_rpc_client, vineyard_ipc_sockets +): + ipc_clients = generate_vineyard_ipc_clients(vineyard_ipc_sockets, 4) + objects = [] + + if isinstance(value, pd.arrays.SparseArray): + value = pd.DataFrame(value) + + for client in ipc_clients: + o = client.put(value, persist=True) + objects.append(o) + o = vineyard_rpc_client.put(value, persist=True) + objects.append(o) + + values = [] + for o in objects: + for client in ipc_clients: + values.append(client.get(vineyard.ObjectID(o))) + values.append(vineyard_rpc_client.get(vineyard.ObjectID(o))) + + for v in values: + if isinstance(value, np.ndarray): + np.testing.assert_equal(value, v) + elif isinstance(value, pd.DataFrame): + pd.testing.assert_frame_equal(value, v) + elif isinstance(value, pd.Series): + pd.testing.assert_series_equal(value, v) + else: + assert value == v