From 300624201b3a87e6876a9756b83c896ae08ddd04 Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Mon, 12 Aug 2024 23:48:01 +0800 Subject: [PATCH 1/9] fix start_server error --- python/xoscar/backends/communication/socket.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index 246956e..4390657 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -201,10 +201,6 @@ def client_type(self) -> Type["Client"]: def channel_type(self) -> int: return ChannelType.remote - @classmethod - def parse_config(cls, config: dict) -> dict: - return config - @staticmethod @implements(Server.create) async def create(config: Dict) -> "Server": From 379e78d6bda0df7b943af3d03bc49217381c5b91 Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Mon, 12 Aug 2024 23:49:13 +0800 Subject: [PATCH 2/9] fix start_server error --- .github/workflows/python.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index bfefb87..12e7dab 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -65,7 +65,6 @@ jobs: find . -name "CMakeLists.txt" -not -path "*third_party/*" | xargs cmake-format -c .cmake-format.yaml --check build_test_job: - if: github.repository == 'xorbitsai/xoscar' runs-on: ${{ matrix.os }} needs: lint env: From 784f50bb0d02d16fb725bcc7a0289b9f80da96b4 Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Tue, 13 Aug 2024 00:10:40 +0800 Subject: [PATCH 3/9] fix start_server error --- .github/workflows/python.yaml | 5 +++-- python/xoscar/backends/communication/socket.py | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index 12e7dab..b64a231 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -4,8 +4,8 @@ on: push: branches: - '*' - pull_request: - types: ['opened', 'reopened', 'synchronize'] + # pull_request: + # types: ['opened', 'reopened', 'synchronize'] concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -65,6 +65,7 @@ jobs: find . -name "CMakeLists.txt" -not -path "*third_party/*" | xargs cmake-format -c .cmake-format.yaml --check build_test_job: + # if: github.repository == 'xorbitsai/xoscar' runs-on: ${{ matrix.os }} needs: lint env: diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index 4390657..246956e 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -201,6 +201,10 @@ def client_type(self) -> Type["Client"]: def channel_type(self) -> int: return ChannelType.remote + @classmethod + def parse_config(cls, config: dict) -> dict: + return config + @staticmethod @implements(Server.create) async def create(config: Dict) -> "Server": From c97ea310d3ec2e6646cca68aa99a42086fbbe98e Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Tue, 13 Aug 2024 06:49:15 +0800 Subject: [PATCH 4/9] sockets don't need ucx config --- python/xoscar/backends/communication/socket.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index 246956e..d80aeef 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -209,6 +209,8 @@ def parse_config(cls, config: dict) -> dict: @implements(Server.create) async def create(config: Dict) -> "Server": config = config.copy() + if "ucx" in config: + config.pop("ucx") if "address" in config: address = config.pop("address") host, port = address.rsplit(":", 1) From 3dd0802bdfa73ccee7a700bae73b7ad3170fb38d Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Tue, 13 Aug 2024 09:15:14 +0800 Subject: [PATCH 5/9] sockets don't need ucx config --- .github/workflows/python.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index b64a231..bfefb87 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -4,8 +4,8 @@ on: push: branches: - '*' - # pull_request: - # types: ['opened', 'reopened', 'synchronize'] + pull_request: + types: ['opened', 'reopened', 'synchronize'] concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -65,7 +65,7 @@ jobs: find . -name "CMakeLists.txt" -not -path "*third_party/*" | xargs cmake-format -c .cmake-format.yaml --check build_test_job: - # if: github.repository == 'xorbitsai/xoscar' + if: github.repository == 'xorbitsai/xoscar' runs-on: ${{ matrix.os }} needs: lint env: From c1a6c14d6b01c30d610849861bf9404990de08fc Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Tue, 13 Aug 2024 12:37:22 +0800 Subject: [PATCH 6/9] filter config --- python/xoscar/backends/communication/socket.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index d80aeef..37ff103 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -203,14 +203,16 @@ def channel_type(self) -> int: @classmethod def parse_config(cls, config: dict) -> dict: - return config + # we only need the following config + keys_of_interest = ['listen_elastic_ip', 'address', 'host', 'port'] + parsed_config = {key: config[key] for key in keys_of_interest if key in config} + + return parsed_config @staticmethod @implements(Server.create) async def create(config: Dict) -> "Server": config = config.copy() - if "ucx" in config: - config.pop("ucx") if "address" in config: address = config.pop("address") host, port = address.rsplit(":", 1) From b27fccff2108092e87d006990fa12f6015741ee1 Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Tue, 13 Aug 2024 12:38:14 +0800 Subject: [PATCH 7/9] filter config --- python/xoscar/backends/communication/socket.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index 37ff103..c20e04c 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -204,9 +204,9 @@ def channel_type(self) -> int: @classmethod def parse_config(cls, config: dict) -> dict: # we only need the following config - keys_of_interest = ['listen_elastic_ip', 'address', 'host', 'port'] + keys_of_interest = ["listen_elastic_ip", "address", "host", "port"] parsed_config = {key: config[key] for key in keys_of_interest if key in config} - + return parsed_config @staticmethod From f5d3e7ce5381729ac30901370af8ef5a34fadf96 Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Tue, 13 Aug 2024 17:52:35 +0800 Subject: [PATCH 8/9] filter config --- python/xoscar/backends/communication/socket.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index c20e04c..816859f 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -203,9 +203,11 @@ def channel_type(self) -> int: @classmethod def parse_config(cls, config: dict) -> dict: + if config is None or not config: + return dict() # we only need the following config - keys_of_interest = ["listen_elastic_ip", "address", "host", "port"] - parsed_config = {key: config[key] for key in keys_of_interest if key in config} + keys = ["listen_elastic_ip"] + parsed_config = {key: config[key] for key in keys if key in config} return parsed_config From a93e1e51656a212fad198f9ec026749db4f019da Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Tue, 13 Aug 2024 17:55:13 +0800 Subject: [PATCH 9/9] filter config --- .../backends/indigen/tests/test_pool.py | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/python/xoscar/backends/indigen/tests/test_pool.py b/python/xoscar/backends/indigen/tests/test_pool.py index 04d5c51..6b60481 100644 --- a/python/xoscar/backends/indigen/tests/test_pool.py +++ b/python/xoscar/backends/indigen/tests/test_pool.py @@ -542,6 +542,104 @@ async def test_create_actor_pool(): assert len(global_router._mapping) == 0 +@pytest.mark.asyncio +async def test_create_actor_pool_extra_config(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + # create a actor pool based on socket rather than ucx + # pass `extra_conf` to check if we can filter out ucx config + pool = await create_actor_pool( + "127.0.0.1", + pool_cls=MainActorPool, + n_process=2, + subprocess_start_method=start_method, + extra_conf={ + "ucx": { + "tcp": None, + "nvlink": None, + "infiniband": None, + "rdmacm": None, + "cuda-copy": None, + "create-cuda-contex": None, + } + }, + ) + + async with pool: + # test global router + global_router = Router.get_instance() + # global router should not be the identical one with pool's router + assert global_router is not pool.router + assert pool.external_address in global_router._curr_external_addresses + assert pool.external_address in global_router._mapping + + ctx = get_context() + + # actor on main pool + actor_ref = await ctx.create_actor( + TestActor, uid="test-1", address=pool.external_address + ) + assert await actor_ref.add(3) == 3 + assert await actor_ref.add(1) == 4 + assert (await ctx.has_actor(actor_ref)) is True + assert (await ctx.actor_ref(actor_ref)) == actor_ref + # test cancel + task = asyncio.create_task(actor_ref.sleep(20)) + await asyncio.sleep(0) + task.cancel() + assert await task == 5 + await ctx.destroy_actor(actor_ref) + assert (await ctx.has_actor(actor_ref)) is False + for f in actor_ref.add, ctx.actor_ref, ctx.destroy_actor: + with pytest.raises(ActorNotExist): + await f(actor_ref) + + # actor on sub pool + actor_ref1 = await ctx.create_actor( + TestActor, uid="test-main", address=pool.external_address + ) + actor_ref2 = await ctx.create_actor( + TestActor, + uid="test-2", + address=pool.external_address, + allocate_strategy=RandomSubPool(), + ) + assert ( + await ctx.actor_ref(uid="test-2", address=actor_ref2.address) + ) == actor_ref2 + main_ref = await ctx.actor_ref(uid="test-main", address=actor_ref2.address) + assert main_ref.address == pool.external_address + main_ref = await ctx.actor_ref(actor_ref1) + assert main_ref.address == pool.external_address + assert actor_ref2.address != actor_ref.address + assert await actor_ref2.add(3) == 3 + assert await actor_ref2.add(1) == 4 + with pytest.raises(RuntimeError): + await actor_ref2.return_cannot_unpickle() + with pytest.raises(SendMessageFailed): + await actor_ref2.raise_cannot_pickle() + assert (await ctx.has_actor(actor_ref2)) is True + assert (await ctx.actor_ref(actor_ref2)) == actor_ref2 + # test cancel + task = asyncio.create_task(actor_ref2.sleep(20)) + start = time.time() + await asyncio.sleep(0) + task.cancel() + assert await task == 5 + assert time.time() - start < 3 + await ctx.destroy_actor(actor_ref2) + assert (await ctx.has_actor(actor_ref2)) is False + + assert pool.stopped + # after pool shutdown, global router must has been cleaned + global_router = Router.get_instance() + assert len(global_router._curr_external_addresses) == 0 + assert len(global_router._mapping) == 0 + + @pytest.mark.asyncio @require_unix async def test_create_actor_pool_elastic_ip():