Skip to content

Commit

Permalink
Fixed creation of ConsumersVerticle
Browse files Browse the repository at this point in the history
  • Loading branch information
azizbekxm committed Sep 24, 2024
1 parent b917654 commit 33f22a9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 14 deletions.
3 changes: 1 addition & 2 deletions src/main/java/org/folio/event/EdiExportHistoryEventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
@RequiredArgsConstructor
@Getter
public enum EdiExportHistoryEventType {
EXPORT_HISTORY_CREATE("edi-export-history.create"),
INVENTORY_ITEM_CREATE("inventory.item");
EXPORT_HISTORY_CREATE("edi-export-history.create");

private final String topicName;
}
26 changes: 14 additions & 12 deletions src/main/java/org/folio/rest/impl/InitAPIs.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.jackson.DatabindCodec;
import org.springframework.context.support.AbstractApplicationContext;

public class InitAPIs implements InitAPI {
private static final Logger log = LogManager.getLogger();
Expand All @@ -48,7 +49,7 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> res
SpringContextUtil.init(vertx, context, ApplicationConfig.class);
SpringContextUtil.autowireDependencies(this, context);

deployKafkaConsumersVerticle(vertx).onComplete(ar -> {
deployKafkaConsumersVerticles(vertx).onComplete(ar -> {
if (ar.failed() && isConsumersVerticleMandatory) {
handler.fail(ar.cause());
} else {
Expand All @@ -66,23 +67,24 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> res
});
}

private Future<?> deployKafkaConsumersVerticle(Vertx vertx) {
Promise<String> inventoryItemConsumer = Promise.promise();
Promise<String> ediExportOrdersHistoryConsumer = Promise.promise();
private Future<?> deployKafkaConsumersVerticles(Vertx vertx) {
Promise<String> inventoryItemConsumerPromise = Promise.promise();
Promise<String> ediExportOrdersHistoryConsumerPromise = Promise.promise();
AbstractApplicationContext springContext = vertx.getOrCreateContext().get("springContext");

vertx.deployVerticle((InventoryItemConsumersVerticle.class.getName()),
vertx.deployVerticle(() -> springContext.getBean(InventoryItemConsumersVerticle.class),
new DeploymentOptions()
.setWorkerPoolName("inventory-item-consumers")
.setInstances(kafkaConsumersVerticleNumber), inventoryItemConsumer);
.setInstances(kafkaConsumersVerticleNumber), inventoryItemConsumerPromise);

vertx.deployVerticle((EdiExportOrdersHistoryConsumersVerticle.class.getName()),
vertx.deployVerticle(() -> springContext.getBean(EdiExportOrdersHistoryConsumersVerticle.class),
new DeploymentOptions()
.setWorkerPoolName("edi-export-orders-history-consumers")
.setInstances(kafkaConsumersVerticleNumber), ediExportOrdersHistoryConsumer);
.setInstances(kafkaConsumersVerticleNumber), ediExportOrdersHistoryConsumerPromise);

return GenericCompositeFuture.all(Arrays.asList(
inventoryItemConsumer.future(),
ediExportOrdersHistoryConsumer.future()
));
return GenericCompositeFuture.all(
Arrays.asList(inventoryItemConsumerPromise.future(), ediExportOrdersHistoryConsumerPromise.future()))
.onSuccess(ar -> log.info("All consumers was successfully started"))
.onFailure(e -> log.error("Failed to start consumers", e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class EdiExportOrdersHistoryConsumersVerticle extends AbstractConsumersVerticle<EdiExportHistoryEventType> {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SubscriptionDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class InventoryItemConsumersVerticle extends AbstractConsumersVerticle<InventoryEventType> {

@Autowired
Expand Down

0 comments on commit 33f22a9

Please sign in to comment.