Skip to content

Commit

Permalink
update record processing logging
Browse files Browse the repository at this point in the history
  • Loading branch information
5amCurfew committed Dec 7, 2024
1 parent bc1f467 commit 42f4d4a
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
8 changes: 5 additions & 3 deletions cmd/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ func extract(discover bool) error {
for record := range sources.ResultChan {
r := *record
rMap, _ := r.(map[string]interface{})
if valid := lib.ValidateRecordSchema(rMap, schema); !valid {
log.Warn(fmt.Sprintf("record %s breaks schema in catalog - skipping...", rMap["_sdc_natural_key"]))
continue
if valid, validateRecordSchemaError := lib.ValidateRecordSchema(rMap, schema); !valid {
log.WithFields(log.Fields{
"_sdc_natural_key": rMap["_sdc_natural_key"],
"error": validateRecordSchemaError,
}).Warn("record breaks schema in catalog")
}

if generateRecordMessageError := lib.GenerateRecordMessage(r); generateRecordMessageError != nil {
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ module github.com/5amCurfew/xtkt

go 1.20

require github.com/spf13/cobra v1.6.1
require (
github.com/spf13/cobra v1.6.1
github.com/xeipuuv/gojsonschema v1.2.0
)

require (
github.com/stretchr/testify v1.8.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
golang.org/x/sys v0.15.0 // indirect
)

Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
26 changes: 22 additions & 4 deletions lib/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"time"

"github.com/xeipuuv/gojsonschema"

util "github.com/5amCurfew/xtkt/util"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -57,7 +59,9 @@ func generateHashedFields(record map[string]interface{}) error {
hash := sha256.Sum256([]byte(fmt.Sprintf("%v", fieldValue)))
util.SetValueAtPath(path, record, hex.EncodeToString(hash[:]))
} else {
log.Warn(fmt.Sprintf("field path %s not found in record for hashing (sensitive fields)", path))
log.WithFields(log.Fields{
"sensitive_field_path": path,
}).Warn("field path not found in record for hashing (sensitive fields)")
continue
}
}
Expand All @@ -71,7 +75,9 @@ func generateSurrogateKeyFields(record map[string]interface{}) error {
if util.GetValueAtPath(*ParsedConfig.Records.UniqueKeyPath, record) != nil {
record["_sdc_natural_key"] = util.GetValueAtPath(*ParsedConfig.Records.UniqueKeyPath, record)
} else {
log.Warn(fmt.Sprintf("unique_key field path %s not found in record", *ParsedConfig.Records.UniqueKeyPath))
log.WithFields(log.Fields{
"unique_key_path": *ParsedConfig.Records.UniqueKeyPath,
}).Warn("unique_key field path not found in record")
}
record["_sdc_surrogate_key"] = hex.EncodeToString(h.Sum(nil))
record["_sdc_time_extracted"] = time.Now().UTC().Format(time.RFC3339)
Expand All @@ -91,6 +97,18 @@ func recordVersusBookmark(record map[string]interface{}) bool {
}

// Validate record against Catalog
func ValidateRecordSchema(record map[string]interface{}, schema map[string]interface{}) bool {
return true
func ValidateRecordSchema(record map[string]interface{}, schema map[string]interface{}) (bool, error) {
// Convert schema map to a JSON string
schemaLoader := gojsonschema.NewGoLoader(schema)
recordLoader := gojsonschema.NewGoLoader(record)

// Validate the record against the schema
result, _ := gojsonschema.Validate(schemaLoader, recordLoader)

// Check if validation was successful
if result.Valid() {
return true, nil
}

return false, fmt.Errorf("%s", result.Errors())
}

0 comments on commit 42f4d4a

Please sign in to comment.