Skip to content

Commit

Permalink
Merge pull request #22 from PDOK/read-csv-once
Browse files Browse the repository at this point in the history
fix: read CSV files once, not in loop for every record
  • Loading branch information
rkettelerij authored Jan 7, 2025
2 parents 7a81875 + 79bd82c commit 9cb8d1f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 32 deletions.
7 changes: 5 additions & 2 deletions internal/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,21 @@ func ImportFile(collection config.GeoSpatialCollection, searchIndex string, file
if err != nil {
return fmt.Errorf("failed extracting source records: %w", err)
}
if len(sourceRecords) == 0 {
sourceRecordCount := len(sourceRecords)
if sourceRecordCount == 0 {
break // no more batches of records to extract
}
log.Printf("extracted %d source records, starting transform", sourceRecordCount)
targetRecords, err := transformer.Transform(sourceRecords, collection, substitutionsFile, synonymsFile)
if err != nil {
return fmt.Errorf("failed to transform raw records to search index records: %w", err)
}
log.Printf("transform completed, %d source records transformed into %d target records", sourceRecordCount, len(targetRecords))
loaded, err := target.Load(targetRecords, searchIndex)
if err != nil {
return fmt.Errorf("failed loading records into target: %w", err)
}
log.Printf("imported %d records into search index", loaded)
log.Printf("loaded %d records into target search index: '%s'", loaded, searchIndex)
offset += pageSize
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,53 @@ package transform

import (
"encoding/csv"
"errors"
"fmt"
"os"
"strings"

"github.com/PDOK/gomagpie/internal/engine/util"
)

func extendFieldValues(fieldValuesByName map[string]string, substitutionsFile, synonymsFile string) ([]map[string]string, error) {
substitutions, err := readCsvFile(substitutionsFile)
if err != nil {
return nil, err
}
synonyms, err := readCsvFile(synonymsFile)
if err != nil {
return nil, err
}
type SubstAndSynonyms struct {
substitutions map[string]string
synonyms map[string]string
synonymsInverse map[string]string
}

func NewSubstAndSynonyms(substitutionsFile, synonymsFile string) (*SubstAndSynonyms, error) {
substitutions, substErr := readCsvFile(substitutionsFile)
synonyms, synErr := readCsvFile(synonymsFile)
return &SubstAndSynonyms{
substitutions: substitutions,
synonyms: synonyms,
synonymsInverse: util.Inverse(synonyms),
}, errors.Join(substErr, synErr)
}

func (s SubstAndSynonyms) generate(fieldValuesByName map[string]string) []map[string]string {
var fieldValuesByNameWithAllValues = make(map[string][]string)
for key, value := range fieldValuesByName {
valueLower := strings.ToLower(value)
// Get all substitutions
substitutedValues := extendValues([]string{valueLower}, substitutions)
substitutedValues := extendValues([]string{valueLower}, s.substitutions)
// Get all synonyms for these substituted values
// -> one way
synonymsValuesOneWay := extendValues(substitutedValues, synonyms)
synonymsValuesOneWay := extendValues(substitutedValues, s.synonyms)
// <- reverse way
allValues := extendValues(synonymsValuesOneWay, util.Inverse(synonyms))
allValues := extendValues(synonymsValuesOneWay, s.synonymsInverse)
// Create map with for each key a slice of []values
fieldValuesByNameWithAllValues[key] = allValues
}
return generateAllFieldValuesByName(fieldValuesByNameWithAllValues), err
return generateAllCombinations(fieldValuesByNameWithAllValues)
}

// Transform a map[string][]string into a []map[string]string using the cartesian product, i.e.
// - both maps have the same keys
// - values exist for all possible combinations
func generateAllFieldValuesByName(input map[string][]string) []map[string]string {
keys := []string{}
values := [][]string{}
func generateAllCombinations(input map[string][]string) []map[string]string {
var keys []string
var values [][]string

for key, vals := range input {
keys = append(keys, key)
Expand Down Expand Up @@ -122,8 +130,6 @@ func uniqueSlice(s []string) []string {
}

func readCsvFile(filepath string) (map[string]string, error) {
substitutions := make(map[string]string)

file, err := os.Open(filepath)
if err != nil {
return nil, fmt.Errorf("failed to open CSV file: %w", err)
Expand All @@ -136,8 +142,9 @@ func readCsvFile(filepath string) (map[string]string, error) {
return nil, fmt.Errorf("failed to parse CSV file: %w", err)
}

result := make(map[string]string)
for _, row := range records {
substitutions[row[0]] = row[1]
result[row[0]] = row[1]
}
return substitutions, nil
return result, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ func Test_generateAllFieldValues(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := extendFieldValues(tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile)
if !tt.wantErr(t, err, fmt.Sprintf("extendFieldValues(%v, %v, %v)", tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile)) {
return
}
assert.Equalf(t, tt.want, got, "extendFieldValues(%v, %v, %v)", tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile)
ss, err := NewSubstAndSynonyms(tt.args.substitutionsFile, tt.args.synonymsFile)
assert.NoError(t, err)
assert.Equalf(t, tt.want, ss.generate(tt.args.fieldValuesByName), "generate(%v, %v, %v)", tt.args.fieldValuesByName, tt.args.substitutionsFile, tt.args.synonymsFile)
})
}
}
Expand All @@ -53,7 +51,8 @@ func Test_generateCombinations(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, generateCombinations(tt.args.keys, tt.args.values), "generateCombinations(%v, %v, %v, %v)", tt.args.keys, tt.args.values)
got := generateCombinations(tt.args.keys, tt.args.values)
assert.Equalf(t, tt.want, got, "generateCombinations(%v, %v)", tt.args.keys, tt.args.values)
})
}
}
Expand Down
10 changes: 6 additions & 4 deletions internal/etl/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type SearchIndexRecord struct {
type Transformer struct{}

func (t Transformer) Transform(records []RawRecord, collection config.GeoSpatialCollection, substitutionsFile string, synonymsFile string) ([]SearchIndexRecord, error) {
substAndSynonyms, err := NewSubstAndSynonyms(substitutionsFile, synonymsFile)
if err != nil {
return nil, err
}

result := make([]SearchIndexRecord, 0, len(records))
for _, r := range records {
fieldValuesByName, err := slicesToStringMap(collection.Search.Fields, r.FieldValues)
Expand All @@ -46,10 +51,7 @@ func (t Transformer) Transform(records []RawRecord, collection config.GeoSpatial
if err != nil {
return nil, err
}
allFieldValuesByName, err := extendFieldValues(fieldValuesByName, substitutionsFile, synonymsFile)
if err != nil {
return nil, err
}
allFieldValuesByName := substAndSynonyms.generate(fieldValuesByName)
suggestions := make([]string, 0, len(collection.Search.ETL.SuggestTemplates))
for i := range allFieldValuesByName {
for _, suggestTemplate := range collection.Search.ETL.SuggestTemplates {
Expand Down

0 comments on commit 9cb8d1f

Please sign in to comment.