Skip to content

Commit

Permalink
Merge branch 'develop' into feature/support_nullable_types
Browse files Browse the repository at this point in the history
  • Loading branch information
maoueh authored Nov 15, 2024
2 parents 3816eb3 + 3cdf12c commit a419805
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Added support for `Nullable` types in Clickhouse.

## v4.2.2

* Fix major bug when receiving empty `MapOutput`

## v4.2.1

* Bump substreams to v1.10.3 to support new manifest data like `protobuf:excludePaths`
Expand Down
29 changes: 16 additions & 13 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,25 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream

dbChanges := &pbdatabase.DatabaseChanges{}
mapOutput := output.GetMapOutput()
if !mapOutput.MessageIs(dbChanges) && mapOutput.TypeUrl != "type.googleapis.com/sf.substreams.database.v1.DatabaseChanges" {
return fmt.Errorf("mismatched message type: trying to unmarshal unknown type %q", mapOutput.MessageName())
}

// We do not use UnmarshalTo here because we need to parse an older proto type and
// UnmarshalTo enforces the type check. So we check manually the `TypeUrl` above and we use
// `Unmarshal` instead which only deals with the bytes value.
if err := proto.Unmarshal(mapOutput.Value, dbChanges); err != nil {
return fmt.Errorf("unmarshal database changes: %w", err)
}
if mapOutput.String() != "" {
if !mapOutput.MessageIs(dbChanges) && mapOutput.TypeUrl != "type.googleapis.com/sf.substreams.database.v1.DatabaseChanges" {
return fmt.Errorf("mismatched message type: trying to unmarshal unknown type %q", mapOutput.MessageName())
}

if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
return fmt.Errorf("apply database changes: %w", err)
// We do not use UnmarshalTo here because we need to parse an older proto type and
// UnmarshalTo enforces the type check. So we check manually the `TypeUrl` above and we use
// `Unmarshal` instead which only deals with the bytes value.
if err := proto.Unmarshal(mapOutput.Value, dbChanges); err != nil {
return fmt.Errorf("unmarshal database changes: %w", err)
}

if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
return fmt.Errorf("apply database changes: %w", err)
}
}

if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 {
if data.Clock.Number%s.batchBlockModulo(isLive) == 0 {
s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive))

flushStart := time.Now()
Expand Down Expand Up @@ -210,7 +213,7 @@ func (s *SQLSinker) HandleBlockUndoSignal(ctx context.Context, data *pbsubstream
return s.loader.Revert(ctx, s.OutputModuleHash(), cursor, data.LastValidBlock.Number)
}

func (s *SQLSinker) batchBlockModulo(blockData *pbsubstreamsrpc.BlockScopedData, isLive *bool) uint64 {
func (s *SQLSinker) batchBlockModulo(isLive *bool) uint64 {
if isLive == nil {
panic(fmt.Errorf("liveness checker has been disabled on the Sinker instance, this is invalid in the context of 'substreams-sink-sql'"))
}
Expand Down

0 comments on commit a419805

Please sign in to comment.