diff --git a/db/dialect.go b/db/dialect.go index 4e3ea5d..6a89e89 100644 --- a/db/dialect.go +++ b/db/dialect.go @@ -26,6 +26,7 @@ type dialect interface { Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error) Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error OnlyInserts() bool + AllowPkDuplicates() bool CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error } diff --git a/db/dialect_clickhouse.go b/db/dialect_clickhouse.go index 6387c1a..ea19ff5 100644 --- a/db/dialect_clickhouse.go +++ b/db/dialect_clickhouse.go @@ -131,6 +131,8 @@ func (d clickhouseDialect) OnlyInserts() bool { return true } +func (d clickhouseDialect) AllowPkDuplicates() bool { return true } + func (d clickhouseDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, _database string, readOnly bool) error { user, pass := EscapeIdentifier(username), escapeStringValue(password) @@ -237,6 +239,7 @@ func convertToType(value string, valueType reflect.Type) (any, error) { if err != nil { return "", fmt.Errorf("could not convert %s to time: %w", value, err) } + return v.Unix(), nil } return "", fmt.Errorf("unsupported struct type %s", valueType) diff --git a/db/dialect_postgres.go b/db/dialect_postgres.go index 43152df..cd24696 100644 --- a/db/dialect_postgres.go +++ b/db/dialect_postgres.go @@ -246,6 +246,8 @@ func (d postgresDialect) OnlyInserts() bool { return false } +func (d postgresDialect) AllowPkDuplicates() bool { return false } + func (d postgresDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error { user, pass, db := EscapeIdentifier(username), password, EscapeIdentifier(database) var q string diff --git a/db/ops.go b/db/ops.go index 2922ef0..b2cdb12 100644 --- a/db/ops.go +++ b/db/ops.go @@ -32,7 +32,7 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map l.entries.Set(tableName, entry) } - if _, found := entry.Get(uniqueID); found { + if _, found := entry.Get(uniqueID); found && !l.getDialect().AllowPkDuplicates() { return fmt.Errorf("attempting to insert in table %q a primary key %q, that is already scheduled for insertion, insert should only be called once for a given primary key", tableName, primaryKey) }