From 5267e4ae8afb358295f3abd63491f08a0f8d364e Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Tue, 22 Oct 2024 20:47:19 +0700 Subject: [PATCH 1/6] fix(dapi): invalid state transition failed with already in chain error --- .../fetchCachedStateTransitionResult.js | 72 +++++++++++++++++++ .../broadcastStateTransitionHandlerFactory.js | 19 +++-- 2 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js diff --git a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js new file mode 100644 index 00000000000..7a78e135385 --- /dev/null +++ b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js @@ -0,0 +1,72 @@ +const UnavailableGrpcError = require('@dashevo/grpc-common/lib/server/error/UnavailableGrpcError'); +const ResourceExhaustedGrpcError = require('@dashevo/grpc-common/lib/server/error/ResourceExhaustedGrpcError'); +const logger = require('../../../../logger'); + +/** + * @param {jaysonClient} rpcClient + * @param {string} uri + * @param {Object} params + * @return {Promise} + */ +async function request(rpcClient, uri, params = {}) { + let response; + try { + response = await rpcClient.request(uri, params); + } catch (e) { + if (e.message === 'socket hang up') { + throw new UnavailableGrpcError('Tenderdash is not available'); + } + + e.message = `Failed to fetch cached transaction: ${e.message}`; + + throw e; + } + + const { result, error: jsonRpcError } = response; + + if (jsonRpcError) { + if (typeof jsonRpcError.data === 'string') { + if (jsonRpcError.data.includes('too_many_resets')) { + throw new ResourceExhaustedGrpcError('tenderdash is not responding: too many requests'); + } + } + + const error = new Error(); + Object.assign(error, jsonRpcError); + + logger.error(error, `Unexpected JSON RPC error during broadcasting state transition: ${JSON.stringify(jsonRpcError)}`); + + throw error; + } + + return result; +} + +/** + * + * @param {jaysonClient} rpcClient + * @return {fetchCachedStateTransitionResult} + */ +function fetchCachedStateTransitionResultFactory(rpcClient) { + /** + * @typedef fetchCachedStateTransitionResult + * @param {Buffer} stBytes + * @return {Promise} + */ + return async function fetchCachedStateTransitionResult(stBytes) { + // Subscribing to future result + const stHash = crypto.createHash('sha256') + .update(stBytes) + .digest(); + + // Search cached state transition in mempool + // rpcClient.request('/unconfirmed_txs'); + + // Search in blockchain data + const result = await request(rpcClient, '/tx', { hash: `0x${stHash.toString('hex')}` }); + + + }; +} + +module.exports = fetchCachedStateTransitionResultFactory; diff --git a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js index ad772760427..c7a5cf89903 100644 --- a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js +++ b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js @@ -19,10 +19,14 @@ const logger = require('../../../logger'); /** * @param {jaysonClient} rpcClient * @param {createGrpcErrorFromDriveResponse} createGrpcErrorFromDriveResponse + * @param {fetchCachedStateTransitionResult} fetchCachedStateTransitionResult * * @returns {broadcastStateTransitionHandler} */ -function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDriveResponse) { +function broadcastStateTransitionHandlerFactory( + rpcClient, + createGrpcErrorFromDriveResponse, + fetchCachedStateTransitionResult) { /** * @typedef broadcastStateTransitionHandler * @@ -38,7 +42,9 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr throw new InvalidArgumentGrpcError('State Transition is not specified'); } - const tx = Buffer.from(stByteArray) + const stBytes = Buffer.from(stByteArray); + + const tx = stBytes .toString('base64'); let response; @@ -55,14 +61,15 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr throw e; } - const { - result, - error: jsonRpcError, - } = response; + let { result } = response; + const { error: jsonRpcError } = response; if (jsonRpcError) { if (typeof jsonRpcError.data === 'string') { if (jsonRpcError.data === 'tx already exists in cache') { + result = fetchCachedStateTransitionResult(stBytes); + + throw new AlreadyExistsGrpcError('state transition already in chain'); } From d5b669838e488341624f65767f1e2e08087fadb4 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Wed, 23 Oct 2024 21:16:58 +0700 Subject: [PATCH 2/6] chore: accomplished implementation --- .../tenderdash/requestTenderRpc.js | 50 +++++++++++++ .../fetchCachedStateTransitionResult.js | 72 ------------------- .../broadcastStateTransitionHandlerFactory.js | 72 ++++++++++++++++--- .../platform/platformHandlersFactory.js | 4 ++ 4 files changed, 117 insertions(+), 81 deletions(-) create mode 100644 packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js delete mode 100644 packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js diff --git a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js new file mode 100644 index 00000000000..f30506822e1 --- /dev/null +++ b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js @@ -0,0 +1,50 @@ +const UnavailableGrpcError = require('@dashevo/grpc-common/lib/server/error/UnavailableGrpcError'); +const ResourceExhaustedGrpcError = require('@dashevo/grpc-common/lib/server/error/ResourceExhaustedGrpcError'); +const RPCError = require('../../rpcServer/RPCError'); + +/** + * @param {jaysonClient} rpcClient + * @return {requestTenderRpc} + */ +function requestTenderRpcFactory(rpcClient) { + /** + * @typedef requestTenderRpc + * @param {string} uri + * @param {Object} [params] + * @return {Promise} + */ + return async function requestTenderRpc(uri, params = {}) { + let response; + try { + response = await rpcClient.request(uri, params); + } catch (e) { + if (e.message === 'socket hang up') { + throw new UnavailableGrpcError('Tenderdash is not available'); + } + + e.message = `Failed to request ${uri}: ${e.message}`; + + throw e; + } + + const { result, error: jsonRpcError } = response; + + if (jsonRpcError) { + if (typeof jsonRpcError.data === 'string') { + if (jsonRpcError.data.includes('too_many_resets')) { + throw new ResourceExhaustedGrpcError('tenderdash is not responding: too many requests'); + } + } + + throw new RPCError( + jsonRpcError.code || -32602, + jsonRpcError.message || 'Internal error', + jsonRpcError.data, + ); + } + + return result; + }; +} + +module.exports = requestTenderRpcFactory; diff --git a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js deleted file mode 100644 index 7a78e135385..00000000000 --- a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransition/fetchCachedStateTransitionResult.js +++ /dev/null @@ -1,72 +0,0 @@ -const UnavailableGrpcError = require('@dashevo/grpc-common/lib/server/error/UnavailableGrpcError'); -const ResourceExhaustedGrpcError = require('@dashevo/grpc-common/lib/server/error/ResourceExhaustedGrpcError'); -const logger = require('../../../../logger'); - -/** - * @param {jaysonClient} rpcClient - * @param {string} uri - * @param {Object} params - * @return {Promise} - */ -async function request(rpcClient, uri, params = {}) { - let response; - try { - response = await rpcClient.request(uri, params); - } catch (e) { - if (e.message === 'socket hang up') { - throw new UnavailableGrpcError('Tenderdash is not available'); - } - - e.message = `Failed to fetch cached transaction: ${e.message}`; - - throw e; - } - - const { result, error: jsonRpcError } = response; - - if (jsonRpcError) { - if (typeof jsonRpcError.data === 'string') { - if (jsonRpcError.data.includes('too_many_resets')) { - throw new ResourceExhaustedGrpcError('tenderdash is not responding: too many requests'); - } - } - - const error = new Error(); - Object.assign(error, jsonRpcError); - - logger.error(error, `Unexpected JSON RPC error during broadcasting state transition: ${JSON.stringify(jsonRpcError)}`); - - throw error; - } - - return result; -} - -/** - * - * @param {jaysonClient} rpcClient - * @return {fetchCachedStateTransitionResult} - */ -function fetchCachedStateTransitionResultFactory(rpcClient) { - /** - * @typedef fetchCachedStateTransitionResult - * @param {Buffer} stBytes - * @return {Promise} - */ - return async function fetchCachedStateTransitionResult(stBytes) { - // Subscribing to future result - const stHash = crypto.createHash('sha256') - .update(stBytes) - .digest(); - - // Search cached state transition in mempool - // rpcClient.request('/unconfirmed_txs'); - - // Search in blockchain data - const result = await request(rpcClient, '/tx', { hash: `0x${stHash.toString('hex')}` }); - - - }; -} - -module.exports = fetchCachedStateTransitionResultFactory; diff --git a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js index c7a5cf89903..6742512b726 100644 --- a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js +++ b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js @@ -2,9 +2,10 @@ const { server: { error: { InvalidArgumentGrpcError, - AlreadyExistsGrpcError, ResourceExhaustedGrpcError, UnavailableGrpcError, + AlreadyExistsGrpcError, + InternalGrpcError, }, }, } = require('@dashevo/grpc-common'); @@ -14,19 +15,23 @@ const { BroadcastStateTransitionResponse, }, } = require('@dashevo/dapi-grpc'); + +const crypto = require('crypto'); + const logger = require('../../../logger'); /** * @param {jaysonClient} rpcClient * @param {createGrpcErrorFromDriveResponse} createGrpcErrorFromDriveResponse - * @param {fetchCachedStateTransitionResult} fetchCachedStateTransitionResult + * @param {requestTenderRpc} requestTenderRpc * * @returns {broadcastStateTransitionHandler} */ function broadcastStateTransitionHandlerFactory( rpcClient, createGrpcErrorFromDriveResponse, - fetchCachedStateTransitionResult) { + requestTenderRpc, +) { /** * @typedef broadcastStateTransitionHandler * @@ -61,16 +66,65 @@ function broadcastStateTransitionHandlerFactory( throw e; } - let { result } = response; - const { error: jsonRpcError } = response; + const { result, error: jsonRpcError } = response; if (jsonRpcError) { if (typeof jsonRpcError.data === 'string') { if (jsonRpcError.data === 'tx already exists in cache') { - result = fetchCachedStateTransitionResult(stBytes); - - - throw new AlreadyExistsGrpcError('state transition already in chain'); + // We need to figure out and report to user why the ST cached + const stHash = crypto.createHash('sha256') + .update(stBytes) + .digest(); + + // TODO: Apply search filter to fetch specific state transition + // Throw an already exist in mempool error if the ST in mempool + const unconfirmedTxsResponse = await requestTenderRpc('unconfirmed_txs', { limit: 100 }); + + if (unconfirmedTxsResponse?.txs?.includes(stBytes.toString('base64'))) { + throw new AlreadyExistsGrpcError('state transition already in mempool'); + } + + // Throw an already exist in chain error if the ST is committed + let txResponse; + try { + txResponse = await requestTenderRpc('tx', { hash: stHash.toString('base64') }); + } catch (e) { + if (typeof e.data !== 'string' || !e.data.includes('not found')) { + throw e; + } + } + + if (txResponse?.tx_result) { + throw new AlreadyExistsGrpcError('state transition already in chain'); + } + + // If the ST not in mempool and not in the state but still in the cache + // it means it was invalidated by CheckTx so we run CheckTx again to provide + // the validation error + const checkTxResponse = await requestTenderRpc('check_tx', { tx }); + + if (checkTxResponse?.code !== 0) { + // Return validation error + throw await createGrpcErrorFromDriveResponse( + checkTxResponse.code, + checkTxResponse.info, + ); + } else { + // CheckTx passes for the ST, it means we have a bug in Drive so ST is passing check + // tx and then removed from the block. The removal from the block doesn't remove ST + // from the cache because it's happening only one proposer and other nodes do not know + // that this ST was processed and keep it in the cache + // The best what we can do is to return an internal error and and log the transaction + logger.warn({ + tx, + }, `State transition ${stHash.toString('hex')} is passing CheckTx but removed from the block by proposal`); + + const error = new Error('State Transition processing error. Please report' + + ' faulty state transition and try to create a new state transition with different' + + ' hash as a workaround.'); + + throw new InternalGrpcError(error); + } } if (jsonRpcError.data.startsWith('Tx too large.')) { diff --git a/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js b/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js index 641071d3572..8288583ab7b 100644 --- a/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js +++ b/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js @@ -55,6 +55,7 @@ const waitForTransactionToBeProvableFactory = require('../../../externalApis/ten const waitForTransactionResult = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/waitForTransactionResult'); const getExistingTransactionResultFactory = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/getExistingTransactionResult'); const getConsensusParamsFactory = require('../../../externalApis/tenderdash/getConsensusParamsFactory'); +const requestTenderRpcFactory = require('../../../externalApis/tenderdash/requestTenderRpc'); /** * @param {jaysonClient} rpcClient @@ -73,10 +74,13 @@ function platformHandlersFactory( ) { const wrapInErrorHandler = wrapInErrorHandlerFactory(logger, isProductionEnvironment); + const requestTenderRpc = requestTenderRpcFactory(rpcClient); + // broadcastStateTransition const broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory( rpcClient, createGrpcErrorFromDriveResponse, + requestTenderRpc, ); const wrappedBroadcastStateTransition = jsonToProtobufHandlerWrapper( From 4ca4cb31fafddedb085eb264f20b84023371c263 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Wed, 23 Oct 2024 21:17:19 +0700 Subject: [PATCH 3/6] test: update broadcastStateTransitionHandler tests --- ...dcastStateTransitionHandlerFactory.spec.js | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js b/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js index 3dc61b2eb40..de1152d015d 100644 --- a/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js +++ b/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js @@ -5,6 +5,7 @@ const { AlreadyExistsGrpcError, UnavailableGrpcError, ResourceExhaustedGrpcError, + InternalGrpcError, }, }, } = require('@dashevo/grpc-common'); @@ -36,6 +37,7 @@ describe('broadcastStateTransitionHandlerFactory', () => { let log; let code; let createGrpcErrorFromDriveResponseMock; + let requestTenderRpcMock; before(async () => { await loadWasmDpp(); @@ -82,11 +84,14 @@ describe('broadcastStateTransitionHandlerFactory', () => { request: this.sinon.stub().resolves(response), }; + requestTenderRpcMock = this.sinon.stub(); + createGrpcErrorFromDriveResponseMock = this.sinon.stub(); broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory( rpcClientMock, createGrpcErrorFromDriveResponseMock, + requestTenderRpcMock, ); }); @@ -182,13 +187,38 @@ describe('broadcastStateTransitionHandlerFactory', () => { } }); - it('should throw AlreadyExistsGrpcError if transaction was broadcasted twice', async () => { + it('should throw AlreadyExistsGrpcError if transaction in mempool', async () => { + response.error = { + code: -32603, + message: 'Internal error', + data: 'tx already exists in cache', + }; + + requestTenderRpcMock.withArgs('unconfirmed_txs').resolves({ + txs: [stateTransitionFixture.toBuffer().toString('base64')], + }); + + try { + await broadcastStateTransitionHandler(call); + + expect.fail('should throw AlreadyExistsGrpcError'); + } catch (e) { + expect(e).to.be.an.instanceOf(AlreadyExistsGrpcError); + expect(e.getMessage()).to.equal('state transition already in mempool'); + } + }); + + it('should throw AlreadyExistsGrpcError if transaction in chain', async () => { response.error = { code: -32603, message: 'Internal error', data: 'tx already exists in cache', }; + requestTenderRpcMock.withArgs('tx').resolves({ + tx_result: { }, + }); + try { await broadcastStateTransitionHandler(call); @@ -199,6 +229,52 @@ describe('broadcastStateTransitionHandlerFactory', () => { } }); + it('should throw consensus result for invalid transition in cache', async () => { + response.error = { + code: -32603, + message: 'Internal error', + data: 'tx already exists in cache', + }; + + requestTenderRpcMock.withArgs('check_tx').resolves({ + code: 1, + info: 'some info', + }); + + const error = new Error('some error'); + + createGrpcErrorFromDriveResponseMock.resolves(error); + + try { + await broadcastStateTransitionHandler(call); + + expect.fail('should throw consensus error'); + } catch (e) { + expect(e).to.equal(error); + } + }); + + it('should throw internal error for transition in cache that passing check tx', async () => { + response.error = { + code: -32603, + message: 'Internal error', + data: 'tx already exists in cache', + }; + + requestTenderRpcMock.withArgs('check_tx').resolves({ + code: 0, + }); + + try { + await broadcastStateTransitionHandler(call); + + expect.fail('should throw InternalError'); + } catch (e) { + expect(e).to.be.an.instanceOf(InternalGrpcError); + expect(e.getMessage()).to.equal('Internal error'); + } + }); + it('should throw a gRPC error based on drive\'s response', async () => { const message = 'not found'; const metadata = { From 10f88c1ec7e21c0d9453a01410383fab95a912ae Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Thu, 24 Oct 2024 11:55:10 +0700 Subject: [PATCH 4/6] chore: handle ECONNRESET error code as well --- packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js | 2 +- .../handlers/platform/broadcastStateTransitionHandlerFactory.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js index f30506822e1..9b6fd3edaf2 100644 --- a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js +++ b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js @@ -18,7 +18,7 @@ function requestTenderRpcFactory(rpcClient) { try { response = await rpcClient.request(uri, params); } catch (e) { - if (e.message === 'socket hang up') { + if (e.code === 'ECONNRESET' || e.message === 'socket hang up') { throw new UnavailableGrpcError('Tenderdash is not available'); } diff --git a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js index 6742512b726..a17d4969182 100644 --- a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js +++ b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js @@ -57,7 +57,7 @@ function broadcastStateTransitionHandlerFactory( try { response = await rpcClient.request('broadcast_tx', { tx }); } catch (e) { - if (e.message === 'socket hang up') { + if (e.code === 'ECONNRESET' || e.message === 'socket hang up') { throw new UnavailableGrpcError('Tenderdash is not available'); } From 6a5f37837a5e95da6ef8133f05c30e71b4b239bc Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Thu, 24 Oct 2024 11:55:28 +0700 Subject: [PATCH 5/6] docs: improve JSDocs --- .../lib/externalApis/tenderdash/requestTenderRpc.js | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js index 9b6fd3edaf2..15997ee95c5 100644 --- a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js +++ b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js @@ -4,16 +4,17 @@ const RPCError = require('../../rpcServer/RPCError'); /** * @param {jaysonClient} rpcClient - * @return {requestTenderRpc} + * @return {requestTenderRpc} A function to make RPC requests to Tenderdash. */ function requestTenderRpcFactory(rpcClient) { /** + * @function * @typedef requestTenderRpc * @param {string} uri - * @param {Object} [params] + * @param {Object} [params={}] * @return {Promise} */ - return async function requestTenderRpc(uri, params = {}) { + async function requestTenderRpc(uri, params = {}) { let response; try { response = await rpcClient.request(uri, params); @@ -44,7 +45,9 @@ function requestTenderRpcFactory(rpcClient) { } return result; - }; + } + + return requestTenderRpc; } module.exports = requestTenderRpcFactory; From 168032d442ec2e79a425077d0b97bac4e7e00b61 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Thu, 24 Oct 2024 11:55:52 +0700 Subject: [PATCH 6/6] refactor: throw RPCError with original error --- .../dapi/lib/externalApis/tenderdash/requestTenderRpc.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js index 15997ee95c5..efdec5962fc 100644 --- a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js +++ b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js @@ -23,9 +23,11 @@ function requestTenderRpcFactory(rpcClient) { throw new UnavailableGrpcError('Tenderdash is not available'); } - e.message = `Failed to request ${uri}: ${e.message}`; - - throw e; + throw new RPCError( + e.code || -32602, + `Failed to request ${uri}: ${e.message}`, + e, + ); } const { result, error: jsonRpcError } = response;