From 414f0ab26e85b5e8ee6f9e9b0263581bc9f11c5e Mon Sep 17 00:00:00 2001 From: pschork <354473+pschork@users.noreply.github.com> Date: Thu, 25 Jul 2024 22:54:52 -0700 Subject: [PATCH] Minibatch dynamodb client (#652) --- common/aws/dynamodb/client.go | 26 + .../batcher/batchstore/minibatch_store.go | 620 ++++++++++++++++++ .../batchstore/minibatch_store_test.go | 373 +++++++++++ .../batcher/inmem/minibatch_store_test.go | 9 + disperser/batcher/minibatch_store.go | 14 +- 5 files changed, 1036 insertions(+), 6 deletions(-) create mode 100644 disperser/batcher/batchstore/minibatch_store.go create mode 100644 disperser/batcher/batchstore/minibatch_store_test.go diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 04cc7d75a..2e309a86b 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -105,6 +105,18 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err return nil } +func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string) (err error) { + _, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(tableName), Item: item, + ConditionExpression: aws.String(condition), + }) + if err != nil { + return err + } + + return nil +} + // PutItems puts items in batches of 25 items (which is a limit DynamoDB imposes) // It returns the items that failed to be put. func (c *Client) PutItems(ctx context.Context, tableName string, items []Item) ([]Item, error) { @@ -166,6 +178,20 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str return response.Items, nil } +// Query returns all items in the primary index that match the given expression +func (c *Client) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpresseionValues) ([]Item, error) { + response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(tableName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + }) + if err != nil { + return nil, err + } + + return response.Items, nil +} + // QueryIndexCount returns the count of the items in the index that match the given key func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) { response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{ diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go new file mode 100644 index 000000000..5fe1bf3c7 --- /dev/null +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -0,0 +1,620 @@ +package batchstore + +import ( + "context" + "fmt" + "strconv" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/google/uuid" +) + +const ( + batchStatusIndexName = "BatchStatusIndex" + batchSKPrefix = "BATCH#" + minibatchSKPrefix = "MINIBATCH#" + dispersalRequestSKPrefix = "DISPERSAL_REQUEST#" + dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#" +) + +type MinibatchStore struct { + dynamoDBClient *commondynamodb.Client + tableName string + logger logging.Logger + ttl time.Duration +} + +var _ batcher.MinibatchStore = (*MinibatchStore)(nil) + +func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore { + logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl) + return &MinibatchStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "MinibatchStore"), + tableName: tableName, + ttl: ttl, + } +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("BatchID"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("BatchStatus"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("CreatedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BatchID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(batchStatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BatchStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("CreatedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*batch) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: batch.ID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: batchSKPrefix + batch.ID.String()} + fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} + return fields, nil +} + +func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*minibatch) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: minibatch.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatch.MinibatchIndex)} + return fields, nil +} + +func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*request) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: request.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: request.OperatorID.Hex()} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} + return fields, nil +} + +func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*response) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: response.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: response.OperatorID.Hex()} + fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} + return fields, nil +} + +func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { + type BatchID struct { + BatchID string + } + + batch := BatchID{} + err := attributevalue.UnmarshalMap(item, &batch) + if err != nil { + return nil, err + } + + batchID, err := uuid.Parse(batch.BatchID) + if err != nil { + return nil, err + } + + return &batchID, nil +} + +func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) { + type OperatorID struct { + OperatorID string + } + + dispersal := OperatorID{} + err := attributevalue.UnmarshalMap(item, &dispersal) + if err != nil { + return nil, err + } + + operatorID, err := core.OperatorIDFromHex(dispersal.OperatorID) + if err != nil { + return nil, err + } + + return &operatorID, nil +} + +func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { + batch := batcher.BatchRecord{} + err := attributevalue.UnmarshalMap(item, &batch) + if err != nil { + return nil, err + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + batch.ID = *batchID + + batch.CreatedAt = batch.CreatedAt.UTC() + return &batch, nil +} + +func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) { + minibatch := batcher.MinibatchRecord{} + err := attributevalue.UnmarshalMap(item, &minibatch) + if err != nil { + return nil, err + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + minibatch.BatchID = *batchID + + return &minibatch, nil +} + +func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) { + request := batcher.DispersalRequest{} + err := attributevalue.UnmarshalMap(item, &request) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + request.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + request.OperatorID = *operatorID + + request.RequestedAt = request.RequestedAt.UTC() + return &request, nil +} + +func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) { + response := batcher.DispersalResponse{} + err := attributevalue.UnmarshalMap(item, &response) + if err != nil { + return nil, err + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + response.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + response.OperatorID = *operatorID + + response.RespondedAt = response.RespondedAt.UTC() + response.DispersalRequest.RequestedAt = response.DispersalRequest.RequestedAt.UTC() + return &response, nil +} + +func (m *MinibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error { + item, err := MarshalBatchRecord(batch) + if err != nil { + return err + } + constraint := "attribute_not_exists(BatchID) AND attribute_not_exists(SK)" + return m.dynamoDBClient.PutItemWithCondition(ctx, m.tableName, item, constraint) +} + +func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { + item, err := MarshalMinibatchRecord(minibatch) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { + item, err := MarshalDispersalRequest(request) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *batcher.DispersalResponse) error { + item, err := MarshalDispersalResponse(response) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: batchSKPrefix + batchID.String(), + }, + }) + if err != nil { + m.logger.Errorf("failed to get batch from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + batch, err := UnmarshalBatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal batch record from DynamoDB: %v", err) + return nil, err + } + return batch, nil +} + +func (m *MinibatchStore) BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) { + dispersalRequests, err := m.GetDispersalRequests(ctx, batchID) + if err != nil { + return false, fmt.Errorf("failed to get dispersal requests for batch %s - %v", batchID.String(), err) + + } + dispersalResponses, err := m.GetDispersalResponses(ctx, batchID) + if err != nil { + return false, fmt.Errorf("failed to get dispersal responses for batch %s - %v", batchID.String(), err) + } + if len(dispersalRequests) != len(dispersalResponses) { + m.logger.Info("number of minibatch dispersal requests does not match responses", "batchID", batchID, "numRequests", len(dispersalRequests), "numResponses", len(dispersalResponses)) + return false, nil + } + if len(dispersalRequests) == 0 || len(dispersalResponses) == 0 { + m.logger.Info("no dispersal requests or responses found", "batchID", batchID) + return false, nil + } + return true, nil +} + +func (m *MinibatchStore) GetBatchesByStatus(ctx context.Context, status batcher.BatchStatus) ([]*batcher.BatchRecord, error) { + items, err := m.dynamoDBClient.QueryIndex(ctx, m.tableName, batchStatusIndexName, "BatchStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }}) + if err != nil { + return nil, err + } + + batches := make([]*batcher.BatchRecord, len(items)) + for i, item := range items { + batches[i], err = UnmarshalBatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal batch record at index %d: %v", i, err) + return nil, err + } + } + + return batches, nil +} + +func (m *MinibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) { + formed, err := m.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + if err != nil { + return nil, nil, err + } + if len(formed) == 0 { + return nil, nil, nil + } + batch = formed[len(formed)-1] + minibatches, err = m.GetMinibatches(ctx, batch.ID) + if err != nil { + return nil, nil, err + } + return batch, minibatches, nil +} + +func (m *MinibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status batcher.BatchStatus) error { + if status < batcher.BatchStatusFormed || status > batcher.BatchStatusFailed { + return fmt.Errorf("invalid batch status %v", status) + } + _, err := m.dynamoDBClient.UpdateItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{Value: batchID.String()}, + "SK": &types.AttributeValueMemberS{Value: batchSKPrefix + batchID.String()}, + }, commondynamodb.Item{ + "BatchStatus": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + }) + + if err != nil { + return fmt.Errorf("failed to update batch status: %v", err) + } + + return nil +} + +func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatchIndex), + }, + }) + if err != nil { + m.logger.Errorf("failed to get minibatch from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + minibatch, err := UnmarshalMinibatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal minibatch record from DynamoDB: %v", err) + return nil, err + } + return minibatch, nil +} + +func (m *MinibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*batcher.MinibatchRecord, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: minibatchSKPrefix, + }, + }) + if err != nil { + return nil, err + } + + minibatches := make([]*batcher.MinibatchRecord, len(items)) + for i, item := range items { + minibatches[i], err = UnmarshalMinibatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal minibatch record at index %d: %v", i, err) + return nil, err + } + } + + return minibatches, nil +} + +func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + }, + }) + if err != nil { + m.logger.Errorf("failed to get dispersal request from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + request, err := UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) + return nil, err + } + return request, nil +} + +func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalRequest, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: dispersalRequestSKPrefix, + }, + }) + if err != nil { + return nil, err + } + + requests := make([]*batcher.DispersalRequest, len(items)) + for i, item := range items { + requests[i], err = UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal requests at index %d: %v", i, err) + return nil, err + } + } + + return requests, nil +} + +func (m *MinibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":sk": &types.AttributeValueMemberS{ + Value: dispersalRequestSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("no dispersal requests found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex) + } + + requests := make([]*batcher.DispersalRequest, len(items)) + for i, item := range items { + requests[i], err = UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal requests at index %d: %v", i, err) + return nil, err + } + } + + return requests, nil +} + +func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalResponse, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + }, + }) + if err != nil { + m.logger.Errorf("failed to get dispersal response from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + response, err := UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response from DynamoDB: %v", err) + return nil, err + } + return response, nil +} + +func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalResponse, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: dispersalResponseSKPrefix, + }, + }) + if err != nil { + return nil, err + } + + responses := make([]*batcher.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) + return nil, err + } + } + + return responses, nil +} + +func (m *MinibatchStore) GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":sk": &types.AttributeValueMemberS{ + Value: dispersalResponseSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("no dispersal responses found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex) + } + + responses := make([]*batcher.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) + return nil, err + } + } + + return responses, nil +} diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go new file mode 100644 index 000000000..5e3d4c05e --- /dev/null +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -0,0 +1,373 @@ +package batchstore_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/batchstore" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/Layr-Labs/eigensdk-go/logging" + gcommon "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" +) + +var ( + logger = logging.NewNoopLogger() + + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + + deployLocalStack bool + localStackPort = "4566" + + dynamoClient *dynamodb.Client + minibatchStore *batchstore.MinibatchStore + + UUID = uuid.New() + minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID) +) + +func setup(m *testing.M) { + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + fmt.Printf("deployLocalStack: %v\n", deployLocalStack) + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + + } + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + _, err := test_utils.CreateTable(context.Background(), cfg, minibatchTableName, batchstore.GenerateTableSchema(minibatchTableName, 10, 10)) + if err != nil { + teardown() + panic("failed to create dynamodb table: " + err.Error()) + } + + dynamoClient, err = dynamodb.NewClient(cfg, logger) + if err != nil { + teardown() + panic("failed to create dynamodb client: " + err.Error()) + } + + minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, minibatchTableName, time.Hour) +} + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} + +func TestMain(m *testing.M) { + setup(m) + code := m.Run() + teardown() + os.Exit(code) +} + +func TestPutBatch(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + batch := &batcher.BatchRecord{ + ID: id, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusPending, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err = minibatchStore.PutBatch(ctx, batch) + assert.NoError(t, err) + err = minibatchStore.PutBatch(ctx, batch) + assert.Error(t, err) + b, err := minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batch, b) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFormed) + assert.NoError(t, err) + u, err := minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusFormed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusAttested) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusAttested, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFailed) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusFailed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, 4) + assert.Error(t, err) +} + +func TestGetBatchesByStatus(t *testing.T) { + ctx := context.Background() + id1, _ := uuid.NewV7() + id2, _ := uuid.NewV7() + id3, _ := uuid.NewV7() + ts := time.Now().Truncate(time.Second).UTC() + batch1 := &batcher.BatchRecord{ + ID: id1, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + _ = minibatchStore.PutBatch(ctx, batch1) + batch2 := &batcher.BatchRecord{ + ID: id2, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err := minibatchStore.PutBatch(ctx, batch2) + assert.NoError(t, err) + batch3 := &batcher.BatchRecord{ + ID: id3, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err = minibatchStore.PutBatch(ctx, batch3) + assert.NoError(t, err) + + attested, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) + assert.NoError(t, err) + assert.Equal(t, 0, len(attested)) + + formed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + assert.NoError(t, err) + assert.Equal(t, 3, len(formed)) + + err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusAttested) + assert.NoError(t, err) + + formed, err = minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + assert.NoError(t, err) + assert.Equal(t, 2, len(formed)) +} + +func TestPutMinibatch(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + minibatch := &batcher.MinibatchRecord{ + BatchID: id, + MinibatchIndex: 12, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + err = minibatchStore.PutMinibatch(ctx, minibatch) + assert.NoError(t, err) + m, err := minibatchStore.GetMinibatch(ctx, minibatch.BatchID, minibatch.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, minibatch, m) +} + +func TestGetLatestFormedBatch(t *testing.T) { + ctx := context.Background() + id1, _ := uuid.NewV7() + id2, _ := uuid.NewV7() + ts := time.Now().Truncate(time.Second).UTC() + ts2 := ts.Add(10 * time.Second) + batch1 := &batcher.BatchRecord{ + ID: id1, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + minibatch1 := &batcher.MinibatchRecord{ + BatchID: id1, + MinibatchIndex: 1, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + batch2 := &batcher.BatchRecord{ + ID: id2, + CreatedAt: ts2, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + minibatch2 := &batcher.MinibatchRecord{ + BatchID: id2, + MinibatchIndex: 1, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + minibatch3 := &batcher.MinibatchRecord{ + BatchID: id2, + MinibatchIndex: 2, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + err := minibatchStore.PutBatch(ctx, batch1) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch1) + assert.NoError(t, err) + err = minibatchStore.PutBatch(ctx, batch2) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch2) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch3) + assert.NoError(t, err) + + batch, minibatches, err := minibatchStore.GetLatestFormedBatch(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, len(minibatches)) + assert.Equal(t, batch.ID, batch2.ID) +} +func TestPutDispersalRequest(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + request := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: "blobHash", + MetadataHash: "metadataHash", + } + err = minibatchStore.PutDispersalRequest(ctx, request) + assert.NoError(t, err) + r, err := minibatchStore.GetDispersalRequest(ctx, request.BatchID, request.MinibatchIndex, opID) + assert.NoError(t, err) + assert.Equal(t, request, r) +} + +func TestPutDispersalResponse(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + blobHash := "blobHash" + metadataHash := "metadataHash" + response := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + }, + Signatures: nil, + RespondedAt: ts, + Error: nil, + } + err = minibatchStore.PutDispersalResponse(ctx, response) + assert.NoError(t, err) + r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex, opID) + assert.NoError(t, err) + assert.Equal(t, response, r) +} + +func TestDispersalStatus(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + blobHash := "blobHash" + metadataHash := "metadataHash" + + // no dispersals + dispersed, err := minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.False(t, dispersed) + + request := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + } + err = minibatchStore.PutDispersalRequest(ctx, request) + assert.NoError(t, err) + + // dispersal request but no response + dispersed, err = minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.False(t, dispersed) + + response := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + }, + Signatures: nil, + RespondedAt: ts, + Error: nil, + } + err = minibatchStore.PutDispersalResponse(ctx, response) + assert.NoError(t, err) + + // dispersal request and response + dispersed, err = minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.True(t, dispersed) +} diff --git a/disperser/batcher/inmem/minibatch_store_test.go b/disperser/batcher/inmem/minibatch_store_test.go index b9e2f02f4..29b30294e 100644 --- a/disperser/batcher/inmem/minibatch_store_test.go +++ b/disperser/batcher/inmem/minibatch_store_test.go @@ -26,6 +26,7 @@ func TestPutBatch(t *testing.T) { ID: id, CreatedAt: time.Now().UTC(), ReferenceBlockNumber: 1, + Status: 1, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -70,6 +71,8 @@ func TestPutDispersalRequest(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", } err = s.PutDispersalRequest(ctx, req1) assert.NoError(t, err) @@ -80,6 +83,8 @@ func TestPutDispersalRequest(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", } err = s.PutDispersalRequest(ctx, req2) assert.NoError(t, err) @@ -113,6 +118,8 @@ func TestPutDispersalResponse(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", }, Signatures: nil, RespondedAt: time.Now().UTC(), @@ -126,6 +133,8 @@ func TestPutDispersalResponse(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "0x0", + MetadataHash: "0x0", }, Signatures: nil, RespondedAt: time.Now().UTC(), diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index d1e3ed24c..d02811321 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -29,17 +29,17 @@ const ( ) type BatchRecord struct { - ID uuid.UUID + ID uuid.UUID `dynamodbav:"-"` CreatedAt time.Time ReferenceBlockNumber uint - Status BatchStatus + Status BatchStatus `dynamodbav:"BatchStatus"` HeaderHash [32]byte AggregatePubKey *core.G2Point AggregateSignature *core.Signature } type MinibatchRecord struct { - BatchID uuid.UUID + BatchID uuid.UUID `dynamodbav:"-"` MinibatchIndex uint BlobHeaderHashes [][32]byte BatchSize uint64 // in bytes @@ -47,13 +47,15 @@ type MinibatchRecord struct { } type DispersalRequest struct { - BatchID uuid.UUID - MinibatchIndex uint - core.OperatorID + BatchID uuid.UUID `dynamodbav:"-"` + MinibatchIndex uint + core.OperatorID `dynamodbav:"-"` OperatorAddress gcommon.Address Socket string NumBlobs uint RequestedAt time.Time + BlobHash string + MetadataHash string } type DispersalResponse struct {