-
Notifications
You must be signed in to change notification settings - Fork 417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1149 - [Search][Recommendation] Introduce Kafka Stream to aggregate … #1233
base: main
Are you sure you want to change the base?
Conversation
Recommendation Coverage Report
|
9665f64
to
3145ba4
Compare
…all product data. -implement kafka stream application to capture all changes related to product
25d6d7f
to
f98e6d4
Compare
Quality Gate passed for 'yas'Issues Measures |
Hi @PhuocNguyenBa, could you help to add your kafka-ui image to docker-compose file. It supports us to interact with message better than the |
* @param <V> the type of values in the store | ||
* @return a Materialized configuration for the store | ||
*/ | ||
protected <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> createMaterializedStore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this one haven't enabled log compacted, could you help to confirm?
.join( | ||
nonNullProductCategoryTable, | ||
(productCategoryDetail, nonNullProductCategoryDetail) -> { | ||
if (productCategoryDetail.getId().equals(-1L)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using this approach to check for deleted cases doesn’t make sense. In the future, if there is master data with id: -1
, will it be identified as deleted?
We should define a flag in BaseMetaDataDto
to serve all cases.
recommendation/src/main/java/com/yas/recommendation/topology/ProductTopology.java
Show resolved
Hide resolved
KTable<Long, ProductCategoryDto> productCategoryTable = | ||
createProductCategoryTable(streamsBuilder); | ||
|
||
KTable<Long, ProductCategoryDto> nonNullProductCategoryTable = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain, why we need to create a mv to store value:null
records here?
|
||
@Override | ||
@Autowired | ||
protected void process(StreamsBuilder streamsBuilder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we haven't aggregated product image data into sink topic, could you help to check?
…all product data.
-implement kafka stream application to capture all changes related to product