From 74bd42f5236ea70b25821a40e89d443d34576ce9 Mon Sep 17 00:00:00 2001 From: vincent-gao Date: Fri, 2 Aug 2024 10:07:33 +1000 Subject: [PATCH] Add a `multi_value_processor` plugin to the `tide_data_pipeline` module to handle multiple values. (#85) * adds mutiple_value_processor plugin for datapipe * Fixes a typo issue * test * add a custom Destination sdp_elasticsearch --- .circleci/code_coverage.sh | 7 + .github/workflows/build.yml | 2 +- .../TideElasticSearchDestination.php | 85 ++++++++++++ .../DatasetTransform/MultiValueProcessor.php | 79 +++++++++++ ...ide_data_pipelines_test.data_pipelines.yml | 57 ++++++++ .../tide_data_pipelines_test.info.yml | 10 ++ .../Transform/TideSearchTransformTest.php | 124 ++++++++++++++++++ ...test-pipeline-multiple_value_processor.csv | 3 + 8 files changed, 366 insertions(+), 1 deletion(-) create mode 100755 .circleci/code_coverage.sh create mode 100644 modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php create mode 100644 modules/tide_data_pipeline/src/Plugin/DatasetTransform/MultiValueProcessor.php create mode 100644 modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.data_pipelines.yml create mode 100644 modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.info.yml create mode 100644 modules/tide_data_pipeline/tests/src/Kernel/Transform/TideSearchTransformTest.php create mode 100644 modules/tide_data_pipeline/tests/src/fixtures/test-pipeline-multiple_value_processor.csv diff --git a/.circleci/code_coverage.sh b/.circleci/code_coverage.sh new file mode 100755 index 0000000..d563495 --- /dev/null +++ b/.circleci/code_coverage.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +## +# Generate coverage report. +# +set -e +echo "==> Generate code coverage report" +ahoy cli "phpdbg -qrr vendor/bin/phpunit ./dpc-sdp --coverage-html /app/coverage-report" diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 45eee5e..18d6875 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,7 +1,7 @@ name: build on: push - + jobs: set_status_in_progress: name: set_status_in_progress diff --git a/modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php b/modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php new file mode 100644 index 0000000..c9df569 --- /dev/null +++ b/modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php @@ -0,0 +1,85 @@ + '', + ] + parent::defaultConfiguration(); + } + + /** + * {@inheritdoc} + */ + public function buildConfigurationForm(array $form, FormStateInterface $form_state): array { + $form = parent::buildConfigurationForm($form, $form_state); + $form['hash_prefix'] = [ + '#type' => 'textfield', + '#title' => $this->t('Index Hash Prefix'), + '#description' => $this->t('The index hash prefix to use.'), + '#default_value' => $this->configuration['hash_prefix'], + ]; + return $form; + } + + /** + * {@inheritdoc} + */ + public function processCleanup(DatasetInterface $dataset, array $invalidDeltas): bool { + $full_index_id = $this->getFullIndexId($dataset->getMachineName()); + try { + $bulk = ['body' => []]; + foreach ($invalidDeltas as $delta) { + $bulk['body'][] = [ + 'delete' => [ + '_index' => $full_index_id, + '_id' => $dataset->getMachineName() . ':' . $delta, + ], + ]; + } + if (count($bulk['body']) > 0) { + $this->getClient()->bulk($bulk); + } + return TRUE; + } + catch (\Exception $e) { + $this->logger->error("The invalid dataset data could not be purged due to @message", [ + '@message' => $e->getMessage(), + ]); + } + return FALSE; + } + + /** + * Returns the actual index id. + */ + protected function getFullIndexId(string $machineName): string { + $hashPrefix = $this->configuration['hash_prefix'] ?? ''; + $prefix = $this->configuration['prefix'] ?? ''; + + if ($hashPrefix && $prefix) { + return "{$hashPrefix}--{$prefix}{$machineName}"; + } + + return ($prefix ?: '') . $machineName; + } + +} diff --git a/modules/tide_data_pipeline/src/Plugin/DatasetTransform/MultiValueProcessor.php b/modules/tide_data_pipeline/src/Plugin/DatasetTransform/MultiValueProcessor.php new file mode 100644 index 0000000..1bfba5e --- /dev/null +++ b/modules/tide_data_pipeline/src/Plugin/DatasetTransform/MultiValueProcessor.php @@ -0,0 +1,79 @@ + '', + 'callback' => NULL, + 'parameters' => [], + // Default to first argument. + 'value_position' => 0, + ]; + } + + /** + * {@inheritdoc} + */ + protected function doTransformField(string $field_name, DatasetData $record): DatasetData { + $record = parent::doTransformRecord($record); + if ($record->offsetExists($field_name) && !empty($record[$field_name])) { + $separator = $this->configuration['separator']; + $callback = $this->configuration['callback']; + $parameters = $this->configuration['parameters']; + $value_position = $this->configuration['value_position']; + $parts = explode($separator, $record[$field_name]); + $cleaned_parts = array_values(array_filter(array_map('trim', $parts), function ($part) { + return $part !== ''; + })); + + // Process the parts if a callback is provided. + if (is_callable($callback)) { + $cleaned_parts = array_map(function ($value) use ($callback, $parameters, $value_position) { + $typed_parameters = array_map([$this, 'convertParameter'], $parameters); + $args = $typed_parameters; + array_splice($args, $value_position, 0, [$value]); + return call_user_func_array($callback, $args); + }, $cleaned_parts); + } + $record[$field_name] = $cleaned_parts; + } + return $record; + } + + /** + * Converts a parameter to its appropriate type. + */ + private function convertParameter($parameter) { + if (is_numeric($parameter)) { + return $parameter + 0; + } + if ($parameter === 'true' || $parameter === 'false') { + return $parameter === 'true'; + } + if (is_string($parameter) && defined($parameter)) { + return constant($parameter); + } + return $parameter; + } + +} diff --git a/modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.data_pipelines.yml b/modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.data_pipelines.yml new file mode 100644 index 0000000..e6cd821 --- /dev/null +++ b/modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.data_pipelines.yml @@ -0,0 +1,57 @@ +pipeline_with_multi_value_processor_transform: + label: 'Multiple Value Processor transform' + transforms: + field: + Suburbs: + - plugin: multi_value_processor + separator: ';' + callback: str_pad + parameters: + - 10 + - '0' + - STR_PAD_LEFT + +pipeline_with_strtoupper: + label: 'Multiple Value Processor with strtoupper' + transforms: + field: + Suburbs: + - plugin: multi_value_processor + separator: ';' + callback: strtoupper + +pipeline_with_mb_convert_case: + label: 'Multiple Value Processor with mb_convert_case' + transforms: + field: + Suburbs: + - plugin: multi_value_processor + separator: ';' + callback: mb_convert_case + parameters: + - 'MB_CASE_UPPER' + +pipeline_with_substr: + label: 'Multiple Value Processor with substr' + transforms: + field: + Suburbs: + - plugin: multi_value_processor + separator: ';' + callback: substr + parameters: + - '0' + - 'true' + +pipeline_with_str_replace: + label: 'Multiple Value Processor with str_replace' + transforms: + field: + Suburbs: + - plugin: multi_value_processor + separator: ';' + callback: str_replace + parameters: + - 'a' + - 'A' + value_position: 2 diff --git a/modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.info.yml b/modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.info.yml new file mode 100644 index 0000000..f7e218b --- /dev/null +++ b/modules/tide_data_pipeline/tests/modules/tide_data_pipelines_test/tide_data_pipelines_test.info.yml @@ -0,0 +1,10 @@ +name: Data Pipelines - Test Pipelines +package: Testing +description: 'Provides tests data set plugins' +core_version_requirement: ^10.1 +php: 8.0 +type: module +dependencies: + - data_pipelines:data_pipelines + - data_pipelines_test:data_pipelines_test + - tide_data_pipeline:tide_data_pipeline diff --git a/modules/tide_data_pipeline/tests/src/Kernel/Transform/TideSearchTransformTest.php b/modules/tide_data_pipeline/tests/src/Kernel/Transform/TideSearchTransformTest.php new file mode 100644 index 0000000..165177f --- /dev/null +++ b/modules/tide_data_pipeline/tests/src/Kernel/Transform/TideSearchTransformTest.php @@ -0,0 +1,124 @@ +getTestFile(dirname(__DIR__, 2) . '/fixtures/test-pipeline-multiple_value_processor.csv'); + $dataset = Dataset::create([ + 'source' => 'csv:file', + 'name' => $this->randomMachineName(), + 'machine_name' => mb_strtolower($this->randomMachineName()), + 'pipeline' => 'pipeline_with_multi_value_processor_transform', + 'csv_file' => $file, + ]); + $data = iterator_to_array($dataset->getDataIterator()); + $this->assertCount(2, $data); + $this->assertEquals(['0Dandenong', 'Dandenong North'], $data[0]['Suburbs']); + $this->assertEquals(['00000Boneo', '000Outtrim'], $data[1]['Suburbs']); + } + + /** + * Test multiple_value_processor transform with strtoupper callback. + */ + public function testMultipleValueProcessorTransformWithStrtoupper(): void { + $file = $this->getTestFile(dirname(__DIR__, 2) . '/fixtures/test-pipeline-multiple_value_processor.csv'); + $dataset = Dataset::create([ + 'source' => 'csv:file', + 'name' => $this->randomMachineName(), + 'machine_name' => mb_strtolower($this->randomMachineName()), + 'pipeline' => 'pipeline_with_strtoupper', + 'csv_file' => $file, + ]); + $data = iterator_to_array($dataset->getDataIterator()); + $this->assertCount(2, $data); + $this->assertEquals(['DANDENONG', 'DANDENONG NORTH'], $data[0]['Suburbs']); + $this->assertEquals(['BONEO', 'OUTTRIM'], $data[1]['Suburbs']); + } + + /** + * Test multiple_value_processor transform with mb_convert_case. + */ + public function testMultipleValueProcessorTransformWithConvertCase(): void { + $file = $this->getTestFile(dirname(__DIR__, 2) . '/fixtures/test-pipeline-multiple_value_processor.csv'); + $dataset = Dataset::create([ + 'source' => 'csv:file', + 'name' => $this->randomMachineName(), + 'machine_name' => mb_strtolower($this->randomMachineName()), + 'pipeline' => 'pipeline_with_mb_convert_case', + 'csv_file' => $file, + ]); + $data = iterator_to_array($dataset->getDataIterator()); + $this->assertCount(2, $data); + $this->assertEquals(['DANDENONG', 'DANDENONG NORTH'], $data[0]['Suburbs']); + $this->assertEquals(['BONEO', 'OUTTRIM'], $data[1]['Suburbs']); + } + + /** + * Test multiple_value_processor transform with substr. + */ + public function testMultipleValueProcessorTransformWithSubstr(): void { + $file = $this->getTestFile(dirname(__DIR__, 2) . '/fixtures/test-pipeline-multiple_value_processor.csv'); + $dataset = Dataset::create([ + 'source' => 'csv:file', + 'name' => $this->randomMachineName(), + 'machine_name' => mb_strtolower($this->randomMachineName()), + 'pipeline' => 'pipeline_with_substr', + 'csv_file' => $file, + ]); + $data = iterator_to_array($dataset->getDataIterator()); + $this->assertCount(2, $data); + $this->assertEquals(['D', 'D'], $data[0]['Suburbs']); + $this->assertEquals(['B', 'O'], $data[1]['Suburbs']); + } + + /** + * Test multiple_value_processor transform with replace. + */ + public function testMultipleValueProcessorTransformWithReplace(): void { + $file = $this->getTestFile(dirname(__DIR__, 2) . '/fixtures/test-pipeline-multiple_value_processor.csv'); + $dataset = Dataset::create([ + 'source' => 'csv:file', + 'name' => $this->randomMachineName(), + 'machine_name' => mb_strtolower($this->randomMachineName()), + 'pipeline' => 'pipeline_with_str_replace', + 'csv_file' => $file, + ]); + $data = iterator_to_array($dataset->getDataIterator()); + $this->assertCount(2, $data); + $this->assertEquals(['DAndenong', 'DAndenong North'], $data[0]['Suburbs']); + $this->assertEquals(['Boneo', 'Outtrim'], $data[1]['Suburbs']); + } + +} diff --git a/modules/tide_data_pipeline/tests/src/fixtures/test-pipeline-multiple_value_processor.csv b/modules/tide_data_pipeline/tests/src/fixtures/test-pipeline-multiple_value_processor.csv new file mode 100644 index 0000000..18ed7e7 --- /dev/null +++ b/modules/tide_data_pipeline/tests/src/fixtures/test-pipeline-multiple_value_processor.csv @@ -0,0 +1,3 @@ +Suburbs +Dandenong; Dandenong North +Boneo;Outtrim