diff --git a/cf/src/connection.js b/cf/src/connection.js index ab977ca8..c9231dc6 100644 --- a/cf/src/connection.js +++ b/cf/src/connection.js @@ -431,10 +431,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose lifeTimer.cancel() connectTimer.cancel() - if (socket.encrypted) { - socket.removeAllListeners() - socket = null - } + socket.removeAllListeners() + socket = null if (initial) return reconnect() @@ -792,7 +790,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose const error = Errors.postgres(parseError(x)) query && query.retried ? errored(query.retried) - : query && retryRoutines.has(error.routine) + : query && query.prepared && retryRoutines.has(error.routine) ? retry(query, error) : errored(error) } diff --git a/cf/src/subscribe.js b/cf/src/subscribe.js index 1ab8b0be..35a98d61 100644 --- a/cf/src/subscribe.js +++ b/cf/src/subscribe.js @@ -48,7 +48,7 @@ export default function Subscribe(postgres, options) { return subscribe - async function subscribe(event, fn, onsubscribe = noop) { + async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { event = parseEvent(event) if (!connection) @@ -67,6 +67,7 @@ export default function Subscribe(postgres, options) { return connection.then(x => { connected(x) onsubscribe() + stream && stream.on('error', onerror) return { unsubscribe, state, sql } }) } @@ -110,8 +111,10 @@ export default function Subscribe(postgres, options) { function data(x) { if (x[0] === 0x77) parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - else if (x[0] === 0x6b && x[17]) + else if (x[0] === 0x6b && x[17]) { + state.lsn = x.subarray(1, 9) pong() + } } function handle(a, b) {