Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to create_listener/create_endpoint through PCIe or shared memory when two kube pod in one machine? #1084

Open
MoFHeka opened this issue Oct 17, 2024 · 5 comments

Comments

@MoFHeka
Copy link

MoFHeka commented Oct 17, 2024

There is only example for transport data through network card.

@pentschev
Copy link
Member

There's no such thing as a listener through shared memory, and I'm not sure what PICE means, perhaps you mean PCIe? A listener is necessarily bound to some sort of networking interface with an IP address and a port, just like a socket. However, this is just the means to establish connection between two processes, UCX will nevertheless use shared memory if that's identified to provide better performance than other available transports between those two processes. You can confirm that by setting UCX_TLS to limit to the transports you want to use it, for example running the internal UCX-Py benchmark with TCP only I see the following result:

$ UCX_TLS=tcp python -m ucp.benchmarks.send_recv --no-detailed-report --n-iter 10000 --n-
bytes 100000 --backend ucp-async -c 0 -b 1 --no-error-handling
Server Running at 10.33.225.163:57742
Client connecting to server at 10.33.225.163:57742
Roundtrip benchmark
================================================================================
Iterations                | 10000
Bytes                     | 97.66 kiB
Object type               | numpy
Reuse allocation          | False
Transfer API              | TAG
UCX_TLS                   | tcp
UCX_NET_DEVICES           | all
================================================================================
Device(s)                 | CPU-only
Server CPU                | 1
Client CPU                | 0
================================================================================
Bandwidth (average)       | 470.28 MiB/s
Bandwidth (median)        | 482.63 MiB/s
Latency (average)         | 202786 ns
Latency (median)          | 197598 ns

If I now enable shared memory as well with UCX_TLS=tcp,sm (note that TCP is necessary to open a listener and establish communication between endpoints) I see the following result:

$ UCX_TLS=tcp,sm python -m ucp.benchmarks.send_recv --no-detailed-report --n-iter 10000 --n-bytes 100000 --backend ucp-async -c 0 -b 1 --no-error-handling
Server Running at 10.33.225.163:58855
Client connecting to server at 10.33.225.163:58855
Roundtrip benchmark
================================================================================
Iterations                | 10000
Bytes                     | 97.66 kiB
Object type               | numpy
Reuse allocation          | False
Transfer API              | TAG
UCX_TLS                   | tcp,sm
UCX_NET_DEVICES           | all
================================================================================
Device(s)                 | CPU-only
Server CPU                | 1
Client CPU                | 0
================================================================================
Bandwidth (average)       | 1.03 GiB/s
Bandwidth (median)        | 1.06 GiB/s
Latency (average)         | 90168 ns
Latency (median)          | 87888 ns

This is now more than twice as fast as the TCP only case seen before, because UCX automatically switches to shared memory.

If you nevertheless need to communicate without a network interface it's still possible to do so by creating an endpoint directly to a worker (without a listener), this test is a good example of how this can be done, essentially you need to get the worker's address and transfer it through some communication channel (like a queue in a multiprocess application), and then create an endpoint to the remote worker, after that everything should work pretty much in the same way, except you'll also need to specify a tag yourself and force it.

Also a note specifically on shared memory: endpoint error handling is not currently supported by UCX and thus you must disable it by specifying endpoint_error_handling=False (and --no-error-handling in the benchmark above) in create_listener/create_endpoint/create_endpoint_from_worker_address, otherwise UCX will just quietly disable it and fallback to another transport that supports error handling (like TCP).

@MoFHeka MoFHeka changed the title How to create_listener/create_endpoint through PICE or shared memory when two kube pod in one machine? How to create_listener/create_endpoint through PCIe or shared memory when two kube pod in one machine? Oct 22, 2024
@MoFHeka
Copy link
Author

MoFHeka commented Oct 25, 2024

@pentschev Thank you for your reply. What's more, it's there any possible transfer a CPU tensor in A machine to GPU device memory in B machine?

@pentschev
Copy link
Member

Yes, that should work without problems, you can send a message using CPU memory (e.g., NumPy array) and receive it on device memory on the remote process (e.g., CuPy array). Below is an example based on the test_send_recv_two_workers.py, notice that I've used UCXX which will soon replace UCX-Py and you should switch, but if you need UCX-Py for the time being switching import ucxx to import ucp should work.

# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

import asyncio
import multiprocessing
import random

import cupy
import numpy as np

import ucxx
from ucxx._lib_async.utils import get_event_loop

SIZE = 10**8


def client(port):
    async def read():
        addr = ucxx.get_address()
        ep = await ucxx.create_endpoint(addr, port)

        recv_msg = cupy.empty(SIZE, dtype=np.uint64)  # Receive as CuPy (CUDA) object

        await ep.recv(recv_msg)

        close_msg = b"shutdown listener"
        close_msg_size = np.array([len(close_msg)], dtype=np.uint64)

        await ep.send(close_msg_size)
        await ep.send(close_msg)
        return recv_msg

    recv_msg = get_event_loop().run_until_complete(read())

    cupy.testing.assert_allclose(recv_msg, cupy.arange(SIZE))


def server(port):
    async def f(listener_port):
        async def write(ep):
            send_msg = np.arange(SIZE, dtype=np.uint64)  # Send as NumPy (host) object

            await ep.send(send_msg)

            close_msg = b"shutdown listener"

            msg_size = np.empty(1, dtype=np.uint64)
            await ep.recv(msg_size)

            msg = np.empty(msg_size[0], dtype=np.uint8)
            await ep.recv(msg)
            assert msg.tobytes() == close_msg
            await ep.close()
            lf.close()

        lf = ucxx.create_listener(write, port=listener_port)
        try:
            while not lf.closed:
                await asyncio.sleep(0.1)
        except Exception as e:
            print(f"Exception: {e=}")

    loop = get_event_loop()
    loop.run_until_complete(f(port))


if __name__ == "__main__":
    port = random.randint(13000, 15500)
    ctx = multiprocessing.get_context("spawn")
    server_process = ctx.Process(
        name="server", target=server, args=[port]
    )
    client_process = ctx.Process(
        name="client", target=client, args=[port]
    )

    server_process.start()
    client_process.start()

    client_process.join()
    server_process.join()

@MoFHeka
Copy link
Author

MoFHeka commented Oct 28, 2024

Many thanks. By the way, does UCXX plan to support UCC? Those collective operators.

@pentschev
Copy link
Member

At the moment there are no plans to support UCC.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants