From 53dc33e9df43b49de63644f27a1429335f5b2d49 Mon Sep 17 00:00:00 2001 From: wuqida <819387826@qq.com> Date: Mon, 8 Jul 2024 21:45:49 +0800 Subject: [PATCH] add a test case to verify that data has been completely deleted --- .../elasticsearch/ElasticsearchIT.java | 45 +++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index 3180f386b27..587953bb7e9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -447,7 +447,7 @@ public void tearDown() { } @Test - public void testCatalog() { + public void testCatalog() throws IOException, InterruptedException { Map configMap = new HashMap<>(); configMap.put("username", "elastic"); configMap.put("password", "elasticsearch"); @@ -467,15 +467,52 @@ public void testCatalog() { elasticSearchCatalog.createTable(tablePath, null, false); final boolean existsAfter = elasticSearchCatalog.tableExists(tablePath); Assertions.assertTrue(existsAfter); - // data exists? - final boolean existsData = elasticSearchCatalog.isExistsData(tablePath); - Assertions.assertFalse(existsData); + + // Add multiple records + List data = generateTestData(); + StringBuilder requestBody = new StringBuilder(); + String indexHeader = "{\"index\":{\"_index\":\"st_index3\"}}\n"; + for (String record : data) { + requestBody.append(indexHeader); + requestBody.append(record); + requestBody.append("\n"); + } + esRestClient.bulk(requestBody.toString()); + Thread.sleep(2000); // Wait for data to be indexed + + // Verify data exists + List sourceFields = Arrays.asList("field1", "field2"); + Map query = new HashMap<>(); + query.put("match_all", new HashMap<>()); + ScrollResult scrollResult = + esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100); + Assertions.assertFalse(scrollResult.getDocs().isEmpty(), "Data should exist in the index"); + // truncate elasticSearchCatalog.truncateTable(tablePath, false); Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath)); + + // Verify data is deleted + scrollResult = esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100); + Assertions.assertTrue( + scrollResult.getDocs().isEmpty(), + "Data was not successfully deleted from the index"); + // drop elasticSearchCatalog.dropTable(tablePath, false); Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath)); elasticSearchCatalog.close(); } + + private List generateTestData() throws JsonProcessingException { + List data = new ArrayList<>(); + ObjectMapper objectMapper = new ObjectMapper(); + for (int i = 0; i < 10; i++) { + Map record = new HashMap<>(); + record.put("field1", "value" + i); + record.put("field2", i); + data.add(objectMapper.writeValueAsString(record)); + } + return data; + } }