From 96c02de50b9563bc602f9c4c4393d59dec5a15b3 Mon Sep 17 00:00:00 2001 From: Samuel Laferriere Date: Wed, 30 Oct 2024 17:58:17 +0000 Subject: [PATCH] feat: eigenda client returns 503 errors (for failover purpose) (#828) --- api/clients/codecs/default_blob_codec.go | 2 + api/clients/codecs/ifft_codec.go | 1 + api/clients/config.go | 23 ++++- api/clients/disperser_client.go | 53 ++++++----- api/clients/disperser_client_test.go | 27 ++++++ api/clients/eigenda_client.go | 114 ++++++++++++++++++----- api/clients/eigenda_client_test.go | 12 --- api/errors.go | 57 +++++++++++- api/grpc/disperser/disperser.pb.go | 13 ++- api/proto/disperser/disperser.proto | 14 ++- disperser/disperser.go | 30 ++++++ 11 files changed, 276 insertions(+), 70 deletions(-) create mode 100644 api/clients/disperser_client_test.go diff --git a/api/clients/codecs/default_blob_codec.go b/api/clients/codecs/default_blob_codec.go index 7cc33fc42..6d3ec2994 100644 --- a/api/clients/codecs/default_blob_codec.go +++ b/api/clients/codecs/default_blob_codec.go @@ -16,6 +16,8 @@ func NewDefaultBlobCodec() DefaultBlobCodec { return DefaultBlobCodec{} } +// EncodeBlob can never return an error, but to maintain the interface it is included +// so that it can be swapped for the IFFTCodec without changing the interface func (v DefaultBlobCodec) EncodeBlob(rawData []byte) ([]byte, error) { codecBlobHeader := make([]byte, 32) // first byte is always 0 to ensure the codecBlobHeader is a valid bn254 element diff --git a/api/clients/codecs/ifft_codec.go b/api/clients/codecs/ifft_codec.go index edd5040d4..41f756e80 100644 --- a/api/clients/codecs/ifft_codec.go +++ b/api/clients/codecs/ifft_codec.go @@ -18,6 +18,7 @@ func (v IFFTCodec) EncodeBlob(data []byte) ([]byte, error) { var err error data, err = v.writeCodec.EncodeBlob(data) if err != nil { + // this cannot happen, because EncodeBlob never returns an error return nil, fmt.Errorf("error encoding data: %w", err) } diff --git a/api/clients/config.go b/api/clients/config.go index 0f23ccb1c..f4d9caa9f 100644 --- a/api/clients/config.go +++ b/api/clients/config.go @@ -16,6 +16,15 @@ type EigenDAClientConfig struct { // TODO: we should change this param as its name is quite confusing ResponseTimeout time.Duration + // The total amount of time that the client will spend waiting for EigenDA + // to "confirm" (include onchain) a blob after it has been dispersed. Note that + // we stick to "confirm" here but this really means InclusionTimeout, + // not confirmation in the sense of confirmation depth. + // + // If ConfirmationTimeout time passes and the blob is not yet confirmed, + // the client will return an api.ErrorFailover to let the caller failover to EthDA. + ConfirmationTimeout time.Duration + // The total amount of time that the client will spend waiting for EigenDA // to confirm a blob after it has been dispersed // Note that reasonable values for this field will depend on the value of WaitForFinalization. @@ -81,15 +90,25 @@ func (c *EigenDAClientConfig) CheckAndSetDefaults() error { return fmt.Errorf("EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.") } + if c.ResponseTimeout == 0 { + c.ResponseTimeout = 30 * time.Second + } + if c.ConfirmationTimeout == 0 { + // batching interval on mainnet is 10 minutes, + // so we set the confirmation timeout to 15 minutes to give some buffer + c.ConfirmationTimeout = 15 * time.Minute + } if c.StatusQueryRetryInterval == 0 { c.StatusQueryRetryInterval = 5 * time.Second } if c.StatusQueryTimeout == 0 { c.StatusQueryTimeout = 25 * time.Minute } - if c.ResponseTimeout == 0 { - c.ResponseTimeout = 30 * time.Second + if c.ConfirmationTimeout > c.StatusQueryTimeout { + // doesn't make sense... confirmation is about onchain inclusion, whereas status query is about reaching finality (after inclusion) + return fmt.Errorf("EigenDAClientConfig.ConfirmationTimeout (%v) > EigenDAClientConfig.StatusQueryTimeout (%v)", c.ConfirmationTimeout, c.StatusQueryTimeout) } + if len(c.SignerPrivateKeyHex) > 0 && len(c.SignerPrivateKeyHex) != 64 { return fmt.Errorf("a valid length SignerPrivateKeyHex needs to have 64 bytes") } diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index e677b9683..dc937db33 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -3,7 +3,6 @@ package clients import ( "context" "crypto/tls" - "errors" "fmt" "sync" "time" @@ -42,6 +41,8 @@ func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag b type DisperserClient interface { Close() error DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) + // DisperseBlobAuthenticated disperses a blob with an authenticated request. + // The BlobStatus returned will always be PROCESSSING if error is nil. DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) @@ -112,7 +113,7 @@ func (c *disperserClient) Close() error { func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { err := c.initOnceGrpcConnection() if err != nil { - return nil, nil, fmt.Errorf("error initializing connection: %w", err) + return nil, nil, api.NewErrorFailover(err) } ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout) @@ -154,17 +155,17 @@ func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quo func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { err := c.initOnceGrpcConnection() if err != nil { - return nil, nil, fmt.Errorf("error initializing connection: %w", err) + return nil, nil, api.NewErrorFailover(err) } if c.signer == nil { - return nil, nil, fmt.Errorf("uninitialized signer for authenticated dispersal") + return nil, nil, api.NewErrorInternal("uninitialized signer for authenticated dispersal") } // first check if signer is valid accountId, err := c.signer.GetAccountID() if err != nil { - return nil, nil, fmt.Errorf("please configure signer key if you want to use authenticated endpoint %w", err) + return nil, nil, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err)) } quorumNumbers := make([]uint32, len(quorums)) @@ -175,7 +176,10 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data [] // check every 32 bytes of data are within the valid range for a bn254 field element _, err = rs.ToFrArray(data) if err != nil { - return nil, nil, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617, %w", err) + return nil, nil, api.NewErrorInvalidArg( + fmt.Sprintf("encountered an error to convert a 32-bytes into a valid field element, "+ + "please use the correct format where every 32bytes(big-endian) is less than "+ + "21888242871839275222246405745257275088548364400416034343698204186575808495617, %v", err)) } request := &disperser_rpc.DisperseBlobRequest{ @@ -185,11 +189,12 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data [] } ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout) - defer cancel() stream, err := c.client.DisperseBlobAuthenticated(ctxTimeout) if err != nil { + // grpc client errors return grpc errors, so we can just wrap the error in a normal wrapError, + // no need to wrap in another grpc error as we do with other errors above. return nil, nil, fmt.Errorf("error while calling DisperseBlobAuthenticated: %w", err) } @@ -197,7 +202,6 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data [] err = stream.Send(&disperser_rpc.AuthenticatedRequest{Payload: &disperser_rpc.AuthenticatedRequest_DisperseRequest{ DisperseRequest: request, }}) - if err != nil { return nil, nil, fmt.Errorf("failed to send request: %w", err) } @@ -209,7 +213,7 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data [] } authHeaderReply, ok := reply.Payload.(*disperser_rpc.AuthenticatedReply_BlobAuthHeader) if !ok { - return nil, nil, errors.New("expected challenge") + return nil, nil, api.NewErrorInternal(fmt.Sprintf("client expected challenge from disperser, instead received: %v", reply)) } authHeader := core.BlobAuthHeader{ @@ -217,10 +221,9 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data [] AccountID: "", Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter, } - authData, err := c.signer.SignBlobRequest(authHeader) if err != nil { - return nil, nil, errors.New("error signing blob request") + return nil, nil, api.NewErrorInternal(fmt.Sprintf("error signing blob request: %v", err)) } // Process challenge and send back challenge_reply @@ -239,12 +242,17 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data [] } disperseReply, ok := reply.Payload.(*disperser_rpc.AuthenticatedReply_DisperseReply) // Process the final disperse_reply if !ok { - return nil, nil, errors.New("expected DisperseReply") + return nil, nil, api.NewErrorInternal(fmt.Sprintf("client expected DisperseReply from disperser, instead received: %v", reply)) } blobStatus, err := disperser.FromBlobStatusProto(disperseReply.DisperseReply.GetResult()) if err != nil { - return nil, nil, err + return nil, nil, api.NewErrorInternal(fmt.Sprintf("parsing blob status: %v", err)) + } + + // Assert: only status that makes sense is processing. Anything else is a bug on disperser side. + if *blobStatus != disperser.Processing { + return nil, nil, api.NewErrorInternal(fmt.Sprintf("expected status to be Processing, got %v", *blobStatus)) } return blobStatus, disperseReply.DisperseReply.GetRequestId(), nil @@ -253,7 +261,7 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data [] func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (*disperser_rpc.BlobStatusReply, error) { err := c.initOnceGrpcConnection() if err != nil { - return nil, fmt.Errorf("error initializing connection: %w", err) + return nil, api.NewErrorInternal(err.Error()) } ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60) @@ -262,19 +270,13 @@ func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) ( request := &disperser_rpc.BlobStatusRequest{ RequestId: requestID, } - - reply, err := c.client.GetBlobStatus(ctxTimeout, request) - if err != nil { - return nil, err - } - - return reply, nil + return c.client.GetBlobStatus(ctxTimeout, request) } func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { err := c.initOnceGrpcConnection() if err != nil { - return nil, fmt.Errorf("error initializing connection: %w", err) + return nil, api.NewErrorInternal(err.Error()) } ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60) @@ -289,6 +291,8 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by return reply.Data, nil } +// initOnceGrpcConnection initializes the grpc connection and client if they are not already initialized. +// If initialization fails, it caches the error and will return it on every subsequent call. func (c *disperserClient) initOnceGrpcConnection() error { var initErr error c.initOnce.Do(func() { @@ -302,7 +306,10 @@ func (c *disperserClient) initOnceGrpcConnection() error { c.conn = conn c.client = disperser_rpc.NewDisperserClient(conn) }) - return initErr + if initErr != nil { + return fmt.Errorf("initializing grpc connection: %w", initErr) + } + return nil } func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption { diff --git a/api/clients/disperser_client_test.go b/api/clients/disperser_client_test.go new file mode 100644 index 000000000..28d98dea7 --- /dev/null +++ b/api/clients/disperser_client_test.go @@ -0,0 +1,27 @@ +package clients_test + +import ( + "context" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestPutBlobNoopSigner(t *testing.T) { + config := clients.NewConfig("nohost", "noport", time.Second, false) + disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner()) + + test := []byte("test") + test[0] = 0x00 // make sure the first byte of the requst is always 0 + quorums := []uint8{0} + _, _, err := disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums) + st, isGRPCError := status.FromError(err) + assert.True(t, isGRPCError) + assert.Equal(t, codes.InvalidArgument.String(), st.Code().String()) + assert.Equal(t, "please configure signer key if you want to use authenticated endpoint noop signer cannot get accountID", st.Message()) +} diff --git a/api/clients/eigenda_client.go b/api/clients/eigenda_client.go index 20b45d1da..5d12a3f54 100644 --- a/api/clients/eigenda_client.go +++ b/api/clients/eigenda_client.go @@ -15,12 +15,12 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/Layr-Labs/eigenda/api" "github.com/Layr-Labs/eigenda/api/clients/codecs" grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser" edasm "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" - "github.com/Layr-Labs/eigenda/disperser" ) // IEigenDAClient is a wrapper around the DisperserClient interface which @@ -162,11 +162,37 @@ func (m *EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blo // PutBlob encodes and writes a blob to EigenDA, waiting for a desired blob status // to be reached (guarded by WaitForFinalization config param) before returning. -// This function is resilient to transient failures and timeouts. +// +// TODO: describe retry/timeout behavior // // Upon return the blob is guaranteed to be: // - finalized onchain (if Config.WaitForFinalization is true), or -// - confirmed at a certain depth (if Config.WaitForFinalization is false, in which case Config.WaitForConfirmationDepth specifies the depth). +// - confirmed at a certain depth (if Config.WaitForFinalization is false, +// in which case Config.WaitForConfirmationDepth specifies the depth). +// +// Errors returned all either grpc errors, or api.ErrorFailover, for eg: +// +// blobInfo, err := client.PutBlob(ctx, blobData) +// if err != nil { +// if errors.Is(err, api.ErrorFailover) { +// // failover to ethda +// } +// st, isGRPCError := status.FromError(err) +// if isGRPCError { +// // use st.Code() and st.Message() +// } else { +// // assert this shouldn't happen +// } +// } +// +// An api.ErrorFailover error returned is used to signify that eigenda is temporarily unavailable, +// and suggest to the caller (most likely some rollup batcher via the eigenda-proxy) +// to fallback to ethda for some amount of time. Three reasons for returning api.ErrorFailover: +// 1. Failed to put the blob in the disperser's queue (disperser is down) +// 2. Timed out before getting confirmed onchain (batcher is down) +// 3. Insufficient signatures (eigenda network is down) +// +// See https://github.com/ethereum-optimism/specs/issues/434 for more details. func (m *EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) { resultChan, errorChan := m.PutBlobAsync(ctx, data) select { // no timeout here because we depend on the configured timeout in PutBlobAsync @@ -184,18 +210,19 @@ func (m *EigenDAClient) PutBlobAsync(ctx context.Context, data []byte) (resultCh return } -func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) { +func (m *EigenDAClient) putBlob(ctxFinality context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) { m.Log.Info("Attempting to disperse blob to EigenDA") // encode blob if m.Codec == nil { - errChan <- fmt.Errorf("Codec cannot be nil") + errChan <- api.NewErrorInternal("codec not initialized") return } data, err := m.Codec.EncodeBlob(rawData) if err != nil { - errChan <- fmt.Errorf("error encoding blob: %w", err) + // Encode can only fail if there is something wrong with the data, so we return a 400 error + errChan <- api.NewErrorInvalidArg(fmt.Sprintf("error encoding blob: %v", err)) return } @@ -205,15 +232,11 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan } // disperse blob // TODO: would be nice to add a trace-id key to the context, to be able to follow requests from batcher->proxy->eigenda - blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers) + _, requestID, err := m.Client.DisperseBlobAuthenticated(ctxFinality, data, customQuorumNumbers) if err != nil { - errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err) - return - } - - // process response - if *blobStatus == disperser.Failed { - errChan <- fmt.Errorf("unable to disperse blob to eigenda (reply status %d): %w", blobStatus, err) + // DisperserClient returned error is already a grpc error which can be a 400 (eg rate limited) or 500, + // so we wrap the error such that clients can still use grpc's status.FromError() function to get the status code. + errChan <- fmt.Errorf("error submitting authenticated blob to disperser: %w", err) return } @@ -223,24 +246,54 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan ticker := time.NewTicker(m.Config.StatusQueryRetryInterval) defer ticker.Stop() + confirmationCh := time.NewTimer(m.Config.ConfirmationTimeout).C var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, m.Config.StatusQueryTimeout) + // finality here can either mean reaching some confirmationDepth or reaching actual finality + // depending on the WaitForFinalization config param. + ctxFinality, cancel = context.WithTimeout(ctxFinality, m.Config.StatusQueryTimeout) defer cancel() alreadyWaitingForDispersal := false alreadyWaitingForConfirmationOrFinality := false + var latestBlobStatus grpcdisperser.BlobStatus for { select { - case <-ctx.Done(): - errChan <- fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w", base64RequestID, ctx.Err()) + // The two first timeout cases can only happen while blob is still in + // 1. processing or dispersing status: waiting to land onchain + // 2. or confirmed status: landed onchain, waiting for finalization + // because all other statuses return immediately once reached (see below). + case <-confirmationCh: + if latestBlobStatus == grpcdisperser.BlobStatus_PROCESSING || latestBlobStatus == grpcdisperser.BlobStatus_DISPERSING { + errChan <- api.NewErrorFailover(fmt.Errorf("eigenda might be down. timed out waiting for blob to land onchain (request id=%s): %w", base64RequestID, ctxFinality.Err())) + } + // set to nil so this case doesn't get triggered again + confirmationCh = nil + case <-ctxFinality.Done(): + // this should have been triggered above because confirmationTimeout < ctxFinality timeout, + // but we leave this assert here as a safety net. + if latestBlobStatus == grpcdisperser.BlobStatus_PROCESSING || latestBlobStatus == grpcdisperser.BlobStatus_DISPERSING { + errChan <- api.NewErrorFailover(fmt.Errorf("eigenda might be down. timed out waiting for blob to land onchain (request id=%s): %w", base64RequestID, ctxFinality.Err())) + } else if latestBlobStatus == grpcdisperser.BlobStatus_CONFIRMED { + // Assuming that the ctxFinality timeout is correctly set (long enough for batch to land onchain + finalize), + // still being in confirmed state here means that there is a problem with Ethereum, so we return DeadlineExceeded (504). + // batcher would most likely resubmit another blob, which is not ideal but there isn't much to be done... + // eigenDA v2 will have idempotency so one can just resubmit the same blob safely. + // TODO: (if timeout was not long enough to finalize in normal conditions): eigenda-client is badly configured, should be a 400 (INVALID_ARGUMENT) + errChan <- api.NewErrorDeadlineExceeded( + fmt.Sprintf("timed out waiting for blob that landed onchain to finalize (request id=%s). "+ + "Either timeout not long enough, or ethereum might be experiencing difficulties: %v. ", base64RequestID, ctxFinality.Err())) + } else { + // this should not be reachable... indicates something wrong with either this client or eigenda, so we failover to ethda + errChan <- api.NewErrorFailover(fmt.Errorf("timed out in a state that shouldn't be possible (request id=%s): %w", base64RequestID, ctxFinality.Err())) + } return case <-ticker.C: - statusRes, err := m.Client.GetBlobStatus(ctx, requestID) + statusRes, err := m.Client.GetBlobStatus(ctxFinality, requestID) if err != nil { m.Log.Warn("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err) continue } - + latestBlobStatus = statusRes.Status switch statusRes.Status { case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING: // to prevent log clutter, we only log at info level once @@ -251,10 +304,21 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan alreadyWaitingForDispersal = true } case grpcdisperser.BlobStatus_FAILED: - errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err) + // This can happen for a few reasons: + // 1. blob has expired, a client retrieve after 14 days. Sounds like 400 errors, but not sure this can happen during dispersal... + // 2. internal logic error while requesting encoding (shouldn't happen), but should probably return api.ErrorFailover + // 3. wait for blob finalization from confirmation and blob retry has exceeded its limit. + // Probably from a chain re-org. See https://github.com/Layr-Labs/eigenda/blob/master/disperser/batcher/finalizer.go#L179-L189. + // So we should be returning 500 to force a blob resubmission (not eigenda's fault but until + // we have idempotency this is unfortunately the only solution) + // TODO: we should create new BlobStatus categories to separate these cases out. For now returning 500 is fine. + errChan <- api.NewErrorInternal(fmt.Sprintf("blob dispersal (requestID=%s) reached failed status. please resubmit the blob.", base64RequestID)) return case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES: - errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err) + // Some quorum failed to sign the blob, indicating that the whole network is having issues. + // We hence return api.ErrorFailover to let the batcher failover to ethda. This could however be a very unlucky + // temporary issue, so the caller should retry at least one more time before failing over. + errChan <- api.NewErrorFailover(fmt.Errorf("blob dispersal (requestID=%s) failed with insufficient signatures. eigenda nodes are probably down.", base64RequestID)) return case grpcdisperser.BlobStatus_CONFIRMED: if m.Config.WaitForFinalization { @@ -267,7 +331,7 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan } } else { batchId := statusRes.Info.BlobVerificationProof.GetBatchId() - batchConfirmed, err := m.batchIdConfirmedAtDepth(ctx, batchId, m.Config.WaitForConfirmationDepth) + batchConfirmed, err := m.batchIdConfirmedAtDepth(ctxFinality, batchId, m.Config.WaitForConfirmationDepth) if err != nil { m.Log.Warn("Error checking if batch ID is confirmed at depth. Will retry...", "requestID", base64RequestID, "err", err) } @@ -290,8 +354,10 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan resultChan <- statusRes.Info return default: - // this should never happen. If it does, the blob is in a heisenberg state... it could either eventually get confirmed or fail - errChan <- fmt.Errorf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID) + // This should never happen. If it does, the blob is in a heisenberg state... it could either eventually get confirmed or fail. + // However, this doesn't mean there's a major outage with EigenDA, so we return a 500 error to let the caller redisperse the blob, + // rather than an api.ErrorFailover to failover to EthDA. + errChan <- api.NewErrorInternal(fmt.Sprintf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID)) return } } diff --git a/api/clients/eigenda_client_test.go b/api/clients/eigenda_client_test.go index 79e8556df..29435472b 100644 --- a/api/clients/eigenda_client_test.go +++ b/api/clients/eigenda_client_test.go @@ -11,7 +11,6 @@ import ( clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock" "github.com/Layr-Labs/eigenda/api/grpc/common" grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/disperser" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/assert" @@ -507,14 +506,3 @@ func TestPutBlobTotalTimeout(t *testing.T) { require.Error(t, err) require.Nil(t, blobInfo) } - -func TestPutBlobNoopSigner(t *testing.T) { - config := clients.NewConfig("nohost", "noport", time.Second, false) - disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner()) - - test := []byte("test") - test[0] = 0x00 // make sure the first byte of the requst is always 0 - quorums := []uint8{0} - _, _, err := disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums) - assert.EqualError(t, err, "please configure signer key if you want to use authenticated endpoint noop signer cannot get accountID") -} diff --git a/api/errors.go b/api/errors.go index d31620b29..93f69cf5d 100644 --- a/api/errors.go +++ b/api/errors.go @@ -1,6 +1,8 @@ package api import ( + "fmt" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -8,12 +10,14 @@ import ( // The canonical errors from the EigenDA gRPC API endpoints. // // Notes: -// - We start with a small (but sufficient) subset of google's error code convention, +// - We start with a small (but sufficient) subset of grpc's error codes, // and expand when there is an important failure case to separate out. See: -// https://cloud.google.com/apis/design/errors#handling_errors +// https://grpc.io/docs/guides/status-codes/ // - Make sure that internally propagated errors are eventually wrapped in one of the // user-facing errors defined here, since grpc otherwise returns an UNKNOWN error code, // which is harder to debug and understand for users. +// - See https://github.com/googleapis/googleapis/blob/ba8ea80f25d19bde8501cd51f314391f8d39bde8/google/rpc/code.proto +// for the mapping of grpc error codes to HTTP status codes. func newErrorGRPC(code codes.Code, msg string) error { return status.Error(code, msg) @@ -39,7 +43,56 @@ func NewErrorInternal(msg string) error { return newErrorGRPC(codes.Internal, msg) } +// HTTP Mapping: 500 Internal Server Error +func NewErrorUnknown(msg string) error { + return newErrorGRPC(codes.Unknown, msg) +} + // HTTP Mapping: 501 Not Implemented func NewErrorUnimplemented() error { return newErrorGRPC(codes.Unimplemented, "not implemented") } + +// HTTP Mapping: 504 Gateway Timeout +func NewErrorDeadlineExceeded(msg string) error { + return newErrorGRPC(codes.DeadlineExceeded, "msg") +} + +// ErrorFailover is returned by the disperser-client and eigenda-client to signify +// that eigenda is temporarily unavailable, and suggest to the caller +// (most likely some rollup batcher via the eigenda-proxy) to failover +// to ethda for some amount of time. +// See https://github.com/ethereum-optimism/specs/issues/434 for more details. +// +// Given that both clients already return grpc errors, we could potentially use +// a grpc UNAVAILABLE error instead, but we don't because: +// 1. UNAVAILABLE is typically used to tell the client to retry the request, not failover +// 2. the grpc framework itself also returns UNAVAILABLE errors in some cases, see: +// https://github.com/grpc/grpc-go/blob/192ee33f6fc0f07070eeaaa1d34e41746740e64c/codes/codes.go#L184. +// We could differentiate from those generated by the grpc framework by using error details, like +// https://github.com/grpc/grpc-go/tree/master/examples/features/error_details, but that would complicate things +// and it feels much simpler to just use a custom error type for this specific purpose. +// +// 3 reasons for returning api.ErrorFailover: +// 1. Failed to put the blob in the disperser's queue (disperser is down) +// 2. Timed out before getting confirmed onchain (batcher is down) +// 3. Insufficient signatures (eigenda network is down) +type ErrorFailover struct { + Err error +} + +// NewErrorFailover creates a new ErrorFailover with the given underlying error. +// See ErrorFailover for more details. +func NewErrorFailover(err error) *ErrorFailover { + return &ErrorFailover{ + Err: err, + } +} + +func (e *ErrorFailover) Error() string { + return fmt.Sprintf("Failover: %s", e.Err.Error()) +} + +func (e *ErrorFailover) Unwrap() error { + return e.Err +} diff --git a/api/grpc/disperser/disperser.pb.go b/api/grpc/disperser/disperser.pb.go index ba949323f..3b29214a8 100644 --- a/api/grpc/disperser/disperser.pb.go +++ b/api/grpc/disperser/disperser.pb.go @@ -42,14 +42,21 @@ const ( // batch containing the blob has been confirmed onchain BlobStatus_CONFIRMED BlobStatus = 2 // FAILED means that the blob has failed permanently (for reasons other than insufficient - // signatures, which is a separate state) + // signatures, which is a separate state). This status is somewhat of a catch-all category, + // containg (but not necessarily exclusively as errors can be added in the future): + // - blob has expired + // - internal logic error while requesting encoding + // - blob retry has exceeded its limit while waiting for blob finalization after confirmation. + // Most likely triggered by a chain reorg: see https://github.com/Layr-Labs/eigenda/blob/master/disperser/batcher/finalizer.go#L179-L189. BlobStatus_FAILED BlobStatus = 3 // FINALIZED means that the block containing the blob's confirmation transaction has been finalized on Ethereum BlobStatus_FINALIZED BlobStatus = 4 // INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met // for at least one quorum. BlobStatus_INSUFFICIENT_SIGNATURES BlobStatus = 5 - // DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain + // The DISPERSING state is comprised of two separate phases: + // - Dispersing to DA nodes and collecting signature + // - Submitting the transaction on chain and waiting for tx receipt BlobStatus_DISPERSING BlobStatus = 6 ) @@ -522,7 +529,7 @@ type DisperseBlobReply struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // The status of the blob associated with the request_id. + // The status of the blob associated with the request_id. Will always be PROCESSING. Result BlobStatus `protobuf:"varint,1,opt,name=result,proto3,enum=disperser.BlobStatus" json:"result,omitempty"` // The request ID generated by the disperser. // Once a request is accepted (although not processed), a unique request ID will be diff --git a/api/proto/disperser/disperser.proto b/api/proto/disperser/disperser.proto index 61ddfe638..7d21ac3f1 100644 --- a/api/proto/disperser/disperser.proto +++ b/api/proto/disperser/disperser.proto @@ -113,7 +113,7 @@ message DispersePaidBlobRequest { } message DisperseBlobReply { - // The status of the blob associated with the request_id. + // The status of the blob associated with the request_id. Will always be PROCESSING. BlobStatus result = 1; // The request ID generated by the disperser. // Once a request is accepted (although not processed), a unique request ID will be @@ -172,15 +172,21 @@ enum BlobStatus { CONFIRMED = 2; // FAILED means that the blob has failed permanently (for reasons other than insufficient - // signatures, which is a separate state) + // signatures, which is a separate state). This status is somewhat of a catch-all category, + // containg (but not necessarily exclusively as errors can be added in the future): + // - blob has expired + // - internal logic error while requesting encoding + // - blob retry has exceeded its limit while waiting for blob finalization after confirmation. + // Most likely triggered by a chain reorg: see https://github.com/Layr-Labs/eigenda/blob/master/disperser/batcher/finalizer.go#L179-L189. FAILED = 3; // FINALIZED means that the block containing the blob's confirmation transaction has been finalized on Ethereum FINALIZED = 4; // INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met // for at least one quorum. INSUFFICIENT_SIGNATURES = 5; - - // DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain + // The DISPERSING state is comprised of two separate phases: + // - Dispersing to DA nodes and collecting signature + // - Submitting the transaction on chain and waiting for tx receipt DISPERSING = 6; } diff --git a/disperser/disperser.go b/disperser/disperser.go index a4f1099e0..0d939e764 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -17,16 +17,46 @@ import ( gcommon "github.com/ethereum/go-ethereum/common" ) +// BlobStatus represents the status of a blob. +// The status of a blob is updated as the blob is processed by the disperser. +// The status of a blob can be queried by the client using the GetBlobStatus API. +// Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state: +// - PROCESSING +// - DISPERSING +// - CONFIRMED +// Terminal states are states that will not be updated to a different state: +// - FAILED +// - FINALIZED +// - INSUFFICIENT_SIGNATURES +// +// Note: this docstring and the enum ones below are copied from the disperser.proto, +// which is the source of truth for BlobStatus. type BlobStatus uint // WARNING: THESE VALUES BECOME PART OF PERSISTENT SYSTEM STATE; // ALWAYS INSERT NEW ENUM VALUES AS THE LAST ELEMENT TO MAINTAIN COMPATIBILITY const ( + // PROCESSING means that the blob is currently being processed by the disperser Processing BlobStatus = iota + // CONFIRMED means that the blob has been dispersed to DA Nodes and the dispersed + // batch containing the blob has been confirmed onchain Confirmed + // FAILED means that the blob has failed permanently (for reasons other than insufficient + // signatures, which is a separate state). This status is somewhat of a catch-all category, + // containg (but not necessarily exclusively as errors can be added in the future): + // - blob has expired + // - internal logic error while requesting encoding + // - blob retry has exceeded its limit while waiting for blob finalization after confirmation. + // Most likely triggered by a chain reorg: see https://github.com/Layr-Labs/eigenda/blob/master/disperser/batcher/finalizer.go#L179-L189. Failed + // FINALIZED means that the block containing the blob's confirmation transaction has been finalized on Ethereum Finalized + // INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met + // for at least one quorum. InsufficientSignatures + // The DISPERSING state is comprised of two separate phases: + // - Dispersing to DA nodes and collecting signature + // - Submitting the transaction on chain and waiting for tx receipt Dispersing )