Skip to content

Commit

Permalink
using partitions from config
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Sep 2, 2023
1 parent 7ae9f31 commit f76309d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ KafkaAdmin.NewTopics topics() {
ORDERS_TOPIC,
PAYMENT_ORDERS_TOPIC,
STOCK_ORDERS_TOPIC);
// streams needs topics to be created before hand, so instead of delegating to kafkaAdmin to
// create, manually creating
return new KafkaAdmin.NewTopics(
TopicBuilder.name(ORDERS_TOPIC).partitions(3).replicas(1).build(),
TopicBuilder.name(PAYMENT_ORDERS_TOPIC).partitions(3).replicas(1).build(),
TopicBuilder.name(STOCK_ORDERS_TOPIC).partitions(3).replicas(1).build());
TopicBuilder.name(ORDERS_TOPIC).build(),
TopicBuilder.name(PAYMENT_ORDERS_TOPIC).build(),
TopicBuilder.name(STOCK_ORDERS_TOPIC).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ default void addOrderItemToOrderEntity(OrderDto orderDTO, @MappingTarget Order o

@Mapping(target = "id", ignore = true)
@Mapping(target = "items", ignore = true)
@Mapping(target = "status", ignore = true)
@Mapping(target = "source", ignore = true)
Order orderRequestToEntity(OrderRequest orderRequest);

@Mapping(target = "id", ignore = true)
Expand All @@ -61,5 +63,8 @@ default void addOrderItemRequestToOrderEntity(
order.addOrderItem(orderItemRequestToOrderItem(orderItemRequest)));
}

void updateOrderFromOrderRequest(OrderRequest orderRequest, @MappingTarget Order orderObj);
@Mapping(target = "status", ignore = true)
@Mapping(target = "source", ignore = true)
@Mapping(target = "id", ignore = true)
void updateOrderFromOrderRequest(OrderRequest orderRequest, @MappingTarget Order order);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.common.dtos.OrderDto;
import com.example.common.dtos.OrderItemDto;
import com.example.orderservice.common.AbstractIntegrationTest;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
Expand All @@ -34,7 +36,8 @@ class OrderServiceApplicationIntegrationTest extends AbstractIntegrationTest {
@Order(1)
void shouldFetchAllOrdersFromStream() {
// waiting till is kafka stream is changed from PARTITIONS_ASSIGNED to RUNNING
await().atMost(10, SECONDS)
await().pollDelay(5, SECONDS)
.atMost(10, SECONDS)
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(
() ->
Expand All @@ -46,28 +49,55 @@ void shouldFetchAllOrdersFromStream() {

@Test
@Order(2)
@Disabled
@Disabled("until infra for streams is set up")
void shouldFetchAllOrdersFromStreamWhenDataIsPresent() {

// Sending events to both payment-orders and stock-orders
// Sending event to OrderTopic for joining
OrderDto orderDto = new OrderDto();
orderDto.setOrderId(151L);
orderDto.setCustomerId(1001L);
orderDto.setStatus("ACCEPT");
orderDto.setSource("PAYMENT");
OrderItemDto orderItemDto = new OrderItemDto();
orderItemDto.setItemId(1L);
orderItemDto.setProductId("P1");
orderItemDto.setProductPrice(BigDecimal.TEN);
orderItemDto.setQuantity(1);
orderDto.setItems(List.of(orderItemDto));

this.kafkaTemplate.send("orders", orderDto.getOrderId(), orderDto);

// Sending events to both payment-orders, stock-orders for streaming to process and confirm
OrderDto paymentOrderDto = new OrderDto();
paymentOrderDto.setOrderId(151L);
paymentOrderDto.setCustomerId(1001L);
paymentOrderDto.setStatus("ACCEPT");
paymentOrderDto.setSource("PAYMENT");
paymentOrderDto.setItems(new ArrayList<>());
OrderItemDto paymentOrderItemDto = new OrderItemDto();
paymentOrderItemDto.setItemId(1L);
paymentOrderItemDto.setProductId("P1");
paymentOrderItemDto.setProductPrice(BigDecimal.TEN);
paymentOrderItemDto.setQuantity(1);
paymentOrderDto.setItems(List.of(paymentOrderItemDto));

this.kafkaTemplate.send("payment-orders", paymentOrderDto.getOrderId(), paymentOrderDto);

this.kafkaTemplate.send("payment-orders", paymentOrderDto);
OrderDto stockOrderDto = new OrderDto();
stockOrderDto.setOrderId(151L);
stockOrderDto.setCustomerId(1001L);
stockOrderDto.setStatus("ACCEPT");
stockOrderDto.setSource("STOCK");
stockOrderDto.setItems(new ArrayList<>());
OrderItemDto stockOrderItemDto = new OrderItemDto();
stockOrderItemDto.setItemId(1L);
stockOrderItemDto.setProductId("P1");
stockOrderItemDto.setProductPrice(BigDecimal.TEN);
stockOrderItemDto.setQuantity(1);
stockOrderDto.setItems(List.of(stockOrderItemDto));

this.kafkaTemplate.send("stock-orders", stockOrderDto);
this.kafkaTemplate.send("stock-orders", stockOrderDto.getOrderId(), stockOrderDto);

await().atMost(60, SECONDS)
await().atMost(30, SECONDS)
.pollDelay(10, SECONDS)
.pollInterval(Duration.ofSeconds(5))
.untilAsserted(
() ->
Expand Down

0 comments on commit f76309d

Please sign in to comment.