From 815a545997bce0f4ff876f78c2e430313baf9842 Mon Sep 17 00:00:00 2001 From: 5amCurfew Date: Thu, 19 Dec 2024 21:36:43 +0000 Subject: [PATCH] v0.3.1 --- README.md | 2 +- cmd/root.go | 2 +- lib/record.go | 1 + sources/jsonl.go | 6 +++--- sources/rest.go | 6 +++--- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 72e6c56..d51945e 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ * [File csv](#file-csv) * [File jsonl](#file-jsonl) -**v0.3.0** +**v0.3.1** `xtkt` ("extract") is a data extraction tool that follows the Singer.io specification. Supported sources include RESTful APIs, csv and jsonl. diff --git a/cmd/root.go b/cmd/root.go index 0d65df0..99d0508 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -10,7 +10,7 @@ import ( "github.com/spf13/cobra" ) -var version = "0.3.0" +var version = "0.3.1" var discover bool = false func Execute() { diff --git a/lib/record.go b/lib/record.go index e862f50..2287993 100644 --- a/lib/record.go +++ b/lib/record.go @@ -61,6 +61,7 @@ func generateHashedFields(record map[string]interface{}) error { } else { log.WithFields(log.Fields{ "sensitive_field_path": path, + "_sdc_natural_key": util.GetValueAtPath(*ParsedConfig.Records.UniqueKeyPath, record), }).Warn("field path not found in record for hashing (sensitive fields)") continue } diff --git a/sources/jsonl.go b/sources/jsonl.go index 850ec50..ecbf3f5 100644 --- a/sources/jsonl.go +++ b/sources/jsonl.go @@ -15,7 +15,7 @@ import ( func ParseJSONL() { go func() { defer close(parseRecordChan) - if err := streamJSONLRecords(*lib.ParsedConfig.URL, parseRecordChan); err != nil { + if err := streamJSONLRecords(*lib.ParsedConfig.URL); err != nil { log.WithFields(log.Fields{"error": err}).Info("parseJSONL: streamJSONLRecords failed") } }() @@ -26,7 +26,7 @@ func ParseJSONL() { } } -func streamJSONLRecords(url string, resultChan chan map[string]interface{}) error { +func streamJSONLRecords(url string) error { var scanner *bufio.Scanner switch { @@ -61,7 +61,7 @@ func streamJSONLRecords(url string, resultChan chan map[string]interface{}) erro continue } - resultChan <- record + parseRecordChan <- record } if err := scanner.Err(); err != nil { diff --git a/sources/rest.go b/sources/rest.go index 5883f3d..6f56449 100644 --- a/sources/rest.go +++ b/sources/rest.go @@ -18,7 +18,7 @@ import ( func ParseREST() { go func() { defer close(parseRecordChan) - if err := streamRESTRecords(lib.ParsedConfig, parseRecordChan); err != nil { + if err := streamRESTRecords(lib.ParsedConfig); err != nil { log.WithFields(log.Fields{"error": err}).Info("parseREST: streamRESTRecords failed") } }() @@ -29,7 +29,7 @@ func ParseREST() { } } -func streamRESTRecords(config lib.Config, resultChan chan map[string]interface{}) error { +func streamRESTRecords(config lib.Config) error { var responseMap map[string]interface{} responseMapRecordsPath := []string{"results"} @@ -78,7 +78,7 @@ func streamRESTRecords(config lib.Config, resultChan chan map[string]interface{} // Stream records for _, item := range recordsInterfaceSlice { if recordMap, ok := item.(map[string]interface{}); ok { - resultChan <- recordMap + parseRecordChan <- recordMap } else { log.WithFields(log.Fields{"item": item}).Warn("encountered non-map element in records array") }