Skip to content

Commit

Permalink
Add a wrapper to manage IPC and RPC clients and revise the statement …
Browse files Browse the repository at this point in the history
…about RPC client in documentation (#1629)

- Add the client wrapper to handle the IPCClient and RPCClient.
- Update the doc for rpc client.

Signed-off-by: Ye Cao <[email protected]>
Signed-off-by: Tao He <[email protected]>
Co-authored-by: Tao He <[email protected]>
  • Loading branch information
dashanji and sighingnow authored Dec 13, 2023
1 parent 89530bf commit 76c823a
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 68 deletions.
28 changes: 28 additions & 0 deletions docs/notes/key-concepts/data-accessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,34 @@ Vineyard offers low-level APIs for creating and accessing local blobs with enhan
Remote Objects
--------------
Creating and accessing remote objects in vineyard can be easily achieved using :code:`put` and :code:`get` methods (see
:meth:`vineyard.RPCClient.put` and :meth:`vineyard.RPCClient.get`).
.. code:: python
:caption: Effortlessly create and access remote objects using :code:`put` and :code:`get`
>>> import pandas as pd
>>> import vineyard
>>> import numpy as np
>>>
>>> vineyard_rpc_client = vineyard.connect("localhost", 9600)
>>>
>>> df = pd.DataFrame(np.random.rand(10, 2))
>>>
>>> # put object into vineyard
>>> r = vineyard_rpc_client.put(df)
>>> r, type(r)
(o000a45730a85f8fe, vineyard._C.ObjectID)
>>>
>>> # get object from vineyard using object id
>>> data = vineyard_rpc_client.get(r)
>>> data
0 1
0 0.884227 0.576031
1 0.863040 0.069815
2 0.297906 0.911874
...
The RPC client enables inspection of remote object metadata and facilitates operations on blobs
within the remote cluster, while taking into account the associated network transfer costs.
Expand Down
67 changes: 25 additions & 42 deletions python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#include <sys/stat.h>
#include <memory>
#include <sstream>

Expand Down Expand Up @@ -580,7 +581,7 @@ void bind_client(py::module& mod) {
.def_property_readonly("is_rpc", &ClientBase::IsRPC,
doc::ClientBase_is_rpc);

// Client
// IPCClient
py::class_<Client, std::shared_ptr<Client>, ClientBase>(mod, "IPCClient",
doc::IPCClient)
.def(
Expand Down Expand Up @@ -644,20 +645,13 @@ void bind_client(py::module& mod) {
.def(
"get_meta",
[](Client* self, ObjectIDWrapper const& object_id,
bool const sync_remote, bool const fetch) -> ObjectMeta {
bool const sync_remote) -> ObjectMeta {
ObjectMeta meta;
// FIXME: do we really not need to sync from etcd? We assume the
// object is a local object
if (fetch) {
throw_on_error(
self->FetchAndGetMetaData(object_id, meta, sync_remote));
} else {
throw_on_error(self->GetMetaData(object_id, meta, sync_remote));
}
throw_on_error(self->GetMetaData(object_id, meta, sync_remote));
return meta;
},
"object_id"_a, py::arg("sync_remote") = false,
py::arg("fetch") = false, doc::IPCClient_get_meta)
doc::IPCClient_get_meta)
.def(
"get_metas",
[](Client* self, std::vector<ObjectIDWrapper> const& object_ids,
Expand Down Expand Up @@ -886,6 +880,12 @@ void bind_client(py::module& mod) {
throw_on_error(self->Fork(*rpc_client));
return rpc_client;
})
.def(
"is_fetchable",
[](RPCClient* self, ObjectMeta& metadata) -> bool {
return self->IsFetchable(metadata);
},
doc::RPCClient_is_fetchable)
.def_property_readonly("remote_instance_id",
&RPCClient::remote_instance_id,
doc::RPCClient_remote_instance_id)
Expand All @@ -896,33 +896,14 @@ void bind_client(py::module& mod) {

mod.def(
"_connect",
[](std::nullptr_t, const std::string& username,
[](std::string const& socket, const SessionID session_id,
const std::string& username,
const std::string& password) -> py::object {
if (!read_env("VINEYARD_IPC_SOCKET").empty()) {
return py::cast(ClientManager<Client>::GetManager()->Connect(
username, password));
}
if (!read_env("VINEYARD_RPC_ENDPOINT").empty()) {
return py::cast(ClientManager<RPCClient>::GetManager()->Connect(
username, password));
}
throw_on_error(Status::ConnectionFailed(
"Failed to resolve IPC socket or RPC endpoint of vineyard "
"server from environment variables VINEYARD_IPC_SOCKET or "
"VINEYARD_RPC_ENDPOINT."));
return py::none();
return py::cast(ClientManager<Client>::GetManager()->Connect(
socket, session_id, username, password));
},
py::arg("target") = py::none(), py::kw_only(),
py::arg("username") = "", py::arg("password") = "", doc::connect)
.def(
"_connect",
[](std::string const& socket, const std::string& username,
const std::string& password) -> py::object {
return py::cast(ClientManager<Client>::GetManager()->Connect(
socket, username, password));
},
"socket"_a, py::kw_only(), py::arg("username") = "",
py::arg("password") = "")
"socket"_a, py::kw_only(), py::arg("session") = RootSessionID(),
py::arg("username") = "", py::arg("password") = "")
.def(
"_connect",
[](std::string const& host, const uint32_t port,
Expand All @@ -932,8 +913,9 @@ void bind_client(py::module& mod) {
return py::cast(ClientManager<RPCClient>::GetManager()->Connect(
rpc_endpoint, session_id, username, password));
},
"host"_a, "port"_a, py::arg("session") = RootSessionID(),
py::kw_only(), py::arg("username") = "", py::arg("password") = "")
"host"_a, "port"_a, py::kw_only(),
py::arg("session") = RootSessionID(), py::arg("username") = "",
py::arg("password") = "")
.def(
"_connect",
[](std::string const& host, std::string const& port,
Expand All @@ -943,8 +925,9 @@ void bind_client(py::module& mod) {
return ClientManager<RPCClient>::GetManager()->Connect(
rpc_endpoint, session_id, username, password);
},
"host"_a, "port"_a, py::arg("session") = RootSessionID(),
py::kw_only(), py::arg("username") = "", py::arg("password") = "")
"host"_a, "port"_a, py::kw_only(),
py::arg("session") = RootSessionID(), py::arg("username") = "",
py::arg("password") = "")
.def(
"_connect",
[](std::pair<std::string, uint32_t> const& endpoint,
Expand All @@ -955,7 +938,7 @@ void bind_client(py::module& mod) {
return ClientManager<RPCClient>::GetManager()->Connect(
rpc_endpoint, session_id, username, password);
},
"endpoint"_a, py::arg("session") = RootSessionID(), py::kw_only(),
"endpoint"_a, py::kw_only(), py::arg("session") = RootSessionID(),
py::arg("username") = "", py::arg("password") = "")
.def(
"_connect",
Expand All @@ -966,7 +949,7 @@ void bind_client(py::module& mod) {
return ClientManager<RPCClient>::GetManager()->Connect(
rpc_endpoint, session_id, username, password);
},
"endpoint"_a, py::arg("session") = RootSessionID(), py::kw_only(),
"endpoint"_a, py::kw_only(), py::arg("session") = RootSessionID(),
py::arg("username") = "", py::arg("password") = "");
} // NOLINT(readability/fn_size)

Expand Down
5 changes: 3 additions & 2 deletions python/core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,10 @@ void bind_core(py::module& mod) {
// NB: don't expose the "Build" method to python.
.def(
"seal",
[](ObjectBuilder* self, Client* client) {
[](ObjectBuilder* self, py::object client) {
std::shared_ptr<Object> object;
throw_on_error(self->Seal(*client, object));
Client* ipc_client = py::cast<Client*>(client.attr("ipc_client"));
throw_on_error(self->Seal(*ipc_client, object));
return object;
},
"client"_a)
Expand Down
9 changes: 6 additions & 3 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1053,9 +1053,6 @@ Close the client.

const char* RPCClient = R"doc(
RPC client that connects to vineyard instance's RPC endpoints.
The RPC client can only access the metadata of objects, any access to the blob payload
will trigger a :code:`RuntimeError` exception.
)doc";

const char* RPCClient_get_object = R"doc(
Expand Down Expand Up @@ -1241,6 +1238,12 @@ const char* RPCClient_close = R"doc(
Close the client.
)doc";

const char* RPCClient_is_fetchable = R"doc(
Whether the rpc client is able to fetch objects from the connected vineyard server.
When the instance connected by the rpc client is not the same as the instance
of metadata, the rpc client is not able to fetch the object.
)doc";

const char* RPCClient_remote_instance_id = R"doc(
The instance id of the connected remote vineyard server.
)doc";
Expand Down
1 change: 1 addition & 0 deletions python/pybind11_docs.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ extern const char* RPCClient_get_metas;
extern const char* RPCClient_list_objects;
extern const char* RPCClient_list_metadatas;
extern const char* RPCClient_close;
extern const char* RPCClient_is_fetchable;
extern const char* RPCClient_remote_instance_id;

extern const char* connect;
Expand Down
10 changes: 4 additions & 6 deletions python/vineyard/_C.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ from typing import Tuple
from typing import Union
from typing import overload

from vineyard.core.client import Client

class ObjectID:
@overload
def __init__(self) -> None: ...
Expand Down Expand Up @@ -119,7 +121,7 @@ class Object:
def isglobal(self) -> bool: ...

class ObjectBuilder:
def seal(self, client: "IPCClient") -> "Object": ...
def seal(self, client: Client) -> "Object": ...
@property
def issealed(self) -> bool: ...

Expand Down Expand Up @@ -235,7 +237,7 @@ class ClientBase:
def next_chunk_id(self, stream_id: ObjectID) -> ObjectID: ...
def next_chunk_meta(self, stream_id: ObjectID) -> ObjectMeta: ...
def next_chunk(self, stream_id: ObjectID) -> Object: ...
def stop_stream(self, stream_id: ObjectID) -> None: ...
def stop_stream(self, stream_id: ObjectID, failed: bool) -> None: ...
def drop_stream(self, stream_id: ObjectID) -> None: ...
def persist(self, object: Union[ObjectID, Object, ObjectMeta]) -> None: ...
def exists(self, object: ObjectID) -> bool: ...
Expand Down Expand Up @@ -371,10 +373,6 @@ class RPCClient(ClientBase):
def __enter__(self) -> "RPCClient": ...
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: ...

@overload
def connect(
*, username: str = "", password: str = ""
) -> Union[IPCClient, RPCClient]: ...
@overload
def connect(
target: None = None, *, username: str = "", password: str = ""
Expand Down
3 changes: 2 additions & 1 deletion python/vineyard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def _init_global_context():
from ._C import VineyardServerNotReadyException
from ._C import _connect
from ._C import memory_copy
from .core import Client
from .core import builder_context
from .core import default_builder_context
from .core import default_driver_context
Expand Down Expand Up @@ -398,4 +399,4 @@ def connect(*args, **kwargs):
'try to launch a standalone one.'
)
try_init()
return _connect(*args, **kwargs)
return Client(*args, **kwargs)
1 change: 1 addition & 0 deletions python/vineyard/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from .builder import builder_context
from .builder import default_builder_context
from .client import Client
from .driver import default_driver_context
from .driver import driver_context
from .resolver import default_resolver_context
Expand Down
2 changes: 1 addition & 1 deletion python/vineyard/core/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def put(
00002ec13bc81226
Parameters:
client: IPCClient
client: IPCClient or RPCClient
The vineyard client to use.
value:
The python value that will be put to vineyard. Supported python value
Expand Down
Loading

0 comments on commit 76c823a

Please sign in to comment.