Skip to content

Commit

Permalink
Sunbird Obsrv opensource release 2.0.0-GA (#13)
Browse files Browse the repository at this point in the history
* feat: add connector config and connector stats update functions

* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs

* feat: added descriptions for default configurations

* feat: added descriptions for default configurations

* feat: modified kafka connector input topic

* feat: obsrv setup instructions

* feat: revisiting open source features

* feat: masterdata processor job config

* Build deploy v2 (#19)

* #0 - Refactor Dockerfile and Github actions workflow
---------

Co-authored-by: Santhosh Vasabhaktula <[email protected]>
Co-authored-by: ManojCKrishna <[email protected]>

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* feat: update all failed, invalid and duplicate topic names

* feat: update kafka topic names in test cases

* #0 fix: add individual extraction

* feat: update failed event

* Update ErrorConstants.scala

* feat: update failed event

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* feat: add exception handling for json deserialization

* Update BaseProcessFunction.scala

* Update BaseProcessFunction.scala

* feat: update batch failed event generation

* Update ExtractionFunction.scala

* feat: update invalid json exception handling

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 fix: remove cloning object

* Issue #46 feat: update batch failed event

* #0 fix: update github actions release condition

* Update DatasetModels.scala

* Issue #46 feat: add error reasons

* Release 1.3.0 into Main branch (#34)

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* #0 fix: add individual extraction

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* #0 fix: update github actions release condition

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>

* Update DatasetModels.scala

* Issue #2 feat: Remove kafka connector code

* Issue #46 feat: add exception stack trace

* Issue #46 feat: add exception stack trace

* feat: add function to get all datasets

* Release 1.3.1 Changes (#42)

* Dataset enhancements (#38)

* feat: add connector config and connector stats update functions
* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs
* Update DatasetModels.scala
* #0 fix: upgrade packages
* #0 feat: add flink dockerfiles
* #0 fix: add individual extraction

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>

* #0000 [SV] - Fallback to local redis instance if embedded redis is not starting

* Update DatasetModels.scala

* #0000 - refactor the denormalization logic
1. Do not fail the denormalization if the denorm key is missing
2. Add clear message whether the denorm is sucessful or failed or partially successful
3. Handle denorm for both text and number fields

* #0000 - refactor:
1. Created a enum for dataset status and ignore events if the dataset is not in Live status
2. Created a outputtag for denorm failed stats
3. Parse event validation failed messages into a case class

* #0000 - refactor:
1. Updated the DruidRouter job to publish data to router topics dynamically
2. Updated framework to created dynamicKafkaSink object

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Added validation to check if the event has a timestamp key and it is not blank nor invalid
2. Added timezone handling to store the data in druid in the TZ specified by the dataset


* #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig

* #0000 - mega refactoring: Refactored logs, error messages and metrics

* #0000 - mega refactoring: Fix unit tests

* #0000 - refactoring:
1. Introduced transformation mode to enable lenient transformations
2. Proper exception handling for transformer job

* #0000 - refactoring: Fix test cases and code

* #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2

* #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Framework test cases and bug fixes

* #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: improve code coverage and fix bugs

* #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100%

* #0000 - refactoring: organize imports

* #0000 - refactoring:
1. transformer test cases and bug fixes - code coverage is 100%

* #0000 - refactoring: test cases and bug fixes

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>

* #000:feat: Removed the provided scope of the kafka-client in the framework (#40)

* #0000 - feat: Add dataset-type to system events (#41)

* #0000 - feat: Add dataset-type to system events

* #0000 - feat: Modify tests for dataset-type in system events

* #0000 - feat: Remove unused getDatasetType function

* #0000 - feat: Remove unused pom test dependencies

* #0000 - feat: Remove unused pom test dependencies

---------

Co-authored-by: Santhosh <[email protected]>
Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>

* Main conflicts fixes (#44)

* feat: add connector config and connector stats update functions

* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs

* Update DatasetModels.scala

* Release 1.3.0 into Main branch (#34)

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* #0 fix: add individual extraction

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* #0 fix: update github actions release condition

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>

* Update DatasetModels.scala

* Issue #2 feat: Remove kafka connector code

* feat: add function to get all datasets

* #000:feat: Resolve conflicts

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Santhosh <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>

* Release 1.3.1 into Main (#43)

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* feat: update all failed, invalid and duplicate topic names

* feat: update kafka topic names in test cases

* #0 fix: add individual extraction

* feat: update failed event

* Update ErrorConstants.scala

* feat: update failed event

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* feat: add exception handling for json deserialization

* Update BaseProcessFunction.scala

* Update BaseProcessFunction.scala

* feat: update batch failed event generation

* Update ExtractionFunction.scala

* feat: update invalid json exception handling

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 fix: remove cloning object

* Issue #46 feat: update batch failed event

* #0 fix: update github actions release condition

* Issue #46 feat: add error reasons

* Issue #46 feat: add exception stack trace

* Issue #46 feat: add exception stack trace

* Release 1.3.1 Changes (#42)

* Dataset enhancements (#38)

* feat: add connector config and connector stats update functions
* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs
* Update DatasetModels.scala
* #0 fix: upgrade packages
* #0 feat: add flink dockerfiles
* #0 fix: add individual extraction

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>

* #0000 [SV] - Fallback to local redis instance if embedded redis is not starting

* Update DatasetModels.scala

* #0000 - refactor the denormalization logic
1. Do not fail the denormalization if the denorm key is missing
2. Add clear message whether the denorm is sucessful or failed or partially successful
3. Handle denorm for both text and number fields

* #0000 - refactor:
1. Created a enum for dataset status and ignore events if the dataset is not in Live status
2. Created a outputtag for denorm failed stats
3. Parse event validation failed messages into a case class

* #0000 - refactor:
1. Updated the DruidRouter job to publish data to router topics dynamically
2. Updated framework to created dynamicKafkaSink object

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Added validation to check if the event has a timestamp key and it is not blank nor invalid
2. Added timezone handling to store the data in druid in the TZ specified by the dataset


* #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig

* #0000 - mega refactoring: Refactored logs, error messages and metrics

* #0000 - mega refactoring: Fix unit tests

* #0000 - refactoring:
1. Introduced transformation mode to enable lenient transformations
2. Proper exception handling for transformer job

* #0000 - refactoring: Fix test cases and code

* #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2

* #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Framework test cases and bug fixes

* #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: improve code coverage and fix bugs

* #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100%

* #0000 - refactoring: organize imports

* #0000 - refactoring:
1. transformer test cases and bug fixes - code coverage is 100%

* #0000 - refactoring: test cases and bug fixes

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>

* #000:feat: Removed the provided scope of the kafka-client in the framework (#40)

* #0000 - feat: Add dataset-type to system events (#41)

* #0000 - feat: Add dataset-type to system events

* #0000 - feat: Modify tests for dataset-type in system events

* #0000 - feat: Remove unused getDatasetType function

* #0000 - feat: Remove unused pom test dependencies

* #0000 - feat: Remove unused pom test dependencies

---------

Co-authored-by: Santhosh <[email protected]>
Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>

* Main conflicts fixes (#44)

* feat: add connector config and connector stats update functions

* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs

* Update DatasetModels.scala

* Release 1.3.0 into Main branch (#34)

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* #0 fix: add individual extraction

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* #0 fix: update github actions release condition

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>

* Update DatasetModels.scala

* Issue #2 feat: Remove kafka connector code

* feat: add function to get all datasets

* #000:feat: Resolve conflicts

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Santhosh <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Santhosh <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>

* update workflow file to skip tests (#45)

* Release 1.3.1 into Main (#49)

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* feat: update all failed, invalid and duplicate topic names

* feat: update kafka topic names in test cases

* #0 fix: add individual extraction

* feat: update failed event

* Update ErrorConstants.scala

* feat: update failed event

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* feat: add exception handling for json deserialization

* Update BaseProcessFunction.scala

* Update BaseProcessFunction.scala

* feat: update batch failed event generation

* Update ExtractionFunction.scala

* feat: update invalid json exception handling

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 fix: remove cloning object

* Issue #46 feat: update batch failed event

* #0 fix: update github actions release condition

* Issue #46 feat: add error reasons

* Issue #46 feat: add exception stack trace

* Issue #46 feat: add exception stack trace

* Release 1.3.1 Changes (#42)

* Dataset enhancements (#38)

* feat: add connector config and connector stats update functions
* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs
* Update DatasetModels.scala
* #0 fix: upgrade packages
* #0 feat: add flink dockerfiles
* #0 fix: add individual extraction

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>

* #0000 [SV] - Fallback to local redis instance if embedded redis is not starting

* Update DatasetModels.scala

* #0000 - refactor the denormalization logic
1. Do not fail the denormalization if the denorm key is missing
2. Add clear message whether the denorm is sucessful or failed or partially successful
3. Handle denorm for both text and number fields

* #0000 - refactor:
1. Created a enum for dataset status and ignore events if the dataset is not in Live status
2. Created a outputtag for denorm failed stats
3. Parse event validation failed messages into a case class

* #0000 - refactor:
1. Updated the DruidRouter job to publish data to router topics dynamically
2. Updated framework to created dynamicKafkaSink object

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Added validation to check if the event has a timestamp key and it is not blank nor invalid
2. Added timezone handling to store the data in druid in the TZ specified by the dataset


* #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig

* #0000 - mega refactoring: Refactored logs, error messages and metrics

* #0000 - mega refactoring: Fix unit tests

* #0000 - refactoring:
1. Introduced transformation mode to enable lenient transformations
2. Proper exception handling for transformer job

* #0000 - refactoring: Fix test cases and code

* #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2

* #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Framework test cases and bug fixes

* #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: improve code coverage and fix bugs

* #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100%

* #0000 - refactoring: organize imports

* #0000 - refactoring:
1. transformer test cases and bug fixes - code coverage is 100%

* #0000 - refactoring: test cases and bug fixes

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>

* #000:feat: Removed the provided scope of the kafka-client in the framework (#40)

* #0000 - feat: Add dataset-type to system events (#41)

* #0000 - feat: Add dataset-type to system events

* #0000 - feat: Modify tests for dataset-type in system events

* #0000 - feat: Remove unused getDatasetType function

* #0000 - feat: Remove unused pom test dependencies

* #0000 - feat: Remove unused pom test dependencies

---------

Co-authored-by: Santhosh <[email protected]>
Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>

* Main conflicts fixes (#44)

* feat: add connector config and connector stats update functions

* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs

* Update DatasetModels.scala

* Release 1.3.0 into Main branch (#34)

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* #0 fix: add individual extraction

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* #0 fix: update github actions release condition

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>

* Update DatasetModels.scala

* Issue #2 feat: Remove kafka connector code

* feat: add function to get all datasets

* #000:feat: Resolve conflicts

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Santhosh <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>

* #0000 - fix: Fix null dataset_type in DruidRouterFunction (#48)

---------

Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Santhosh <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>

* Develop to Release-1.0.0-GA (#52) (#53)

* testing new images

* testing new images

* testing new images

* testing new images

* testing new images

* build new image with bug fixes

* update dockerfile

* update dockerfile

* #0 fix: upgrade packages

* #0 feat: add flink dockerfiles

* feat: update all failed, invalid and duplicate topic names

* feat: update kafka topic names in test cases

* #0 fix: add individual extraction

* feat: update failed event

* Update ErrorConstants.scala

* feat: update failed event

* Issue #0 fix: upgrade ubuntu packages for vulnerabilities

* feat: add exception handling for json deserialization

* Update BaseProcessFunction.scala

* Update BaseProcessFunction.scala

* feat: update batch failed event generation

* Update ExtractionFunction.scala

* feat: update invalid json exception handling

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 feat: update batch failed event

* Issue #46 fix: remove cloning object

* Issue #46 feat: update batch failed event

* #0 fix: update github actions release condition

* Issue #46 feat: add error reasons

* Issue #46 feat: add exception stack trace

* Issue #46 feat: add exception stack trace

* Dataset enhancements (#38)

* feat: add connector config and connector stats update functions
* Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs
* Update DatasetModels.scala
* #0 fix: upgrade packages
* #0 feat: add flink dockerfiles
* #0 fix: add individual extraction

---------





* #0000 [SV] - Fallback to local redis instance if embedded redis is not starting

* Update DatasetModels.scala

* #0000 - refactor the denormalization logic
1. Do not fail the denormalization if the denorm key is missing
2. Add clear message whether the denorm is sucessful or failed or partially successful
3. Handle denorm for both text and number fields

* #0000 - refactor:
1. Created a enum for dataset status and ignore events if the dataset is not in Live status
2. Created a outputtag for denorm failed stats
3. Parse event validation failed messages into a case class

* #0000 - refactor:
1. Updated the DruidRouter job to publish data to router topics dynamically
2. Updated framework to created dynamicKafkaSink object

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres
2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures
3. Refactored serde - merged map and string serialization into one function and parameterized the function
4. Moved failed events sinking into a common base class
5. Master dataset processor can now do denormalization with another master dataset as well

* #0000 - mega refactoring:
1. Added validation to check if the event has a timestamp key and it is not blank nor invalid
2. Added timezone handling to store the data in druid in the TZ specified by the dataset


* #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig

* #0000 - mega refactoring: Refactored logs, error messages and metrics

* #0000 - mega refactoring: Fix unit tests

* #0000 - refactoring:
1. Introduced transformation mode to enable lenient transformations
2. Proper exception handling for transformer job

* #0000 - refactoring: Fix test cases and code

* #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2

* #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: Framework test cases and bug fixes

* #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now

* #0000 - refactoring: improve code coverage and fix bugs

* #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100%

* #0000 - refactoring: organize imports

* #0000 - refactoring:
1. transformer test cases and bug fixes - code coverage is 100%

* #0000 - refactoring: test cases and bug fixes

---------









* #000:feat: Removed the provided scope of the kafka-client in the framework (#40)

* #0000 - feat: Add dataset-type to system events (#41)

* #0000 - feat: Add dataset-type to system events

* #0000 - feat: Modify tests for dataset-type in system events

* #0000 - feat: Remove unused getDatasetType function

* #0000 - feat: Remove unused pom test dependencies

* #0000 - feat: Remove unused pom test dependencies

* #67 feat: query system configurations from meta store

* #67 fix: Refactor system configuration retrieval and update dynamic router function

* #67 fix: update system config according to review

* #67 fix: update test cases for system config

* #67 fix: update default values in test cases

* #67 fix: add get all system settings method and update test cases

* #67 fix: add test case for covering exception case

* #67 fix: fix data types in test cases

* #67 fix: Refactor event indexing in DynamicRouterFunction

* Issue #67 refactor: SystemConfig read from DB implementation

* #226 fix: update test cases according to the refactor

---------

Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Sowmya N Dixit <[email protected]>
Co-authored-by: Santhosh <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>

* #0 fix: Flink base image updates

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: GayathriSrividya <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: Manoj Krishna <[email protected]>
Co-authored-by: Santhosh Vasabhaktula <[email protected]>
Co-authored-by: ManojCKrishna <[email protected]>
Co-authored-by: ManojKrishnaChintaluri <[email protected]>
Co-authored-by: Praveen <[email protected]>
Co-authored-by: Anand Parthasarathy <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>
Co-authored-by: Manoj Krishna <[email protected]>
  • Loading branch information
13 people authored Dec 27, 2023
1 parent 454b0e1 commit 620a3fc
Show file tree
Hide file tree
Showing 115 changed files with 3,759 additions and 994 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
fetch-depth: 0
- name: Maven Build
run: |
mvn clean install
mvn clean install -DskipTests
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-dat

FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as kafka-connector-image
USER flink
COPY --from=build-pipeline /app/pipeline/kafka-connector/target/kafka-connector-1.0.0.jar $FLINK_HOME/lib
COPY --from=build-pipeline /app/pipeline/kafka-connector/target/kafka-connector-1.0.0.jar $FLINK_HOME/lib
13 changes: 12 additions & 1 deletion data-products/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<properties>
<spark.version>3.1.0</spark.version>
<scala.version>2.12.10</scala.version>
<scala.version>2.12.11</scala.version>
<scala.maj.version>2.12</scala.maj.version>
<scoverage.plugin.version>1.1.1</scoverage.plugin.version>
</properties>
Expand Down Expand Up @@ -225,6 +225,17 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.scoverage</groupId>
<artifactId>scoverage-maven-plugin</artifactId>
<version>${scoverage.plugin.version}</version>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<aggregate>true</aggregate>
<highlighting>true</highlighting>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object MasterDataProcessorIndexer {
val response = Unirest.post(config.getString("druid.indexer.url"))
.header("Content-Type", "application/json")
.body(ingestionSpec).asJson()
response.ifFailure(response => throw new Exception("Exception while submitting ingestion task"))
response.ifFailure(_ => throw new Exception("Exception while submitting ingestion task"))
}

private def updateDataSourceRef(datasource: DataSource, datasourceRef: String): Unit = {
Expand All @@ -100,7 +100,7 @@ object MasterDataProcessorIndexer {
val response = Unirest.delete(config.getString("druid.datasource.delete.url") + datasourceRef)
.header("Content-Type", "application/json")
.asJson()
response.ifFailure(response => throw new Exception("Exception while deleting datasource" + datasourceRef))
response.ifFailure(_ => throw new Exception("Exception while deleting datasource" + datasourceRef))
}

private def createDataFile(dataset: Dataset, timestamp: Long, outputFilePath: String, objectKey: String): String = {
Expand All @@ -115,7 +115,7 @@ object MasterDataProcessorIndexer {
val sc = new SparkContext(conf)

val readWriteConf = ReadWriteConfig(scanCount = 1000, maxPipelineSize = 1000)
val rdd = sc.fromRedisKV("*")(readWriteConfig = readWriteConf)
sc.fromRedisKV("*")(readWriteConfig = readWriteConf)
.map(f => JSONUtil.deserialize[mutable.Map[String, AnyRef]](f._2))
.map(f => f.put("syncts", timestamp.asInstanceOf[AnyRef]))
.map(f => JSONUtil.serialize(f))
Expand Down
4 changes: 2 additions & 2 deletions dataset-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>it.ozimov</groupId>
<groupId>com.github.codemonstur</groupId>
<artifactId>embedded-redis</artifactId>
<version>0.7.1</version>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
9 changes: 5 additions & 4 deletions dataset-registry/src/main/resources/dataset-registry.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ CREATE TABLE IF NOT EXISTS dataset_transformations (
id text PRIMARY KEY,
dataset_id text REFERENCES datasets (id),
field_key text NOT NULL,
transformation_function text NOT NULL,
transformation_function json NOT NULL,
status text NOT NULL,
mode text,
created_by text NOT NULL,
updated_by text NOT NULL,
created_date timestamp NOT NULL,
Expand All @@ -53,17 +54,17 @@ CREATE INDEX IF NOT EXISTS dataset_transformations_status ON dataset_transformat
CREATE INDEX IF NOT EXISTS dataset_transformations_dataset ON dataset_transformations(dataset_id);

CREATE TABLE IF NOT EXISTS dataset_source_config (
id SERIAL PRIMARY KEY,
id text PRIMARY KEY,
dataset_id text NOT NULL REFERENCES datasets (id),
connector_type text NOT NULL,
connector_config json NOT NULL,
connector_stats json NOT NULL,
connector_stats json,
status text NOT NULL,
created_by text NOT NULL,
updated_by text NOT NULL,
created_date timestamp NOT NULL,
updated_date timestamp NOT NULL,
UNIQUE(dataset_id)
UNIQUE(connector_type, dataset_id)
);
CREATE INDEX IF NOT EXISTS dataset_source_config_status ON dataset_source_config(status);
CREATE INDEX IF NOT EXISTS dataset_source_config_dataset ON dataset_source_config(dataset_id);
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
import org.sunbird.obsrv.core.model.SystemConfig
import org.sunbird.obsrv.model.DatasetStatus.DatasetStatus
import org.sunbird.obsrv.model.TransformMode.TransformMode
import org.sunbird.obsrv.model.ValidationMode.ValidationMode

import java.sql.Timestamp
import scala.beans.BeanProperty

object DatasetModels {
Expand All @@ -17,7 +20,7 @@ object DatasetModels {

case class DedupConfig(@JsonProperty("drop_duplicates") dropDuplicates: Option[Boolean] = Some(false),
@JsonProperty("dedup_key") dedupKey: Option[String],
@JsonProperty("dedup_period") dedupPeriod: Option[Integer] = Some(SystemConfig.defaultDedupPeriodInSeconds))
@JsonProperty("dedup_period") dedupPeriod: Option[Integer] = Some(SystemConfig.getInt("defaultDedupPeriodInSeconds", 604800)))

case class ValidationConfig(@JsonProperty("validate") validate: Option[Boolean] = Some(true),
@JsonProperty("mode") @JsonScalaEnumeration(classOf[ValidationModeType]) mode: Option[ValidationMode])
Expand All @@ -30,15 +33,16 @@ object DatasetModels {

case class RouterConfig(@JsonProperty("topic") topic: String)

case class DatasetConfig(@JsonProperty("data_key") key: String, @JsonProperty("timestamp_key") tsKey: String,
@JsonProperty("entry_topic") entryTopic: String, @JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None,
@JsonProperty("redis_db_host") redisDBHost: Option[String] = None, @JsonProperty("redis_db_port") redisDBPort: Option[Int] = None,
@JsonProperty("redis_db") redisDB: Option[Int] = None, @JsonProperty("index_data") indexData: Option[Boolean] = None)
case class DatasetConfig(@JsonProperty("data_key") key: String, @JsonProperty("timestamp_key") tsKey: String, @JsonProperty("entry_topic") entryTopic: String,
@JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None, @JsonProperty("redis_db_host") redisDBHost: Option[String] = None,
@JsonProperty("redis_db_port") redisDBPort: Option[Int] = None, @JsonProperty("redis_db") redisDB: Option[Int] = None,
@JsonProperty("index_data") indexData: Option[Boolean] = None, @JsonProperty("timestamp_format") tsFormat: Option[String] = None,
@JsonProperty("dataset_tz") datasetTimezone: Option[String] = None)

case class Dataset(@JsonProperty("id") id: String, @JsonProperty("type") datasetType: String , @JsonProperty("extraction_config") extractionConfig: Option[ExtractionConfig],
case class Dataset(@JsonProperty("id") id: String, @JsonProperty("type") datasetType: String, @JsonProperty("extraction_config") extractionConfig: Option[ExtractionConfig],
@JsonProperty("dedup_config") dedupConfig: Option[DedupConfig], @JsonProperty("validation_config") validationConfig: Option[ValidationConfig],
@JsonProperty("data_schema") jsonSchema: Option[String], @JsonProperty("denorm_config") denormConfig: Option[DenormConfig],
@JsonProperty("router_config") routerConfig: RouterConfig, datasetConfig: DatasetConfig, @JsonProperty("status") status: String,
@JsonProperty("router_config") routerConfig: RouterConfig, datasetConfig: DatasetConfig, @JsonProperty("status") @JsonScalaEnumeration(classOf[DatasetStatusType]) status: DatasetStatus,
@JsonProperty("tags") tags: Option[Array[String]] = None, @JsonProperty("data_version") dataVersion: Option[Int] = None)

case class Condition(@JsonProperty("type") `type`: String, @JsonProperty("expr") expr: String)
Expand All @@ -47,9 +51,9 @@ object DatasetModels {

case class DatasetTransformation(@JsonProperty("id") id: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("field_key") fieldKey: String, @JsonProperty("transformation_function") transformationFunction: TransformationFunction,
@JsonProperty("status") status: String)
@JsonProperty("status") status: String, @JsonProperty("mode") @JsonScalaEnumeration(classOf[TransformModeType]) mode: Option[TransformMode] = Some(TransformMode.Strict))

case class ConnectorConfig(@JsonProperty("kafkaBrokers") kafkaBrokers: String, @JsonProperty("topic") topic: String, @JsonProperty("type")databaseType: String,
case class ConnectorConfig(@JsonProperty("kafkaBrokers") kafkaBrokers: String, @JsonProperty("topic") topic: String, @JsonProperty("type") databaseType: String,
@JsonProperty("connection") connection: Connection, @JsonProperty("tableName") tableName: String, @JsonProperty("databaseName") databaseName: String,
@JsonProperty("pollingInterval") pollingInterval: PollingInterval, @JsonProperty("authenticationMechanism") authenticationMechanism: AuthenticationMechanism,
@JsonProperty("batchSize") batchSize: Int, @JsonProperty("timestampColumn") timestampColumn: String)
Expand All @@ -60,19 +64,34 @@ object DatasetModels {

case class AuthenticationMechanism(@JsonProperty("encrypted") encrypted: Boolean, @JsonProperty("encryptedValues") encryptedValues: String)

case class ConnectorStats(@JsonProperty("last_fetch_timestamp") lastFetchTimestamp: String, @JsonProperty("records") records: Long, @JsonProperty("avg_batch_read_time") avgBatchReadTime: Long, @JsonProperty("disconnections") disconnections: Int)
case class ConnectorStats(@JsonProperty("last_fetch_timestamp") lastFetchTimestamp: Timestamp, @JsonProperty("records") records: Long, @JsonProperty("avg_batch_read_time") avgBatchReadTime: Long, @JsonProperty("disconnections") disconnections: Int)

case class DatasetSourceConfig(@JsonProperty("id") id: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("connector_type") connectorType: String, @JsonProperty("connector_config") connectorConfig: ConnectorConfig,
@JsonProperty("connector_stats") connectorStats: ConnectorStats, @JsonProperty("status") status: String)
case class DataSource(@JsonProperty("datasource") datasource: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)
@JsonProperty("status") status: String, @JsonProperty("connector_stats") connectorStats: Option[ConnectorStats] = None)

case class DataSource(@JsonProperty("id") id: String, @JsonProperty("datasource") datasource: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)

}

class ValidationModeType extends TypeReference[ValidationMode.type]

object ValidationMode extends Enumeration {
type ValidationMode = Value
val Strict, IgnoreNewFields, DiscardNewFields = Value
}

class TransformModeType extends TypeReference[TransformMode.type]

object TransformMode extends Enumeration {
type TransformMode = Value
val Strict, Lenient = Value
}

class DatasetStatusType extends TypeReference[DatasetStatus.type]

object DatasetStatus extends Enumeration {
type DatasetStatus = Value
val Draft, Publish, Live, Retired, Purged = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,56 +4,62 @@ import org.sunbird.obsrv.model.DatasetModels.{DataSource, Dataset, DatasetSource
import org.sunbird.obsrv.service.DatasetRegistryService

import java.sql.Timestamp
import scala.collection.mutable

object DatasetRegistry {

private val datasets: Map[String, Dataset] = DatasetRegistryService.readAllDatasets()
private val datasets: mutable.Map[String, Dataset] = mutable.Map[String, Dataset]()
datasets ++= DatasetRegistryService.readAllDatasets()
private val datasetTransformations: Map[String, List[DatasetTransformation]] = DatasetRegistryService.readAllDatasetTransformations()
private val datasetSourceConfig: Option[List[DatasetSourceConfig]] = DatasetRegistryService.readAllDatasetSourceConfig()
private val datasources: Map[String, List[DataSource]] = DatasetRegistryService.readAllDatasources()

def getAllDatasets(datasetType: String): List[Dataset] = {
datasets.filter(f => f._2.datasetType.equals(datasetType)).values.toList
val datasetList = DatasetRegistryService.readAllDatasets()
datasetList.filter(f => f._2.datasetType.equals(datasetType)).values.toList
}

def getDataset(id: String): Option[Dataset] = {
datasets.get(id)
val datasetFromCache = datasets.get(id)
if (datasetFromCache.isDefined) datasetFromCache else {
val dataset = DatasetRegistryService.readDataset(id)
if (dataset.isDefined) datasets.put(dataset.get.id, dataset.get)
dataset
}
}

def getDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = {
datasetSourceConfig
def getAllDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = {
DatasetRegistryService.readAllDatasetSourceConfig()
}

def getDatasetSourceConfigById(datasetId: String): DatasetSourceConfig = {
datasetSourceConfig.map(configList => configList.filter(_.datasetId.equalsIgnoreCase(datasetId))).get.head
def getDatasetSourceConfigById(datasetId: String): Option[List[DatasetSourceConfig]] = {
DatasetRegistryService.readDatasetSourceConfig(datasetId)
}

def getDatasetTransformations(id: String): Option[List[DatasetTransformation]] = {
datasetTransformations.get(id)
def getDatasetTransformations(datasetId: String): Option[List[DatasetTransformation]] = {
datasetTransformations.get(datasetId)
}

def getDatasources(datasetId: String): Option[List[DataSource]] = {
datasources.get(datasetId)
DatasetRegistryService.readDatasources(datasetId)
}

def getDataSetIds(datasetType: String): List[String] = {
datasets.filter(f => f._2.datasetType.equals(datasetType)).keySet.toList
}

def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Unit = {
def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Int = {
DatasetRegistryService.updateDatasourceRef(datasource, datasourceRef)
}

def updateConnectorStats(datasetId: String, lastFetchTimestamp: Timestamp, records: Long): Unit = {
DatasetRegistryService.updateConnectorStats(datasetId, lastFetchTimestamp, records)
def updateConnectorStats(id: String, lastFetchTimestamp: Timestamp, records: Long): Int = {
DatasetRegistryService.updateConnectorStats(id, lastFetchTimestamp, records)
}

def updateConnectorDisconnections(datasetId: String, disconnections: Int): Unit = {
DatasetRegistryService.updateConnectorDisconnections(datasetId, disconnections)
def updateConnectorDisconnections(id: String, disconnections: Int): Int = {
DatasetRegistryService.updateConnectorDisconnections(id, disconnections)
}

def updateConnectorAvgBatchReadTime(datasetId: String, avgReadTime: Long): Unit = {
DatasetRegistryService.updateConnectorAvgBatchReadTime(datasetId, avgReadTime)
def updateConnectorAvgBatchReadTime(id: String, avgReadTime: Long): Int = {
DatasetRegistryService.updateConnectorAvgBatchReadTime(id, avgReadTime)
}

}
Loading

0 comments on commit 620a3fc

Please sign in to comment.