Skip to content

Commit

Permalink
v0.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
5amCurfew committed Dec 19, 2024
1 parent 888fffe commit 815a545
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [File csv](#file-csv)
* [File jsonl](#file-jsonl)

**v0.3.0**
**v0.3.1**

`xtkt` ("extract") is a data extraction tool that follows the Singer.io specification. Supported sources include RESTful APIs, csv and jsonl.

Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/spf13/cobra"
)

var version = "0.3.0"
var version = "0.3.1"
var discover bool = false

func Execute() {
Expand Down
1 change: 1 addition & 0 deletions lib/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func generateHashedFields(record map[string]interface{}) error {
} else {
log.WithFields(log.Fields{
"sensitive_field_path": path,
"_sdc_natural_key": util.GetValueAtPath(*ParsedConfig.Records.UniqueKeyPath, record),
}).Warn("field path not found in record for hashing (sensitive fields)")
continue
}
Expand Down
6 changes: 3 additions & 3 deletions sources/jsonl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func ParseJSONL() {
go func() {
defer close(parseRecordChan)
if err := streamJSONLRecords(*lib.ParsedConfig.URL, parseRecordChan); err != nil {
if err := streamJSONLRecords(*lib.ParsedConfig.URL); err != nil {
log.WithFields(log.Fields{"error": err}).Info("parseJSONL: streamJSONLRecords failed")
}
}()
Expand All @@ -26,7 +26,7 @@ func ParseJSONL() {
}
}

func streamJSONLRecords(url string, resultChan chan map[string]interface{}) error {
func streamJSONLRecords(url string) error {
var scanner *bufio.Scanner

switch {
Expand Down Expand Up @@ -61,7 +61,7 @@ func streamJSONLRecords(url string, resultChan chan map[string]interface{}) erro
continue
}

resultChan <- record
parseRecordChan <- record
}

if err := scanner.Err(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions sources/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func ParseREST() {
go func() {
defer close(parseRecordChan)
if err := streamRESTRecords(lib.ParsedConfig, parseRecordChan); err != nil {
if err := streamRESTRecords(lib.ParsedConfig); err != nil {
log.WithFields(log.Fields{"error": err}).Info("parseREST: streamRESTRecords failed")
}
}()
Expand All @@ -29,7 +29,7 @@ func ParseREST() {
}
}

func streamRESTRecords(config lib.Config, resultChan chan map[string]interface{}) error {
func streamRESTRecords(config lib.Config) error {
var responseMap map[string]interface{}
responseMapRecordsPath := []string{"results"}

Expand Down Expand Up @@ -78,7 +78,7 @@ func streamRESTRecords(config lib.Config, resultChan chan map[string]interface{}
// Stream records
for _, item := range recordsInterfaceSlice {
if recordMap, ok := item.(map[string]interface{}); ok {
resultChan <- recordMap
parseRecordChan <- recordMap
} else {
log.WithFields(log.Fields{"item": item}).Warn("encountered non-map element in records array")
}
Expand Down

0 comments on commit 815a545

Please sign in to comment.