diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java index 64189e2ab1..d09d3bbc5f 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java @@ -65,16 +65,12 @@ public boolean isEnabled() { public void jobStatusChange(JobDetails job) { if (isEnabled()) { try { - JobDataEvent event = JobDataEvent - .builder() - .source(url + RestApiConstants.JOBS_PATH) - .data(ScheduledJobAdapter.of(job))//this should support jobs crated with V1 and V2 - .build(); + JobDataEvent event = buildEvent(job); LOGGER.debug("emit jobStatusChange, hasRequests: {}, eventId: {}, jobDetails: {}", emitter.hasRequests(), event.getId(), job); String json = objectMapper.writeValueAsString(event); emitter.send(decorate(ContextAwareMessage.of(json) .withAck(() -> onAck(event.getId(), job)) - .withNack(reason -> onNack(reason, job)))); + .withNack(reason -> onNack(reason, job)), event)); } catch (Exception e) { String msg = String.format("An unexpected error was produced while processing a Job status change for the job: %s", job); LOGGER.error(msg, e); @@ -82,6 +78,14 @@ public void jobStatusChange(JobDetails job) { } } + protected JobDataEvent buildEvent(JobDetails job) { + return JobDataEvent + .builder() + .source(url + RestApiConstants.JOBS_PATH) + .data(ScheduledJobAdapter.of(job))//this should support jobs crated with V1 and V2 + .build(); + } + protected CompletionStage onAck(String eventId, JobDetails job) { LOGGER.debug("Job Status change emitted successfully, eventId: {}, jobDetails: {}", eventId, job); return CompletableFuture.completedFuture(null); @@ -93,7 +97,7 @@ protected CompletionStage onNack(Throwable reason, JobDetails job) { return CompletableFuture.completedFuture(null); } - protected Message decorate(Message message) { + protected Message decorate(Message message, JobDataEvent event) { return message; } } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java index f7ac6ed1ce..1f2e3fc33b 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java @@ -58,7 +58,7 @@ public abstract class AbstractJobStreamsTest { protected static final String URL = "http://localhost:8180"; private static final String SERIALIZED_MESSAGE = "SERIALIZED_MESSAGE"; - private static final String JOB_ID = "JOB_ID"; + protected static final String JOB_ID = "JOB_ID"; private static final String CORRELATION_ID = "CORRELATION_ID"; private static final JobStatus STATUS = JobStatus.SCHEDULED; private static final ZonedDateTime LAST_UPDATE = ZonedDateTime.parse("2022-08-03T18:00:15.001+01:00"); @@ -170,7 +170,7 @@ private JobDetails mockJobDetails() { } - private void assertExpectedEvent(JobDataEvent event) { + protected void assertExpectedEvent(JobDataEvent event) { assertThat(event.getId()).isNotNull(); assertThat(event.getType()).isEqualTo(JobDataEvent.JOB_EVENT_TYPE); assertThat(event.getSource()).hasToString(URL + "/jobs"); diff --git a/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java index 52bb50d35f..61a2efebaf 100644 --- a/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java +++ b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java @@ -26,6 +26,7 @@ import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.OnOverflow; +import org.kie.kogito.jobs.service.events.JobDataEvent; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.stream.AbstractJobStreams; import org.slf4j.Logger; @@ -45,6 +46,7 @@ public class HttpJobStreams extends AbstractJobStreams { public static final String PUBLISH_EVENTS_CONFIG_KEY = "kogito.jobs-service.http.job-status-change-events"; public static final String JOB_STATUS_CHANGE_EVENTS_HTTP = "kogito-job-service-job-status-events-http"; + public static final String PARTITION_KEY_EXTENSION = "partitionkey"; private static final Logger LOGGER = LoggerFactory.getLogger(HttpJobStreams.class); @@ -70,7 +72,16 @@ public void jobStatusChange(JobDetails job) { } @Override - protected Message decorate(Message message) { + protected JobDataEvent buildEvent(JobDetails job) { + JobDataEvent event = super.buildEvent(job); + // use the well-known extension https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/partitioning.md + // to instruct potential http driven Brokers like, Knative Eventing Kafka Broker, to process accordingly. + event.addExtensionAttribute(PARTITION_KEY_EXTENSION, event.getData().getId()); + return event; + } + + @Override + protected Message decorate(Message message, JobDataEvent event) { return message.addMetadata(OUTGOING_HTTP_METADATA.get()); } } diff --git a/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java index 8449d88da6..7b951b5e39 100644 --- a/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java +++ b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java @@ -21,6 +21,7 @@ import java.util.Optional; import org.eclipse.microprofile.reactive.messaging.Message; +import org.kie.kogito.jobs.service.events.JobDataEvent; import org.kie.kogito.jobs.service.stream.AbstractJobStreamsTest; import io.cloudevents.jackson.JsonFormat; @@ -29,6 +30,7 @@ import jakarta.ws.rs.core.HttpHeaders; import static org.assertj.core.api.Assertions.assertThat; +import static org.kie.kogito.jobs.service.messaging.http.stream.HttpJobStreams.PARTITION_KEY_EXTENSION; class HttpJobStreamsTest extends AbstractJobStreamsTest { @@ -44,4 +46,12 @@ protected void assertExpectedMetadata(Message message) { assertThat(metadata.getHeaders()).hasSize(1); assertThat(metadata.getHeaders().get(HttpHeaders.CONTENT_TYPE)).containsExactlyInAnyOrder(JsonFormat.CONTENT_TYPE); } + + @Override + protected void assertExpectedEvent(JobDataEvent event) { + super.assertExpectedEvent(event); + assertThat(event.getExtension(PARTITION_KEY_EXTENSION)) + .isNotNull() + .isEqualTo(JOB_ID); + } } diff --git a/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java index c9b149ede2..315d89dff4 100644 --- a/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java +++ b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java @@ -23,7 +23,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.OnOverflow; +import org.kie.kogito.jobs.service.events.JobDataEvent; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.stream.AbstractJobStreams; import org.kie.kogito.jobs.service.stream.AvailableStreams; @@ -32,6 +34,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -54,4 +58,10 @@ public void jobStatusChange(JobDetails job) { LOGGER.debug("jobStatusChange call received, enabled: {}, job: {}", enabled, job); super.jobStatusChange(job); } + + @Override + protected Message decorate(Message message, JobDataEvent event) { + // regular kafka partitioning. + return message.addMetadata(OutgoingKafkaRecordMetadata.builder().withKey(event.getData().getId()).build()); + } } diff --git a/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java b/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java index a0dbc6c95d..d5be8d1f5f 100644 --- a/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java +++ b/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java @@ -20,12 +20,24 @@ import java.util.Optional; +import org.eclipse.microprofile.reactive.messaging.Message; import org.kie.kogito.jobs.service.stream.AbstractJobStreamsTest; +import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; + +import static org.assertj.core.api.Assertions.assertThat; + class KafkaJobStreamsTest extends AbstractJobStreamsTest { @Override protected KafkaJobStreams createJobStreams() { return new KafkaJobStreams(objectMapper, Optional.of(true), emitter, URL); } + + @Override + protected void assertExpectedMetadata(Message message) { + OutgoingKafkaRecordMetadata metadata = message.getMetadata(OutgoingKafkaRecordMetadata.class).orElse(null); + assertThat(metadata).isNotNull(); + assertThat(metadata.getKey()).isEqualTo(JOB_ID); + } }