From 5b7325b44393246e9852b49f41e08485ae4678cd Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 4 Dec 2023 09:07:42 -0500 Subject: [PATCH 1/5] Fix test failure #102868 (#102889) closes https://github.com/elastic/elasticsearch/issues/102868 --- .../ExceptionSerializationTests.java | 34 ++++++------------- 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 2263bfe78f218..f7362c7001c36 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreadsTests; @@ -129,9 +128,8 @@ public class ExceptionSerializationTests extends ESTestCase { - public void testExceptionRegistration() throws ClassNotFoundException, IOException, URISyntaxException { + public void testExceptionRegistration() throws IOException, URISyntaxException { final Set> notRegistered = new HashSet<>(); - final Set> hasDedicatedWrite = new HashSet<>(); final Set> registered = new HashSet<>(); final String path = "/org/elasticsearch"; final Path startPath = PathUtils.get(ElasticsearchException.class.getProtectionDomain().getCodeSource().getLocation().toURI()) @@ -146,13 +144,13 @@ public void testExceptionRegistration() throws ClassNotFoundException, IOExcepti private Path pkgPrefix = PathUtils.get(path).getParent(); @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { pkgPrefix = pkgPrefix.resolve(dir.getFileName()); return FileVisitResult.CONTINUE; } @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { checkFile(file.getFileName().toString()); return FileVisitResult.CONTINUE; } @@ -180,13 +178,6 @@ private void checkClass(Class clazz) { notRegistered.add(clazz); } else if (ElasticsearchException.isRegistered(clazz.asSubclass(Throwable.class), TransportVersion.current())) { registered.add(clazz); - try { - if (clazz.getMethod("writeTo", StreamOutput.class) != null) { - hasDedicatedWrite.add(clazz); - } - } catch (Exception e) { - // fair enough - } } } @@ -199,7 +190,7 @@ private Class loadClass(String filename) throws ClassNotFoundException { for (Path p : pkgPrefix) { pkg.append(p.getFileName().toString()).append("."); } - pkg.append(filename.substring(0, filename.length() - 6)); + pkg.append(filename, 0, filename.length() - 6); return getClass().getClassLoader().loadClass(pkg.toString()); } @@ -209,7 +200,7 @@ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOExce } @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + public FileVisitResult postVisitDirectory(Path dir, IOException exc) { pkgPrefix = pkgPrefix.getParent(); return FileVisitResult.CONTINUE; } @@ -220,7 +211,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx Files.walkFileTree(testStartPath, visitor); assertTrue(notRegistered.remove(TestException.class)); assertTrue(notRegistered.remove(UnknownHeaderException.class)); - assertTrue("Classes subclassing ElasticsearchException must be registered \n" + notRegistered.toString(), notRegistered.isEmpty()); + assertTrue("Classes subclassing ElasticsearchException must be registered \n" + notRegistered, notRegistered.isEmpty()); assertTrue(registered.removeAll(ElasticsearchException.getRegisteredKeys())); // check assertEquals(registered.toString(), 0, registered.size()); } @@ -344,7 +335,7 @@ public void testInvalidIndexTemplateException() throws IOException { assertEquals(ex.name(), "foo"); ex = serialize(new InvalidIndexTemplateException(null, "bar")); assertEquals(ex.getMessage(), "index_template [null] invalid, cause [bar]"); - assertEquals(ex.name(), null); + assertNull(ex.name()); } public void testActionTransportException() throws IOException { @@ -353,17 +344,12 @@ public void testActionTransportException() throws IOException { assertEquals("[name?][" + transportAddress + "][ACTION BABY!] message?", ex.getMessage()); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102868") public void testSearchContextMissingException() throws IOException { ShardSearchContextId contextId = new ShardSearchContextId(UUIDs.randomBase64UUID(), randomLong()); - TransportVersion version = TransportVersionUtils.randomVersion(random()); + TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random()); SearchContextMissingException ex = serialize(new SearchContextMissingException(contextId), version); assertThat(ex.contextId().getId(), equalTo(contextId.getId())); - if (version.onOrAfter(TransportVersions.V_7_7_0)) { - assertThat(ex.contextId().getSessionId(), equalTo(contextId.getSessionId())); - } else { - assertThat(ex.contextId().getSessionId(), equalTo("")); - } + assertThat(ex.contextId().getSessionId(), equalTo(contextId.getSessionId())); } public void testCircuitBreakingException() throws IOException { @@ -422,7 +408,7 @@ public void testConnectTransportException() throws IOException { } public void testSearchPhaseExecutionException() throws IOException { - ShardSearchFailure[] empty = new ShardSearchFailure[0]; + ShardSearchFailure[] empty = ShardSearchFailure.EMPTY_ARRAY; SearchPhaseExecutionException ex = serialize(new SearchPhaseExecutionException("boom", "baam", new NullPointerException(), empty)); assertEquals("boom", ex.getPhaseName()); assertEquals("baam", ex.getMessage()); From bba08fc97c2c7b783263b5cd6de2e75a8bf42871 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20FOUCRET?= Date: Mon, 4 Dec 2023 15:31:02 +0100 Subject: [PATCH 2/5] Renaming inference rescorer feature flag to learn to rank. (#102883) --- .../org/elasticsearch/test/cluster/FeatureFlag.java | 2 +- x-pack/plugin/ml/qa/basic-multi-node/build.gradle | 2 +- x-pack/plugin/ml/qa/ml-with-security/build.gradle | 4 ++-- .../org/elasticsearch/xpack/ml/MachineLearning.java | 9 ++++----- ...rerFeature.java => LearnToRankRescorerFeature.java} | 10 +++++----- .../org/elasticsearch/xpack/test/rest/XPackRestIT.java | 2 +- 6 files changed, 14 insertions(+), 15 deletions(-) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ltr/{InferenceRescorerFeature.java => LearnToRankRescorerFeature.java} (61%) diff --git a/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java b/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java index b83cc7bba06e5..ff7195f9f5f37 100644 --- a/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java +++ b/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java @@ -16,7 +16,7 @@ */ public enum FeatureFlag { TIME_SERIES_MODE("es.index_mode_feature_flag_registered=true", Version.fromString("8.0.0"), null), - INFERENCE_RESCORER("es.inference_rescorer_feature_flag_enabled=true", Version.fromString("8.10.0"), null), + LEARN_TO_RANK("es.learn_to_rank_feature_flag_enabled=true", Version.fromString("8.10.0"), null), FAILURE_STORE_ENABLED("es.failure_store_feature_flag_enabled=true", Version.fromString("8.12.0"), null); public final String systemProperty; diff --git a/x-pack/plugin/ml/qa/basic-multi-node/build.gradle b/x-pack/plugin/ml/qa/basic-multi-node/build.gradle index fca019a6fc689..bf6ab9ed7d77e 100644 --- a/x-pack/plugin/ml/qa/basic-multi-node/build.gradle +++ b/x-pack/plugin/ml/qa/basic-multi-node/build.gradle @@ -17,7 +17,7 @@ testClusters.configureEach { setting 'xpack.license.self_generated.type', 'trial' setting 'indices.lifecycle.history_index_enabled', 'false' setting 'slm.history_index_enabled', 'false' - requiresFeature 'es.inference_rescorer_feature_flag_enabled', Version.fromString("8.10.0") + requiresFeature 'es.learn_to_rank_feature_flag_enabled', Version.fromString("8.10.0") } if (BuildParams.inFipsJvm){ diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index b28e6bec462b9..b8b706353d624 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -181,7 +181,7 @@ tasks.named("yamlRestTest").configure { 'ml/inference_crud/Test put nlp model config with vocabulary set', 'ml/inference_crud/Test put model model aliases with nlp model', 'ml/inference_processor/Test create processor with missing mandatory fields', - 'ml/inference_rescore/Test rescore with missing model', + 'ml/learn_to_rank_rescorer/Test rescore with missing model', 'ml/inference_stats_crud/Test get stats given missing trained model', 'ml/inference_stats_crud/Test get stats given expression without matches and allow_no_match is false', 'ml/jobs_crud/Test cannot create job with model snapshot id set', @@ -258,5 +258,5 @@ testClusters.configureEach { user username: "no_ml", password: "x-pack-test-password", role: "minimal" setting 'xpack.license.self_generated.type', 'trial' setting 'xpack.security.enabled', 'true' - requiresFeature 'es.inference_rescorer_feature_flag_enabled', Version.fromString("8.10.0") + requiresFeature 'es.learn_to_rank_feature_flag_enabled', Version.fromString("8.10.0") } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index db23e7796f862..d0f7302105768 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -324,8 +324,8 @@ import org.elasticsearch.xpack.ml.inference.deployment.DeploymentManager; import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService; -import org.elasticsearch.xpack.ml.inference.ltr.InferenceRescorerFeature; import org.elasticsearch.xpack.ml.inference.ltr.LearnToRankRescorerBuilder; +import org.elasticsearch.xpack.ml.inference.ltr.LearnToRankRescorerFeature; import org.elasticsearch.xpack.ml.inference.ltr.LearnToRankService; import org.elasticsearch.xpack.ml.inference.modelsize.MlModelSizeNamedXContentProvider; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; @@ -886,8 +886,7 @@ private static void reportClashingNodeAttribute(String attrName) { @Override public List> getRescorers() { - if (enabled && InferenceRescorerFeature.isEnabled()) { - // Inference rescorer requires access to the model loading service + if (enabled && LearnToRankRescorerFeature.isEnabled()) { return List.of( new RescorerSpec<>( LearnToRankRescorerBuilder.NAME, @@ -1798,7 +1797,7 @@ public List getNamedXContent() { ); namedXContent.addAll(new CorrelationNamedContentProvider().getNamedXContentParsers()); // LTR Combine with Inference named content provider when feature flag is removed - if (InferenceRescorerFeature.isEnabled()) { + if (LearnToRankRescorerFeature.isEnabled()) { namedXContent.addAll(new MlLTRNamedXContentProvider().getNamedXContentParsers()); } return namedXContent; @@ -1886,7 +1885,7 @@ public List getNamedWriteables() { namedWriteables.addAll(new CorrelationNamedContentProvider().getNamedWriteables()); namedWriteables.addAll(new ChangePointNamedContentProvider().getNamedWriteables()); // LTR Combine with Inference named content provider when feature flag is removed - if (InferenceRescorerFeature.isEnabled()) { + if (LearnToRankRescorerFeature.isEnabled()) { namedWriteables.addAll(new MlLTRNamedXContentProvider().getNamedWriteables()); } return namedWriteables; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ltr/InferenceRescorerFeature.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ltr/LearnToRankRescorerFeature.java similarity index 61% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ltr/InferenceRescorerFeature.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ltr/LearnToRankRescorerFeature.java index 8a26714c7c06b..18b2c6fe5ff3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ltr/InferenceRescorerFeature.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ltr/LearnToRankRescorerFeature.java @@ -10,19 +10,19 @@ import org.elasticsearch.common.util.FeatureFlag; /** - * Inference rescorer feature flag. When the feature is complete, this flag will be removed. + * Learn to rank feature flag. When the feature is complete, this flag will be removed. * * Upon removal, ensure transport serialization is all corrected for future BWC. * * See {@link LearnToRankRescorerBuilder} */ -public class InferenceRescorerFeature { +public class LearnToRankRescorerFeature { - private InferenceRescorerFeature() {} + private LearnToRankRescorerFeature() {} - private static final FeatureFlag INFERENCE_RESCORE_FEATURE_FLAG = new FeatureFlag("inference_rescorer"); + private static final FeatureFlag LEARN_TO_RANK = new FeatureFlag("learn_to_rank"); public static boolean isEnabled() { - return INFERENCE_RESCORE_FEATURE_FLAG.isEnabled(); + return LEARN_TO_RANK.isEnabled(); } } diff --git a/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index a0e0fd621ba46..3fd8e952d626e 100644 --- a/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -43,7 +43,7 @@ public class XPackRestIT extends AbstractXPackRestTest { .setting("xpack.searchable.snapshot.shared_cache.region_size", "256KB") .user("x_pack_rest_user", "x-pack-test-password") .feature(FeatureFlag.TIME_SERIES_MODE) - .feature(FeatureFlag.INFERENCE_RESCORER) + .feature(FeatureFlag.LEARN_TO_RANK) .configFile("testnode.pem", Resource.fromClasspath("org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.pem")) .configFile("testnode.crt", Resource.fromClasspath("org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.crt")) .configFile("service_tokens", Resource.fromClasspath("service_tokens")) From 3493ce4ebe75d1c44bd0eb01cf68ca568bae8674 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Mon, 4 Dec 2023 15:31:42 +0100 Subject: [PATCH 3/5] [Connector API] Implement update error action (#102841) --- .../api/connector.update_error.json | 39 ++++ .../entsearch/335_connector_update_error.yml | 60 ++++++ .../xpack/application/EnterpriseSearch.java | 5 + .../application/connector/Connector.java | 52 ++++- .../connector/ConnectorIndexService.java | 31 +++ .../RestUpdateConnectorErrorAction.java | 45 +++++ .../TransportUpdateConnectorErrorAction.java | 52 +++++ .../action/UpdateConnectorErrorAction.java | 186 ++++++++++++++++++ .../connector/ConnectorIndexServiceTests.java | 43 ++++ ...ErrorActionRequestBWCSerializingTests.java | 50 +++++ ...rrorActionResponseBWCSerializingTests.java | 42 ++++ .../xpack/security/operator/Constants.java | 1 + 12 files changed, 600 insertions(+), 6 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_error.json create mode 100644 x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/335_connector_update_error.yml create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestUpdateConnectorErrorAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportUpdateConnectorErrorAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorAction.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionRequestBWCSerializingTests.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionResponseBWCSerializingTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_error.json b/rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_error.json new file mode 100644 index 0000000000000..5d82a3729b501 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_error.json @@ -0,0 +1,39 @@ +{ + "connector.update_error": { + "documentation": { + "url": "https://www.elastic.co/guide/en/enterprise-search/current/connectors.html", + "description": "Updates the error field in the connector document." + }, + "stability": "experimental", + "visibility": "feature_flag", + "feature_flag": "es.connector_api_feature_flag_enabled", + "headers": { + "accept": [ + "application/json" + ], + "content_type": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_connector/{connector_id}/_error", + "methods": [ + "PUT" + ], + "parts": { + "connector_id": { + "type": "string", + "description": "The unique identifier of the connector to be updated." + } + } + } + ] + }, + "body": { + "description": "An object containing the connector's error.", + "required": true + } + } +} diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/335_connector_update_error.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/335_connector_update_error.yml new file mode 100644 index 0000000000000..70021e3899525 --- /dev/null +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/335_connector_update_error.yml @@ -0,0 +1,60 @@ +setup: + - skip: + version: " - 8.11.99" + reason: Introduced in 8.12.0 + + - do: + connector.put: + connector_id: test-connector + body: + index_name: search-1-test + name: my-connector + language: pl + is_native: false + service_type: super-connector + +--- +"Update Connector Error": + - do: + connector.update_error: + connector_id: test-connector + body: + error: "some error" + + + - match: { result: updated } + + - do: + connector.get: + connector_id: test-connector + + - match: { error: "some error" } + +--- +"Update Connector Error - 404 when connector doesn't exist": + - do: + catch: "missing" + connector.update_error: + connector_id: test-non-existent-connector + body: + error: "some error" + +--- +"Update Connector Error - 400 status code when connector_id is empty": + - do: + catch: "bad_request" + connector.update_error: + connector_id: "" + body: + error: "some error" + +--- +"Update Connector Error - 400 status code when payload is not string": + - do: + catch: "bad_request" + connector.update_error: + connector_id: test-connector + body: + error: + field_1: test + field_2: something diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java index 2a53a46760868..09b86988ffe81 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java @@ -50,6 +50,7 @@ import org.elasticsearch.xpack.application.connector.action.RestGetConnectorAction; import org.elasticsearch.xpack.application.connector.action.RestListConnectorAction; import org.elasticsearch.xpack.application.connector.action.RestPutConnectorAction; +import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorFilteringAction; import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorLastSeenAction; import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorLastSyncStatsAction; @@ -59,11 +60,13 @@ import org.elasticsearch.xpack.application.connector.action.TransportGetConnectorAction; import org.elasticsearch.xpack.application.connector.action.TransportListConnectorAction; import org.elasticsearch.xpack.application.connector.action.TransportPutConnectorAction; +import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorFilteringAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorLastSeenAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorLastSyncStatsAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorPipelineAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorSchedulingAction; +import org.elasticsearch.xpack.application.connector.action.UpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSeenAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSyncStatsAction; @@ -201,6 +204,7 @@ protected XPackLicenseState getLicenseState() { new ActionHandler<>(GetConnectorAction.INSTANCE, TransportGetConnectorAction.class), new ActionHandler<>(ListConnectorAction.INSTANCE, TransportListConnectorAction.class), new ActionHandler<>(PutConnectorAction.INSTANCE, TransportPutConnectorAction.class), + new ActionHandler<>(UpdateConnectorErrorAction.INSTANCE, TransportUpdateConnectorErrorAction.class), new ActionHandler<>(UpdateConnectorFilteringAction.INSTANCE, TransportUpdateConnectorFilteringAction.class), new ActionHandler<>(UpdateConnectorLastSeenAction.INSTANCE, TransportUpdateConnectorLastSeenAction.class), new ActionHandler<>(UpdateConnectorLastSyncStatsAction.INSTANCE, TransportUpdateConnectorLastSyncStatsAction.class), @@ -267,6 +271,7 @@ public List getRestHandlers( new RestGetConnectorAction(), new RestListConnectorAction(), new RestPutConnectorAction(), + new RestUpdateConnectorErrorAction(), new RestUpdateConnectorFilteringAction(), new RestUpdateConnectorLastSeenAction(), new RestUpdateConnectorLastSyncStatsAction(), diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java index 45b906d815aee..d68cc9f7227bc 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java @@ -200,14 +200,14 @@ public Connector(StreamInput in) throws IOException { public static final ParseField CONFIGURATION_FIELD = new ParseField("configuration"); static final ParseField CUSTOM_SCHEDULING_FIELD = new ParseField("custom_scheduling"); static final ParseField DESCRIPTION_FIELD = new ParseField("description"); - static final ParseField ERROR_FIELD = new ParseField("error"); + public static final ParseField ERROR_FIELD = new ParseField("error"); static final ParseField FEATURES_FIELD = new ParseField("features"); public static final ParseField FILTERING_FIELD = new ParseField("filtering"); public static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); static final ParseField IS_NATIVE_FIELD = new ParseField("is_native"); public static final ParseField LANGUAGE_FIELD = new ParseField("language"); public static final ParseField LAST_SEEN_FIELD = new ParseField("last_seen"); - static final ParseField NAME_FIELD = new ParseField("name"); + public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField PIPELINE_FIELD = new ParseField("pipeline"); public static final ParseField SCHEDULING_FIELD = new ParseField("scheduling"); public static final ParseField SERVICE_TYPE_FIELD = new ParseField("service_type"); @@ -457,8 +457,28 @@ public String getConnectorId() { return connectorId; } - public ConnectorScheduling getScheduling() { - return scheduling; + public String getApiKeyId() { + return apiKeyId; + } + + public Map getConfiguration() { + return configuration; + } + + public Map getCustomScheduling() { + return customScheduling; + } + + public String getDescription() { + return description; + } + + public String getError() { + return error; + } + + public ConnectorFeatures getFeatures() { + return features; } public List getFiltering() { @@ -469,20 +489,40 @@ public String getIndexName() { return indexName; } + public boolean isNative() { + return isNative; + } + public String getLanguage() { return language; } + public String getName() { + return name; + } + public ConnectorIngestPipeline getPipeline() { return pipeline; } + public ConnectorScheduling getScheduling() { + return scheduling; + } + public String getServiceType() { return serviceType; } - public Map getConfiguration() { - return configuration; + public ConnectorStatus getStatus() { + return status; + } + + public Object getSyncCursor() { + return syncCursor; + } + + public boolean isSyncNow() { + return syncNow; } public ConnectorSyncInfo getSyncInfo() { diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java index d99ad28dc3970..744a4d2028990 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java @@ -31,6 +31,7 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.application.connector.action.UpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSeenAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSyncStatsAction; @@ -323,6 +324,36 @@ public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request re } } + /** + * Updates the error property of a {@link Connector}. + * + * @param request The request for updating the connector's error. + * @param listener The listener for handling responses, including successful updates or errors. + */ + public void updateConnectorError(UpdateConnectorErrorAction.Request request, ActionListener listener) { + try { + String connectorId = request.getConnectorId(); + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( + new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) + .id(connectorId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + clientWithOrigin.update( + updateRequest, + new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(connectorId)); + return; + } + l.onResponse(updateResponse); + }) + ); + } catch (Exception e) { + listener.onFailure(e); + } + } + private static ConnectorIndexService.ConnectorResult mapSearchResponseToConnectorList(SearchResponse response) { final List connectorResults = Arrays.stream(response.getHits().getHits()) .map(ConnectorIndexService::hitToConnector) diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestUpdateConnectorErrorAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestUpdateConnectorErrorAction.java new file mode 100644 index 0000000000000..ea8bd1b4ee50f --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestUpdateConnectorErrorAction.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.action; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.application.EnterpriseSearch; + +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; + +public class RestUpdateConnectorErrorAction extends BaseRestHandler { + + @Override + public String getName() { + return "connector_update_error_action"; + } + + @Override + public List routes() { + return List.of(new Route(PUT, "/" + EnterpriseSearch.CONNECTOR_API_ENDPOINT + "/{connector_id}/_error")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { + UpdateConnectorErrorAction.Request request = UpdateConnectorErrorAction.Request.fromXContentBytes( + restRequest.param("connector_id"), + restRequest.content(), + restRequest.getXContentType() + ); + return channel -> client.execute( + UpdateConnectorErrorAction.INSTANCE, + request, + new RestToXContentListener<>(channel, UpdateConnectorErrorAction.Response::status, r -> null) + ); + } +} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportUpdateConnectorErrorAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportUpdateConnectorErrorAction.java new file mode 100644 index 0000000000000..629fd14861cf6 --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportUpdateConnectorErrorAction.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.application.connector.ConnectorIndexService; + +public class TransportUpdateConnectorErrorAction extends HandledTransportAction< + UpdateConnectorErrorAction.Request, + UpdateConnectorErrorAction.Response> { + + protected final ConnectorIndexService connectorIndexService; + + @Inject + public TransportUpdateConnectorErrorAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client + ) { + super( + UpdateConnectorErrorAction.NAME, + transportService, + actionFilters, + UpdateConnectorErrorAction.Request::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.connectorIndexService = new ConnectorIndexService(client); + } + + @Override + protected void doExecute( + Task task, + UpdateConnectorErrorAction.Request request, + ActionListener listener + ) { + connectorIndexService.updateConnectorError(request, listener.map(r -> new UpdateConnectorErrorAction.Response(r.getResult()))); + } +} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorAction.java new file mode 100644 index 0000000000000..c9e48dac08cd5 --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorAction.java @@ -0,0 +1,186 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.action; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.application.connector.Connector; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class UpdateConnectorErrorAction extends ActionType { + + public static final UpdateConnectorErrorAction INSTANCE = new UpdateConnectorErrorAction(); + public static final String NAME = "cluster:admin/xpack/connector/update_error"; + + public UpdateConnectorErrorAction() { + super(NAME, UpdateConnectorErrorAction.Response::new); + } + + public static class Request extends ActionRequest implements ToXContentObject { + + private final String connectorId; + + @Nullable + private final String error; + + public Request(String connectorId, String error) { + this.connectorId = connectorId; + this.error = error; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.connectorId = in.readString(); + this.error = in.readOptionalString(); + } + + public String getConnectorId() { + return connectorId; + } + + public String getError() { + return error; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + + if (Strings.isNullOrEmpty(connectorId)) { + validationException = addValidationError("[connector_id] cannot be null or empty.", validationException); + } + + return validationException; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "connector_update_error_request", + false, + ((args, connectorId) -> new UpdateConnectorErrorAction.Request(connectorId, (String) args[0])) + ); + + static { + PARSER.declareStringOrNull(optionalConstructorArg(), Connector.ERROR_FIELD); + } + + public static UpdateConnectorErrorAction.Request fromXContentBytes( + String connectorId, + BytesReference source, + XContentType xContentType + ) { + try (XContentParser parser = XContentHelper.createParser(XContentParserConfiguration.EMPTY, source, xContentType)) { + return UpdateConnectorErrorAction.Request.fromXContent(parser, connectorId); + } catch (IOException e) { + throw new ElasticsearchParseException("Failed to parse: " + source.utf8ToString(), e); + } + } + + public static UpdateConnectorErrorAction.Request fromXContent(XContentParser parser, String connectorId) throws IOException { + return PARSER.parse(parser, connectorId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(Connector.ERROR_FIELD.getPreferredName(), error); + } + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(connectorId); + out.writeOptionalString(error); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(connectorId, request.connectorId) && Objects.equals(error, request.error); + } + + @Override + public int hashCode() { + return Objects.hash(connectorId, error); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + final DocWriteResponse.Result result; + + public Response(StreamInput in) throws IOException { + super(in); + result = DocWriteResponse.Result.readFrom(in); + } + + public Response(DocWriteResponse.Result result) { + this.result = result; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + this.result.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("result", this.result.getLowercase()); + builder.endObject(); + return builder; + } + + public RestStatus status() { + return switch (result) { + case NOT_FOUND -> RestStatus.NOT_FOUND; + default -> RestStatus.OK; + }; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response that = (Response) o; + return Objects.equals(result, that.result); + } + + @Override + public int hashCode() { + return Objects.hash(result); + } + } +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorIndexServiceTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorIndexServiceTests.java index e155cdfefbfa1..0f2c6c3fa3e8e 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorIndexServiceTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorIndexServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.application.connector.action.UpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSeenAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSyncStatsAction; @@ -172,6 +173,23 @@ public void testUpdateConnectorScheduling() throws Exception { assertThat(updatedScheduling, equalTo(indexedConnector.getScheduling())); } + public void testUpdateConnectorError() throws Exception { + Connector connector = ConnectorTestUtils.getRandomConnector(); + DocWriteResponse resp = awaitPutConnector(connector); + assertThat(resp.status(), anyOf(equalTo(RestStatus.CREATED), equalTo(RestStatus.OK))); + + UpdateConnectorErrorAction.Request updateErrorRequest = new UpdateConnectorErrorAction.Request( + connector.getConnectorId(), + randomAlphaOfLengthBetween(5, 15) + ); + + DocWriteResponse updateResponse = awaitUpdateConnectorError(updateErrorRequest); + assertThat(updateResponse.status(), equalTo(RestStatus.OK)); + + Connector indexedConnector = awaitGetConnector(connector.getConnectorId()); + assertThat(updateErrorRequest.getError(), equalTo(indexedConnector.getError())); + } + private DeleteResponse awaitDeleteConnector(String connectorId) throws Exception { CountDownLatch latch = new CountDownLatch(1); final AtomicReference resp = new AtomicReference<>(null); @@ -399,4 +417,29 @@ public void onFailure(Exception e) { assertNotNull("Received null response from update scheduling request", resp.get()); return resp.get(); } + + private UpdateResponse awaitUpdateConnectorError(UpdateConnectorErrorAction.Request updatedError) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final AtomicReference resp = new AtomicReference<>(null); + final AtomicReference exc = new AtomicReference<>(null); + connectorIndexService.updateConnectorError(updatedError, new ActionListener<>() { + @Override + public void onResponse(UpdateResponse indexResponse) { + resp.set(indexResponse); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exc.set(e); + latch.countDown(); + } + }); + assertTrue("Timeout waiting for update error request", latch.await(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + if (exc.get() != null) { + throw exc.get(); + } + assertNotNull("Received null response from update error request", resp.get()); + return resp.get(); + } } diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionRequestBWCSerializingTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionRequestBWCSerializingTests.java new file mode 100644 index 0000000000000..94092cee61b40 --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionRequestBWCSerializingTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.action; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase; + +import java.io.IOException; + +public class UpdateConnectorErrorActionRequestBWCSerializingTests extends AbstractBWCSerializationTestCase< + UpdateConnectorErrorAction.Request> { + + private String connectorId; + + @Override + protected Writeable.Reader instanceReader() { + return UpdateConnectorErrorAction.Request::new; + } + + @Override + protected UpdateConnectorErrorAction.Request createTestInstance() { + this.connectorId = randomUUID(); + return new UpdateConnectorErrorAction.Request(connectorId, randomAlphaOfLengthBetween(5, 15)); + } + + @Override + protected UpdateConnectorErrorAction.Request mutateInstance(UpdateConnectorErrorAction.Request instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected UpdateConnectorErrorAction.Request doParseInstance(XContentParser parser) throws IOException { + return UpdateConnectorErrorAction.Request.fromXContent(parser, this.connectorId); + } + + @Override + protected UpdateConnectorErrorAction.Request mutateInstanceForVersion( + UpdateConnectorErrorAction.Request instance, + TransportVersion version + ) { + return instance; + } +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionResponseBWCSerializingTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionResponseBWCSerializingTests.java new file mode 100644 index 0000000000000..a39fcac3d2f04 --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorErrorActionResponseBWCSerializingTests.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.action; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; + +import java.io.IOException; + +public class UpdateConnectorErrorActionResponseBWCSerializingTests extends AbstractBWCWireSerializationTestCase< + UpdateConnectorErrorAction.Response> { + + @Override + protected Writeable.Reader instanceReader() { + return UpdateConnectorErrorAction.Response::new; + } + + @Override + protected UpdateConnectorErrorAction.Response createTestInstance() { + return new UpdateConnectorErrorAction.Response(randomFrom(DocWriteResponse.Result.values())); + } + + @Override + protected UpdateConnectorErrorAction.Response mutateInstance(UpdateConnectorErrorAction.Response instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected UpdateConnectorErrorAction.Response mutateInstanceForVersion( + UpdateConnectorErrorAction.Response instance, + TransportVersion version + ) { + return instance; + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 86640e2e1a784..ffc894af423cf 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -127,6 +127,7 @@ public class Constants { "cluster:admin/xpack/connector/get", "cluster:admin/xpack/connector/list", "cluster:admin/xpack/connector/put", + "cluster:admin/xpack/connector/update_error", "cluster:admin/xpack/connector/update_filtering", "cluster:admin/xpack/connector/update_last_seen", "cluster:admin/xpack/connector/update_last_sync_stats", From fd1e26a4bb3ba4a466fa614f200aa5a57b32b1d4 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 4 Dec 2023 15:37:52 +0100 Subject: [PATCH 4/5] [Enterprise Search] Add GET connector sync job by id (#102908) Add GET connector sync job by id. --- .../api/connector_sync_job.get.json | 32 +++ .../entsearch/440_connector_sync_job_get.yml | 36 +++ .../xpack/application/EnterpriseSearch.java | 5 + .../connector/syncjob/ConnectorSyncJob.java | 271 ++++++++++++++++-- .../syncjob/ConnectorSyncJobIndexService.java | 36 ++- .../action/GetConnectorSyncJobAction.java | 153 ++++++++++ .../action/RestGetConnectorSyncJobAction.java | 42 +++ .../TransportGetConnectorSyncJobAction.java | 55 ++++ .../ConnectorSyncJobIndexServiceTests.java | 112 +++++--- .../syncjob/ConnectorSyncJobTestUtils.java | 9 + .../syncjob/ConnectorSyncJobTests.java | 207 +++++++++++++ ...ncJobActionRequestBWCSerializingTests.java | 47 +++ ...cJobActionResponseBWCSerializingTests.java | 50 ++++ .../GetConnectorSyncJobActionTests.java | 36 +++ ...ansportGetConnectorSyncJobActionTests.java | 75 +++++ .../xpack/security/operator/Constants.java | 1 + 16 files changed, 1103 insertions(+), 64 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.get.json create mode 100644 x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/440_connector_sync_job_get.yml create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestGetConnectorSyncJobAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobAction.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionRequestBWCSerializingTests.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionResponseBWCSerializingTests.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionTests.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobActionTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.get.json b/rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.get.json new file mode 100644 index 0000000000000..6eb461ad62128 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.get.json @@ -0,0 +1,32 @@ +{ + "connector_sync_job.get": { + "documentation": { + "url": "https://www.elastic.co/guide/en/enterprise-search/current/connectors.html", + "description": "Returns the details about a connector sync job." + }, + "stability": "experimental", + "visibility": "feature_flag", + "feature_flag": "es.connector_api_feature_flag_enabled", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_connector/_sync_job/{connector_sync_job_id}", + "methods": [ + "GET" + ], + "parts": { + "connector_sync_job_id": { + "type": "string", + "description": "The unique identifier of the connector sync job to be returned." + } + } + } + ] + } + } +} diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/440_connector_sync_job_get.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/440_connector_sync_job_get.yml new file mode 100644 index 0000000000000..ade0736436e87 --- /dev/null +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/440_connector_sync_job_get.yml @@ -0,0 +1,36 @@ +setup: + - skip: + version: " - 8.11.99" + reason: Introduced in 8.12.0 + - do: + connector.put: + connector_id: test-connector + body: + index_name: search-test + name: my-connector + language: de + is_native: false + service_type: super-connector + +--- +'Get connector sync job': + - do: + connector_sync_job.post: + body: + id: test-connector + job_type: access_control + trigger_method: scheduled + - set: { id: id } + - match: { id: $id } + - do: + connector_sync_job.get: + connector_sync_job_id: $id + - match: { job_type: access_control } + - match: { trigger_method: scheduled } + +--- +'Get connector sync job - Missing sync job id': + - do: + connector_sync_job.get: + connector_sync_job_id: non-existing-sync-job-id + catch: missing diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java index 09b86988ffe81..f93177666f3d8 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java @@ -75,14 +75,17 @@ import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.DeleteConnectorSyncJobAction; +import org.elasticsearch.xpack.application.connector.syncjob.action.GetConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.RestCancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.RestCheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.RestDeleteConnectorSyncJobAction; +import org.elasticsearch.xpack.application.connector.syncjob.action.RestGetConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.RestPostConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.TransportCancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.TransportCheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.TransportDeleteConnectorSyncJobAction; +import org.elasticsearch.xpack.application.connector.syncjob.action.TransportGetConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.TransportPostConnectorSyncJobAction; import org.elasticsearch.xpack.application.rules.QueryRulesConfig; import org.elasticsearch.xpack.application.rules.QueryRulesIndexService; @@ -212,6 +215,7 @@ protected XPackLicenseState getLicenseState() { new ActionHandler<>(UpdateConnectorSchedulingAction.INSTANCE, TransportUpdateConnectorSchedulingAction.class), // SyncJob API + new ActionHandler<>(GetConnectorSyncJobAction.INSTANCE, TransportGetConnectorSyncJobAction.class), new ActionHandler<>(PostConnectorSyncJobAction.INSTANCE, TransportPostConnectorSyncJobAction.class), new ActionHandler<>(DeleteConnectorSyncJobAction.INSTANCE, TransportDeleteConnectorSyncJobAction.class), new ActionHandler<>(CheckInConnectorSyncJobAction.INSTANCE, TransportCheckInConnectorSyncJobAction.class), @@ -279,6 +283,7 @@ public List getRestHandlers( new RestUpdateConnectorSchedulingAction(), // SyncJob API + new RestGetConnectorSyncJobAction(), new RestPostConnectorSyncJobAction(), new RestDeleteConnectorSyncJobAction(), new RestCancelConnectorSyncJobAction(), diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java index 6c0e9635d986d..2a302ddb68199 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java @@ -7,22 +7,36 @@ package org.elasticsearch.xpack.application.connector.syncjob; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.application.connector.Connector; +import org.elasticsearch.xpack.application.connector.ConnectorFiltering; +import org.elasticsearch.xpack.application.connector.ConnectorIngestPipeline; import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus; import java.io.IOException; import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + /** * Represents a sync job in the Elasticsearch ecosystem. Sync jobs refer to a unit of work, which syncs data from a 3rd party * data source into an Elasticsearch index using the Connectors service. A ConnectorSyncJob always refers @@ -60,7 +74,7 @@ public class ConnectorSyncJob implements Writeable, ToXContentObject { static final ParseField CREATED_AT_FIELD = new ParseField("created_at"); - static final ParseField DELETED_DOCUMENT_COUNT = new ParseField("deleted_document_count"); + static final ParseField DELETED_DOCUMENT_COUNT_FIELD = new ParseField("deleted_document_count"); static final ParseField ERROR_FIELD = new ParseField("error"); @@ -92,6 +106,7 @@ public class ConnectorSyncJob implements Writeable, ToXContentObject { static final ConnectorSyncJobTriggerMethod DEFAULT_TRIGGER_METHOD = ConnectorSyncJobTriggerMethod.ON_DEMAND; + @Nullable private final Instant cancelationRequestedAt; @Nullable @@ -127,7 +142,6 @@ public class ConnectorSyncJob implements Writeable, ToXContentObject { private final ConnectorSyncStatus status; - @Nullable private final long totalDocumentCount; private final ConnectorSyncJobTriggerMethod triggerMethod; @@ -217,44 +231,269 @@ public ConnectorSyncJob(StreamInput in) throws IOException { this.workerHostname = in.readOptionalString(); } + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "connector_sync_job", + true, + (args) -> { + int i = 0; + return new Builder().setCancellationRequestedAt((Instant) args[i++]) + .setCanceledAt((Instant) args[i++]) + .setCompletedAt((Instant) args[i++]) + .setConnector((Connector) args[i++]) + .setCreatedAt((Instant) args[i++]) + .setDeletedDocumentCount((Long) args[i++]) + .setError((String) args[i++]) + .setId((String) args[i++]) + .setIndexedDocumentCount((Long) args[i++]) + .setIndexedDocumentVolume((Long) args[i++]) + .setJobType((ConnectorSyncJobType) args[i++]) + .setLastSeen((Instant) args[i++]) + .setMetadata((Map) args[i++]) + .setStartedAt((Instant) args[i++]) + .setStatus((ConnectorSyncStatus) args[i++]) + .setTotalDocumentCount((Long) args[i++]) + .setTriggerMethod((ConnectorSyncJobTriggerMethod) args[i++]) + .setWorkerHostname((String) args[i]) + .build(); + } + ); + + static { + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> Instant.parse(p.text()), + CANCELATION_REQUESTED_AT_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); + PARSER.declareField(optionalConstructorArg(), (p, c) -> Instant.parse(p.text()), CANCELED_AT_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField(optionalConstructorArg(), (p, c) -> Instant.parse(p.text()), COMPLETED_AT_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField( + constructorArg(), + (p, c) -> ConnectorSyncJob.syncJobConnectorFromXContent(p), + CONNECTOR_FIELD, + ObjectParser.ValueType.OBJECT + ); + PARSER.declareField(constructorArg(), (p, c) -> Instant.parse(p.text()), CREATED_AT_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareLong(constructorArg(), DELETED_DOCUMENT_COUNT_FIELD); + PARSER.declareStringOrNull(optionalConstructorArg(), ERROR_FIELD); + PARSER.declareString(constructorArg(), ID_FIELD); + PARSER.declareLong(constructorArg(), INDEXED_DOCUMENT_COUNT_FIELD); + PARSER.declareLong(constructorArg(), INDEXED_DOCUMENT_VOLUME_FIELD); + PARSER.declareField( + constructorArg(), + (p, c) -> ConnectorSyncJobType.fromString(p.text()), + JOB_TYPE_FIELD, + ObjectParser.ValueType.STRING + ); + PARSER.declareField(constructorArg(), (p, c) -> Instant.parse(p.text()), LAST_SEEN_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField(constructorArg(), (p, c) -> p.map(), METADATA_FIELD, ObjectParser.ValueType.OBJECT); + PARSER.declareField(optionalConstructorArg(), (p, c) -> Instant.parse(p.text()), STARTED_AT_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField( + constructorArg(), + (p, c) -> ConnectorSyncStatus.fromString(p.text()), + STATUS_FIELD, + ObjectParser.ValueType.STRING + ); + PARSER.declareLong(constructorArg(), TOTAL_DOCUMENT_COUNT_FIELD); + PARSER.declareField( + constructorArg(), + (p, c) -> ConnectorSyncJobTriggerMethod.fromString(p.text()), + TRIGGER_METHOD_FIELD, + ObjectParser.ValueType.STRING + ); + PARSER.declareString(optionalConstructorArg(), WORKER_HOSTNAME_FIELD); + } + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser SYNC_JOB_CONNECTOR_PARSER = new ConstructingObjectParser<>( + "sync_job_connector", + true, + (args) -> { + int i = 0; + return new Connector.Builder().setConnectorId((String) args[i++]) + .setFiltering((List) args[i++]) + .setIndexName((String) args[i++]) + .setLanguage((String) args[i++]) + .setPipeline((ConnectorIngestPipeline) args[i++]) + .setServiceType((String) args[i++]) + .setConfiguration((Map) args[i++]) + .build(); + } + ); + + static { + SYNC_JOB_CONNECTOR_PARSER.declareString(constructorArg(), Connector.ID_FIELD); + SYNC_JOB_CONNECTOR_PARSER.declareObjectArray( + optionalConstructorArg(), + (p, c) -> ConnectorFiltering.fromXContent(p), + Connector.FILTERING_FIELD + ); + SYNC_JOB_CONNECTOR_PARSER.declareString(optionalConstructorArg(), Connector.INDEX_NAME_FIELD); + SYNC_JOB_CONNECTOR_PARSER.declareString(optionalConstructorArg(), Connector.LANGUAGE_FIELD); + SYNC_JOB_CONNECTOR_PARSER.declareField( + optionalConstructorArg(), + (p, c) -> ConnectorIngestPipeline.fromXContent(p), + Connector.PIPELINE_FIELD, + ObjectParser.ValueType.OBJECT + ); + SYNC_JOB_CONNECTOR_PARSER.declareString(optionalConstructorArg(), Connector.SERVICE_TYPE_FIELD); + SYNC_JOB_CONNECTOR_PARSER.declareField( + optionalConstructorArg(), + (parser, context) -> parser.map(), + Connector.CONFIGURATION_FIELD, + ObjectParser.ValueType.OBJECT + ); + } + + public static ConnectorSyncJob fromXContentBytes(BytesReference source, XContentType xContentType) { + try (XContentParser parser = XContentHelper.createParser(XContentParserConfiguration.EMPTY, source, xContentType)) { + return ConnectorSyncJob.fromXContent(parser); + } catch (IOException e) { + throw new ElasticsearchParseException("Failed to parse a connector sync job document.", e); + } + } + + public static ConnectorSyncJob fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public static Connector syncJobConnectorFromXContent(XContentParser parser) throws IOException { + return SYNC_JOB_CONNECTOR_PARSER.parse(parser, null); + } + public String getId() { return id; } + public Instant getCancelationRequestedAt() { + return cancelationRequestedAt; + } + + public Instant getCanceledAt() { + return canceledAt; + } + + public Instant getCompletedAt() { + return completedAt; + } + + public Connector getConnector() { + return connector; + } + + public Instant getCreatedAt() { + return createdAt; + } + + public long getDeletedDocumentCount() { + return deletedDocumentCount; + } + + public String getError() { + return error; + } + + public long getIndexedDocumentCount() { + return indexedDocumentCount; + } + + public long getIndexedDocumentVolume() { + return indexedDocumentVolume; + } + + public ConnectorSyncJobType getJobType() { + return jobType; + } + + public Instant getLastSeen() { + return lastSeen; + } + + public Map getMetadata() { + return metadata; + } + + public Instant getStartedAt() { + return startedAt; + } + + public ConnectorSyncStatus getStatus() { + return status; + } + + public long getTotalDocumentCount() { + return totalDocumentCount; + } + + public ConnectorSyncJobTriggerMethod getTriggerMethod() { + return triggerMethod; + } + + public String getWorkerHostname() { + return workerHostname; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); { - builder.field(CANCELATION_REQUESTED_AT_FIELD.getPreferredName(), cancelationRequestedAt); - builder.field(CANCELED_AT_FIELD.getPreferredName(), canceledAt); - builder.field(COMPLETED_AT_FIELD.getPreferredName(), completedAt); + if (cancelationRequestedAt != null) { + builder.field(CANCELATION_REQUESTED_AT_FIELD.getPreferredName(), cancelationRequestedAt); + } + if (canceledAt != null) { + builder.field(CANCELED_AT_FIELD.getPreferredName(), canceledAt); + } + if (completedAt != null) { + builder.field(COMPLETED_AT_FIELD.getPreferredName(), completedAt); + } builder.startObject(CONNECTOR_FIELD.getPreferredName()); { builder.field(Connector.ID_FIELD.getPreferredName(), connector.getConnectorId()); - builder.field(Connector.FILTERING_FIELD.getPreferredName(), connector.getFiltering()); - builder.field(Connector.INDEX_NAME_FIELD.getPreferredName(), connector.getIndexName()); - builder.field(Connector.LANGUAGE_FIELD.getPreferredName(), connector.getLanguage()); - builder.field(Connector.PIPELINE_FIELD.getPreferredName(), connector.getPipeline()); - builder.field(Connector.SERVICE_TYPE_FIELD.getPreferredName(), connector.getServiceType()); - builder.field(Connector.CONFIGURATION_FIELD.getPreferredName(), connector.getConfiguration()); + if (connector.getFiltering() != null) { + builder.field(Connector.FILTERING_FIELD.getPreferredName(), connector.getFiltering()); + } + if (connector.getIndexName() != null) { + builder.field(Connector.INDEX_NAME_FIELD.getPreferredName(), connector.getIndexName()); + } + if (connector.getLanguage() != null) { + builder.field(Connector.LANGUAGE_FIELD.getPreferredName(), connector.getLanguage()); + } + if (connector.getPipeline() != null) { + builder.field(Connector.PIPELINE_FIELD.getPreferredName(), connector.getPipeline()); + } + if (connector.getServiceType() != null) { + builder.field(Connector.SERVICE_TYPE_FIELD.getPreferredName(), connector.getServiceType()); + } + if (connector.getConfiguration() != null) { + builder.field(Connector.CONFIGURATION_FIELD.getPreferredName(), connector.getConfiguration()); + } } builder.endObject(); builder.field(CREATED_AT_FIELD.getPreferredName(), createdAt); - builder.field(DELETED_DOCUMENT_COUNT.getPreferredName(), deletedDocumentCount); - builder.field(ERROR_FIELD.getPreferredName(), error); + builder.field(DELETED_DOCUMENT_COUNT_FIELD.getPreferredName(), deletedDocumentCount); + if (error != null) { + builder.field(ERROR_FIELD.getPreferredName(), error); + } builder.field(ID_FIELD.getPreferredName(), id); builder.field(INDEXED_DOCUMENT_COUNT_FIELD.getPreferredName(), indexedDocumentCount); builder.field(INDEXED_DOCUMENT_VOLUME_FIELD.getPreferredName(), indexedDocumentVolume); builder.field(JOB_TYPE_FIELD.getPreferredName(), jobType); - builder.field(LAST_SEEN_FIELD.getPreferredName(), lastSeen); + if (lastSeen != null) { + builder.field(LAST_SEEN_FIELD.getPreferredName(), lastSeen); + } builder.field(METADATA_FIELD.getPreferredName(), metadata); - builder.field(STARTED_AT_FIELD.getPreferredName(), startedAt); + if (startedAt != null) { + builder.field(STARTED_AT_FIELD.getPreferredName(), startedAt); + } builder.field(STATUS_FIELD.getPreferredName(), status); builder.field(TOTAL_DOCUMENT_COUNT_FIELD.getPreferredName(), totalDocumentCount); builder.field(TRIGGER_METHOD_FIELD.getPreferredName(), triggerMethod); - builder.field(WORKER_HOSTNAME_FIELD.getPreferredName(), workerHostname); + if (workerHostname != null) { + builder.field(WORKER_HOSTNAME_FIELD.getPreferredName(), workerHostname); + } } builder.endObject(); return builder; diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java index ab593fe99fcee..5e1686dde80f2 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.application.connector.Connector; import org.elasticsearch.xpack.application.connector.ConnectorFiltering; import org.elasticsearch.xpack.application.connector.ConnectorIndexService; @@ -174,6 +175,40 @@ public void checkInConnectorSyncJob(String connectorSyncJobId, ActionListener listener) { + final GetRequest getRequest = new GetRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).id(connectorSyncJobId).realtime(true); + + try { + clientWithOrigin.get( + getRequest, + new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, getResponse) -> { + if (getResponse.isExists() == false) { + l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); + return; + } + + try { + final ConnectorSyncJob syncJob = ConnectorSyncJob.fromXContentBytes( + getResponse.getSourceAsBytesRef(), + XContentType.JSON + ); + l.onResponse(syncJob); + } catch (Exception e) { + listener.onFailure(e); + } + }) + ); + } catch (Exception e) { + listener.onFailure(e); + } + } + /** * Cancels the {@link ConnectorSyncJob} in the underlying index. * Canceling means to set the {@link ConnectorSyncStatus} to "canceling" and not "canceled" as this is an async operation. @@ -211,7 +246,6 @@ public void cancelConnectorSyncJob(String connectorSyncJobId, ActionListener { + + public static final GetConnectorSyncJobAction INSTANCE = new GetConnectorSyncJobAction(); + public static final String NAME = "cluster:admin/xpack/connector/sync_job/get"; + + private GetConnectorSyncJobAction() { + super(NAME, GetConnectorSyncJobAction.Response::new); + } + + public static class Request extends ActionRequest implements ToXContentObject { + private final String connectorSyncJobId; + + private static final ParseField CONNECTOR_ID_FIELD = new ParseField("connector_id"); + + public Request(StreamInput in) throws IOException { + super(in); + this.connectorSyncJobId = in.readString(); + } + + public Request(String connectorSyncJobId) { + this.connectorSyncJobId = connectorSyncJobId; + } + + public String getConnectorSyncJobId() { + return connectorSyncJobId; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + + if (Strings.isNullOrEmpty(connectorSyncJobId)) { + validationException = addValidationError( + ConnectorSyncJobConstants.EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE, + validationException + ); + } + + return validationException; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(connectorSyncJobId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(connectorSyncJobId, request.connectorSyncJobId); + } + + @Override + public int hashCode() { + return Objects.hash(connectorSyncJobId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CONNECTOR_ID_FIELD.getPreferredName(), connectorSyncJobId); + builder.endObject(); + return builder; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "get_connector_sync_job_request", + false, + (args) -> new Request((String) args[0]) + ); + + static { + PARSER.declareString(constructorArg(), CONNECTOR_ID_FIELD); + } + + public static Request parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + private final ConnectorSyncJob connectorSyncJob; + + public Response(ConnectorSyncJob connectorSyncJob) { + this.connectorSyncJob = connectorSyncJob; + } + + public Response(StreamInput in) throws IOException { + super(in); + this.connectorSyncJob = new ConnectorSyncJob(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + connectorSyncJob.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return connectorSyncJob.toXContent(builder, params); + } + + public static GetConnectorSyncJobAction.Response fromXContent(XContentParser parser) throws IOException { + return new GetConnectorSyncJobAction.Response(ConnectorSyncJob.fromXContent(parser)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(connectorSyncJob, response.connectorSyncJob); + } + + @Override + public int hashCode() { + return Objects.hash(connectorSyncJob); + } + } +} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestGetConnectorSyncJobAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestGetConnectorSyncJobAction.java new file mode 100644 index 0000000000000..1f5606810757e --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestGetConnectorSyncJobAction.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.application.EnterpriseSearch; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobConstants.CONNECTOR_SYNC_JOB_ID_PARAM; + +public class RestGetConnectorSyncJobAction extends BaseRestHandler { + @Override + public String getName() { + return "connector_sync_job_get_action"; + } + + @Override + public List routes() { + return List.of( + new Route( + RestRequest.Method.GET, + "/" + EnterpriseSearch.CONNECTOR_SYNC_JOB_API_ENDPOINT + "/{" + CONNECTOR_SYNC_JOB_ID_PARAM + "}" + ) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + GetConnectorSyncJobAction.Request request = new GetConnectorSyncJobAction.Request(restRequest.param(CONNECTOR_SYNC_JOB_ID_PARAM)); + return restChannel -> client.execute(GetConnectorSyncJobAction.INSTANCE, request, new RestToXContentListener<>(restChannel)); + } +} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobAction.java new file mode 100644 index 0000000000000..1024b9953fd09 --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobAction.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService; + +public class TransportGetConnectorSyncJobAction extends HandledTransportAction< + GetConnectorSyncJobAction.Request, + GetConnectorSyncJobAction.Response> { + + protected final ConnectorSyncJobIndexService connectorSyncJobIndexService; + + @Inject + public TransportGetConnectorSyncJobAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client + ) { + super( + GetConnectorSyncJobAction.NAME, + transportService, + actionFilters, + GetConnectorSyncJobAction.Request::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.connectorSyncJobIndexService = new ConnectorSyncJobIndexService(client); + } + + @Override + protected void doExecute( + Task task, + GetConnectorSyncJobAction.Request request, + ActionListener listener + ) { + connectorSyncJobIndexService.getConnectorSyncJob( + request.getConnectorSyncJobId(), + listener.map(GetConnectorSyncJobAction.Response::new) + ); + } +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java index cadc8b761cbe3..8613078e3074e 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java @@ -80,46 +80,21 @@ public void testCreateConnectorSyncJob() throws Exception { PostConnectorSyncJobAction.Request syncJobRequest = ConnectorSyncJobTestUtils.getRandomPostConnectorSyncJobActionRequest( connector.getConnectorId() ); - PostConnectorSyncJobAction.Response response = awaitPutConnectorSyncJob(syncJobRequest); - Map connectorSyncJobSource = getConnectorSyncJobSourceById(response.getId()); - - String id = (String) connectorSyncJobSource.get(ConnectorSyncJob.ID_FIELD.getPreferredName()); - ConnectorSyncJobType requestJobType = syncJobRequest.getJobType(); - ConnectorSyncJobType jobType = ConnectorSyncJobType.fromString( - (String) connectorSyncJobSource.get(ConnectorSyncJob.JOB_TYPE_FIELD.getPreferredName()) - ); - ConnectorSyncJobTriggerMethod requestTriggerMethod = syncJobRequest.getTriggerMethod(); - ConnectorSyncJobTriggerMethod triggerMethod = ConnectorSyncJobTriggerMethod.fromString( - (String) connectorSyncJobSource.get(ConnectorSyncJob.TRIGGER_METHOD_FIELD.getPreferredName()) - ); - - ConnectorSyncStatus initialStatus = ConnectorSyncStatus.fromString( - (String) connectorSyncJobSource.get(ConnectorSyncJob.STATUS_FIELD.getPreferredName()) - ); - - Instant createdNow = Instant.parse((String) connectorSyncJobSource.get(ConnectorSyncJob.CREATED_AT_FIELD.getPreferredName())); - Instant lastSeen = Instant.parse((String) connectorSyncJobSource.get(ConnectorSyncJob.LAST_SEEN_FIELD.getPreferredName())); + PostConnectorSyncJobAction.Response response = awaitPutConnectorSyncJob(syncJobRequest); - Integer totalDocumentCount = (Integer) connectorSyncJobSource.get(ConnectorSyncJob.TOTAL_DOCUMENT_COUNT_FIELD.getPreferredName()); - Integer indexedDocumentCount = (Integer) connectorSyncJobSource.get( - ConnectorSyncJob.INDEXED_DOCUMENT_COUNT_FIELD.getPreferredName() - ); - Integer indexedDocumentVolume = (Integer) connectorSyncJobSource.get( - ConnectorSyncJob.INDEXED_DOCUMENT_VOLUME_FIELD.getPreferredName() - ); - Integer deletedDocumentCount = (Integer) connectorSyncJobSource.get(ConnectorSyncJob.DELETED_DOCUMENT_COUNT.getPreferredName()); - - assertThat(id, notNullValue()); - assertThat(jobType, equalTo(requestJobType)); - assertThat(triggerMethod, equalTo(requestTriggerMethod)); - assertThat(initialStatus, equalTo(ConnectorSyncJob.DEFAULT_INITIAL_STATUS)); - assertThat(createdNow, equalTo(lastSeen)); - assertThat(totalDocumentCount, equalTo(0)); - assertThat(indexedDocumentCount, equalTo(0)); - assertThat(indexedDocumentVolume, equalTo(0)); - assertThat(deletedDocumentCount, equalTo(0)); + ConnectorSyncJob connectorSyncJob = awaitGetConnectorSyncJob(response.getId()); + + assertThat(connectorSyncJob.getId(), notNullValue()); + assertThat(connectorSyncJob.getJobType(), equalTo(requestJobType)); + assertThat(connectorSyncJob.getTriggerMethod(), equalTo(requestTriggerMethod)); + assertThat(connectorSyncJob.getStatus(), equalTo(ConnectorSyncJob.DEFAULT_INITIAL_STATUS)); + assertThat(connectorSyncJob.getCreatedAt(), equalTo(connectorSyncJob.getLastSeen())); + assertThat(connectorSyncJob.getTotalDocumentCount(), equalTo(0L)); + assertThat(connectorSyncJob.getIndexedDocumentCount(), equalTo(0L)); + assertThat(connectorSyncJob.getIndexedDocumentVolume(), equalTo(0L)); + assertThat(connectorSyncJob.getDeletedDocumentCount(), equalTo(0L)); } public void testCreateConnectorSyncJob_WithMissingJobType_ExpectDefaultJobTypeToBeSet() throws Exception { @@ -130,12 +105,9 @@ public void testCreateConnectorSyncJob_WithMissingJobType_ExpectDefaultJobTypeTo ); PostConnectorSyncJobAction.Response response = awaitPutConnectorSyncJob(syncJobRequest); - Map connectorSyncJobSource = getConnectorSyncJobSourceById(response.getId()); - ConnectorSyncJobType jobType = ConnectorSyncJobType.fromString( - (String) connectorSyncJobSource.get(ConnectorSyncJob.JOB_TYPE_FIELD.getPreferredName()) - ); + ConnectorSyncJob connectorSyncJob = awaitGetConnectorSyncJob(response.getId()); - assertThat(jobType, equalTo(ConnectorSyncJob.DEFAULT_JOB_TYPE)); + assertThat(connectorSyncJob.getJobType(), equalTo(ConnectorSyncJob.DEFAULT_JOB_TYPE)); } public void testCreateConnectorSyncJob_WithMissingTriggerMethod_ExpectDefaultTriggerMethodToBeSet() throws Exception { @@ -146,12 +118,9 @@ public void testCreateConnectorSyncJob_WithMissingTriggerMethod_ExpectDefaultTri ); PostConnectorSyncJobAction.Response response = awaitPutConnectorSyncJob(syncJobRequest); - Map connectorSyncJobSource = getConnectorSyncJobSourceById(response.getId()); - ConnectorSyncJobTriggerMethod triggerMethod = ConnectorSyncJobTriggerMethod.fromString( - (String) connectorSyncJobSource.get(ConnectorSyncJob.TRIGGER_METHOD_FIELD.getPreferredName()) - ); + ConnectorSyncJob connectorSyncJob = awaitGetConnectorSyncJob(response.getId()); - assertThat(triggerMethod, equalTo(ConnectorSyncJob.DEFAULT_TRIGGER_METHOD)); + assertThat(connectorSyncJob.getTriggerMethod(), equalTo(ConnectorSyncJob.DEFAULT_TRIGGER_METHOD)); } public void testCreateConnectorSyncJob_WithMissingConnectorId_ExpectException() throws Exception { @@ -184,6 +153,28 @@ public void testDeleteConnectorSyncJob_WithMissingSyncJobId_ExpectException() { expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnectorSyncJob(NON_EXISTING_SYNC_JOB_ID)); } + public void testGetConnectorSyncJob() throws Exception { + PostConnectorSyncJobAction.Request syncJobRequest = ConnectorSyncJobTestUtils.getRandomPostConnectorSyncJobActionRequest( + connector.getConnectorId() + ); + ConnectorSyncJobType jobType = syncJobRequest.getJobType(); + ConnectorSyncJobTriggerMethod triggerMethod = syncJobRequest.getTriggerMethod(); + + PostConnectorSyncJobAction.Response response = awaitPutConnectorSyncJob(syncJobRequest); + String syncJobId = response.getId(); + + ConnectorSyncJob syncJob = awaitGetConnectorSyncJob(syncJobId); + + assertThat(syncJob.getId(), equalTo(syncJobId)); + assertThat(syncJob.getJobType(), equalTo(jobType)); + assertThat(syncJob.getTriggerMethod(), equalTo(triggerMethod)); + assertThat(syncJob.getConnector().getConnectorId(), equalTo(connector.getConnectorId())); + } + + public void testGetConnectorSyncJob_WithMissingSyncJobId_ExpectException() { + expectThrows(ResourceNotFoundException.class, () -> awaitGetConnectorSyncJob(NON_EXISTING_SYNC_JOB_ID)); + } + public void testCheckInConnectorSyncJob() throws Exception { PostConnectorSyncJobAction.Request syncJobRequest = ConnectorSyncJobTestUtils.getRandomPostConnectorSyncJobActionRequest( connector.getConnectorId() @@ -346,6 +337,33 @@ private Map getConnectorSyncJobSourceById(String syncJobId) thro return getResponseActionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getSource(); } + private ConnectorSyncJob awaitGetConnectorSyncJob(String connectorSyncJobId) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final AtomicReference resp = new AtomicReference<>(null); + final AtomicReference exc = new AtomicReference<>(null); + + connectorSyncJobIndexService.getConnectorSyncJob(connectorSyncJobId, new ActionListener() { + @Override + public void onResponse(ConnectorSyncJob connectorSyncJob) { + resp.set(connectorSyncJob); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exc.set(e); + latch.countDown(); + } + }); + + assertTrue("Timeout waiting for get request", latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + if (exc.get() != null) { + throw exc.get(); + } + assertNotNull("Received null response from get request", resp.get()); + return resp.get(); + } + private UpdateResponse awaitCheckInConnectorSyncJob(String connectorSyncJobId) throws Exception { CountDownLatch latch = new CountDownLatch(1); final AtomicReference resp = new AtomicReference<>(null); diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java index 4fa1b9122284d..9ec404e109496 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.DeleteConnectorSyncJobAction; +import org.elasticsearch.xpack.application.connector.syncjob.action.GetConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction; import java.time.Instant; @@ -100,4 +101,12 @@ public static CancelConnectorSyncJobAction.Request getRandomCancelConnectorSyncJ public static CheckInConnectorSyncJobAction.Request getRandomCheckInConnectorSyncJobActionRequest() { return new CheckInConnectorSyncJobAction.Request(randomAlphaOfLength(10)); } + + public static GetConnectorSyncJobAction.Request getRandomGetConnectorSyncJobRequest() { + return new GetConnectorSyncJobAction.Request(randomAlphaOfLength(10)); + } + + public static GetConnectorSyncJobAction.Response getRandomGetConnectorSyncJobResponse() { + return new GetConnectorSyncJobAction.Response(getRandomConnectorSyncJob()); + } } diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java index aeecf582c9ec7..ace1138b8e987 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java @@ -7,15 +7,23 @@ package org.elasticsearch.xpack.application.connector.syncjob; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.application.connector.Connector; +import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus; import org.junit.Before; import java.io.IOException; +import java.time.Instant; import java.util.List; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; public class ConnectorSyncJobTests extends ESTestCase { @@ -35,6 +43,205 @@ public final void testRandomSerialization() throws IOException { } } + public void testFromXContent_WithAllFields_AllSet() throws IOException { + String content = XContentHelper.stripWhitespace(""" + { + "cancelation_requested_at": "2023-12-01T14:19:39.394194Z", + "canceled_at": "2023-12-01T14:19:39.394194Z", + "completed_at": "2023-12-01T14:19:39.394194Z", + "connector": { + "connector_id": "connector-id", + "filtering": [ + { + "active": { + "advanced_snippet": { + "created_at": "2023-12-01T14:18:37.397819Z", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": {} + }, + "rules": [ + { + "created_at": "2023-12-01T14:18:37.397819Z", + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + }, + "domain": "DEFAULT", + "draft": { + "advanced_snippet": { + "created_at": "2023-12-01T14:18:37.397819Z", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": {} + }, + "rules": [ + { + "created_at": "2023-12-01T14:18:37.397819Z", + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + } + } + ], + "index_name": "search-connector", + "language": "english", + "pipeline": { + "extract_binary_content": true, + "name": "ent-search-generic-ingestion", + "reduce_whitespace": true, + "run_ml_inference": false + }, + "service_type": "service type", + "configuration": {} + }, + "created_at": "2023-12-01T14:18:43.07693Z", + "deleted_document_count": 10, + "error": "some-error", + "id": "HIC-JYwB9RqKhB7x_hIE", + "indexed_document_count": 10, + "indexed_document_volume": 10, + "job_type": "full", + "last_seen": "2023-12-01T14:18:43.07693Z", + "metadata": {}, + "started_at": "2023-12-01T14:18:43.07693Z", + "status": "canceling", + "total_document_count": 0, + "trigger_method": "scheduled", + "worker_hostname": "worker-hostname" + } + """); + + ConnectorSyncJob syncJob = ConnectorSyncJob.fromXContentBytes(new BytesArray(content), XContentType.JSON); + + assertThat(syncJob.getCancelationRequestedAt(), equalTo(Instant.parse("2023-12-01T14:19:39.394194Z"))); + assertThat(syncJob.getCanceledAt(), equalTo(Instant.parse("2023-12-01T14:19:39.394194Z"))); + assertThat(syncJob.getCompletedAt(), equalTo(Instant.parse("2023-12-01T14:19:39.394194Z"))); + + assertThat(syncJob.getConnector().getConnectorId(), equalTo("connector-id")); + assertThat(syncJob.getConnector().getFiltering(), hasSize(greaterThan(0))); + assertThat(syncJob.getConnector().getIndexName(), equalTo("search-connector")); + assertThat(syncJob.getConnector().getLanguage(), equalTo("english")); + assertThat(syncJob.getConnector().getPipeline(), notNullValue()); + + assertThat(syncJob.getCreatedAt(), equalTo(Instant.parse("2023-12-01T14:18:43.07693Z"))); + assertThat(syncJob.getDeletedDocumentCount(), equalTo(10L)); + assertThat(syncJob.getError(), equalTo("some-error")); + assertThat(syncJob.getId(), equalTo("HIC-JYwB9RqKhB7x_hIE")); + assertThat(syncJob.getIndexedDocumentCount(), equalTo(10L)); + assertThat(syncJob.getIndexedDocumentVolume(), equalTo(10L)); + assertThat(syncJob.getJobType(), equalTo(ConnectorSyncJobType.FULL)); + assertThat(syncJob.getLastSeen(), equalTo(Instant.parse("2023-12-01T14:18:43.07693Z"))); + assertThat(syncJob.getMetadata(), notNullValue()); + assertThat(syncJob.getStartedAt(), equalTo(Instant.parse("2023-12-01T14:18:43.07693Z"))); + assertThat(syncJob.getStatus(), equalTo(ConnectorSyncStatus.CANCELING)); + assertThat(syncJob.getTotalDocumentCount(), equalTo(0L)); + assertThat(syncJob.getTriggerMethod(), equalTo(ConnectorSyncJobTriggerMethod.SCHEDULED)); + assertThat(syncJob.getWorkerHostname(), equalTo("worker-hostname")); + } + + public void testFromXContent_WithAllNonOptionalFieldsSet_DoesNotThrow() throws IOException { + String content = XContentHelper.stripWhitespace(""" + { + "connector": { + "connector_id": "connector-id", + "filtering": [ + { + "active": { + "advanced_snippet": { + "created_at": "2023-12-01T14:18:37.397819Z", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": {} + }, + "rules": [ + { + "created_at": "2023-12-01T14:18:37.397819Z", + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + }, + "domain": "DEFAULT", + "draft": { + "advanced_snippet": { + "created_at": "2023-12-01T14:18:37.397819Z", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": {} + }, + "rules": [ + { + "created_at": "2023-12-01T14:18:37.397819Z", + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + } + } + ], + "index_name": "search-connector", + "language": "english", + "pipeline": { + "extract_binary_content": true, + "name": "ent-search-generic-ingestion", + "reduce_whitespace": true, + "run_ml_inference": false + }, + "service_type": "service type", + "configuration": {} + }, + "created_at": "2023-12-01T14:18:43.07693Z", + "deleted_document_count": 10, + "id": "HIC-JYwB9RqKhB7x_hIE", + "indexed_document_count": 10, + "indexed_document_volume": 10, + "job_type": "full", + "last_seen": "2023-12-01T14:18:43.07693Z", + "metadata": {}, + "status": "canceling", + "total_document_count": 0, + "trigger_method": "scheduled" + } + """); + + ConnectorSyncJob.fromXContentBytes(new BytesArray(content), XContentType.JSON); + } + private void assertTransportSerialization(ConnectorSyncJob testInstance) throws IOException { ConnectorSyncJob deserializedInstance = copyInstance(testInstance); assertNotSame(testInstance, deserializedInstance); diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionRequestBWCSerializingTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionRequestBWCSerializingTests.java new file mode 100644 index 0000000000000..c0b7711474a0b --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionRequestBWCSerializingTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils; +import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase; + +import java.io.IOException; + +public class GetConnectorSyncJobActionRequestBWCSerializingTests extends AbstractBWCSerializationTestCase< + GetConnectorSyncJobAction.Request> { + @Override + protected Writeable.Reader instanceReader() { + return GetConnectorSyncJobAction.Request::new; + } + + @Override + protected GetConnectorSyncJobAction.Request createTestInstance() { + return ConnectorSyncJobTestUtils.getRandomGetConnectorSyncJobRequest(); + } + + @Override + protected GetConnectorSyncJobAction.Request mutateInstance(GetConnectorSyncJobAction.Request instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected GetConnectorSyncJobAction.Request doParseInstance(XContentParser parser) throws IOException { + return GetConnectorSyncJobAction.Request.parse(parser); + } + + @Override + protected GetConnectorSyncJobAction.Request mutateInstanceForVersion( + GetConnectorSyncJobAction.Request instance, + TransportVersion version + ) { + return new GetConnectorSyncJobAction.Request(instance.getConnectorSyncJobId()); + } +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionResponseBWCSerializingTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionResponseBWCSerializingTests.java new file mode 100644 index 0000000000000..00f6e7cf57fc1 --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionResponseBWCSerializingTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.application.connector.Connector; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; + +import java.io.IOException; +import java.util.List; + +public class GetConnectorSyncJobActionResponseBWCSerializingTests extends AbstractBWCWireSerializationTestCase< + GetConnectorSyncJobAction.Response> { + + @Override + public NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(List.of(new NamedWriteableRegistry.Entry(Connector.class, Connector.NAME, Connector::new))); + } + + @Override + protected Writeable.Reader instanceReader() { + return GetConnectorSyncJobAction.Response::new; + } + + @Override + protected GetConnectorSyncJobAction.Response createTestInstance() { + return ConnectorSyncJobTestUtils.getRandomGetConnectorSyncJobResponse(); + } + + @Override + protected GetConnectorSyncJobAction.Response mutateInstance(GetConnectorSyncJobAction.Response instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected GetConnectorSyncJobAction.Response mutateInstanceForVersion( + GetConnectorSyncJobAction.Response instance, + TransportVersion version + ) { + return instance; + } +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionTests.java new file mode 100644 index 0000000000000..807f02124f32a --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/GetConnectorSyncJobActionTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobConstants; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class GetConnectorSyncJobActionTests extends ESTestCase { + + public void testValidate_WhenConnectorSyncJobIdIsPresent_ExpectNoValidationError() { + GetConnectorSyncJobAction.Request request = ConnectorSyncJobTestUtils.getRandomGetConnectorSyncJobRequest(); + ActionRequestValidationException exception = request.validate(); + + assertThat(exception, nullValue()); + } + + public void testValidate_WhenConnectorSyncJobIdIsEmpty_ExpectValidationError() { + GetConnectorSyncJobAction.Request requestWithMissingConnectorId = new GetConnectorSyncJobAction.Request(""); + ActionRequestValidationException exception = requestWithMissingConnectorId.validate(); + + assertThat(exception, notNullValue()); + assertThat(exception.getMessage(), containsString(ConnectorSyncJobConstants.EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE)); + } + +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobActionTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobActionTests.java new file mode 100644 index 0000000000000..7b83d008d92bc --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportGetConnectorSyncJobActionTests.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils; +import org.junit.Before; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; + +public class TransportGetConnectorSyncJobActionTests extends ESSingleNodeTestCase { + + private static final Long TIMEOUT_SECONDS = 10L; + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private TransportGetConnectorSyncJobAction action; + + @Before + public void setup() { + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> null, + null, + Collections.emptySet() + ); + + action = new TransportGetConnectorSyncJobAction(transportService, clusterService, mock(ActionFilters.class), client()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + public void testGetConnectorSyncJob_ExpectNoWarnings() throws InterruptedException { + GetConnectorSyncJobAction.Request request = ConnectorSyncJobTestUtils.getRandomGetConnectorSyncJobRequest(); + + executeRequest(request); + + ensureNoWarnings(); + } + + private void executeRequest(GetConnectorSyncJobAction.Request request) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + action.doExecute(mock(Task.class), request, ActionListener.wrap(response -> latch.countDown(), exception -> latch.countDown())); + + boolean requestTimedOut = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + assertTrue("Timeout waiting for get request", requestTimedOut); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index ffc894af423cf..3409f549cb579 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -136,6 +136,7 @@ public class Constants { "cluster:admin/xpack/connector/sync_job/post", "cluster:admin/xpack/connector/sync_job/delete", "cluster:admin/xpack/connector/sync_job/check_in", + "cluster:admin/xpack/connector/sync_job/get", "cluster:admin/xpack/connector/sync_job/cancel", "cluster:admin/xpack/deprecation/info", "cluster:admin/xpack/deprecation/nodes/info", From af30fe437ba6ba2f3540aa12249220c9d43cbdfb Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 4 Dec 2023 14:39:19 +0000 Subject: [PATCH 5/5] Check for null before overriding task settings (#102918) --- .../embeddings/OpenAiEmbeddingsModel.java | 5 ++++- .../OpenAiEmbeddingsModelTests.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModel.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModel.java index 5e2c352d88a01..02c1e41e0374a 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModel.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModel.java @@ -84,8 +84,11 @@ public ExecutableAction accept(OpenAiActionVisitor creator, Map } public OpenAiEmbeddingsModel overrideWith(Map taskSettings) { - var requestTaskSettings = OpenAiEmbeddingsRequestTaskSettings.fromMap(taskSettings); + if (taskSettings == null || taskSettings.isEmpty()) { + return this; + } + var requestTaskSettings = OpenAiEmbeddingsRequestTaskSettings.fromMap(taskSettings); return new OpenAiEmbeddingsModel(this, getTaskSettings().overrideWith(requestTaskSettings)); } } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModelTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModelTests.java index 96ced66723f04..62cb609a59d2a 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModelTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModelTests.java @@ -14,8 +14,11 @@ import org.elasticsearch.xpack.inference.services.openai.OpenAiServiceSettings; import org.elasticsearch.xpack.inference.services.settings.DefaultSecretSettings; +import java.util.Map; + import static org.elasticsearch.xpack.inference.services.openai.embeddings.OpenAiEmbeddingsRequestTaskSettingsTests.getRequestTaskSettingsMap; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; public class OpenAiEmbeddingsModelTests extends ESTestCase { @@ -28,6 +31,22 @@ public void testOverrideWith_OverridesUser() { assertThat(overriddenModel, is(createModel("url", "org", "api_key", "model_name", "user_override"))); } + public void testOverrideWith_EmptyMap() { + var model = createModel("url", "org", "api_key", "model_name", null); + + var requestTaskSettingsMap = Map.of(); + + var overriddenModel = model.overrideWith(requestTaskSettingsMap); + assertThat(overriddenModel, sameInstance(model)); + } + + public void testOverrideWith_NullMap() { + var model = createModel("url", "org", "api_key", "model_name", null); + + var overriddenModel = model.overrideWith(null); + assertThat(overriddenModel, sameInstance(model)); + } + public static OpenAiEmbeddingsModel createModel( String url, @Nullable String org,