Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dapi): invalid state transition failed with already in chain error #2270

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
const UnavailableGrpcError = require('@dashevo/grpc-common/lib/server/error/UnavailableGrpcError');
const ResourceExhaustedGrpcError = require('@dashevo/grpc-common/lib/server/error/ResourceExhaustedGrpcError');
const RPCError = require('../../rpcServer/RPCError');

/**
* @param {jaysonClient} rpcClient
* @return {requestTenderRpc} A function to make RPC requests to Tenderdash.
*/
function requestTenderRpcFactory(rpcClient) {
/**
* @function
* @typedef requestTenderRpc
* @param {string} uri
* @param {Object} [params={}]
* @return {Promise<Object>}
*/
async function requestTenderRpc(uri, params = {}) {
let response;
try {
response = await rpcClient.request(uri, params);
} catch (e) {
if (e.code === 'ECONNRESET' || e.message === 'socket hang up') {
throw new UnavailableGrpcError('Tenderdash is not available');
}

throw new RPCError(
e.code || -32602,
`Failed to request ${uri}: ${e.message}`,
e,
);
}

const { result, error: jsonRpcError } = response;

if (jsonRpcError) {
if (typeof jsonRpcError.data === 'string') {
if (jsonRpcError.data.includes('too_many_resets')) {
throw new ResourceExhaustedGrpcError('tenderdash is not responding: too many requests');
}
}
shumkov marked this conversation as resolved.
Show resolved Hide resolved

throw new RPCError(
jsonRpcError.code || -32602,
jsonRpcError.message || 'Internal error',
jsonRpcError.data,
);
}

return result;
}

return requestTenderRpc;
}

module.exports = requestTenderRpcFactory;
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ const {
server: {
error: {
InvalidArgumentGrpcError,
AlreadyExistsGrpcError,
ResourceExhaustedGrpcError,
UnavailableGrpcError,
AlreadyExistsGrpcError,
InternalGrpcError,
},
},
} = require('@dashevo/grpc-common');
Expand All @@ -14,15 +15,23 @@ const {
BroadcastStateTransitionResponse,
},
} = require('@dashevo/dapi-grpc');

const crypto = require('crypto');
shumkov marked this conversation as resolved.
Show resolved Hide resolved

const logger = require('../../../logger');

