Skip to content

Commit

Permalink
fix major bug when receiving empty ouput
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Nov 7, 2024
1 parent 1ff3a62 commit 3c9bcd7
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,22 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream

dbChanges := &pbdatabase.DatabaseChanges{}
mapOutput := output.GetMapOutput()
if !mapOutput.MessageIs(dbChanges) && mapOutput.TypeUrl != "type.googleapis.com/sf.substreams.database.v1.DatabaseChanges" {
return fmt.Errorf("mismatched message type: trying to unmarshal unknown type %q", mapOutput.MessageName())
}

// We do not use UnmarshalTo here because we need to parse an older proto type and
// UnmarshalTo enforces the type check. So we check manually the `TypeUrl` above and we use
// `Unmarshal` instead which only deals with the bytes value.
if err := proto.Unmarshal(mapOutput.Value, dbChanges); err != nil {
return fmt.Errorf("unmarshal database changes: %w", err)
}
if mapOutput.String() != "" {
if !mapOutput.MessageIs(dbChanges) && mapOutput.TypeUrl != "type.googleapis.com/sf.substreams.database.v1.DatabaseChanges" {
return fmt.Errorf("mismatched message type: trying to unmarshal unknown type %q", mapOutput.MessageName())
}

if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
return fmt.Errorf("apply database changes: %w", err)
// We do not use UnmarshalTo here because we need to parse an older proto type and
// UnmarshalTo enforces the type check. So we check manually the `TypeUrl` above and we use
// `Unmarshal` instead which only deals with the bytes value.
if err := proto.Unmarshal(mapOutput.Value, dbChanges); err != nil {
return fmt.Errorf("unmarshal database changes: %w", err)
}

if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
return fmt.Errorf("apply database changes: %w", err)
}
}

if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 {
Expand Down

0 comments on commit 3c9bcd7

Please sign in to comment.