Skip to content

Commit

Permalink
feat: more changes on raw repository
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhilt committed Jan 23, 2025
1 parent 259f3f6 commit df0a653
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 20 deletions.
74 changes: 59 additions & 15 deletions pkg/convenience/synchronizer_node/raw_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ type RawRepository struct {
Db *sqlx.DB
}

type RawInputAppAddress struct {
Address []byte `db:"iapplication_address"`
}

type RawInput struct {
ID uint64 `db:"id"`
Index uint64 `db:"index"` // numeric(20,0)
RawData []byte `db:"raw_data"`
BlockNumber uint64 `db:"block_number"` // numeric(20,0)
Status string `db:"status"`
MachineHash []byte `db:"machine_hash,omitempty"`
OutputsHash []byte `db:"outputs_hash,omitempty"`
ApplicationAddress []byte `db:"application_address"`
EpochID uint64 `db:"epoch_id"`
TransactionId []byte `db:"transaction_id"`
EpochIndex uint64 `db:"epoch_index"`
EpochAppId uint64 `db:"epoch_application_id"`
TransactionId []byte `db:"transaction_reference"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
ApplicationAddress []byte
}

type Report struct {
Expand Down Expand Up @@ -79,6 +84,38 @@ func NewRawRepository(connectionURL string, db *sqlx.DB) *RawRepository {
return &RawRepository{connectionURL, db}
}

func (s *RawRepository) GetAppAddress(ctx context.Context, rawInputIdx uint64) ([]byte, error) {
bindVarIdx := 1
args := []interface{}{rawInputIdx}
baseQuery := fmt.Sprintf(`
select
a.iapplication_address
from
input i
inner join application a on
i.epoch_application_id = a.id
where
i.index = %d`, bindVarIdx)
// bindVarIdx++

result, err := s.Db.QueryxContext(ctx, baseQuery, args...)
if err != nil {
slog.Error("Failed to execute query in GetAppAddress", "error", err)
return nil, err
}
defer result.Close()

var appAddress RawInputAppAddress
for result.Next() {
err := result.StructScan(&appAddress)
if err != nil {
slog.Error("Failed to scan row into RawInputAppAddress struct", "error", err)
return nil, err
}
}
return appAddress.Address, nil
}

func (s *RawRepository) FindAllInputsByFilter(ctx context.Context, filter FilterInput, pag *Pagination) ([]RawInput, error) {
inputs := []RawInput{}

Expand All @@ -88,7 +125,7 @@ func (s *RawRepository) FindAllInputsByFilter(ctx context.Context, filter Filter
}

bindVarIdx := 1
baseQuery := fmt.Sprintf("SELECT * FROM input WHERE ID >= $%d", bindVarIdx)
baseQuery := fmt.Sprintf("SELECT * FROM input WHERE index >= $%d", bindVarIdx)
bindVarIdx++
args := []any{filter.IDgt}

Expand All @@ -109,7 +146,7 @@ func (s *RawRepository) FindAllInputsByFilter(ctx context.Context, filter Filter
pagination := fmt.Sprintf(" LIMIT $%d", bindVarIdx)
args = append(args, limit)

orderBy := " ORDER BY ID ASC "
orderBy := " ORDER BY index ASC "
query := baseQuery + additionalFilter + orderBy + pagination

result, err := s.Db.QueryxContext(ctx, query, args...)
Expand All @@ -127,6 +164,13 @@ func (s *RawRepository) FindAllInputsByFilter(ctx context.Context, filter Filter
slog.Error("Failed to scan row into RawInput struct", "error", err)
return nil, err
}
appAddress, err := s.GetAppAddress(ctx, input.Index)
if err != nil {
slog.Error("Failed to get app address", "error", err)
return nil, err
}
input.ApplicationAddress = appAddress

inputs = append(inputs, input)
}

Expand All @@ -138,10 +182,10 @@ func (s *RawRepository) FindAllReportsByFilter(ctx context.Context, filter Filte

result, err := s.Db.QueryxContext(ctx, `
SELECT
r.id, r.index, r.raw_data, r.input_id,
r.id, r.index, r.raw_data, r.input_id,
inp.application_address as app_contract,
inp.index as input_index
FROM
FROM
report as r
INNER JOIN
input as inp
Expand Down Expand Up @@ -197,16 +241,16 @@ func (s *RawRepository) FindAllOutputsByFilter(ctx context.Context, filter Filte
outputs := []Output{}

result, err := s.Db.QueryxContext(ctx, `
SELECT o.id, o.index, o.raw_data, o.hash,
SELECT o.id, o.index, o.raw_data, o.hash,
o.output_hashes_siblings,
o.input_id, o.transaction_hash, o.updated_at,
o.input_id, o.transaction_hash, o.updated_at,
i.application_address app_contract,
i.index input_index
FROM output o
INNER JOIN
input i
ON i.id = o.input_id
WHERE o.id > $1
WHERE o.id > $1
ORDER BY o.id ASC
LIMIT $2`, filter.IDgt, LIMIT)
if err != nil {
Expand All @@ -231,8 +275,8 @@ func (s *RawRepository) FindAllOutputsByFilter(ctx context.Context, filter Filte
func (s *RawRepository) FindAllOutputsWithProof(ctx context.Context, filter FilterID) ([]Output, error) {
outputs := []Output{}
result, err := s.Db.QueryxContext(ctx, `
SELECT *
FROM output
SELECT *
FROM output
WHERE ID >= $1 and output_hashes_siblings IS NOT NULL
ORDER BY ID ASC
LIMIT $2
Expand All @@ -259,8 +303,8 @@ func (s *RawRepository) FindAllOutputsWithProof(ctx context.Context, filter Filt
func (s *RawRepository) FindAllOutputsExecutedAfter(ctx context.Context, afterUpdatedAt time.Time, rawId uint64) ([]Output, error) {
outputs := []Output{}
result, err := s.Db.QueryxContext(ctx, `
SELECT *
FROM output
SELECT *
FROM output
WHERE ((updated_at > $1) or (updated_at = $1 and id > $2)) and transaction_hash IS NOT NULL
ORDER BY updated_at ASC, id ASC
LIMIT $3
Expand Down
6 changes: 3 additions & 3 deletions pkg/convenience/synchronizer_node/raw_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *RawNodeSuite) TestSynchronizerNodeListInputs() {
}

firstInput := inputs[0]
s.Equal(firstInput.ID, uint64(1))
s.Equal(firstInput.Index, uint64(1))

b := inputs[0].BlockNumber

Expand All @@ -109,7 +109,7 @@ func (s *RawNodeSuite) TestSynchronizerNodeInputByID() {
inputs, err := s.rawRepository.FindAllInputsByFilter(ctx, FilterInput{IDgt: 2, IsStatusNone: false}, nil)
s.NoError(err)
firstInput := inputs[0]
s.Equal(firstInput.ID, uint64(2))
s.Equal(firstInput.Index, uint64(2))

b := inputs[0].BlockNumber

Expand Down Expand Up @@ -189,5 +189,5 @@ func (s *RawNodeSuite) TestSynchronizerNodeFindInputByOutput() {

input, err := s.rawRepository.FindInputByOutput(ctx, FilterID{IDgt: 1})
s.NoError(err)
s.Equal(uint64(1), input.ID)
s.Equal(uint64(1), input.Index)
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *SynchronizerInputCreator) CreateInput(ctx context.Context, rawInput Raw

rawInputRef := repository.RawInputRef{
ID: inputBox.ID,
RawID: uint64(rawInput.ID),
RawID: uint64(rawInput.Index),
InputIndex: rawInput.Index,
AppContract: common.BytesToAddress(rawInput.ApplicationAddress).Hex(),
Status: rawInput.Status,
Expand Down
2 changes: 1 addition & 1 deletion pkg/convenience/synchronizer_node/synchronizer_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *SynchronizerUpdate) rollbackTransaction(ctx context.Context) {
func (s *SynchronizerUpdate) mapIds(rawInputs []RawInput) []string {
ids := make([]string, len(rawInputs))
for i, input := range rawInputs {
ids[i] = strconv.FormatUint(input.ID, 10)
ids[i] = strconv.FormatUint(input.Index, 10)
}
return ids
}
Expand Down

0 comments on commit df0a653

Please sign in to comment.