From e31c95efc2e4b4d4dbe82f33e669156f16ed51b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Mon, 29 Jul 2024 10:56:32 +0200 Subject: [PATCH] handle pointer and dates in clickhouse dialect --- .gitignore | 1 + cmd/substreams-sink-sql/main.go | 4 --- db/dialect_clickhouse.go | 57 ++++++++++++++++++++++++++++++++- db/operations.go | 1 + 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 76ce6c7..43f32f1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ dist/ devel/data* build/ *.spkg +/substreams-sink-sql diff --git a/cmd/substreams-sink-sql/main.go b/cmd/substreams-sink-sql/main.go index 59026a2..50c2fa2 100644 --- a/cmd/substreams-sink-sql/main.go +++ b/cmd/substreams-sink-sql/main.go @@ -1,7 +1,6 @@ package main import ( - "github.com/spf13/viper" "net/http" _ "net/http/pprof" "time" @@ -19,9 +18,6 @@ import ( var version = "dev" func main() { - - viper.AutomaticEnv() - Run("substreams-sink-sql", "Substreams SQL Sink", sinkRunCmd, sinkSetupCmd, diff --git a/db/dialect_clickhouse.go b/db/dialect_clickhouse.go index fe2fd8d..31b3877 100644 --- a/db/dialect_clickhouse.go +++ b/db/dialect_clickhouse.go @@ -233,6 +233,8 @@ func convertToType(value string, valueType reflect.Type) (any, error) { var err error if strings.Contains(value, "T") && strings.HasSuffix(value, "Z") { v, err = time.Parse("2006-01-02T15:04:05Z", value) + } else if dateRegex.MatchString(value) { + v, err = time.Parse("2006-01-02", value) } else { v, err = time.Parse("2006-01-02 15:04:05", value) } @@ -250,7 +252,60 @@ func convertToType(value string, valueType reflect.Type) (any, error) { newInt.SetString(value, 10) return newInt, nil } - return "", fmt.Errorf("unsupported pointer type %s", valueType) + + switch valueType.Elem().Kind() { + case reflect.String: + return &value, nil + case reflect.Bool: + res, err := strconv.ParseBool(value) + if err != nil { + return nil, err + } + return &res, err + case reflect.Int: + v, err := strconv.ParseInt(value, 10, 0) + res := int(v) + return &res, err + case reflect.Int8: + v, err := strconv.ParseInt(value, 10, 8) + res := int8(v) + return &res, err + case reflect.Int16: + v, err := strconv.ParseInt(value, 10, 16) + res := int16(v) + return &res, err + case reflect.Int32: + v, err := strconv.ParseInt(value, 10, 32) + res := int32(v) + return &res, err + case reflect.Int64: + v, err := strconv.ParseInt(value, 10, 64) + return &v, err + case reflect.Uint: + v, err := strconv.ParseUint(value, 10, 0) + res := uint(v) + return &res, err + case reflect.Uint8: + v, err := strconv.ParseUint(value, 10, 8) + res := uint8(v) + return &res, err + case reflect.Uint16: + v, err := strconv.ParseUint(value, 10, 16) + res := uint16(v) + return &res, err + case reflect.Uint32: + v, err := strconv.ParseUint(value, 10, 32) + res := uint32(v) + return &res, err + case reflect.Uint64: + v, err := strconv.ParseUint(value, 10, 0) + return &v, err + case reflect.Float32, reflect.Float64: + v, err := strconv.ParseFloat(value, 10) + return &v, err + default: + return "", fmt.Errorf("unsupported pointer type %s", valueType) + } default: return value, nil } diff --git a/db/operations.go b/db/operations.go index 34302c7..cdb503b 100644 --- a/db/operations.go +++ b/db/operations.go @@ -76,6 +76,7 @@ func (o *Operation) mergeData(newData map[string]string) error { } var integerRegex = regexp.MustCompile(`^\d+$`) +var dateRegex = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`) var reflectTypeTime = reflect.TypeOf(time.Time{}) func EscapeIdentifier(valueToEscape string) string {