From d76bd7f34f97fe92c067a909411ca91f891a9a1b Mon Sep 17 00:00:00 2001 From: Richard Kettelerij Date: Tue, 7 Jan 2025 16:14:46 +0100 Subject: [PATCH 1/2] fix: read CSV files once, not in loop for every record --- ...extend_values.go => subst_and_synonyms.go} | 47 +++++++++++-------- ...ues_test.go => subst_and_synonyms_test.go} | 11 ++--- internal/etl/transform/transform.go | 10 ++-- 3 files changed, 38 insertions(+), 30 deletions(-) rename internal/etl/transform/{extend_values.go => subst_and_synonyms.go} (73%) rename internal/etl/transform/{extend_values_test.go => subst_and_synonyms_test.go} (92%) diff --git a/internal/etl/transform/extend_values.go b/internal/etl/transform/subst_and_synonyms.go similarity index 73% rename from internal/etl/transform/extend_values.go rename to internal/etl/transform/subst_and_synonyms.go index 9ad0a86..9d44b29 100644 --- a/internal/etl/transform/extend_values.go +++ b/internal/etl/transform/subst_and_synonyms.go @@ -2,6 +2,7 @@ package transform import ( "encoding/csv" + "errors" "fmt" "os" "strings" @@ -9,38 +10,45 @@ import ( "github.com/PDOK/gomagpie/internal/engine/util" ) -func extendFieldValues(fieldValuesByName map[string]string, substitutionsFile, synonymsFile string) ([]map[string]string, error) { - substitutions, err := readCsvFile(substitutionsFile) - if err != nil { - return nil, err - } - synonyms, err := readCsvFile(synonymsFile) - if err != nil { - return nil, err - } +type SubstAndSynonyms struct { + substitutions map[string]string + synonyms map[string]string + synonymsInverse map[string]string +} + +func NewSubstAndSynonyms(substitutionsFile, synonymsFile string) (*SubstAndSynonyms, error) { + substitutions, substErr := readCsvFile(substitutionsFile) + synonyms, synErr := readCsvFile(synonymsFile) + return &SubstAndSynonyms{ + substitutions: substitutions, + synonyms: synonyms, + synonymsInverse: util.Inverse(synonyms), + }, errors.Join(substErr, synErr) +} +func (s SubstAndSynonyms) generate(fieldValuesByName map[string]string) []map[string]string { var fieldValuesByNameWithAllValues = make(map[string][]string) for key, value := range fieldValuesByName { valueLower := strings.ToLower(value) // Get all substitutions - substitutedValues := extendValues([]string{valueLower}, substitutions) + substitutedValues := extendValues([]string{valueLower}, s.substitutions) // Get all synonyms for these substituted values // -> one way - synonymsValuesOneWay := extendValues(substitutedValues, synonyms) + synonymsValuesOneWay := extendValues(substitutedValues, s.synonyms) // <- reverse way - allValues := extendValues(synonymsValuesOneWay, util.Inverse(synonyms)) + allValues := extendValues(synonymsValuesOneWay, s.synonymsInverse) // Create map with for each key a slice of []values fieldValuesByNameWithAllValues[key] = allValues } - return generateAllFieldValuesByName(fieldValuesByNameWithAllValues), err + return generateAllCombinations(fieldValuesByNameWithAllValues) } // Transform a map[string][]string into a []map[string]string using the cartesian product, i.e. // - both maps have the same keys // - values exist for all possible combinations -func generateAllFieldValuesByName(input map[string][]string) []map[string]string { - keys := []string{} - values := [][]string{} +func generateAllCombinations(input map[string][]string) []map[string]string { + var keys []string + var values [][]string for key, vals := range input { keys = append(keys, key) @@ -122,8 +130,6 @@ func uniqueSlice(s []string) []string { } func readCsvFile(filepath string) (map[string]string, error) { - substitutions := make(map[string]string) - file, err := os.Open(filepath) if err != nil { return nil, fmt.Errorf("failed to open CSV file: %w", err) @@ -136,8 +142,9 @@ func readCsvFile(filepath string) (map[string]string, error) { return nil, fmt.Errorf("failed to parse CSV file: %w", err) } + result := make(map[string]string) for _, row := range records { - substitutions[row[0]] = row[1] + result[row[0]] = row[1] } - return substitutions, nil + return result, nil } diff --git a/internal/etl/transform/extend_values_test.go b/internal/etl/transform/subst_and_synonyms_test.go similarity index 92% rename from internal/etl/transform/extend_values_test.go rename to internal/etl/transform/subst_and_synonyms_test.go index fa6c8df..fbe244b 100644 --- a/internal/etl/transform/extend_values_test.go +++ b/internal/etl/transform/subst_and_synonyms_test.go @@ -26,11 +26,9 @@ func Test_generateAllFieldValues(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := extendFieldValues(tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile) - if !tt.wantErr(t, err, fmt.Sprintf("extendFieldValues(%v, %v, %v)", tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile)) { - return - } - assert.Equalf(t, tt.want, got, "extendFieldValues(%v, %v, %v)", tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile) + ss, err := NewSubstAndSynonyms(tt.args.substitutionsFile, tt.args.synonymsFile) + assert.NoError(t, err) + assert.Equalf(t, tt.want, ss.generate(tt.args.fieldValuesByName), "generate(%v, %v, %v)", tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile) }) } } @@ -53,7 +51,8 @@ func Test_generateCombinations(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, generateCombinations(tt.args.keys, tt.args.values), "generateCombinations(%v, %v, %v, %v)", tt.args.keys, tt.args.values) + got := generateCombinations(tt.args.keys, tt.args.values) + assert.Equalf(t, tt.want, got, "generateCombinations(%v, %v)", tt.args.keys, tt.args.values) }) } } diff --git a/internal/etl/transform/transform.go b/internal/etl/transform/transform.go index 3013f05..ea0e7b6 100644 --- a/internal/etl/transform/transform.go +++ b/internal/etl/transform/transform.go @@ -36,6 +36,11 @@ type SearchIndexRecord struct { type Transformer struct{} func (t Transformer) Transform(records []RawRecord, collection config.GeoSpatialCollection, substitutionsFile string, synonymsFile string) ([]SearchIndexRecord, error) { + substAndSynonyms, err := NewSubstAndSynonyms(substitutionsFile, synonymsFile) + if err != nil { + return nil, err + } + result := make([]SearchIndexRecord, 0, len(records)) for _, r := range records { fieldValuesByName, err := slicesToStringMap(collection.Search.Fields, r.FieldValues) @@ -46,10 +51,7 @@ func (t Transformer) Transform(records []RawRecord, collection config.GeoSpatial if err != nil { return nil, err } - allFieldValuesByName, err := extendFieldValues(fieldValuesByName, substitutionsFile, synonymsFile) - if err != nil { - return nil, err - } + allFieldValuesByName := substAndSynonyms.generate(fieldValuesByName) suggestions := make([]string, 0, len(collection.Search.ETL.SuggestTemplates)) for i := range allFieldValuesByName { for _, suggestTemplate := range collection.Search.ETL.SuggestTemplates { From 79bd82c781471bda1e62e41ec3143dac71b7ad50 Mon Sep 17 00:00:00 2001 From: Richard Kettelerij Date: Tue, 7 Jan 2025 16:26:33 +0100 Subject: [PATCH 2/2] chore: add extra logging --- internal/etl/etl.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/etl/etl.go b/internal/etl/etl.go index e308b46..bd61834 100644 --- a/internal/etl/etl.go +++ b/internal/etl/etl.go @@ -81,18 +81,21 @@ func ImportFile(collection config.GeoSpatialCollection, searchIndex string, file if err != nil { return fmt.Errorf("failed extracting source records: %w", err) } - if len(sourceRecords) == 0 { + sourceRecordCount := len(sourceRecords) + if sourceRecordCount == 0 { break // no more batches of records to extract } + log.Printf("extracted %d source records, starting transform", sourceRecordCount) targetRecords, err := transformer.Transform(sourceRecords, collection, substitutionsFile, synonymsFile) if err != nil { return fmt.Errorf("failed to transform raw records to search index records: %w", err) } + log.Printf("transform completed, %d source records transformed into %d target records", sourceRecordCount, len(targetRecords)) loaded, err := target.Load(targetRecords, searchIndex) if err != nil { return fmt.Errorf("failed loading records into target: %w", err) } - log.Printf("imported %d records into search index", loaded) + log.Printf("loaded %d records into target search index: '%s'", loaded, searchIndex) offset += pageSize }