Skip to content

Commit

Permalink
Created separate verticle for each type
Browse files Browse the repository at this point in the history
  • Loading branch information
azizbekxm committed Sep 24, 2024
1 parent 28e2c34 commit b917654
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 187 deletions.
5 changes: 0 additions & 5 deletions src/main/java/org/folio/event/DomainEventPayloadType.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

@RequiredArgsConstructor
@Getter
public enum KafkaEventType {
public enum EdiExportHistoryEventType {
EXPORT_HISTORY_CREATE("edi-export-history.create"),
INVENTORY_ITEM_CREATE("inventory.item"),;
INVENTORY_ITEM_CREATE("inventory.item");

private final String topicName;
}
19 changes: 19 additions & 0 deletions src/main/java/org/folio/event/InventoryEventType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.folio.event;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
@AllArgsConstructor
public enum InventoryEventType {
INVENTORY_ITEM_CREATE("inventory.item", PayloadType.CREATE);

private String topicName;
private PayloadType payloadType;

public enum PayloadType {
UPDATE, DELETE, CREATE, DELETE_ALL
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.event.handler;

import static org.folio.event.InventoryEventType.INVENTORY_ITEM_CREATE;
import static org.folio.event.util.KafkaEventUtil.extractTenantFromHeaders;

import io.vertx.core.Context;
Expand All @@ -13,7 +14,6 @@
import org.apache.commons.lang.ObjectUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.event.DomainEventPayloadType;
import org.folio.rest.jaxrs.model.Piece;
import org.folio.rest.persist.DBClient;
import org.folio.services.piece.PieceService;
Expand Down Expand Up @@ -41,7 +41,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRe
var payload = new JsonObject(kafkaConsumerRecord.value());

String eventType = payload.getString("type");
if (!Objects.equals(eventType, DomainEventPayloadType.CREATE.name())) {
if (!Objects.equals(eventType, INVENTORY_ITEM_CREATE.getPayloadType().name())) {
log.info("handle:: unsupported event type: {}", eventType);
return Future.succeededFuture();
}
Expand Down Expand Up @@ -92,10 +92,10 @@ private Future<Void> updatePieces(List<Piece> pieces, JsonObject itemObject, Str

private List<Piece> filterPiecesToUpdate(List<Piece> pieces, String holdingId, String tenantId) {
return pieces.stream()
.filter(piece ->
.filter(piece -> // filter out pieces that already have the same tenantId and holdingId
ObjectUtils.notEqual(piece.getReceivingTenantId(), tenantId)
|| ObjectUtils.notEqual(piece.getHoldingId(), holdingId))
.filter(piece ->
.filter(piece -> // filter out pieces that already have the same tenantId and existing locationId
ObjectUtils.notEqual(piece.getReceivingTenantId(), tenantId)
|| Objects.isNull(piece.getLocationId()))
.toList();
Expand Down
32 changes: 20 additions & 12 deletions src/main/java/org/folio/rest/impl/InitAPIs.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package org.folio.rest.impl;

import java.util.Arrays;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.config.ApplicationConfig;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.rest.resource.interfaces.InitAPI;
import org.folio.spring.SpringContextUtil;
import org.folio.verticles.KafkaConsumersVerticle;
import org.folio.verticles.EdiExportOrdersHistoryConsumersVerticle;
import org.folio.verticles.InventoryItemConsumersVerticle;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.support.AbstractApplicationContext;

import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.SerializationConfig;
Expand Down Expand Up @@ -64,17 +66,23 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> res
});
}

private Future<String> deployKafkaConsumersVerticle(Vertx vertx) {
Promise<String> promise = Promise.promise();
AbstractApplicationContext springContext = vertx.getOrCreateContext().get("springContext");
private Future<?> deployKafkaConsumersVerticle(Vertx vertx) {
Promise<String> inventoryItemConsumer = Promise.promise();
Promise<String> ediExportOrdersHistoryConsumer = Promise.promise();

DeploymentOptions deploymentOptions = new DeploymentOptions()
.setInstances(kafkaConsumersVerticleNumber)
.setWorker(true);
vertx.deployVerticle(() -> springContext.getBean(KafkaConsumersVerticle.class), deploymentOptions, promise);
vertx.deployVerticle((InventoryItemConsumersVerticle.class.getName()),
new DeploymentOptions()
.setWorkerPoolName("inventory-item-consumers")
.setInstances(kafkaConsumersVerticleNumber), inventoryItemConsumer);

return promise.future()
.onSuccess(ar -> log.info("KafkaConsumersVerticle was successfully started"))
.onFailure(e -> log.error("KafkaConsumersVerticle was not successfully started", e));
vertx.deployVerticle((EdiExportOrdersHistoryConsumersVerticle.class.getName()),
new DeploymentOptions()
.setWorkerPoolName("edi-export-orders-history-consumers")
.setInstances(kafkaConsumersVerticleNumber), ediExportOrdersHistoryConsumer);

return GenericCompositeFuture.all(Arrays.asList(
inventoryItemConsumer.future(),
ediExportOrdersHistoryConsumer.future()
));
}
}
108 changes: 108 additions & 0 deletions src/main/java/org/folio/verticles/AbstractConsumersVerticle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.folio.verticles;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.rest.tools.utils.ModuleName;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.support.AbstractApplicationContext;

public abstract class AbstractConsumersVerticle<T> extends AbstractVerticle {

protected static final Logger log = LogManager.getLogger();
protected static final String MODULE_ID = getModuleId();
protected static final String TENANT_ID_PATTERN = "\\w{1,}";

@Value("${kafka.export.orders.loadLimit:5}")
protected int loadLimit;

protected final List<KafkaConsumerWrapper<String, String>> consumers = new ArrayList<>();
protected final AbstractApplicationContext springContext;
protected final KafkaConfig kafkaConfig;

public AbstractConsumersVerticle(KafkaConfig kafkaConfig,
AbstractApplicationContext springContext) {
this.springContext = springContext;
this.kafkaConfig = kafkaConfig;
}

@Override
public void init(Vertx vertx, Context context) {
super.init(vertx, context);
context.put("springContext", springContext);
}

@Override
public void start(Promise<Void> startPromise) {
log.info("start:: starting kafka consumer verticle. kafka config: {}", kafkaConfig);

createConsumers()
.onSuccess(v -> log.info("start:: kafka consumer verticle started successfully"))
.onFailure(t -> log.error("start:: failed to start kafka consumer verticle", t))
.onComplete(startPromise);
}

@Override
public void stop(Promise<Void> stopPromise) {
log.info("stop:: stopping verticle");

stopConsumers()
.onSuccess(v -> log.info("stop:: kafka consumer verticle stopped successfully"))
.onFailure(t -> log.error("stop:: failed to stop kafka consumer verticle", t))
.onComplete(stopPromise);
}

/**
* Create consumers for the specific event types.
*
* @return future with the result of the operation
*/
protected abstract Future<Void> createConsumers();

protected Future<KafkaConsumerWrapper<String, String>> createConsumer(T eventType,
SubscriptionDefinition subscriptionDefinition,
AsyncRecordHandler<String, String> handler) {
log.info("createConsumer:: creating consumer for event type: {}", eventType);
var consumerWrapper = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(loadLimit)
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(subscriptionDefinition)
.processRecordErrorHandler((t, r) -> log.error("Failed to process event: {}", r, t))
.build();

log.info("createConsumer:: moduleId={}", MODULE_ID);
return consumerWrapper.start(handler, MODULE_ID)
.map(consumerWrapper)
.onSuccess(consumers::add);
}

private Future<Void> stopConsumers() {
var stopFutures = consumers.stream()
.map(KafkaConsumerWrapper::stop)
.toList();

return Future.all(stopFutures)
.onSuccess(v -> log.info("stop:: event consumers stopped successfully"))
.onFailure(t -> log.error("stop:: failed to stop event consumers", t))
.mapEmpty();
}

private static String getModuleId() {
return ModuleName.getModuleName().replace("_", "-") + "-" + ModuleName.getModuleVersion();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.folio.verticles;

import io.vertx.core.Future;
import org.folio.event.EdiExportHistoryEventType;
import org.folio.event.handler.EdiExportOrdersHistoryAsyncRecordHandler;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.AbstractApplicationContext;

public class EdiExportOrdersHistoryConsumersVerticle extends AbstractConsumersVerticle<EdiExportHistoryEventType> {

@Autowired
public EdiExportOrdersHistoryConsumersVerticle(KafkaConfig kafkaConfig,
AbstractApplicationContext springContext) {
super(kafkaConfig, springContext);
}

@Override
protected Future<Void> createConsumers() {
log.info("createConsumers:: creating EDI Export Order History consumers");
return createEdiExportConsumer(EdiExportHistoryEventType.EXPORT_HISTORY_CREATE,
new EdiExportOrdersHistoryAsyncRecordHandler(context, vertx))
.mapEmpty();
}

/**
* This method creates a consumer for the given event type.
* Note: The method is using specific subscription pattern for edifact export topics:
* {envId}.Default.{tenant}.{eventType} -> e.g. 'folio.Default.diku.edi-export-history.create'
*
* @param eventType - the event type
* @param handler - the handler to process the records
* @return future with the created consumer
*/
private Future<KafkaConsumerWrapper<String, String>> createEdiExportConsumer(EdiExportHistoryEventType eventType,
AsyncRecordHandler<String, String> handler) {
var subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(
kafkaConfig.getEnvId(), KafkaTopicNameHelper.getDefaultNameSpace(), eventType.getTopicName());

return createConsumer(eventType, subscriptionDefinition, handler);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.folio.verticles;

import io.vertx.core.Future;
import java.util.List;
import org.folio.event.InventoryEventType;
import org.folio.event.handler.ItemCreateAsyncRecordHandler;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SubscriptionDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.AbstractApplicationContext;

public class InventoryItemConsumersVerticle extends AbstractConsumersVerticle<InventoryEventType> {

@Autowired
public InventoryItemConsumersVerticle(KafkaConfig kafkaConfig,
AbstractApplicationContext springContext) {
super(kafkaConfig, springContext);
}

@Override
protected Future<Void> createConsumers() {
log.info("createConsumers:: creating Item Event consumers");
return Future.all(
List.of(
createInventoryConsumer(InventoryEventType.INVENTORY_ITEM_CREATE,
new ItemCreateAsyncRecordHandler(context, vertx))
)
).mapEmpty();
}

/**
* This method creates a consumer for the given event type.
* Note: The method is using specific subscription pattern for inventory topics:
* {envId}.{tenant}.{eventType} -> e.g. 'folio.diku.inventory.item'
*
* @param eventType - the event type
* @param handler - the handler to process the records
* @return future with the created consumer
*/
private Future<KafkaConsumerWrapper<String, String>> createInventoryConsumer(InventoryEventType eventType,
AsyncRecordHandler<String, String> handler) {
var subscriptionPattern = KafkaTopicNameHelper.formatTopicName(
kafkaConfig.getEnvId(), TENANT_ID_PATTERN, eventType.getTopicName());
log.info("createInventoryConsumer:: subscriptionPattern: {}, for evenType: {}", subscriptionPattern, eventType.name());

var subscriptionDefinition = SubscriptionDefinition.builder()
.eventType(eventType.name())
.subscriptionPattern(subscriptionPattern)
.build();

return createConsumer(eventType, subscriptionDefinition, handler);
}
}
Loading

0 comments on commit b917654

Please sign in to comment.