Skip to content

Commit

Permalink
Merge pull request #1 from pinax-network/fix/allow_multi_inserts_clic…
Browse files Browse the repository at this point in the history
…khouse

allow multiple inserts for the same pk on Clickhouse
  • Loading branch information
fschoell authored Apr 4, 2024
2 parents 8b81124 + 4bd0d67 commit 8dd82c8
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 1 deletion.
1 change: 1 addition & 0 deletions db/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type dialect interface {
Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error)
Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error
OnlyInserts() bool
AllowPkDuplicates() bool
CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error
}

Expand Down
3 changes: 3 additions & 0 deletions db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func (d clickhouseDialect) OnlyInserts() bool {
return true
}

func (d clickhouseDialect) AllowPkDuplicates() bool { return true }

func (d clickhouseDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, _database string, readOnly bool) error {
user, pass := EscapeIdentifier(username), escapeStringValue(password)

Expand Down Expand Up @@ -237,6 +239,7 @@ func convertToType(value string, valueType reflect.Type) (any, error) {
if err != nil {
return "", fmt.Errorf("could not convert %s to time: %w", value, err)
}

return v.Unix(), nil
}
return "", fmt.Errorf("unsupported struct type %s", valueType)
Expand Down
2 changes: 2 additions & 0 deletions db/dialect_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ func (d postgresDialect) OnlyInserts() bool {
return false
}

func (d postgresDialect) AllowPkDuplicates() bool { return false }

func (d postgresDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error {
user, pass, db := EscapeIdentifier(username), password, EscapeIdentifier(database)
var q string
Expand Down
2 changes: 1 addition & 1 deletion db/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map
l.entries.Set(tableName, entry)
}

if _, found := entry.Get(uniqueID); found {
if _, found := entry.Get(uniqueID); found && !l.getDialect().AllowPkDuplicates() {
return fmt.Errorf("attempting to insert in table %q a primary key %q, that is already scheduled for insertion, insert should only be called once for a given primary key", tableName, primaryKey)
}

Expand Down

0 comments on commit 8dd82c8

Please sign in to comment.