/**
* @param {jaysonClient} rpcClient
* @param {createGrpcErrorFromDriveResponse} createGrpcErrorFromDriveResponse
* @param {requestTenderRpc} requestTenderRpc
*
* @returns {broadcastStateTransitionHandler}
*/
function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDriveResponse) {
function broadcastStateTransitionHandlerFactory(
rpcClient,
createGrpcErrorFromDriveResponse,
requestTenderRpc,
) {
shumkov marked this conversation as resolved.
Show resolved Hide resolved
/**
* @typedef broadcastStateTransitionHandler
*
Expand All @@ -38,15 +47,17 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr
throw new InvalidArgumentGrpcError('State Transition is not specified');
}

const tx = Buffer.from(stByteArray)
const stBytes = Buffer.from(stByteArray);

const tx = stBytes
.toString('base64');

let response;

try {
response = await rpcClient.request('broadcast_tx', { tx });
} catch (e) {
if (e.message === 'socket hang up') {
if (e.code === 'ECONNRESET' || e.message === 'socket hang up') {
throw new UnavailableGrpcError('Tenderdash is not available');
}

Expand All @@ -55,15 +66,65 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr
throw e;
}

const {
result,
error: jsonRpcError,
} = response;
const { result, error: jsonRpcError } = response;

if (jsonRpcError) {
if (typeof jsonRpcError.data === 'string') {
if (jsonRpcError.data === 'tx already exists in cache') {
throw new AlreadyExistsGrpcError('state transition already in chain');
// We need to figure out and report to user why the ST cached
const stHash = crypto.createHash('sha256')
.update(stBytes)
.digest();
shumkov marked this conversation as resolved.
Show resolved Hide resolved

// TODO: Apply search filter to fetch specific state transition
// Throw an already exist in mempool error if the ST in mempool
const unconfirmedTxsResponse = await requestTenderRpc('unconfirmed_txs', { limit: 100 });

if (unconfirmedTxsResponse?.txs?.includes(stBytes.toString('base64'))) {
throw new AlreadyExistsGrpcError('state transition already in mempool');
}
shumkov marked this conversation as resolved.
Show resolved Hide resolved
shumkov marked this conversation as resolved.
Show resolved Hide resolved

// Throw an already exist in chain error if the ST is committed
let txResponse;
try {
txResponse = await requestTenderRpc('tx', { hash: stHash.toString('base64') });
} catch (e) {
if (typeof e.data !== 'string' || !e.data.includes('not found')) {
throw e;
}
}

shumkov marked this conversation as resolved.
Show resolved Hide resolved
if (txResponse?.tx_result) {
throw new AlreadyExistsGrpcError('state transition already in chain');
}

// If the ST not in mempool and not in the state but still in the cache
// it means it was invalidated by CheckTx so we run CheckTx again to provide
// the validation error
const checkTxResponse = await requestTenderRpc('check_tx', { tx });

if (checkTxResponse?.code !== 0) {
// Return validation error
throw await createGrpcErrorFromDriveResponse(
checkTxResponse.code,
checkTxResponse.info,
);
} else {
// CheckTx passes for the ST, it means we have a bug in Drive so ST is passing check
// tx and then removed from the block. The removal from the block doesn't remove ST
// from the cache because it's happening only one proposer and other nodes do not know
// that this ST was processed and keep it in the cache
// The best what we can do is to return an internal error and and log the transaction
logger.warn({
tx,
}, `State transition ${stHash.toString('hex')} is passing CheckTx but removed from the block by proposal`);

const error = new Error('State Transition processing error. Please report'
+ ' faulty state transition and try to create a new state transition with different'
+ ' hash as a workaround.');

shumkov marked this conversation as resolved.
Show resolved Hide resolved
throw new InternalGrpcError(error);
}
}

if (jsonRpcError.data.startsWith('Tx too large.')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const waitForTransactionToBeProvableFactory = require('../../../externalApis/ten
const waitForTransactionResult = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/waitForTransactionResult');
const getExistingTransactionResultFactory = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/getExistingTransactionResult');
const getConsensusParamsFactory = require('../../../externalApis/tenderdash/getConsensusParamsFactory');
const requestTenderRpcFactory = require('../../../externalApis/tenderdash/requestTenderRpc');

/**
* @param {jaysonClient} rpcClient
Expand All @@ -73,10 +74,13 @@ function platformHandlersFactory(
) {
const wrapInErrorHandler = wrapInErrorHandlerFactory(logger, isProductionEnvironment);

const requestTenderRpc = requestTenderRpcFactory(rpcClient);

// broadcastStateTransition
const broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory(
rpcClient,
createGrpcErrorFromDriveResponse,
requestTenderRpc,
);

const wrappedBroadcastStateTransition = jsonToProtobufHandlerWrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
AlreadyExistsGrpcError,
UnavailableGrpcError,
ResourceExhaustedGrpcError,
InternalGrpcError,
},
},
} = require('@dashevo/grpc-common');
Expand Down Expand Up @@ -36,6 +37,7 @@ describe('broadcastStateTransitionHandlerFactory', () => {
let log;
let code;
let createGrpcErrorFromDriveResponseMock;
let requestTenderRpcMock;

before(async () => {
await loadWasmDpp();
Expand Down Expand Up @@ -82,11 +84,14 @@ describe('broadcastStateTransitionHandlerFactory', () => {
request: this.sinon.stub().resolves(response),
};

requestTenderRpcMock = this.sinon.stub();

createGrpcErrorFromDriveResponseMock = this.sinon.stub();

broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory(
rpcClientMock,
createGrpcErrorFromDriveResponseMock,
requestTenderRpcMock,
);
});

Expand Down Expand Up @@ -182,13 +187,38 @@ describe('broadcastStateTransitionHandlerFactory', () => {
}
});

it('should throw AlreadyExistsGrpcError if transaction was broadcasted twice', async () => {
it('should throw AlreadyExistsGrpcError if transaction in mempool', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('unconfirmed_txs').resolves({
txs: [stateTransitionFixture.toBuffer().toString('base64')],
});

try {
await broadcastStateTransitionHandler(call);

expect.fail('should throw AlreadyExistsGrpcError');
} catch (e) {
expect(e).to.be.an.instanceOf(AlreadyExistsGrpcError);
expect(e.getMessage()).to.equal('state transition already in mempool');
}
});

it('should throw AlreadyExistsGrpcError if transaction in chain', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('tx').resolves({
tx_result: { },
});

try {
await broadcastStateTransitionHandler(call);

Expand All @@ -199,6 +229,52 @@ describe('broadcastStateTransitionHandlerFactory', () => {
}
});

it('should throw consensus result for invalid transition in cache', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('check_tx').resolves({
code: 1,
info: 'some info',
});

const error = new Error('some error');

createGrpcErrorFromDriveResponseMock.resolves(error);

try {
await broadcastStateTransitionHandler(call);

expect.fail('should throw consensus error');
} catch (e) {
expect(e).to.equal(error);
}
});

it('should throw internal error for transition in cache that passing check tx', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('check_tx').resolves({
code: 0,
});

try {
await broadcastStateTransitionHandler(call);

expect.fail('should throw InternalError');
} catch (e) {
expect(e).to.be.an.instanceOf(InternalGrpcError);
expect(e.getMessage()).to.equal('Internal error');
}
});

it('should throw a gRPC error based on drive\'s response', async () => {
const message = 'not found';
const metadata = {
Expand Down
Loading