diff --git a/README.md b/README.md
index 1e4ef3047..a343f72e5 100644
--- a/README.md
+++ b/README.md
@@ -56,7 +56,7 @@ Or via docker build directly:
```bash
docker build \
- --build-arg NODE_VERSION="$(cat .nvmrc)-alpine" \
+ --build-arg NODE_VERSION="$(cat .nvmrc)-alpine3.19" \
-t mojaloop/ml-api-adapter:local \
.
```
@@ -403,4 +403,4 @@ push a release triggering another subsequent build that also publishes a docker
is a boon.
- It is unknown if a race condition might occur with multiple merges with main in
- quick succession, but this is a suspected edge case.
+ quick succession, but this is a suspected edge case.
diff --git a/documentation/state-diagrams/transfer-internal-states-diagram.png b/documentation/state-diagrams/transfer-internal-states-diagram.png
index f960c9bf3..d5a334788 100644
Binary files a/documentation/state-diagrams/transfer-internal-states-diagram.png and b/documentation/state-diagrams/transfer-internal-states-diagram.png differ
diff --git a/documentation/state-diagrams/transfer-internal-states.plantuml b/documentation/state-diagrams/transfer-internal-states.plantuml
index 7441e983b..8d8902e45 100644
--- a/documentation/state-diagrams/transfer-internal-states.plantuml
+++ b/documentation/state-diagrams/transfer-internal-states.plantuml
@@ -8,6 +8,8 @@ state RECEIVED {
state RESERVED_ {
state RESERVED {
}
+ state RESERVED_FORWARDED {
+ }
state RECEIVED_FULFIL {
}
state RECEIVED_FULFIL_DEPENDENT {
@@ -48,20 +50,25 @@ RECEIVED_PREPARE --> RECEIVED_REJECT : Reject callback from Payee with status "A
RECEIVED_PREPARE --> RECEIVED_ERROR : Transfer Error callback from Payee
RECEIVED_FULFIL --> COMMITTED : Transfer committed [Position handler] \n (commit funds, assign T. to settlement window)
-
RECEIVED_REJECT --> ABORTED_REJECTED : Transfer Aborted by Payee
RECEIVED_ERROR --> ABORTED_ERROR : Hub aborts T.
RECEIVED_PREPARE --> EXPIRED_PREPARED : Timeout handler \n detects T. being EXPIRED
RESERVED --> RECEIVED_FULFIL : Fulfil callback from Payee \n with status "COMMITTED" \n [Fulfil handler]: \n fulfilment check passed
RESERVED --> RECEIVED_ERROR : Fulfil callback from Payee fails validation\n [Fulfil handler]
+RESERVED --> RECEIVED_FULFIL_DEPENDENT : Recieved FX transfer fulfilment
+RESERVED --> RESERVED_FORWARDED : A Proxy participant has acknowledged the transfer to be forwarded
RESERVED --> RESERVED_TIMEOUT : Timeout handler
-RESERVED_TIMEOUT --> EXPIRED_RESERVED : Hub aborts T. due to being EXPIRED
-RESERVED --> RECEIVED_FULFIL_DEPENDENT : Recieved FX transfer fulfilment
+RESERVED_FORWARDED --> RECEIVED_FULFIL : Fulfil callback from Payee \n with status "COMMITTED" \n [Fulfil handler]: \n fulfilment check passed
+RESERVED_FORWARDED --> RECEIVED_ERROR : Fulfil callback from Payee fails validation\n [Fulfil handler]
+RESERVED_FORWARDED --> RECEIVED_FULFIL_DEPENDENT : Recieved FX transfer fulfilment
+
RECEIVED_FULFIL_DEPENDENT --> COMMITTED : Dependant transfer committed [Position handler] \n (commit funds, assign T. to settlement window)
RECEIVED_FULFIL_DEPENDENT --> RESERVED_TIMEOUT : Dependant transfer is timed out
+RESERVED_TIMEOUT --> EXPIRED_RESERVED : Hub aborts T. due to being EXPIRED
+
COMMITTED --> [*]
ABORTED --> [*]
diff --git a/package-lock.json b/package-lock.json
index 00dc20e1d..3a4e65dd8 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,12 +1,12 @@
{
"name": "@mojaloop/central-ledger",
- "version": "17.7.8",
+ "version": "17.8.0-snapshot.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@mojaloop/central-ledger",
- "version": "17.7.8",
+ "version": "17.8.0-snapshot.0",
"license": "Apache-2.0",
"dependencies": {
"@hapi/basic": "7.0.2",
@@ -20,7 +20,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.3.1",
"@mojaloop/central-services-metrics": "12.0.8",
- "@mojaloop/central-services-shared": "18.5.2",
+ "@mojaloop/central-services-shared": "18.6.3",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
@@ -37,7 +37,7 @@
"docdash": "2.0.2",
"event-stream": "4.0.1",
"five-bells-condition": "5.0.1",
- "glob": "10.4.2",
+ "glob": "10.4.3",
"hapi-auth-basic": "5.0.0",
"hapi-auth-bearer-token": "8.0.0",
"hapi-swagger": "17.2.1",
@@ -1296,6 +1296,11 @@
"node": ">=6.9.0"
}
},
+ "node_modules/@ioredis/commands": {
+ "version": "1.2.0",
+ "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz",
+ "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg=="
+ },
"node_modules/@isaacs/cliui": {
"version": "8.0.2",
"resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz",
@@ -1592,12 +1597,13 @@
}
},
"node_modules/@mojaloop/central-services-shared": {
- "version": "18.5.2",
- "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.5.2.tgz",
- "integrity": "sha512-qHCmmOMwjcNq6OkNqFznNCyX1lwgJfgu+tULbjqGxMtVMANf+LU01gFtJnD//M9wHcXDgP0VRu1waC+WqmAmOg==",
+ "version": "18.6.3",
+ "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.6.3.tgz",
+ "integrity": "sha512-GTMNxBB4lhjrW7V52OmZvuWKKx7IywmyihAfmcmSJ1zCtb+yL1CzF/pM4slOx2d6taE9Pn+q3S2Ucf/ZV2QzuA==",
"dependencies": {
"@hapi/catbox": "12.1.1",
"@hapi/catbox-memory": "5.0.1",
+ "@mojaloop/inter-scheme-proxy-cache-lib": "1.4.0",
"axios": "1.7.2",
"clone": "2.1.2",
"dotenv": "16.4.5",
@@ -1743,6 +1749,21 @@
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.3.tgz",
"integrity": "sha512-xNvxJEOUiWPGhUuUdQgAJPKOOJfGnIyKySOc09XkKsgdUV/3E2zvwZYdejjmRgPCgcym1juLH3226yA7sEFJKQ=="
},
+ "node_modules/@mojaloop/inter-scheme-proxy-cache-lib": {
+ "version": "1.4.0",
+ "resolved": "https://registry.npmjs.org/@mojaloop/inter-scheme-proxy-cache-lib/-/inter-scheme-proxy-cache-lib-1.4.0.tgz",
+ "integrity": "sha512-jmAWWdjZxjxlSQ+wt8aUcMYOneVo1GNbIIs7yK/R2K9DBtKb0aYle2mWwdjm9ovk6zSWL2a9lH+n3hq7kb08Wg==",
+ "dependencies": {
+ "@mojaloop/central-services-logger": "^11.3.1",
+ "ajv": "^8.16.0",
+ "convict": "^6.2.4",
+ "fast-safe-stringify": "^2.1.1",
+ "ioredis": "^5.4.1"
+ },
+ "engines": {
+ "node": ">=18.x"
+ }
+ },
"node_modules/@mojaloop/ml-number": {
"version": "11.2.4",
"resolved": "https://registry.npmjs.org/@mojaloop/ml-number/-/ml-number-11.2.4.tgz",
@@ -3345,6 +3366,26 @@
"node": "^14.17.0 || ^16.13.0 || >=18.0.0"
}
},
+ "node_modules/cacache/node_modules/glob": {
+ "version": "10.4.5",
+ "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.5.tgz",
+ "integrity": "sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==",
+ "dev": true,
+ "dependencies": {
+ "foreground-child": "^3.1.0",
+ "jackspeak": "^3.1.2",
+ "minimatch": "^9.0.4",
+ "minipass": "^7.1.2",
+ "package-json-from-dist": "^1.0.0",
+ "path-scurry": "^1.11.1"
+ },
+ "bin": {
+ "glob": "dist/esm/bin.mjs"
+ },
+ "funding": {
+ "url": "https://github.com/sponsors/isaacs"
+ }
+ },
"node_modules/cacheable-lookup": {
"version": "7.0.0",
"resolved": "https://registry.npmjs.org/cacheable-lookup/-/cacheable-lookup-7.0.0.tgz",
@@ -3722,6 +3763,14 @@
"node": ">=0.8"
}
},
+ "node_modules/cluster-key-slot": {
+ "version": "1.1.2",
+ "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz",
+ "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==",
+ "engines": {
+ "node": ">=0.10.0"
+ }
+ },
"node_modules/code-point-at": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/code-point-at/-/code-point-at-1.1.0.tgz",
@@ -4331,6 +4380,18 @@
"integrity": "sha512-ASFBup0Mz1uyiIjANan1jzLQami9z1PoYSZCiiYW2FczPbenXc45FZdBZLzOT+r6+iciuEModtmCti+hjaAk0A==",
"dev": true
},
+ "node_modules/convict": {
+ "version": "6.2.4",
+ "resolved": "https://registry.npmjs.org/convict/-/convict-6.2.4.tgz",
+ "integrity": "sha512-qN60BAwdMVdofckX7AlohVJ2x9UvjTNoKVXCL2LxFk1l7757EJqf1nySdMkPQer0bt8kQ5lQiyZ9/2NvrFBuwQ==",
+ "dependencies": {
+ "lodash.clonedeep": "^4.5.0",
+ "yargs-parser": "^20.2.7"
+ },
+ "engines": {
+ "node": ">=6"
+ }
+ },
"node_modules/cookie": {
"version": "0.5.0",
"resolved": "https://registry.npmjs.org/cookie/-/cookie-0.5.0.tgz",
@@ -4671,6 +4732,14 @@
"integrity": "sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==",
"dev": true
},
+ "node_modules/denque": {
+ "version": "2.1.0",
+ "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz",
+ "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==",
+ "engines": {
+ "node": ">=0.10"
+ }
+ },
"node_modules/depd": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz",
@@ -7198,9 +7267,9 @@
"dev": true
},
"node_modules/glob": {
- "version": "10.4.2",
- "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.2.tgz",
- "integrity": "sha512-GwMlUF6PkPo3Gk21UxkCohOv0PLcIXVtKyLlpEI28R/cO/4eNOdmLk3CMW1wROV/WR/EsZOWAfBbBOqYvs88/w==",
+ "version": "10.4.3",
+ "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.3.tgz",
+ "integrity": "sha512-Q38SGlYRpVtDBPSWEylRyctn7uDeTp4NQERTLiCT1FqA9JXPYWqAVmQU6qh4r/zMM5ehxTcbaO8EjhWnvEhmyg==",
"dependencies": {
"foreground-child": "^3.1.0",
"jackspeak": "^3.1.2",
@@ -7213,7 +7282,7 @@
"glob": "dist/esm/bin.mjs"
},
"engines": {
- "node": ">=16 || 14 >=14.18"
+ "node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/isaacs"
@@ -8316,6 +8385,29 @@
"node": ">=4"
}
},
+ "node_modules/ioredis": {
+ "version": "5.4.1",
+ "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.4.1.tgz",
+ "integrity": "sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==",
+ "dependencies": {
+ "@ioredis/commands": "^1.1.1",
+ "cluster-key-slot": "^1.1.0",
+ "debug": "^4.3.4",
+ "denque": "^2.1.0",
+ "lodash.defaults": "^4.2.0",
+ "lodash.isarguments": "^3.1.0",
+ "redis-errors": "^1.2.0",
+ "redis-parser": "^3.0.0",
+ "standard-as-callback": "^2.1.0"
+ },
+ "engines": {
+ "node": ">=12.22.0"
+ },
+ "funding": {
+ "type": "opencollective",
+ "url": "https://opencollective.com/ioredis"
+ }
+ },
"node_modules/ip-address": {
"version": "9.0.5",
"resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz",
@@ -9056,14 +9148,14 @@
}
},
"node_modules/jackspeak": {
- "version": "3.1.2",
- "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-3.1.2.tgz",
- "integrity": "sha512-kWmLKn2tRtfYMF/BakihVVRzBKOxz4gJMiL2Rj91WnAB5TPZumSH99R/Yf1qE1u4uRimvCSJfm6hnxohXeEXjQ==",
+ "version": "3.4.2",
+ "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-3.4.2.tgz",
+ "integrity": "sha512-qH3nOSj8q/8+Eg8LUPOq3C+6HWkpUioIjDsq1+D4zY91oZvpPttw8GwtF1nReRYKXl+1AORyFqtm2f5Q1SB6/Q==",
"dependencies": {
"@isaacs/cliui": "^8.0.2"
},
"engines": {
- "node": ">=14"
+ "node": "14 >=14.21 || 16 >=16.20 || >=18"
},
"funding": {
"url": "https://github.com/sponsors/isaacs"
@@ -9642,6 +9734,16 @@
"resolved": "https://registry.npmjs.org/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz",
"integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA=="
},
+ "node_modules/lodash.clonedeep": {
+ "version": "4.5.0",
+ "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz",
+ "integrity": "sha512-H5ZhCF25riFd9uB5UCkVKo61m3S/xZk1x4wA6yp/L3RFP6Z/eHH1ymQcGLo7J3GMPfm0V/7m1tryHuGVxpqEBQ=="
+ },
+ "node_modules/lodash.defaults": {
+ "version": "4.2.0",
+ "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz",
+ "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ=="
+ },
"node_modules/lodash.flattendeep": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz",
@@ -9653,6 +9755,11 @@
"resolved": "https://registry.npmjs.org/lodash.get/-/lodash.get-4.4.2.tgz",
"integrity": "sha512-z+Uw/vLuy6gQe8cfaFWD7p0wVv8fJl3mbzXh33RS+0oW2wvUqiRXiQ69gLWSLpgB5/6sU+r6BlQR0MBILadqTQ=="
},
+ "node_modules/lodash.isarguments": {
+ "version": "3.1.0",
+ "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz",
+ "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg=="
+ },
"node_modules/lodash.isequal": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/lodash.isequal/-/lodash.isequal-4.5.0.tgz",
@@ -12452,12 +12559,9 @@
}
},
"node_modules/path-scurry/node_modules/lru-cache": {
- "version": "10.2.2",
- "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.2.2.tgz",
- "integrity": "sha512-9hp3Vp2/hFQUiIwKo8XCeFVnrg8Pk3TYNPIR7tJADKi5YfcF7vEaK7avFHTlSy3kOKYaJQaalfEo6YuXdceBOQ==",
- "engines": {
- "node": "14 || >=16.14"
- }
+ "version": "10.4.3",
+ "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz",
+ "integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ=="
},
"node_modules/path-to-regexp": {
"version": "0.1.7",
@@ -13179,6 +13283,26 @@
"node": "^14.17.0 || ^16.13.0 || >=18.0.0"
}
},
+ "node_modules/read-package-json/node_modules/glob": {
+ "version": "10.4.5",
+ "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.5.tgz",
+ "integrity": "sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==",
+ "dev": true,
+ "dependencies": {
+ "foreground-child": "^3.1.0",
+ "jackspeak": "^3.1.2",
+ "minimatch": "^9.0.4",
+ "minipass": "^7.1.2",
+ "package-json-from-dist": "^1.0.0",
+ "path-scurry": "^1.11.1"
+ },
+ "bin": {
+ "glob": "dist/esm/bin.mjs"
+ },
+ "funding": {
+ "url": "https://github.com/sponsors/isaacs"
+ }
+ },
"node_modules/read-pkg": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/read-pkg/-/read-pkg-3.0.0.tgz",
@@ -13408,6 +13532,25 @@
"node": ">=8"
}
},
+ "node_modules/redis-errors": {
+ "version": "1.2.0",
+ "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz",
+ "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==",
+ "engines": {
+ "node": ">=4"
+ }
+ },
+ "node_modules/redis-parser": {
+ "version": "3.0.0",
+ "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz",
+ "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==",
+ "dependencies": {
+ "redis-errors": "^1.0.0"
+ },
+ "engines": {
+ "node": ">=4"
+ }
+ },
"node_modules/reflect.getprototypeof": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.4.tgz",
@@ -13971,6 +14114,26 @@
"url": "https://github.com/sponsors/isaacs"
}
},
+ "node_modules/rimraf/node_modules/glob": {
+ "version": "10.4.5",
+ "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.5.tgz",
+ "integrity": "sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==",
+ "dev": true,
+ "dependencies": {
+ "foreground-child": "^3.1.0",
+ "jackspeak": "^3.1.2",
+ "minimatch": "^9.0.4",
+ "minipass": "^7.1.2",
+ "package-json-from-dist": "^1.0.0",
+ "path-scurry": "^1.11.1"
+ },
+ "bin": {
+ "glob": "dist/esm/bin.mjs"
+ },
+ "funding": {
+ "url": "https://github.com/sponsors/isaacs"
+ }
+ },
"node_modules/run-parallel": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz",
@@ -14857,6 +15020,11 @@
"node": "^12.22.0 || ^14.17.0 || >=16.0.0"
}
},
+ "node_modules/standard-as-callback": {
+ "version": "2.1.0",
+ "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz",
+ "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A=="
+ },
"node_modules/standard-engine": {
"version": "15.1.0",
"resolved": "https://registry.npmjs.org/standard-engine/-/standard-engine-15.1.0.tgz",
@@ -17270,7 +17438,6 @@
"version": "20.2.9",
"resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-20.2.9.tgz",
"integrity": "sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w==",
- "dev": true,
"engines": {
"node": ">=10"
}
diff --git a/package.json b/package.json
index 21585c8ab..a7d9315e8 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
- "version": "17.7.8",
+ "version": "17.8.0-snapshot.0",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
@@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.3.1",
"@mojaloop/central-services-metrics": "12.0.8",
- "@mojaloop/central-services-shared": "18.5.2",
+ "@mojaloop/central-services-shared": "18.6.3",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
@@ -109,7 +109,7 @@
"docdash": "2.0.2",
"event-stream": "4.0.1",
"five-bells-condition": "5.0.1",
- "glob": "10.4.2",
+ "glob": "10.4.3",
"hapi-auth-basic": "5.0.0",
"hapi-auth-bearer-token": "8.0.0",
"hapi-swagger": "17.2.1",
diff --git a/seeds/transferState.js b/seeds/transferState.js
index 9fd134628..4135ae33b 100644
--- a/seeds/transferState.js
+++ b/seeds/transferState.js
@@ -100,6 +100,11 @@ const transferStates = [
transferStateId: 'SETTLED',
enumeration: 'SETTLED',
description: 'The switch has settled the transfer.'
+ },
+ {
+ transferStateId: 'RESERVED_FORWARDED',
+ enumeration: 'RESERVED',
+ description: 'The switch has forwarded the transfer to a proxy participant'
}
]
diff --git a/src/domain/transfer/index.js b/src/domain/transfer/index.js
index fb5ae70d9..5de1f17c8 100644
--- a/src/domain/transfer/index.js
+++ b/src/domain/transfer/index.js
@@ -57,6 +57,22 @@ const prepare = async (payload, stateReason = null, hasPassedValidation = true,
}
}
+const forwardedPrepare = async (transferId) => {
+ const histTimerTransferServicePrepareEnd = Metrics.getHistogram(
+ 'domain_transfer',
+ 'prepare - Metrics for transfer domain',
+ ['success', 'funcName']
+ ).startTimer()
+ try {
+ const result = await TransferFacade.updatePrepareReservedForwarded(transferId)
+ histTimerTransferServicePrepareEnd({ success: true, funcName: 'forwardedPrepare' })
+ return result
+ } catch (err) {
+ histTimerTransferServicePrepareEnd({ success: false, funcName: 'forwardedPrepare' })
+ throw ErrorHandler.Factory.reformatFSPIOPError(err)
+ }
+}
+
const handlePayeeResponse = async (transferId, payload, action, fspiopError) => {
const histTimerTransferServiceHandlePayeeResponseEnd = Metrics.getHistogram(
'domain_transfer',
@@ -104,6 +120,7 @@ const TransferService = {
prepare,
handlePayeeResponse,
logTransferError,
+ forwardedPrepare,
getTransferErrorByTransferId: TransferErrorModel.getByTransferId,
getTransferById: TransferModel.getById,
getById: TransferFacade.getById,
diff --git a/src/handlers/transfers/dto.js b/src/handlers/transfers/dto.js
index 8a4a6aaae..2ee5433bf 100644
--- a/src/handlers/transfers/dto.js
+++ b/src/handlers/transfers/dto.js
@@ -16,10 +16,11 @@ const prepareInputDto = (error, messages) => {
if (!message) throw new Error('No input kafka message')
const payload = decodePayload(message.value.content.payload)
- const isFx = !payload.transferId
+ const isForwarded = message.value.metadata.event.action === Action.FORWARDED
+ const isFx = !payload.transferId && !isForwarded
const { action } = message.value.metadata.event
- const isPrepare = [Action.PREPARE, Action.FX_PREPARE].includes(action)
+ const isPrepare = [Action.PREPARE, Action.FX_PREPARE, Action.FORWARDED].includes(action)
const actionLetter = isPrepare
? Enum.Events.ActionLetter.prepare
@@ -39,9 +40,10 @@ const prepareInputDto = (error, messages) => {
action,
functionality,
isFx,
- ID: payload.transferId || payload.commitRequestId,
+ isForwarded,
+ ID: payload.transferId || payload.commitRequestId || message.value.id,
headers: message.value.content.headers,
- metric: PROM_METRICS.transferPrepare(isFx),
+ metric: PROM_METRICS.transferPrepare(isFx, isForwarded),
actionLetter // just for logging
}
}
diff --git a/src/handlers/transfers/handler.js b/src/handlers/transfers/handler.js
index a31440e48..4d2fde6ed 100644
--- a/src/handlers/transfers/handler.js
+++ b/src/handlers/transfers/handler.js
@@ -446,7 +446,9 @@ const processFulfilMessage = async (message, functionality, span) => {
throw fspiopError
}
- if (transfer.transferState !== TransferState.RESERVED) {
+ if (transfer.transferState !== Enum.Transfers.TransferInternalState.RESERVED &&
+ transfer.transferState !== Enum.Transfers.TransferInternalState.RESERVED_FORWARDED
+ ) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorNonReservedState--${actionLetter}10`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'non-RESERVED transfer state')
const eventDetail = { functionality, action: TransferEventAction.COMMIT }
diff --git a/src/handlers/transfers/prepare.js b/src/handlers/transfers/prepare.js
index 1436af280..1ea3b80f2 100644
--- a/src/handlers/transfers/prepare.js
+++ b/src/handlers/transfers/prepare.js
@@ -36,6 +36,7 @@ const Participant = require('../../domain/participant')
const createRemittanceEntity = require('./createRemittanceEntity')
const Validator = require('./validator')
const dto = require('./dto')
+const TransferService = require('#src/domain/transfer/index')
const { Kafka, Comparators } = Util
const { TransferState } = Enum.Transfers
@@ -99,7 +100,7 @@ const processDuplication = async ({
.getByIdLight(ID)
const isFinalized = [TransferState.COMMITTED, TransferState.ABORTED].includes(transfer?.transferStateEnumeration)
- const isPrepare = [Action.PREPARE, Action.FX_PREPARE].includes(action)
+ const isPrepare = [Action.PREPARE, Action.FX_PREPARE, Action.FORWARDED].includes(action)
if (isFinalized && isPrepare) {
logger.info(Util.breadcrumb(location, `finalized callback--${actionLetter}1`))
@@ -219,7 +220,7 @@ const prepare = async (error, messages) => {
}
const {
- message, payload, isFx, ID, headers, action, actionLetter, functionality
+ message, payload, isFx, ID, headers, action, actionLetter, functionality, isForwarded
} = input
const contextFromMessage = EventSdk.Tracer.extractContextFromMessage(message.value)
@@ -239,6 +240,60 @@ const prepare = async (error, messages) => {
producer: Producer
}
+ if (isForwarded) {
+ const transfer = await TransferService.getById(ID)
+ if (!transfer) {
+ const eventDetail = {
+ functionality: Enum.Events.Event.Type.NOTIFICATION,
+ action: Enum.Events.Event.Action.FORWARDED
+ }
+ const fspiopError = ErrorHandler.Factory.createFSPIOPError(
+ ErrorHandler.Enums.FSPIOPErrorCodes.ID_NOT_FOUND,
+ 'Forwarded transfer could not be found.'
+ ).toApiErrorObject(Config.ERROR_HANDLING)
+ // IMPORTANT: This singular message is taken by the ml-api-adapter and used to
+ // notify the payerFsp and proxy of the error.
+ // As long as the `to` and `from` message values are the payer and payee,
+ // and the action is `forwarded`, the ml-api-adapter will notify both.
+ await Kafka.proceed(
+ Config.KAFKA_CONFIG,
+ params,
+ {
+ consumerCommit,
+ fspiopError,
+ eventDetail
+ }
+ )
+ return true
+ }
+
+ if (transfer.transferState === Enum.Transfers.TransferInternalState.RESERVED) {
+ await TransferService.forwardedPrepare(ID)
+ } else {
+ const eventDetail = {
+ functionality: Enum.Events.Event.Type.NOTIFICATION,
+ action: Enum.Events.Event.Action.FORWARDED
+ }
+ const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(
+ `Invalid State: ${transfer.transferState} - expected: ${Enum.Transfers.TransferInternalState.RESERVED}`
+ ).toApiErrorObject(Config.ERROR_HANDLING)
+ // IMPORTANT: This singular message is taken by the ml-api-adapter and used to
+ // notify the payerFsp and proxy of the error.
+ // As long as the `to` and `from` message values are the payer and payee,
+ // and the action is `forwarded`, the ml-api-adapter will notify both.
+ await Kafka.proceed(
+ Config.KAFKA_CONFIG,
+ params,
+ {
+ consumerCommit,
+ fspiopError,
+ eventDetail
+ }
+ )
+ }
+ return true
+ }
+
const duplication = await checkDuplication({ payload, isFx, ID, location })
if (duplication.hasDuplicateId) {
const success = await processDuplication({
diff --git a/src/models/transfer/facade.js b/src/models/transfer/facade.js
index 0ae904ad0..3d7be944e 100644
--- a/src/models/transfer/facade.js
+++ b/src/models/transfer/facade.js
@@ -988,7 +988,9 @@ const transferStateAndPositionUpdate = async function (param1, enums, trx = null
.first()
.transacting(trx)
- if (param1.transferStateId === enums.transferState.COMMITTED) {
+ if (param1.transferStateId === enums.transferState.COMMITTED ||
+ param1.transferStateId === TransferInternalState.RESERVED_FORWARDED
+ ) {
await knex('transferStateChange')
.insert({
transferId: param1.transferId,
@@ -1088,6 +1090,21 @@ const transferStateAndPositionUpdate = async function (param1, enums, trx = null
}
}
+const updatePrepareReservedForwarded = async function (transferId) {
+ try {
+ const knex = await Db.getKnex()
+ return await knex('transferStateChange')
+ .insert({
+ transferId,
+ transferStateId: TransferInternalState.RESERVED_FORWARDED,
+ reason: null,
+ createdDate: Time.getUTCString(new Date())
+ })
+ } catch (err) {
+ throw ErrorHandler.Factory.reformatFSPIOPError(err)
+ }
+}
+
const reconciliationTransferPrepare = async function (payload, transactionTimestamp, enums, trx = null) {
try {
const knex = await Db.getKnex()
@@ -1436,7 +1453,8 @@ const TransferFacade = {
reconciliationTransferCommit,
reconciliationTransferAbort,
getTransferParticipant,
- recordFundsIn
+ recordFundsIn,
+ updatePrepareReservedForwarded
}
module.exports = TransferFacade
diff --git a/src/shared/constants.js b/src/shared/constants.js
index 79967880e..ac1f6c7cd 100644
--- a/src/shared/constants.js
+++ b/src/shared/constants.js
@@ -12,10 +12,11 @@ const TABLE_NAMES = Object.freeze({
})
const FX_METRIC_PREFIX = 'fx_'
+const FORWARDED_METRIC_PREFIX = 'fwd_'
const PROM_METRICS = Object.freeze({
transferGet: (isFx) => `${isFx ? FX_METRIC_PREFIX : ''}transfer_get`,
- transferPrepare: (isFx) => `${isFx ? FX_METRIC_PREFIX : ''}transfer_prepare`,
+ transferPrepare: (isFx, isForwarded) => `${isFx ? FX_METRIC_PREFIX : ''}${isForwarded ? FORWARDED_METRIC_PREFIX : ''}transfer_prepare`,
transferFulfil: (isFx) => `${isFx ? FX_METRIC_PREFIX : ''}transfer_fulfil`,
transferFulfilError: (isFx) => `${isFx ? FX_METRIC_PREFIX : ''}transfer_fulfil_error`
})
diff --git a/test/integration-override/handlers/transfers/handlers.test.js b/test/integration-override/handlers/transfers/handlers.test.js
index 4869c82b0..303968ee6 100644
--- a/test/integration-override/handlers/transfers/handlers.test.js
+++ b/test/integration-override/handlers/transfers/handlers.test.js
@@ -50,6 +50,7 @@ const ParticipantCached = require('#src/models/participant/participantCached')
const ParticipantCurrencyCached = require('#src/models/participant/participantCurrencyCached')
const ParticipantLimitCached = require('#src/models/participant/participantLimitCached')
const SettlementModelCached = require('#src/models/settlement/settlementModelCached')
+const TransferService = require('#src/domain/transfer/index')
const Handlers = {
index: require('#src/handlers/register'),
@@ -239,6 +240,30 @@ const prepareTestData = async (dataObj) => {
}
}
+ const messageProtocolPrepareForwarded = {
+ id: transferPayload.transferId,
+ from: 'payerFsp',
+ to: 'proxyFsp',
+ type: 'application/json',
+ content: {
+ payload: {
+ proxyId: 'test'
+ }
+ },
+ metadata: {
+ event: {
+ id: transferPayload.transferId,
+ type: TransferEventType.PREPARE,
+ action: TransferEventAction.FORWARDED,
+ createdAt: dataObj.now,
+ state: {
+ status: 'success',
+ code: 0
+ }
+ }
+ }
+ }
+
const messageProtocolFulfil = Util.clone(messageProtocolPrepare)
messageProtocolFulfil.id = randomUUID()
messageProtocolFulfil.from = transferPayload.payeeFsp
@@ -271,6 +296,7 @@ const prepareTestData = async (dataObj) => {
rejectPayload,
errorPayload,
messageProtocolPrepare,
+ messageProtocolPrepareForwarded,
messageProtocolFulfil,
messageProtocolReject,
messageProtocolError,
@@ -312,6 +338,19 @@ Test('Handlers test', async handlersTest => {
Enum.Events.Event.Type.TRANSFER.toUpperCase(),
Enum.Events.Event.Action.POSITION.toUpperCase()
)
+ },
+ {
+ topicName: Utility.transformGeneralTopicName(
+ Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE,
+ Enum.Events.Event.Type.NOTIFICATION,
+ Enum.Events.Event.Action.EVENT
+ ),
+ config: Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.CONSUMER,
+ Enum.Events.Event.Type.NOTIFICATION.toUpperCase(),
+ Enum.Events.Event.Action.EVENT.toUpperCase()
+ )
}
])
@@ -366,6 +405,295 @@ Test('Handlers test', async handlersTest => {
transferPrepare.end()
})
+ await handlersTest.test('transferForwarded should', async transferForwarded => {
+ await transferForwarded.test('should update transfer internal state on prepare event forwarded action', async (test) => {
+ const td = await prepareTestData(testData)
+ const prepareConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.PREPARE.toUpperCase())
+ prepareConfig.logger = Logger
+ await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig)
+
+ try {
+ const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({
+ topicFilter: 'topic-transfer-position-batch',
+ action: 'prepare',
+ keyFilter: td.payer.participantCurrencyId.toString()
+ }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ test.ok(positionPrepare[0], 'Position prepare message with key found')
+ } catch (err) {
+ test.notOk('Error should not be thrown')
+ console.error(err)
+ }
+
+ await Producer.produceMessage(td.messageProtocolPrepareForwarded, td.topicConfTransferPrepare, prepareConfig)
+
+ await new Promise(resolve => setTimeout(resolve, 5000))
+
+ try {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ test.equal(transfer?.transferState, TransferInternalState.RESERVED_FORWARDED, 'Transfer state updated to RESERVED_FORWARDED')
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+ testConsumer.clearEvents()
+ test.end()
+ })
+
+ await transferForwarded.test('not timeout transfer in RESERVED_FORWARDED internal transfer state', async (test) => {
+ const td = await prepareTestData(testData)
+ const prepareConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.PREPARE.toUpperCase())
+ prepareConfig.logger = Logger
+ await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig)
+
+ try {
+ const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({
+ topicFilter: 'topic-transfer-position-batch',
+ action: 'prepare',
+ keyFilter: td.payer.participantCurrencyId.toString()
+ }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ test.ok(positionPrepare[0], 'Position prepare message with key found')
+ } catch (err) {
+ test.notOk('Error should not be thrown')
+ console.error(err)
+ }
+
+ await Producer.produceMessage(td.messageProtocolPrepareForwarded, td.topicConfTransferPrepare, prepareConfig)
+
+ await new Promise(resolve => setTimeout(resolve, 5000))
+
+ try {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ test.equal(transfer?.transferState, TransferInternalState.RESERVED_FORWARDED, 'Transfer state updated to RESERVED_FORWARDED')
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+
+ await new Promise(resolve => setTimeout(resolve, 5000))
+ try {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ test.equal(transfer?.transferState, TransferInternalState.RESERVED_FORWARDED, 'Transfer state is still RESERVED_FORWARDED')
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+
+ testConsumer.clearEvents()
+ test.end()
+ })
+
+ await transferForwarded.test('should be able to transition from RESERVED_FORWARDED to RECEIVED_FULFIL and COMMITED on fulfil', async (test) => {
+ const td = await prepareTestData(testData)
+ const prepareConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.PREPARE.toUpperCase())
+ prepareConfig.logger = Logger
+ const fulfilConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.FULFIL.toUpperCase())
+ fulfilConfig.logger = Logger
+ await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig)
+
+ try {
+ const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({
+ topicFilter: 'topic-transfer-position-batch',
+ action: 'prepare',
+ keyFilter: td.payer.participantCurrencyId.toString()
+ }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ test.ok(positionPrepare[0], 'Position prepare message with key found')
+ } catch (err) {
+ test.notOk('Error should not be thrown')
+ console.error(err)
+ }
+
+ await Producer.produceMessage(td.messageProtocolPrepareForwarded, td.topicConfTransferPrepare, prepareConfig)
+
+ await new Promise(resolve => setTimeout(resolve, 5000))
+
+ try {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ test.equal(transfer?.transferState, TransferInternalState.RESERVED_FORWARDED, 'Transfer state updated to RESERVED_FORWARDED')
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+ await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig)
+
+ try {
+ const positionFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({
+ topicFilter: 'topic-transfer-position-batch',
+ action: 'commit',
+ keyFilter: td.payee.participantCurrencyId.toString()
+ }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ test.ok(positionFulfil[0], 'Position fulfil message with key found')
+ } catch (err) {
+ test.notOk('Error should not be thrown')
+ console.error(err)
+ }
+
+ try {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ test.equal(transfer?.transferState, TransferInternalState.COMMITTED, 'Transfer state updated to COMMITTED')
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+
+ testConsumer.clearEvents()
+ test.end()
+ })
+
+ await transferForwarded.test('should be able to transition from RESERVED_FORWARDED to RECEIVED_ERROR and ABORTED_ERROR on fulfil error', async (test) => {
+ const td = await prepareTestData(testData)
+ const prepareConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.PREPARE.toUpperCase())
+ prepareConfig.logger = Logger
+ const fulfilConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.FULFIL.toUpperCase())
+ fulfilConfig.logger = Logger
+ await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig)
+
+ try {
+ const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({
+ topicFilter: 'topic-transfer-position-batch',
+ action: 'prepare',
+ keyFilter: td.payer.participantCurrencyId.toString()
+ }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ test.ok(positionPrepare[0], 'Position prepare message with key found')
+ } catch (err) {
+ test.notOk('Error should not be thrown')
+ console.error(err)
+ }
+
+ await Producer.produceMessage(td.messageProtocolPrepareForwarded, td.topicConfTransferPrepare, prepareConfig)
+
+ await new Promise(resolve => setTimeout(resolve, 5000))
+
+ try {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ test.equal(transfer?.transferState, TransferInternalState.RESERVED_FORWARDED, 'Transfer state updated to RESERVED_FORWARDED')
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+ await Producer.produceMessage(td.messageProtocolError, td.topicConfTransferFulfil, fulfilConfig)
+
+ await new Promise(resolve => setTimeout(resolve, 5000))
+
+ try {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ test.equal(transfer?.transferState, TransferInternalState.ABORTED_ERROR, 'Transfer state updated to ABORTED_ERROR')
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+
+ testConsumer.clearEvents()
+ test.end()
+ })
+
+ await transferForwarded.test('should create notification message if transfer is not found', async (test) => {
+ const td = await prepareTestData(testData)
+ const prepareConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.PREPARE.toUpperCase())
+ prepareConfig.logger = Logger
+
+ await Producer.produceMessage(td.messageProtocolPrepareForwarded, td.topicConfTransferPrepare, prepareConfig)
+
+ try {
+ const notificationMessages = await wrapWithRetries(() => testConsumer.getEventsForFilter({
+ topicFilter: 'topic-notification-event',
+ action: 'forwarded'
+ }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ test.ok(notificationMessages[0], 'notification message found')
+ test.equal(notificationMessages[0].value.to, 'proxyFsp')
+ test.equal(notificationMessages[0].value.from, 'payerFsp')
+ test.equal(
+ notificationMessages[0].value.content.payload.errorInformation.errorDescription,
+ 'Generic ID not found - Forwarded transfer could not be found.'
+ )
+ } catch (err) {
+ test.notOk('Error should not be thrown')
+ console.error(err)
+ }
+
+ testConsumer.clearEvents()
+ test.end()
+ })
+
+ await transferForwarded.test('should create notification message if transfer is found in incorrect state', async (test) => {
+ const expiredTestData = Util.clone(testData)
+ expiredTestData.expiration = new Date((new Date()).getTime() + 1000)
+ const td = await prepareTestData(expiredTestData)
+ const prepareConfig = Utility.getKafkaConfig(
+ Config.KAFKA_CONFIG,
+ Enum.Kafka.Config.PRODUCER,
+ TransferEventType.TRANSFER.toUpperCase(),
+ TransferEventType.PREPARE.toUpperCase())
+ prepareConfig.logger = Logger
+ await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig)
+
+ try {
+ await wrapWithRetries(async () => {
+ const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
+ if (transfer?.transferState !== TransferInternalState.EXPIRED_RESERVED) {
+ if (debug) console.log(`retrying in ${retryDelay / 1000}s..`)
+ return null
+ }
+ return transfer
+ }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ } catch (err) {
+ Logger.error(err)
+ test.fail(err.message)
+ }
+
+ // Send the prepare forwarded message after the prepare message has timed out
+ await Producer.produceMessage(td.messageProtocolPrepareForwarded, td.topicConfTransferPrepare, prepareConfig)
+
+ try {
+ const notificationMessages = await wrapWithRetries(() => testConsumer.getEventsForFilter({
+ topicFilter: 'topic-notification-event',
+ action: 'forwarded'
+ }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
+ test.ok(notificationMessages[0], 'notification message found')
+ test.equal(notificationMessages[0].value.to, 'proxyFsp')
+ test.equal(notificationMessages[0].value.from, 'payerFsp')
+ test.equal(
+ notificationMessages[0].value.content.payload.errorInformation.errorDescription,
+ 'Internal server error - Invalid State: EXPIRED_RESERVED - expected: RESERVED'
+ )
+ } catch (err) {
+ test.notOk('Error should not be thrown')
+ console.error(err)
+ }
+
+ testConsumer.clearEvents()
+ test.end()
+ })
+ transferForwarded.end()
+ })
+
await handlersTest.test('transferFulfil should', async transferFulfil => {
await transferFulfil.test('should create position fulfil message to override topic name in config', async (test) => {
const td = await prepareTestData(testData)
diff --git a/test/unit/domain/position/fx-timeout-reserved.test.js b/test/unit/domain/position/fx-timeout-reserved.test.js
index 8993d77b0..5cf119b3a 100644
--- a/test/unit/domain/position/fx-timeout-reserved.test.js
+++ b/test/unit/domain/position/fx-timeout-reserved.test.js
@@ -202,7 +202,7 @@ Test('timeout reserved domain', positionIndexTest => {
t.end()
})
- positionIndexTest.skip('processPositionFxTimeoutReservedBin should', changeParticipantPositionTest => {
+ positionIndexTest.test('processPositionFxTimeoutReservedBin should', changeParticipantPositionTest => {
changeParticipantPositionTest.test('produce abort message for transfers not in the right transfer state', async (test) => {
try {
await processPositionFxTimeoutReservedBin(
@@ -233,10 +233,14 @@ Test('timeout reserved domain', positionIndexTest => {
},
{
'd6a036a5-65a3-48af-a0c7-ee089c412ada': {
- amount: -10
+ 51: {
+ value: 10
+ }
},
'7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': {
- amount: -5
+ 51: {
+ value: 5
+ }
}
}
)
diff --git a/test/unit/domain/transfer/index.test.js b/test/unit/domain/transfer/index.test.js
index 730c527a0..93287e9aa 100644
--- a/test/unit/domain/transfer/index.test.js
+++ b/test/unit/domain/transfer/index.test.js
@@ -209,5 +209,35 @@ Test('Transfer Service', transferIndexTest => {
logTransferErrorTest.end()
})
+ transferIndexTest.test('forwardedPrepare should', handlePayeeResponseTest => {
+ handlePayeeResponseTest.test('commit transfer', async (test) => {
+ try {
+ TransferFacade.updatePrepareReservedForwarded.returns(Promise.resolve())
+ await TransferService.forwardedPrepare(payload.transferId)
+ test.pass()
+ test.end()
+ } catch (err) {
+ Logger.error(`handlePayeeResponse failed with error - ${err}`)
+ test.fail()
+ test.end()
+ }
+ })
+
+ handlePayeeResponseTest.test('throw error', async (test) => {
+ try {
+ TransferFacade.updatePrepareReservedForwarded.throws(new Error())
+ await TransferService.forwardedPrepare(payload.transferId)
+ test.fail('Error not thrown')
+ test.end()
+ } catch (err) {
+ Logger.error(`handlePayeeResponse failed with error - ${err}`)
+ test.pass('Error thrown')
+ test.end()
+ }
+ })
+
+ handlePayeeResponseTest.end()
+ })
+
transferIndexTest.end()
})
diff --git a/test/unit/handlers/transfers/handler.test.js b/test/unit/handlers/transfers/handler.test.js
index c69729212..9d228e3ed 100644
--- a/test/unit/handlers/transfers/handler.test.js
+++ b/test/unit/handlers/transfers/handler.test.js
@@ -998,6 +998,41 @@ Test('Transfer handler', transferHandlerTest => {
test.end()
})
+ fulfilTest.test('produce message to position topic when validations pass with RESERVED_FORWARDED state', async (test) => {
+ const localfulfilMessages = MainUtil.clone(fulfilMessages)
+ await Consumer.createHandler(topicName, config, command)
+ Kafka.transformGeneralTopicName.returns(topicName)
+
+ TransferService.getById.returns(Promise.resolve({
+ condition: 'condition',
+ payeeFsp: 'dfsp2',
+ payerFsp: 'proxyFsp',
+ transferState: TransferInternalState.RESERVED_FORWARDED
+ }))
+ ilp.update.returns(Promise.resolve())
+ Validator.validateFulfilCondition.returns(true)
+ localfulfilMessages[0].value.content.headers['fspiop-source'] = 'dfsp2'
+ localfulfilMessages[0].value.content.headers['fspiop-destination'] = 'proxyFsp'
+ localfulfilMessages[0].value.content.payload.fulfilment = 'condition'
+ Kafka.proceed.returns(true)
+
+ TransferService.getTransferDuplicateCheck.returns(Promise.resolve(null))
+ TransferService.saveTransferDuplicateCheck.returns(Promise.resolve(null))
+ Comparators.duplicateCheckComparator.withArgs(transfer.transferId, localfulfilMessages[0].value.content.payload).returns(Promise.resolve({
+ hasDuplicateId: false,
+ hasDuplicateHash: false
+ }))
+
+ const result = await allTransferHandlers.fulfil(null, localfulfilMessages)
+ const kafkaCallOne = Kafka.proceed.getCall(0)
+
+ test.equal(kafkaCallOne.args[2].eventDetail.functionality, Enum.Events.Event.Type.POSITION)
+ test.equal(kafkaCallOne.args[2].eventDetail.action, Enum.Events.Event.Action.COMMIT)
+ test.equal(kafkaCallOne.args[2].messageKey, '1')
+ test.equal(result, true)
+ test.end()
+ })
+
fulfilTest.test('fail if event type is not fulfil', async (test) => {
const localfulfilMessages = MainUtil.clone(fulfilMessages)
await Consumer.createHandler(topicName, config, command)
@@ -1071,6 +1106,47 @@ Test('Transfer handler', transferHandlerTest => {
test.end()
})
+ fulfilTest.test('produce message to position topic when validations pass if Cyril result is fx enabled on RESERVED_FORWARDED transfer state', async (test) => {
+ const localfulfilMessages = MainUtil.clone(fulfilMessages)
+ await Consumer.createHandler(topicName, config, command)
+ Kafka.transformGeneralTopicName.returns(topicName)
+ Cyril.processFulfilMessage.returns({
+ isFx: true,
+ positionChanges: [{
+ participantCurrencyId: 1
+ }]
+ })
+
+ TransferService.getById.returns(Promise.resolve({
+ condition: 'condition',
+ payeeFsp: 'dfsp2',
+ payerFsp: 'dfsp1',
+ transferState: TransferInternalState.RESERVED_FORWARDED
+ }))
+ ilp.update.returns(Promise.resolve())
+ Validator.validateFulfilCondition.returns(true)
+ localfulfilMessages[0].value.content.headers['fspiop-source'] = 'dfsp2'
+ localfulfilMessages[0].value.content.headers['fspiop-destination'] = 'dfsp1'
+ localfulfilMessages[0].value.content.payload.fulfilment = 'condition'
+ Kafka.proceed.returns(true)
+
+ TransferService.getTransferDuplicateCheck.returns(Promise.resolve(null))
+ TransferService.saveTransferDuplicateCheck.returns(Promise.resolve(null))
+ Comparators.duplicateCheckComparator.withArgs(transfer.transferId, localfulfilMessages[0].value.content.payload).returns(Promise.resolve({
+ hasDuplicateId: false,
+ hasDuplicateHash: false
+ }))
+
+ const result = await allTransferHandlers.fulfil(null, localfulfilMessages)
+ const kafkaCallOne = Kafka.proceed.getCall(0)
+
+ test.equal(kafkaCallOne.args[2].eventDetail.functionality, Enum.Events.Event.Type.POSITION)
+ test.equal(kafkaCallOne.args[2].eventDetail.action, Enum.Events.Event.Action.COMMIT)
+ test.equal(kafkaCallOne.args[2].messageKey, '1')
+ test.equal(result, true)
+ test.end()
+ })
+
fulfilTest.test('fail when Cyril result contains no positionChanges', async (test) => {
const localfulfilMessages = MainUtil.clone(fulfilMessages)
await Consumer.createHandler(topicName, config, command)
@@ -1815,6 +1891,36 @@ Test('Transfer handler', transferHandlerTest => {
test.end()
})
+ fulfilTest.test('set transfer ABORTED when valid errorInformation is provided from RESERVED_FORWARDED state', async (test) => {
+ const invalidEventMessage = MainUtil.clone(fulfilMessages)[0]
+ await Consumer.createHandler(topicName, config, command)
+ Kafka.transformGeneralTopicName.returns(topicName)
+ Validator.validateFulfilCondition.returns(true)
+ TransferService.getById.returns(Promise.resolve({
+ condition: 'condition',
+ payeeFsp: 'dfsp2',
+ payerFsp: 'dfsp1',
+ transferState: TransferInternalState.RESERVED_FORWARDED
+ }))
+ TransferService.handlePayeeResponse.returns(Promise.resolve({ transferErrorRecord: { errorCode: '5000', errorDescription: 'error text' } }))
+ invalidEventMessage.value.metadata.event.action = 'abort'
+ invalidEventMessage.value.content.payload = errInfo
+ invalidEventMessage.value.content.headers['fspiop-source'] = 'dfsp2'
+ invalidEventMessage.value.content.headers['fspiop-destination'] = 'dfsp1'
+ Kafka.proceed.returns(true)
+
+ TransferService.getTransferDuplicateCheck.returns(Promise.resolve(null))
+ TransferService.saveTransferDuplicateCheck.returns(Promise.resolve(null))
+ Comparators.duplicateCheckComparator.withArgs(transfer.transferId, invalidEventMessage.value.content.payload).returns(Promise.resolve({
+ hasDuplicateId: false,
+ hasDuplicateHash: false
+ }))
+
+ const result = await allTransferHandlers.fulfil(null, invalidEventMessage)
+ test.equal(result, true)
+ test.end()
+ })
+
fulfilTest.test('log error', async (test) => { // TODO: extend and enable unit test
const invalidEventMessage = MainUtil.clone(fulfilMessages)[0]
await Consumer.createHandler(topicName, config, command)
diff --git a/test/unit/handlers/transfers/prepare.test.js b/test/unit/handlers/transfers/prepare.test.js
index 726277b69..651ea7dd0 100644
--- a/test/unit/handlers/transfers/prepare.test.js
+++ b/test/unit/handlers/transfers/prepare.test.js
@@ -35,6 +35,7 @@ optionally within square brackets .
const Sinon = require('sinon')
const Test = require('tapes')(require('tape'))
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
+const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Validator = require('../../../../src/handlers/transfers/validator')
const TransferService = require('../../../../src/domain/transfer')
const Cyril = require('../../../../src/domain/fx/cyril')
@@ -187,6 +188,32 @@ const fxMessageProtocol = {
pp: ''
}
+const messageForwardedProtocol = {
+ id: randomUUID(),
+ from: '',
+ to: '',
+ type: 'application/json',
+ content: {
+ uriParams: { id: transfer.transferId },
+ payload: {
+ proxyId: ''
+ }
+ },
+ metadata: {
+ event: {
+ id: randomUUID(),
+ type: 'prepare',
+ action: 'forwarded',
+ createdAt: new Date(),
+ state: {
+ status: 'success',
+ code: 0
+ }
+ }
+ },
+ pp: ''
+}
+
const messageProtocolBulkPrepare = MainUtil.clone(messageProtocol)
messageProtocolBulkPrepare.metadata.event.action = 'bulk-prepare'
const messageProtocolBulkCommit = MainUtil.clone(messageProtocol)
@@ -212,6 +239,13 @@ const fxMessages = [
}
]
+const forwardedMessages = [
+ {
+ topic: topicName,
+ value: messageForwardedProtocol
+ }
+]
+
const config = {
options: {
mode: 2,
@@ -889,6 +923,51 @@ Test('Transfer handler', transferHandlerTest => {
}
})
+ prepareTest.test('update reserved transfer on forwarded prepare message', async (test) => {
+ await Consumer.createHandler(topicName, config, command)
+ Kafka.transformAccountToTopicName.returns(topicName)
+ Kafka.proceed.returns(true)
+ TransferService.getById.returns(Promise.resolve({ transferState: Enum.Transfers.TransferInternalState.RESERVED }))
+ Comparators.duplicateCheckComparator.withArgs(transfer.transferId, transfer).returns(Promise.resolve({
+ hasDuplicateId: false,
+ hasDuplicateHash: false
+ }))
+ const result = await allTransferHandlers.prepare(null, forwardedMessages[0])
+ test.ok(TransferService.forwardedPrepare.called)
+ test.equal(result, true)
+ test.end()
+ })
+
+ prepareTest.test('produce error for unexpected state', async (test) => {
+ await Consumer.createHandler(topicName, config, command)
+ Kafka.transformAccountToTopicName.returns(topicName)
+ Kafka.proceed.returns(true)
+ TransferService.getById.returns(Promise.resolve({ transferState: Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT }))
+ Comparators.duplicateCheckComparator.withArgs(transfer.transferId, transfer).returns(Promise.resolve({
+ hasDuplicateId: false,
+ hasDuplicateHash: false
+ }))
+ const result = await allTransferHandlers.prepare(null, forwardedMessages[0])
+ test.equal(Kafka.proceed.getCall(0).args[2].fspiopError.errorInformation.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR.code)
+ test.equal(result, true)
+ test.end()
+ })
+
+ prepareTest.test('produce error on transfer not found', async (test) => {
+ await Consumer.createHandler(topicName, config, command)
+ Kafka.transformAccountToTopicName.returns(topicName)
+ Kafka.proceed.returns(true)
+ TransferService.getById.returns(Promise.resolve(null))
+ Comparators.duplicateCheckComparator.withArgs(transfer.transferId, transfer).returns(Promise.resolve({
+ hasDuplicateId: false,
+ hasDuplicateHash: false
+ }))
+ const result = await allTransferHandlers.prepare(null, forwardedMessages[0])
+ test.equal(result, true)
+ test.equal(Kafka.proceed.getCall(0).args[2].fspiopError.errorInformation.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.ID_NOT_FOUND.code)
+ test.end()
+ })
+
prepareTest.end()
})