Skip to content

Commit

Permalink
[Connectors API] Fix ClassCastException when creating a new sync job (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb authored Dec 18, 2023
1 parent bfe491a commit bc3f99d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 30 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/103508.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103508
summary: "[Connectors API] Fix `ClassCastException` when creating a new sync job"
area: Application
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,68 @@ setup:
- exists: created_at
- exists: last_seen

---
'Create connector sync job with complex connector document':

- do:
connector.update_pipeline:
connector_id: test-connector
body:
pipeline:
extract_binary_content: true
name: test-pipeline
reduce_whitespace: true
run_ml_inference: false

- match: { result: updated }

- do:
connector.update_configuration:
connector_id: test-connector
body:
configuration:
some_field:
default_value: null
depends_on:
- field: some_field
value: 31
display: numeric
label: Very important field
options: [ ]
order: 4
required: true
sensitive: false
tooltip: Wow, this tooltip is useful.
type: str
ui_restrictions: [ ]
validations:
- constraint: 0
type: greater_than
value: 456

- match: { result: updated }

- do:
connector_sync_job.post:
body:
id: test-connector
job_type: full
trigger_method: on_demand

- set: { id: id }

- match: { id: $id }

- do:
connector_sync_job.get:
connector_sync_job_id: $id

- match: { connector.id: test-connector }
- match: { connector.configuration.some_field.value: 456 }
- match: { connector.pipeline.name: test-pipeline }

---

'Create connector sync job with missing job type - expect job type full as default':
- do:
connector_sync_job.post:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.application.connector.syncjob;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -282,7 +283,7 @@ public ConnectorSyncJob(StreamInput in) throws IOException {
);
PARSER.declareField(
constructorArg(),
(p, c) -> ConnectorSyncJob.syncJobConnectorFromXContent(p),
(p, c) -> ConnectorSyncJob.syncJobConnectorFromXContent(p, null),
CONNECTOR_FIELD,
ObjectParser.ValueType.OBJECT
);
Expand Down Expand Up @@ -327,12 +328,21 @@ private static Instant parseNullableInstant(XContentParser p) throws IOException
}

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<Connector, Void> SYNC_JOB_CONNECTOR_PARSER = new ConstructingObjectParser<>(
private static final ConstructingObjectParser<Connector, String> SYNC_JOB_CONNECTOR_PARSER = new ConstructingObjectParser<>(
"sync_job_connector",
true,
(args) -> {
(args, connectorId) -> {
int i = 0;
return new Connector.Builder().setConnectorId((String) args[i++])

// Parse the connector ID from the arguments. The ID uniquely identifies the connector.
String parsedConnectorId = (String) args[i++];

// Determine the actual connector ID to use. If the context parameter `connectorId` is not null or empty,
// it takes precedence over the `parsedConnectorId` extracted from the arguments.
// This approach allows for flexibility in specifying the connector ID, either from a context or as a parsed argument.
String syncJobConnectorId = Strings.isNullOrEmpty(connectorId) ? parsedConnectorId : connectorId;

return new Connector.Builder().setConnectorId(syncJobConnectorId)
.setFiltering((List<ConnectorFiltering>) args[i++])
.setIndexName((String) args[i++])
.setLanguage((String) args[i++])
Expand All @@ -344,7 +354,7 @@ private static Instant parseNullableInstant(XContentParser p) throws IOException
);

static {
SYNC_JOB_CONNECTOR_PARSER.declareString(constructorArg(), Connector.ID_FIELD);
SYNC_JOB_CONNECTOR_PARSER.declareString(optionalConstructorArg(), Connector.ID_FIELD);
SYNC_JOB_CONNECTOR_PARSER.declareObjectArray(
optionalConstructorArg(),
(p, c) -> ConnectorFiltering.fromXContent(p),
Expand Down Expand Up @@ -378,16 +388,16 @@ public static ConnectorSyncJob fromXContent(XContentParser parser) throws IOExce
return PARSER.parse(parser, null);
}

public static Connector syncJobConnectorFromXContentBytes(BytesReference source, XContentType xContentType) {
public static Connector syncJobConnectorFromXContentBytes(BytesReference source, String connectorId, XContentType xContentType) {
try (XContentParser parser = XContentHelper.createParser(XContentParserConfiguration.EMPTY, source, xContentType)) {
return ConnectorSyncJob.syncJobConnectorFromXContent(parser);
return ConnectorSyncJob.syncJobConnectorFromXContent(parser, connectorId);
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse a connector document.", e);
}
}

public static Connector syncJobConnectorFromXContent(XContentParser parser) throws IOException {
return SYNC_JOB_CONNECTOR_PARSER.parse(parser, null);
public static Connector syncJobConnectorFromXContent(XContentParser parser, String connectorId) throws IOException {
return SYNC_JOB_CONNECTOR_PARSER.parse(parser, connectorId);
}

public String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.ConnectorConfiguration;
import org.elasticsearch.xpack.application.connector.ConnectorFiltering;
import org.elasticsearch.xpack.application.connector.ConnectorIndexService;
import org.elasticsearch.xpack.application.connector.ConnectorIngestPipeline;
import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction;
Expand Down Expand Up @@ -429,22 +426,16 @@ public void onResponse(GetResponse response) {
onFailure(new ResourceNotFoundException("Connector with id '" + connectorId + "' does not exist."));
return;
}

Map<String, Object> source = response.getSource();

@SuppressWarnings("unchecked")
final Connector syncJobConnectorInfo = new Connector.Builder().setConnectorId(connectorId)
.setFiltering((List<ConnectorFiltering>) source.get(Connector.FILTERING_FIELD.getPreferredName()))
.setIndexName((String) source.get(Connector.INDEX_NAME_FIELD.getPreferredName()))
.setLanguage((String) source.get(Connector.LANGUAGE_FIELD.getPreferredName()))
.setPipeline((ConnectorIngestPipeline) source.get(Connector.PIPELINE_FIELD.getPreferredName()))
.setServiceType((String) source.get(Connector.SERVICE_TYPE_FIELD.getPreferredName()))
.setConfiguration(
(Map<String, ConnectorConfiguration>) source.get(Connector.CONFIGURATION_FIELD.getPreferredName())
)
.build();

listener.onResponse(syncJobConnectorInfo);
try {
final Connector syncJobConnectorInfo = ConnectorSyncJob.syncJobConnectorFromXContentBytes(
response.getSourceAsBytesRef(),
connectorId,
XContentType.JSON
);
listener.onResponse(syncJobConnectorInfo);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public void testSyncJobConnectorFromXContent_WithAllFieldsSet() throws IOExcepti
}
""");

Connector connector = ConnectorSyncJob.syncJobConnectorFromXContentBytes(new BytesArray(content), XContentType.JSON);
Connector connector = ConnectorSyncJob.syncJobConnectorFromXContentBytes(new BytesArray(content), null, XContentType.JSON);

assertThat(connector.getConnectorId(), equalTo("connector-id"));
assertThat(connector.getFiltering().size(), equalTo(1));
Expand Down Expand Up @@ -474,7 +474,7 @@ public void testSyncJobConnectorFromXContent_WithAllNonOptionalFieldsSet_DoesNot
}
""");

ConnectorSyncJob.syncJobConnectorFromXContentBytes(new BytesArray(content), XContentType.JSON);
ConnectorSyncJob.syncJobConnectorFromXContentBytes(new BytesArray(content), null, XContentType.JSON);
}

private void assertTransportSerialization(ConnectorSyncJob testInstance) throws IOException {
Expand Down

0 comments on commit bc3f99d

Please sign in to comment.