From f0f43b9c5f2cb75879f0d4050b115754e2fa0709 Mon Sep 17 00:00:00 2001 From: Ye Cao Date: Tue, 3 Sep 2024 11:10:08 +0800 Subject: [PATCH] Fix the e2e test in vineyard operator. Signed-off-by: Ye Cao --- k8s/config/scheduler/config.yaml | 6 ++++ k8s/test/e2e/Dockerfile | 2 +- k8s/test/e2e/assembly-demo/assembly-job1.py | 2 +- k8s/test/e2e/assembly-demo/assembly-local.py | 2 +- .../e2e/assembly-demo/distributed-job1.py | 2 +- .../e2e/assembly-demo/distributed-job2.py | 2 +- .../deploy-raw-backup-and-recover/e2e.yaml | 4 +-- k8s/test/e2e/serialize/e2e.yaml | 1 + python/vineyard/io/tests/test_stream.py | 4 +-- test/runner.py | 33 ++++++++++++++++++- 10 files changed, 48 insertions(+), 10 deletions(-) diff --git a/k8s/config/scheduler/config.yaml b/k8s/config/scheduler/config.yaml index babba5c0f1..9b547b1db1 100644 --- a/k8s/config/scheduler/config.yaml +++ b/k8s/config/scheduler/config.yaml @@ -9,6 +9,12 @@ leaderElection: profiles: - schedulerName: vineyard-scheduler plugins: + filter: + disabled: + - name: "*" + preScore: + disabled: + - name: "*" score: enabled: - name: Vineyard diff --git a/k8s/test/e2e/Dockerfile b/k8s/test/e2e/Dockerfile index fb8e83ded5..35731dd948 100644 --- a/k8s/test/e2e/Dockerfile +++ b/k8s/test/e2e/Dockerfile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM ghcr.io/v6d-io/v6d/vineyard-python-dev:latest +FROM ghcr.io/v6d-io/v6d/vineyard-python-dev:latest_x86_64 WORKDIR / diff --git a/k8s/test/e2e/assembly-demo/assembly-job1.py b/k8s/test/e2e/assembly-demo/assembly-job1.py index 0d508757ea..6cd6385ba0 100644 --- a/k8s/test/e2e/assembly-demo/assembly-job1.py +++ b/k8s/test/e2e/assembly-demo/assembly-job1.py @@ -31,7 +31,7 @@ def generate_df(index): stream = RecordBatchStream.new(vineyard_client) vineyard_client.persist(stream.id) print(stream.id) -writer = stream.writer +writer = stream.open_writer(vineyard_client) total_chunks = 10 for idx in range(total_chunks): time.sleep(idx) diff --git a/k8s/test/e2e/assembly-demo/assembly-local.py b/k8s/test/e2e/assembly-demo/assembly-local.py index 1800a88561..2fafbf15f6 100644 --- a/k8s/test/e2e/assembly-demo/assembly-local.py +++ b/k8s/test/e2e/assembly-demo/assembly-local.py @@ -24,7 +24,7 @@ stream_id = env_dist['STREAM_ID'] stream = vineyard_client.get(stream_id) -reader = stream.reader +reader = stream.open_reader(vineyard_client) index = 0 global_meta = vineyard.ObjectMeta() diff --git a/k8s/test/e2e/assembly-demo/distributed-job1.py b/k8s/test/e2e/assembly-demo/distributed-job1.py index c4a3eb3445..c27d6597bc 100644 --- a/k8s/test/e2e/assembly-demo/distributed-job1.py +++ b/k8s/test/e2e/assembly-demo/distributed-job1.py @@ -39,7 +39,7 @@ def generate_df(index): sys.stdout = sys.__stdout__ print(meta.id, flush=True) -writer = stream.writer +writer = stream.open_writer(vineyard_client) total_chunks = 10 for idx in range(total_chunks): time.sleep(idx) diff --git a/k8s/test/e2e/assembly-demo/distributed-job2.py b/k8s/test/e2e/assembly-demo/distributed-job2.py index 16945dd9a2..e25fbe071b 100644 --- a/k8s/test/e2e/assembly-demo/distributed-job2.py +++ b/k8s/test/e2e/assembly-demo/distributed-job2.py @@ -46,7 +46,7 @@ def generate_df(index): tup = vineyard_client.create_metadata(meta) vineyard_client.persist(tup) -writer = stream.writer +writer = stream.open_writer(vineyard_client) total_chunks = 10 for idx in range(total_chunks): time.sleep(idx) diff --git a/k8s/test/e2e/deploy-raw-backup-and-recover/e2e.yaml b/k8s/test/e2e/deploy-raw-backup-and-recover/e2e.yaml index 3a40bfbb6f..4928c99656 100644 --- a/k8s/test/e2e/deploy-raw-backup-and-recover/e2e.yaml +++ b/k8s/test/e2e/deploy-raw-backup-and-recover/e2e.yaml @@ -185,7 +185,7 @@ verify: - query: | kubectl get pod -l app=get-local-object -n vineyard-job -oname | \ awk -F '/' '{print $2}' | \ - head -n 1 | \ + tail -n 1 | \ xargs kubectl logs -n vineyard-job | \ yq e '{"sum": .}' - | \ yq e 'to_entries' - @@ -193,7 +193,7 @@ verify: - query: | kubectl get pod -l app=get-distributed-object -n vineyard-job -oname | \ awk -F '/' '{print $2}' | \ - head -n 1 | \ + tail -n 1 | \ xargs kubectl logs -n vineyard-job | \ yq e '{"sum": .}' - | \ yq e 'to_entries' - diff --git a/k8s/test/e2e/serialize/e2e.yaml b/k8s/test/e2e/serialize/e2e.yaml index 52a15fe353..866f6db863 100644 --- a/k8s/test/e2e/serialize/e2e.yaml +++ b/k8s/test/e2e/serialize/e2e.yaml @@ -50,6 +50,7 @@ verify: awk -F '/' '{print $2}' | \ head -n 1 | \ xargs kubectl logs -n vineyard-system | \ + grep "test passed" | \ yq e '{"result": .}' - | \ yq e 'to_entries' - expected: ../verify/serialize.yaml diff --git a/python/vineyard/io/tests/test_stream.py b/python/vineyard/io/tests/test_stream.py index 308796caf3..fcb5f1673e 100644 --- a/python/vineyard/io/tests/test_stream.py +++ b/python/vineyard/io/tests/test_stream.py @@ -47,7 +47,7 @@ def test_recordbatch_stream(vineyard_client): total_chunks = 10 def producer(stream: RecordBatchStream, dtypes, produced: List): - writer = stream.writer + writer = stream.open_writer(vineyard_client) for idx in range(total_chunks): time.sleep(idx) chunk = generate_random_dataframe(dtypes, 2) # np.random.randint(10, 100)) @@ -57,7 +57,7 @@ def producer(stream: RecordBatchStream, dtypes, produced: List): writer.finish() def consumer(stream: RecordBatchStream, produced: List): - reader = stream.reader + reader = stream.open_reader(vineyard_client) index = 0 while True: try: diff --git a/test/runner.py b/test/runner.py index a02d4223b5..470d7ea689 100755 --- a/test/runner.py +++ b/test/runner.py @@ -1010,7 +1010,6 @@ def run_python_deploy_tests(meta, allocator, endpoints, test_args, with_migratio flush=True, ) - def run_io_adaptor_tests(meta, allocator, endpoints, test_args): meta_prefix = 'vineyard_test_%s' % time.time() metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix) @@ -1043,6 +1042,37 @@ def run_io_adaptor_tests(meta, allocator, endpoints, test_args): flush=True, ) +def run_stream_test(meta, allocator, endpoints, test_args): + meta_prefix = 'vineyard_test_%s' % time.time() + metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix) + + with start_vineyardd( + metadata_settings, + ['--allocator', allocator], + default_ipc_socket=VINEYARD_CI_IPC_SOCKET, + ) as (_, rpc_socket_port): + start_time = time.time() + subprocess.check_call( + [ + 'pytest', + '-s', + '-vvv', + '--exitfirst', + '--durations=0', + '--log-cli-level', + 'DEBUG', + 'python/vineyard/io/tests', + *test_args, + '--vineyard-ipc-socket=%s' % VINEYARD_CI_IPC_SOCKET, + '--vineyard-endpoint=localhost:%s' % rpc_socket_port, + ], + cwd=os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'), + ) + print( + 'running python stream %s tests use %s seconds' + % (time.time() - start_time), + flush=True, + ) def run_fuse_test(meta, allocator, endpoints, test_args): meta_prefix = 'vineyard_test_%s' % time.time() @@ -1257,6 +1287,7 @@ def execute_tests(args): if args.with_io: run_io_adaptor_tests(args.meta, args.allocator, endpoints, python_test_args) + run_stream_test(args.meta, args.allocator, endpoints, python_test_args) if args.with_fuse: run_fuse_test(args.meta, args.allocator, endpoints, python_test_args)