Skip to content

Commit

Permalink
add: kafka sender category
Browse files Browse the repository at this point in the history
  • Loading branch information
hoangtien2k3 committed May 26, 2024
1 parent b8d5ea8 commit 453be61
Show file tree
Hide file tree
Showing 9 changed files with 10,531 additions and 83 deletions.
10,348 changes: 10,348 additions & 0 deletions backup.sql

Large diffs are not rendered by default.

128 changes: 58 additions & 70 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
version: '3.7'

services:
# mysql
mysql8-container:
container_name: mysql8-container
image: mysql:8.2.0
Expand All @@ -15,6 +16,8 @@ services:
networks:
- shopapp-network


# redis
redis-container:
container_name: redis-container
image: redis:7.2.3
Expand All @@ -26,33 +29,8 @@ services:
networks:
- shopapp-network

shopapp-spring-container:
container_name: shopapp-spring-container
build:
context: .
dockerfile: Dockerfile
ports:
- "8099:8088"
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql8-container:3307/shopapp?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
MYSQL_ROOT_PASSWORD: 12042003
REDIS_HOST: redis-container
REDIS_PORT: 6379
# KAFKA_BROKER_SERVER: kafka-broker-01
# KAFKA_BROKER_PORT: 19092
depends_on:
- mysql8-container
networks:
- shopapp-network
healthcheck:
# test: [ "CMD-SHELL", "curl --fail http://localhost:8080/health_check/health || exit 1" ]
test: [ "CMD-SHELL", "curl --fail http://localhost:8080/api/v1/actuator/health || exit 1" ]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s


# zookeeper-01
zookeeper-01:
image: confluentinc/cp-zookeeper:7.5.3
hostname: zookeeper-01
Expand All @@ -66,6 +44,8 @@ services:
networks:
- shopapp-network


# zookeeper-02
zookeeper-02:
image: confluentinc/cp-zookeeper:7.5.3
hostname: zookeeper-02
Expand All @@ -79,6 +59,8 @@ services:
networks:
- shopapp-network


# zookeeper-03
zookeeper-03:
image: confluentinc/cp-zookeeper:7.5.3
hostname: zookeeper-03
Expand All @@ -92,81 +74,87 @@ services:
networks:
- shopapp-network


# kafka-broker-01
kafka-broker-01:
image: confluentinc/cp-kafka:7.5.3
hostname: kafka-broker-01
container_name: kafka-broker-01
networks:
- shopapp-network
depends_on:
- zookeeper-01
- zookeeper-02
- zookeeper-03
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-01:2181,zookeeper-02:2182,zookeeper-03:2183"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-broker-01:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://kafka-broker-01:29092
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092,DOCKER://0.0.0.0:29092
KAFKA_LISTENERS_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-01:2181,zookeeper-02:2181,zookeeper-03:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-broker-01:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
# Thêm cấu hình bảo mật cho listener INTERNAL
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: PLAINTEXT
networks:
- shopapp-network
depends_on:
- zookeeper-01
- zookeeper-02
- zookeeper-03


kafka-broker-02:
image: confluentinc/cp-kafka:7.5.3
hostname: kafka-broker-02
container_name: kafka-broker-02
# shopapp connect
shopapp-spring-container:
container_name: shopapp-spring-container
build:
context: .
dockerfile: Dockerfile
ports:
- "9093:9093"
- "29093:29093"
- "8099:8080"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-broker-02:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://kafka-broker-02:29093
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19093,EXTERNAL://0.0.0.0:9093,DOCKER://0.0.0.0:29093
KAFKA_LISTENERS_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-01:2181,zookeeper-02:2181,zookeeper-03:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9998
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
# Thêm cấu hình bảo mật cho listener INTERNAL
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: PLAINTEXT
SPRING_DATASOURCE_URL: jdbc:mysql://mysql8-container:3307/shopapp?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
MYSQL_ROOT_PASSWORD: 12042003
REDIS_HOST: redis-container
REDIS_PORT: 6379
KAFKA_BROKER_SERVER: kafka-broker-01
KAFKA_BROKER_PORT: 19092
depends_on:
- mysql8-container
- redis-container
- kafka-broker-01
networks:
- shopapp-network
depends_on:
- zookeeper-01
- zookeeper-02
- zookeeper-03
healthcheck:
# test: [ "CMD-SHELL", "curl --fail http://localhost:8080/health_check/health || exit 1" ]
test: [ "CMD-SHELL", "curl --fail http://localhost:8080/api/v1/actuator/health || exit 1" ]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s

# network
networks:
shopapp-network:
name: shopapp-network
driver: bridge


# docker-compose -f ./kafka-deployment.yml rm -s -f zookeeper-01

# docker-compose -f ./kafka-deployment.yml up -d zookeeper-01
# docker-compose -f ./kafka-deployment.yml up -d zookeeper-02
# docker-compose -f ./kafka-deployment.yml up -d zookeeper-03
# RUN ZOOKEEPER
# docker-compose up -d zookeeper-01
# docker-compose up -d zookeeper-02
# docker-compose up -d zookeeper-03

# docker-compose -f ./kafka-deployment.yml up -d kafka-broker-01
# docker-compose -f ./kafka-deployment.yml up -d kafka-broker-02
# RUN KAFKA-BROKER
# docker-compose up -d kafka-broker-01
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@
<artifactId>flyway-mysql</artifactId>
<version>10.13.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.hoangtien2k3.shopappbackend.components;

