Skip to content

Commit

Permalink
use State STRUCT
Browse files Browse the repository at this point in the history
  • Loading branch information
5amCurfew committed May 15, 2023
1 parent 99a19ca commit 0c1a674
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 47 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Install targets (Python) in `_targets/` in virtual environments:

I have been using [jq](https://github.com/stedolan/jq) to view `stdout` messages in development. For example:
```bash
$ xtkt config_github.json | jq .
$ xtkt config_github.json 2>&1 | jq .
```

When there is not an appropriate bookmark but you want to only write updates to your target you can use new-record-detection (not advisable for large data sets) by setting the `records.primary_bookmark_path: ["*"]` in your `config.json`. See examples below
Expand Down
8 changes: 4 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var rootCmd = &cobra.Command{
if readConfigError != nil {
log.WithFields(
log.Fields{
"Error": fmt.Sprintf("%e", readConfigError),
"Error": fmt.Sprintf("%v", readConfigError),
},
).Fatalln("Failed to READ CONFIG.JSON")
}
Expand All @@ -36,15 +36,15 @@ var rootCmd = &cobra.Command{
if configError != nil {
log.WithFields(
log.Fields{
"Error": fmt.Sprintf("%e", configError),
"Error": fmt.Sprintf("%v", configError),
},
).Fatalln("Failed to VALIDATE CONFIG.JSON")
} else {
jsonError := json.Unmarshal(config, &cfg)
if jsonError != nil {
log.WithFields(
log.Fields{
"Error": fmt.Sprintf("%e", jsonError),
"Error": fmt.Sprintf("%v", jsonError),
},
).Fatalln("Failed to PARSE CONFIG.JSON")
}
Expand All @@ -54,7 +54,7 @@ var rootCmd = &cobra.Command{
if parseError != nil {
log.WithFields(
log.Fields{
"Error": fmt.Sprintf("%e", parseError),
"Error": fmt.Sprintf("%v", parseError),
},
).Fatalln("Failed to EXTRACT RECORDS")
}
Expand Down
70 changes: 37 additions & 33 deletions lib/bookmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"time"
)

type State struct {
Type string
Value struct {
Bookmarks map[string]map[string]interface{} `json:"bookmarks"`
}
}

func UsingBookmark(config Config) bool {
return *config.Records.Bookmark && config.Records.PrimaryBookmarkPath != nil
}
Expand All @@ -20,34 +27,33 @@ func detectionSetContains(s []interface{}, str interface{}) bool {
return false
}

func writeStateJSON(state map[string]interface{}) {
result, _ := json.Marshal(map[string]interface{}{
"type": "STATE",
"value": state["value"],
})
func writeStateJSON(state *State) {
result, _ := json.Marshal(state)
os.WriteFile("state.json", result, 0644)
}

// ///////////////////////////////////////////////////////////
// CREATE
// ///////////////////////////////////////////////////////////
func CreateStateJSON(config Config) error {
stream := make(map[string]interface{})
data := make(map[string]interface{})

data["bookmark_updated_at"] = time.Now().Format(time.RFC3339)
data["detection_bookmark"] = []string{}
data["primary_bookmark"] = ""

stream[*config.StreamName] = data

values := make(map[string]interface{})
values["bookmarks"] = stream
now := time.Now().Format(time.RFC3339)

state := State{
Type: "STATE",
Value: struct {
Bookmarks map[string]map[string]interface{} `json:"bookmarks"`
}{
Bookmarks: map[string]map[string]interface{}{
*config.StreamName: {
"bookmark_updated_at": now,
"detection_bookmark": []string{},
"primary_bookmark": "",
},
},
},
}

result, err := json.Marshal(map[string]interface{}{
"type": "STATE",
"value": values,
})
result, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("error MARSHALLING STATE into JSON: %w", err)
}
Expand All @@ -62,22 +68,22 @@ func CreateStateJSON(config Config) error {
// ///////////////////////////////////////////////////////////
// PARSE STATE.JSON
// ///////////////////////////////////////////////////////////
func parseStateJSON(config Config) (map[string]interface{}, error) {
func parseStateJSON(config Config) (*State, error) {
stateFile, err := os.ReadFile("state.json")
if err != nil {
return nil, fmt.Errorf("error reading state file: %w", err)
}

state := make(map[string]interface{})
var state State
if err := json.Unmarshal(stateFile, &state); err != nil {
return nil, fmt.Errorf("error unmarshaling state JSON: %w", err)
}

if state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName] == nil {
return nil, fmt.Errorf("error stream %s DOES NOT EXIST in this STATE.JSON", *config.StreamName)
if _, ok := state.Value.Bookmarks[*config.StreamName]; !ok {
return nil, fmt.Errorf("stream %s does not exist in this state", *config.StreamName)
}

return state, nil
return &state, nil
}

// ///////////////////////////////////////////////////////////
Expand All @@ -90,7 +96,7 @@ func UpdateBookmarkPrimary(records []interface{}, config Config) error {
}

// CURRENT
latestBookmark := state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["primary_bookmark"].(string)
latestBookmark := state.Value.Bookmarks[*config.StreamName]["primary_bookmark"].(string)

// FIND LATEST
for _, record := range records {
Expand All @@ -103,9 +109,8 @@ func UpdateBookmarkPrimary(records []interface{}, config Config) error {
}

// UPDATE PRIMARY BOOKMARK
state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["primary_bookmark"] = latestBookmark

state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["bookmark_updated_at"] = time.Now().Format(time.RFC3339)
state.Value.Bookmarks[*config.StreamName]["primary_bookmark"] = latestBookmark
state.Value.Bookmarks[*config.StreamName]["bookmark_updated_at"] = time.Now().Format(time.RFC3339)

writeStateJSON(state)
return nil
Expand All @@ -118,7 +123,7 @@ func UpdateBookmarkDetection(records []interface{}, config Config) error {
}

// CURRENT
latestDetectionSet := state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["detection_bookmark"].([]interface{})
latestDetectionSet := state.Value.Bookmarks[*config.StreamName]["detection_bookmark"].([]interface{})

// UPDATE DETECTION SET
for _, record := range records {
Expand All @@ -132,9 +137,8 @@ func UpdateBookmarkDetection(records []interface{}, config Config) error {
}

// UPDATE
state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["detection_bookmark"] = latestDetectionSet

state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["bookmark_updated_at"] = time.Now().Format(time.RFC3339)
state.Value.Bookmarks[*config.StreamName]["detection_bookmark"] = latestDetectionSet
state.Value.Bookmarks[*config.StreamName]["bookmark_updated_at"] = time.Now().Format(time.RFC3339)

writeStateJSON(state)
return nil
Expand Down
8 changes: 4 additions & 4 deletions lib/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func generateQuery(config Config) (string, error) {
return "", fmt.Errorf("error parsing STATE for bookmark value: %w", err)
}

value := state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})
value := state.Value.Bookmarks[*config.StreamName]

var query strings.Builder
query.WriteString(fmt.Sprintf("SELECT * FROM %s", *config.Database.Table))
Expand All @@ -55,11 +55,11 @@ func generateQuery(config Config) (string, error) {
field := *config.Records.PrimaryBookmarkPath
switch dbType {
case "postgres", "postgresql", "sqlite":
query.WriteString(fmt.Sprintf(` WHERE CAST("%s" AS text) > '%s'`, field[0], value["primary_bookmark"]))
query.WriteString(fmt.Sprintf(` WHERE CAST("%s" AS text) > '%s'`, field[0], state.Value.Bookmarks[*config.StreamName]["primary_bookmark"]))
case "mysql":
query.WriteString(fmt.Sprintf(` WHERE CAST("%s" AS char) > '%s'`, field[0], value["primary_bookmark"]))
query.WriteString(fmt.Sprintf(` WHERE CAST("%s" AS char) > '%s'`, field[0], state.Value.Bookmarks[*config.StreamName]["primary_bookmark"]))
case "sqlserver":
query.WriteString(fmt.Sprintf(` WHERE CAST("%s" AS varchar) > '%s'`, field[0], value["primary_bookmark"]))
query.WriteString(fmt.Sprintf(` WHERE CAST("%s" AS varchar) > '%s'`, field[0], state.Value.Bookmarks[*config.StreamName]["primary_bookmark"]))
default:
return "", fmt.Errorf("unsupported database type: %s", dbType)
}
Expand Down
7 changes: 3 additions & 4 deletions lib/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func GenerateSchemaMessage(schema map[string]interface{}, config Config) error {
}

func GenerateRecordMessage(record map[string]interface{}, config Config) error {

bookmarkCondition := false

if UsingBookmark(config) {
Expand All @@ -47,10 +46,10 @@ func GenerateRecordMessage(record map[string]interface{}, config Config) error {

switch path := *config.Records.PrimaryBookmarkPath; {
case reflect.DeepEqual(path, []string{"*"}):
bookmarkCondition = !detectionSetContains(state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["detection_bookmark"].([]interface{}), record["_sdc_surrogate_key"])
bookmarkCondition = !detectionSetContains(state.Value.Bookmarks[*config.StreamName]["detection_bookmark"].([]interface{}), record["_sdc_surrogate_key"])
default:
primaryBookmarkValue := getValueAtPath(*config.Records.PrimaryBookmarkPath, record)
bookmarkCondition = toString(primaryBookmarkValue) > state["value"].(map[string]interface{})["bookmarks"].(map[string]interface{})[*config.StreamName].(map[string]interface{})["primary_bookmark"].(string)
bookmarkCondition = toString(primaryBookmarkValue) > state.Value.Bookmarks[*config.StreamName]["primary_bookmark"].(string)
}

} else {
Expand Down Expand Up @@ -83,7 +82,7 @@ func GenerateStateMessage() error {

message := Message{
Type: "STATE",
Value: state["value"],
Value: state["Value"],
}

messageJson, err := json.Marshal(message)
Expand Down
2 changes: 1 addition & 1 deletion lib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func GenerateRestRecords(config Config) ([]interface{}, error) {

records, ok := getValueAtPath(responseMapRecordsPath, responseMap).([]interface{})
if !ok {
return nil, fmt.Errorf("error RESPONSE RECORDS PATH: %w", err)
return nil, fmt.Errorf("error RESPONSE RECORDS PATH")
}

if *config.Rest.Response.Pagination {
Expand Down

0 comments on commit 0c1a674

Please sign in to comment.