Skip to content

Commit

Permalink
Retry connection on ECONNRESET (#167)
Browse files Browse the repository at this point in the history
* Retry connection on ECONNRESET

* Restrict retry methods

* do not retry on req with body

* add tests

* support undici

* rename retry test

* Remove PUT & DELETE and add docs

* Update README.md

* Fix readme

Co-authored-by: Artur Koshtei <[email protected]>
Co-authored-by: Artur Koshtei <[email protected]>
Co-authored-by: Artur Koshtei <[email protected]>
Co-authored-by: Artur Koshtei <[email protected]>
  • Loading branch information
5 people authored May 3, 2021
1 parent dd28b58 commit 98c8009
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 5 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ This only applies when a custom [`body`](#body) is not passed in. Defaults to:
]
```

#### `retryMethods`

On which methods should the connection be retried in case of socket hang up.
**Be aware** that setting here not idempotent method may lead to unexpected results on target.

By default: `['GET', 'HEAD', 'OPTIONS', 'TRACE' ]`

---

### `reply.from(source, [opts])`
Expand Down Expand Up @@ -256,6 +263,13 @@ Replaces the original request body with what is specified. Unless
through `JSON.stringify()`.
Setting this option for GET, HEAD requests will throw an error "Rewriting the body when doing a {GET|HEAD} is not allowed".

#### `retriesCount`

How many times it will try to pick another connection on socket hangup (`ECONNRESET` error).
Useful when keeping the connection open (KeepAlive).
This number should be a function of the number of connections and the number of instances of a target.

By default: 0 (disabled)

#### `contentType`

Expand Down
44 changes: 42 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ module.exports = fp(function from (fastify, opts, next) {
'application/json',
...(opts.contentTypesToEncode || [])
])

const retryMethods = new Set(opts.retryMethods || [
'GET', 'HEAD', 'OPTIONS', 'TRACE'
])

const cache = opts.disableCache ? undefined : lru(opts.cacheURLs || 100)
const base = opts.base
const { request, close } = buildRequest({
const { request, close, retryOnError } = buildRequest({
http: opts.http,
http2: opts.http2,
base,
undici: opts.undici
})

fastify.decorateReply('from', function (source, opts) {
opts = opts || {}
const req = this.request.raw
Expand All @@ -36,6 +42,7 @@ module.exports = fp(function from (fastify, opts, next) {
const rewriteRequestHeaders = opts.rewriteRequestHeaders || requestHeadersNoOp
const getUpstream = opts.getUpstream || upstreamNoOp
const onError = opts.onError || onErrorDefault
const retriesCount = opts.retriesCount || 0

if (!source) {
source = req.url
Expand Down Expand Up @@ -109,8 +116,16 @@ module.exports = fp(function from (fastify, opts, next) {
this.request.log.info({ source }, 'fetching from remote server')

const requestHeaders = rewriteRequestHeaders(req, headers)
const contentLength = requestHeaders['content-length']
let requestImpl

request({ method: req.method, url, qs, headers: requestHeaders, body }, (err, res) => {
if (retriesCount && retryMethods.has(req.method) && !contentLength) {
requestImpl = createRequestRetry(request, this, retriesCount, retryOnError)
} else {
requestImpl = request
}

requestImpl({ method: req.method, url, qs, headers: requestHeaders, body }, (err, res) => {
if (err) {
this.request.log.warn(err, 'response errored')
if (!this.sent) {
Expand Down Expand Up @@ -197,3 +212,28 @@ function onErrorDefault (reply, { error }) {
function isFastifyMultipartRegistered (fastify) {
return fastify.hasContentTypeParser('multipart') && fastify.hasRequestDecorator('multipart')
}

function createRequestRetry (requestImpl, reply, retriesCount, retryOnError) {
function requestRetry (req, cb) {
let retries = 0

function run () {
requestImpl(req, function (err, res) {
if (err && !reply.sent && retriesCount > retries) {
if (err.code === retryOnError) {
retries += 1

run()
return
}
}

cb(err, res)
})
}

run()
}

return requestRetry
}
7 changes: 4 additions & 3 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function buildRequest (opts) {
}

if (isHttp2) {
return { request: handleHttp2Req, close }
return { request: handleHttp2Req, close, retryOnError: 'ECONNRESET' }
} else if (isUndici) {
if (opts.base && opts.base.startsWith('unix+')) {
const undiciOpts = getUndiciOptions(opts.undici)
Expand All @@ -62,9 +62,9 @@ function buildRequest (opts) {
} else {
undiciAgent = new undici.Agent(getUndiciOptions(opts.undici))
}
return { request: handleUndici, close }
return { request: handleUndici, close, retryOnError: 'UND_ERR_SOCKET' }
} else {
return { request: handleHttp1Req, close }
return { request: handleHttp1Req, close, retryOnError: 'ECONNRESET' }
}

function close () {
Expand Down Expand Up @@ -98,6 +98,7 @@ function buildRequest (opts) {
req.abort()
done(err)
})

end(req, opts.body, done)
}

Expand Down
109 changes: 109 additions & 0 deletions test/http-retry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
'use strict'

const Fastify = require('fastify')
const From = require('..')
const got = require('got')
const { test } = require('tap')

let retryNum = 1

const target = require('http').createServer(function (req, res) {
if (retryNum % 2 !== 0) {
req.socket.destroy()
} else {
res.statusCode = 200
res.setHeader('Content-Type', 'text/plain')
res.end('hello world')
}

retryNum += 1
})

test('Will retry', async function (t) {
t.teardown(() => { retryNum = 1 })

await target.listen(0)
t.teardown(target.close.bind(target))

const instance = Fastify()

instance.register(From, { http: true })

instance.get('/', (request, reply) => {
reply.from(`http://localhost:${target.address().port}/`, {
retriesCount: 1,
onError: (reply, { error }) => {
t.equal(error.code, 'ECONNRESET')
reply.send(error)
}
})
})

