From 98c80098828e46d99b7c69c36c1fa522d2172932 Mon Sep 17 00:00:00 2001 From: artur-ma <57314588+artur-ma@users.noreply.github.com> Date: Mon, 3 May 2021 10:14:50 +0300 Subject: [PATCH] Retry connection on ECONNRESET (#167) * Retry connection on ECONNRESET * Restrict retry methods * do not retry on req with body * add tests * support undici * rename retry test * Remove PUT & DELETE and add docs * Update README.md * Fix readme Co-authored-by: Artur Koshtei Co-authored-by: Artur Koshtei Co-authored-by: Artur Koshtei Co-authored-by: Artur Koshtei --- README.md | 14 ++++++ index.js | 44 ++++++++++++++++- lib/request.js | 7 +-- test/http-retry.js | 109 +++++++++++++++++++++++++++++++++++++++++++ test/undici-retry.js | 109 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 278 insertions(+), 5 deletions(-) create mode 100644 test/http-retry.js create mode 100644 test/undici-retry.js diff --git a/README.md b/README.md index f9b9e68a..6c248be9 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,13 @@ This only applies when a custom [`body`](#body) is not passed in. Defaults to: ] ``` +#### `retryMethods` + +On which methods should the connection be retried in case of socket hang up. +**Be aware** that setting here not idempotent method may lead to unexpected results on target. + +By default: `['GET', 'HEAD', 'OPTIONS', 'TRACE' ]` + --- ### `reply.from(source, [opts])` @@ -256,6 +263,13 @@ Replaces the original request body with what is specified. Unless through `JSON.stringify()`. Setting this option for GET, HEAD requests will throw an error "Rewriting the body when doing a {GET|HEAD} is not allowed". +#### `retriesCount` + +How many times it will try to pick another connection on socket hangup (`ECONNRESET` error). +Useful when keeping the connection open (KeepAlive). +This number should be a function of the number of connections and the number of instances of a target. + +By default: 0 (disabled) #### `contentType` diff --git a/index.js b/index.js index aea8a61a..4723b55d 100644 --- a/index.js +++ b/index.js @@ -20,14 +20,20 @@ module.exports = fp(function from (fastify, opts, next) { 'application/json', ...(opts.contentTypesToEncode || []) ]) + + const retryMethods = new Set(opts.retryMethods || [ + 'GET', 'HEAD', 'OPTIONS', 'TRACE' + ]) + const cache = opts.disableCache ? undefined : lru(opts.cacheURLs || 100) const base = opts.base - const { request, close } = buildRequest({ + const { request, close, retryOnError } = buildRequest({ http: opts.http, http2: opts.http2, base, undici: opts.undici }) + fastify.decorateReply('from', function (source, opts) { opts = opts || {} const req = this.request.raw @@ -36,6 +42,7 @@ module.exports = fp(function from (fastify, opts, next) { const rewriteRequestHeaders = opts.rewriteRequestHeaders || requestHeadersNoOp const getUpstream = opts.getUpstream || upstreamNoOp const onError = opts.onError || onErrorDefault + const retriesCount = opts.retriesCount || 0 if (!source) { source = req.url @@ -109,8 +116,16 @@ module.exports = fp(function from (fastify, opts, next) { this.request.log.info({ source }, 'fetching from remote server') const requestHeaders = rewriteRequestHeaders(req, headers) + const contentLength = requestHeaders['content-length'] + let requestImpl - request({ method: req.method, url, qs, headers: requestHeaders, body }, (err, res) => { + if (retriesCount && retryMethods.has(req.method) && !contentLength) { + requestImpl = createRequestRetry(request, this, retriesCount, retryOnError) + } else { + requestImpl = request + } + + requestImpl({ method: req.method, url, qs, headers: requestHeaders, body }, (err, res) => { if (err) { this.request.log.warn(err, 'response errored') if (!this.sent) { @@ -197,3 +212,28 @@ function onErrorDefault (reply, { error }) { function isFastifyMultipartRegistered (fastify) { return fastify.hasContentTypeParser('multipart') && fastify.hasRequestDecorator('multipart') } + +function createRequestRetry (requestImpl, reply, retriesCount, retryOnError) { + function requestRetry (req, cb) { + let retries = 0 + + function run () { + requestImpl(req, function (err, res) { + if (err && !reply.sent && retriesCount > retries) { + if (err.code === retryOnError) { + retries += 1 + + run() + return + } + } + + cb(err, res) + }) + } + + run() + } + + return requestRetry +} diff --git a/lib/request.js b/lib/request.js index 7ea7412e..297fbc48 100644 --- a/lib/request.js +++ b/lib/request.js @@ -52,7 +52,7 @@ function buildRequest (opts) { } if (isHttp2) { - return { request: handleHttp2Req, close } + return { request: handleHttp2Req, close, retryOnError: 'ECONNRESET' } } else if (isUndici) { if (opts.base && opts.base.startsWith('unix+')) { const undiciOpts = getUndiciOptions(opts.undici) @@ -62,9 +62,9 @@ function buildRequest (opts) { } else { undiciAgent = new undici.Agent(getUndiciOptions(opts.undici)) } - return { request: handleUndici, close } + return { request: handleUndici, close, retryOnError: 'UND_ERR_SOCKET' } } else { - return { request: handleHttp1Req, close } + return { request: handleHttp1Req, close, retryOnError: 'ECONNRESET' } } function close () { @@ -98,6 +98,7 @@ function buildRequest (opts) { req.abort() done(err) }) + end(req, opts.body, done) } diff --git a/test/http-retry.js b/test/http-retry.js new file mode 100644 index 00000000..92547bba --- /dev/null +++ b/test/http-retry.js @@ -0,0 +1,109 @@ +'use strict' + +const Fastify = require('fastify') +const From = require('..') +const got = require('got') +const { test } = require('tap') + +let retryNum = 1 + +const target = require('http').createServer(function (req, res) { + if (retryNum % 2 !== 0) { + req.socket.destroy() + } else { + res.statusCode = 200 + res.setHeader('Content-Type', 'text/plain') + res.end('hello world') + } + + retryNum += 1 +}) + +test('Will retry', async function (t) { + t.teardown(() => { retryNum = 1 }) + + await target.listen(0) + t.teardown(target.close.bind(target)) + + const instance = Fastify() + + instance.register(From, { http: true }) + + instance.get('/', (request, reply) => { + reply.from(`http://localhost:${target.address().port}/`, { + retriesCount: 1, + onError: (reply, { error }) => { + t.equal(error.code, 'ECONNRESET') + reply.send(error) + } + }) + }) + + await instance.listen(0) + t.teardown(instance.close.bind(instance)) + + const { statusCode } = await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 }) + t.equal(statusCode, 200) +}) + +test('will not retry', async function (t) { + t.teardown(() => { retryNum = 1 }) + + await target.listen(0) + t.teardown(target.close.bind(target)) + + const instance = Fastify() + + instance.register(From, { http: true }) + + instance.get('/', (request, reply) => { + reply.from(`http://localhost:${target.address().port}/`, { + retriesCount: 0, + onError: (reply, { error }) => { + t.equal(error.code, 'ECONNRESET') + reply.send(error) + } + }) + }) + + await instance.listen(0) + t.teardown(instance.close.bind(instance)) + + try { + await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 }) + t.fail() + } catch (err) { + t.equal(err.response.statusCode, 500) + } +}) + +test('will not retry unsupported method', async function (t) { + t.teardown(() => { retryNum = 1 }) + + await target.listen(0) + t.teardown(target.close.bind(target)) + + const instance = Fastify() + + instance.register(From, { http: true, retryMethods: ['DELETE'] }) + + instance.get('/', (request, reply) => { + reply.from(`http://localhost:${target.address().port}/`, { + retriesCount: 1, + onError: (reply, { error }) => { + t.equal(error.code, 'ECONNRESET') + reply.send(error) + } + }) + }) + + await instance.listen(0) + t.teardown(instance.close.bind(instance)) + + try { + await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 }) + t.fail() + } catch (err) { + t.equal(err.response.statusCode, 500) + } +}) diff --git a/test/undici-retry.js b/test/undici-retry.js new file mode 100644 index 00000000..8d964254 --- /dev/null +++ b/test/undici-retry.js @@ -0,0 +1,109 @@ +'use strict' + +const Fastify = require('fastify') +const From = require('..') +const got = require('got') +const { test } = require('tap') + +let retryNum = 1 + +const target = require('http').createServer(function (req, res) { + if (retryNum % 2 !== 0) { + req.socket.destroy() + } else { + res.statusCode = 200 + res.setHeader('Content-Type', 'text/plain') + res.end('hello world') + } + + retryNum += 1 +}) + +test('Will retry', async function (t) { + t.teardown(() => { retryNum = 1 }) + + await target.listen(0) + t.teardown(target.close.bind(target)) + + const instance = Fastify() + + instance.register(From, { undici: true }) + + instance.get('/', (request, reply) => { + reply.from(`http://localhost:${target.address().port}/`, { + retriesCount: 1, + onError: (reply, { error }) => { + t.equal(error.code, 'UND_ERR_SOCKET') + reply.send(error) + } + }) + }) + + await instance.listen(0) + t.teardown(instance.close.bind(instance)) + + const { statusCode } = await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 }) + t.equal(statusCode, 200) +}) + +test('will not retry', async function (t) { + t.teardown(() => { retryNum = 1 }) + + await target.listen(0) + t.teardown(target.close.bind(target)) + + const instance = Fastify() + + instance.register(From, { undici: true }) + + instance.get('/', (request, reply) => { + reply.from(`http://localhost:${target.address().port}/`, { + retriesCount: 0, + onError: (reply, { error }) => { + t.equal(error.code, 'UND_ERR_SOCKET') + reply.send(error) + } + }) + }) + + await instance.listen(0) + t.teardown(instance.close.bind(instance)) + + try { + await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 }) + t.fail() + } catch (err) { + t.equal(err.response.statusCode, 500) + } +}) + +test('will not retry unsupported method', async function (t) { + t.teardown(() => { retryNum = 1 }) + + await target.listen(0) + t.teardown(target.close.bind(target)) + + const instance = Fastify() + + instance.register(From, { undici: true, retryMethods: ['DELETE'] }) + + instance.get('/', (request, reply) => { + reply.from(`http://localhost:${target.address().port}/`, { + retriesCount: 1, + onError: (reply, { error }) => { + t.equal(error.code, 'UND_ERR_SOCKET') + reply.send(error) + } + }) + }) + + await instance.listen(0) + t.teardown(instance.close.bind(instance)) + + try { + await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 }) + t.fail() + } catch (err) { + t.equal(err.response.statusCode, 500) + } +})