import com.hoangtien2k3.shopappbackend.models.Category;
import com.hoangtien2k3.shopappbackend.utils.Const;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@KafkaListener(id = Const.KAFKA_ID, topics = {Const.KAFKA_TOPIC_INSERT_CATEGORY, Const.KAFKA_TOPIC_GET_ALL_CATEGORY})
public class MyKafkaListener {

@KafkaListener(topics = Const.KAFKA_TOPIC_INSERT_CATEGORY)
public void listenerCategory(Category category) {
System.out.println("Received Category: " + category);
}

@KafkaListener(topics = Const.KAFKA_TOPIC_GET_ALL_CATEGORY)
public void listenListOfCategory(List<Category> categories) {
System.out.println("Received list of Category: " + categories);
}

@KafkaHandler(isDefault = true)
public void unknown(Category category) {
System.out.println("Received unknown: " + category);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.hoangtien2k3.shopappbackend.components.converters;

import com.hoangtien2k3.shopappbackend.models.Category;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;

import java.util.Collections;

public class CategoryMessageConverter extends JsonMessageConverter {
public CategoryMessageConverter() {
super();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.hoangtien2k3.shopappbackend");
typeMapper.setIdClassMapping(Collections.singletonMap("category", Category.class));
this.setTypeMapper(typeMapper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.hoangtien2k3.shopappbackend.configurations;

import com.hoangtien2k3.shopappbackend.utils.Const;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
@EnableKafka
public class KafkaCofiguration {

@Bean
public CommonErrorHandler errorHandler(KafkaOperations<Object, Object> kafkaOperations) {
return new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaOperations), new FixedBackOff(1000L, 2)
);
}

@Bean
public NewTopic insertACategoryTopic() {
return new NewTopic(Const.KAFKA_TOPIC_INSERT_CATEGORY, 1, (short) 1);
}

@Bean
public NewTopic getAllCategoryTopic() {
return new NewTopic(Const.KAFKA_TOPIC_GET_ALL_CATEGORY, 1, (short) 1);
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
package com.hoangtien2k3.shopappbackend.controllers;

import com.hoangtien2k3.shopappbackend.components.TranslateMessages;
import com.hoangtien2k3.shopappbackend.components.converters.CategoryMessageConverter;
import com.hoangtien2k3.shopappbackend.dtos.CategoryDTO;
import com.hoangtien2k3.shopappbackend.models.Category;
import com.hoangtien2k3.shopappbackend.responses.ApiResponse;
import com.hoangtien2k3.shopappbackend.services.CategoryService;
import com.hoangtien2k3.shopappbackend.utils.Const;
import com.hoangtien2k3.shopappbackend.utils.MessageKeys;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.BindingResult;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.annotation.*;

import java.util.List;

//@Validated
// dependency injection
@RestController
@RequestMapping("${api.prefix}/categories")
@RequiredArgsConstructor
public class CategoryController extends TranslateMessages {

private final CategoryService categoryService;
private final KafkaTemplate<String, Object> kafkaTemplate;

@PreAuthorize("hasRole('ROLE_ADMIN')")
@PostMapping("")
Expand All @@ -43,35 +45,43 @@ public ResponseEntity<?> createCategory(@RequestBody @Valid CategoryDTO category
.toList()).build()
);
}
Category newCategory = categoryService.createCategory(categoryDTO);

// send kafka category
this.kafkaTemplate.send(Const.KAFKA_TOPIC_INSERT_CATEGORY, newCategory); // producer
this.kafkaTemplate.setMessageConverter(new CategoryMessageConverter());

categoryService.createCategory(categoryDTO);
return ResponseEntity.ok(ApiResponse.builder().success(true)
.message(translate(MessageKeys.CREATE_CATEGORIES_SUCCESS)).build());
.message(translate(MessageKeys.CREATE_CATEGORIES_SUCCESS))
.payload(newCategory)
.build());
} catch (Exception e) {
return ResponseEntity.ok(ApiResponse.builder()
.error(e.getMessage())
.message(translate(MessageKeys.CREATE_CATEGORIES_FAILED)).build());
}
}

@PreAuthorize("hasRole('ROLE_ADMIN') OR hasRole('ROLE_USER')")
// ai cũng có thể lấy ra danh sách các danh mục sản phẩm
@GetMapping("")
public ResponseEntity<?> getAllCategories() {
public ResponseEntity<ApiResponse<?>> getAllCategories() {
List<Category> categories = categoryService.getAllCategories();
/*kafka get all category*/
this.kafkaTemplate.send(Const.KAFKA_TOPIC_GET_ALL_CATEGORY, categories);
return ResponseEntity.ok(ApiResponse.builder()
.success(true)
.payload(categories)
.success(true)
.payload(categories)
.build());
}

@PreAuthorize("hasRole('ROLE_ADMIN')")
@PutMapping("/{id}")
public ResponseEntity<?> updateCategories(@PathVariable("id") Long id,
@RequestBody CategoryDTO categoryDTO) {
@RequestBody CategoryDTO categoryDTO) {
Category category = categoryService.updateCategory(id, categoryDTO);
return ResponseEntity.ok(ApiResponse.builder().success(true)
.message(translate(MessageKeys.UPDATE_CATEGORIES_SUCCESS))
.payload(category)
.message(translate(MessageKeys.UPDATE_CATEGORIES_SUCCESS))
.payload(category)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ public class Const {
public static final String MINIMUN_AMOUNT = "minimum_amount";
public static final String APPLICATION_DATE = "applicable_date";
public static final String BETWEEN = "BETWEEN";

// kafka
public static final String KAFKA_ID = "groupA";
public static final String KAFKA_TOPIC_INSERT_CATEGORY = "insert-a-category";
public static final String KAFKA_TOPIC_GET_ALL_CATEGORY = "get-all-categories";
}
Loading

0 comments on commit 453be61

Please sign in to comment.