await instance.listen(0)
t.teardown(instance.close.bind(instance))

const { statusCode } = await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 })
t.equal(statusCode, 200)
})

test('will not retry', async function (t) {
t.teardown(() => { retryNum = 1 })

await target.listen(0)
t.teardown(target.close.bind(target))

const instance = Fastify()

instance.register(From, { http: true })

instance.get('/', (request, reply) => {
reply.from(`http://localhost:${target.address().port}/`, {
retriesCount: 0,
onError: (reply, { error }) => {
t.equal(error.code, 'ECONNRESET')
reply.send(error)
}
})
})

await instance.listen(0)
t.teardown(instance.close.bind(instance))

try {
await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 })
t.fail()
} catch (err) {
t.equal(err.response.statusCode, 500)
}
})

test('will not retry unsupported method', async function (t) {
t.teardown(() => { retryNum = 1 })

await target.listen(0)
t.teardown(target.close.bind(target))

const instance = Fastify()

instance.register(From, { http: true, retryMethods: ['DELETE'] })

instance.get('/', (request, reply) => {
reply.from(`http://localhost:${target.address().port}/`, {
retriesCount: 1,
onError: (reply, { error }) => {
t.equal(error.code, 'ECONNRESET')
reply.send(error)
}
})
})

await instance.listen(0)
t.teardown(instance.close.bind(instance))

try {
await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 })
t.fail()
} catch (err) {
t.equal(err.response.statusCode, 500)
}
})
109 changes: 109 additions & 0 deletions test/undici-retry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
'use strict'

const Fastify = require('fastify')
const From = require('..')
const got = require('got')
const { test } = require('tap')

let retryNum = 1

const target = require('http').createServer(function (req, res) {
if (retryNum % 2 !== 0) {
req.socket.destroy()
} else {
res.statusCode = 200
res.setHeader('Content-Type', 'text/plain')
res.end('hello world')
}

retryNum += 1
})

test('Will retry', async function (t) {
t.teardown(() => { retryNum = 1 })

await target.listen(0)
t.teardown(target.close.bind(target))

const instance = Fastify()

instance.register(From, { undici: true })

instance.get('/', (request, reply) => {
reply.from(`http://localhost:${target.address().port}/`, {
retriesCount: 1,
onError: (reply, { error }) => {
t.equal(error.code, 'UND_ERR_SOCKET')
reply.send(error)
}
})
})

await instance.listen(0)
t.teardown(instance.close.bind(instance))

const { statusCode } = await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 })
t.equal(statusCode, 200)
})

test('will not retry', async function (t) {
t.teardown(() => { retryNum = 1 })

await target.listen(0)
t.teardown(target.close.bind(target))

const instance = Fastify()

instance.register(From, { undici: true })

instance.get('/', (request, reply) => {
reply.from(`http://localhost:${target.address().port}/`, {
retriesCount: 0,
onError: (reply, { error }) => {
t.equal(error.code, 'UND_ERR_SOCKET')
reply.send(error)
}
})
})

await instance.listen(0)
t.teardown(instance.close.bind(instance))

try {
await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 })
t.fail()
} catch (err) {
t.equal(err.response.statusCode, 500)
}
})

test('will not retry unsupported method', async function (t) {
t.teardown(() => { retryNum = 1 })

await target.listen(0)
t.teardown(target.close.bind(target))

const instance = Fastify()

instance.register(From, { undici: true, retryMethods: ['DELETE'] })

instance.get('/', (request, reply) => {
reply.from(`http://localhost:${target.address().port}/`, {
retriesCount: 1,
onError: (reply, { error }) => {
t.equal(error.code, 'UND_ERR_SOCKET')
reply.send(error)
}
})
})

await instance.listen(0)
t.teardown(instance.close.bind(instance))

try {
await got.get(`http://localhost:${instance.server.address().port}/`, { retry: 0 })
t.fail()
} catch (err) {
t.equal(err.response.statusCode, 500)
}
})

0 comments on commit 98c8009

Please sign in to comment.