Skip to content

Commit

Permalink
Handle network errors during file transfers (#2223)
Browse files Browse the repository at this point in the history
Network errors thrown while a file is being transferred to or from the
Cozy should be handled as `FetchError` errors so our error handling
mechanisms will work.
This includes showing proper error messages, retrying transfers and
avoiding blocking the client.

This PR solves multiple problems at once:
- `net::*` errors are handled as `FetchError` errors
- file transfers are retried as quickly as possible for a limited amount
  of network errors
- the remote watcher does not try to make calls to the remote Cozy when
  stopped
- the synchronization process does not get blocked for more than one
  reason at any given time (since it can only get unblocked for one
  reason)
  • Loading branch information
taratatach authored Apr 22, 2022
2 parents 0f4450a + e4ce588 commit df852aa
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 106 deletions.
68 changes: 36 additions & 32 deletions core/local/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const bluebird = require('bluebird')

const { TMP_DIR_NAME } = require('./constants')
const { NOTE_MIME_TYPE } = require('../remote/constants')
const { isRetryableNetworkError } = require('../remote/errors')
const stater = require('./stater')
const metadata = require('../metadata')
const { hideOnWindows } = require('../utils/fs')
Expand Down Expand Up @@ -251,46 +252,49 @@ class Local /*:: implements Reader, Writer */ {
}
},

async existingFilePath => {
return new Promise((resolve, reject) => {
fse.ensureDir(this.tmpPath, async () => {
hideOnWindows(this.tmpPath)
if (existingFilePath) {
log.info(
{ path: filePath },
`Recopy ${existingFilePath} -> ${filePath}`
)
this.events.emit('transfer-copy', doc)
fse.copy(existingFilePath, tmpFile, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
} else {
try {
const reader = await this.other.createReadStreamAsync(doc)
const source = onProgress
? streamUtils.withProgress(reader, onProgress)
: reader

const destination = fse.createWriteStream(tmpFile)

stream.pipeline(source, destination, err => {
async.retryable(
{ times: 5, interval: 2000, errorFilter: isRetryableNetworkError },
async existingFilePath => {
return new Promise((resolve, reject) => {
fse.ensureDir(this.tmpPath, async () => {
hideOnWindows(this.tmpPath)
if (existingFilePath) {
log.info(
{ path: filePath },
`Recopy ${existingFilePath} -> ${filePath}`
)
this.events.emit('transfer-copy', doc)
fse.copy(existingFilePath, tmpFile, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
} catch (err) {
reject(err)
} else {
try {
const reader = await this.other.createReadStreamAsync(doc)
const source = onProgress
? streamUtils.withProgress(reader, onProgress)
: reader

const destination = fse.createWriteStream(tmpFile)

stream.pipeline(source, destination, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
} catch (err) {
reject(err)
}
}
}
})
})
})
},
}
),

async () => {
if (doc.md5sum != null) {
Expand Down
20 changes: 19 additions & 1 deletion core/remote/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ const wrapError = (
err /*: FetchError | Error */,
doc /*: ?SavedMetadata */
) /*: RemoteError */ => {
if (err.name === 'FetchError') {
if (isNetworkError(err)) {
// $FlowFixMe FetchErrors missing status will fallback to the default case
const { status } = err

Expand Down Expand Up @@ -379,6 +379,22 @@ function detail(err /*: FetchError */) /*: ?string */ {
return detail
}

function isNetworkError(err /*: Error */) {
return (
err.name === 'FetchError' ||
(typeof err.message === 'string' && err.message.includes('net::'))
)
}

function isRetryableNetworkError(err /*: Error */) {
return (
typeof err.message === 'string' &&
err.message.includes('net::') &&
!err.message.includes('net::ERR_INTERNET_DISCONNECTED') &&
!err.message.includes('net::ERR_PROXY_CONNECTION_FAILED')
)
}

module.exports = {
CozyDocumentMissingError,
DirectoryNotFound,
Expand All @@ -405,5 +421,7 @@ module.exports = {
UNKNOWN_REMOTE_ERROR_CODE,
UNREACHABLE_COZY_CODE,
USER_ACTION_REQUIRED_CODE,
isNetworkError,
isRetryableNetworkError,
wrapError
}
141 changes: 78 additions & 63 deletions core/remote/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
const autoBind = require('auto-bind')
const Promise = require('bluebird')
const path = require('path')
const async = require('async')

const logger = require('../utils/logger')
const measureTime = require('../utils/perfs')
const pathUtils = require('../utils/path')
const metadata = require('../metadata')
const { ROOT_DIR_ID, DIR_TYPE } = require('./constants')
const { RemoteCozy } = require('./cozy')
const { DirectoryNotFound, ExcludedDirError } = require('./errors')
const {
DirectoryNotFound,
ExcludedDirError,
isRetryableNetworkError
} = require('./errors')
const { RemoteWarningPoller } = require('./warning_poller')
const { RemoteWatcher } = require('./watcher')
const timestamp = require('../utils/timestamp')
Expand Down Expand Up @@ -182,32 +187,37 @@ class Remote /*:: implements Reader, Writer */ {
const [parentPath, name] = dirAndName(path)
const parent = await this.findDirectoryByPath(parentPath)

let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}

const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream
await async.retry(
{ times: 5, interval: 2000, errorFilter: isRetryableNetworkError },
async () => {
let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}

const created = await this.remoteCozy.createFile(source, {
...newDocumentAttributes(name, parent._id, doc.updated_at),
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime
})
metadata.updateRemote(doc, created)
const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream

const created = await this.remoteCozy.createFile(source, {
...newDocumentAttributes(name, parent._id, doc.updated_at),
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime
})
metadata.updateRemote(doc, created)
}
)

stopMeasure()
}
Expand All @@ -219,46 +229,51 @@ class Remote /*:: implements Reader, Writer */ {
const { path } = doc
log.info({ path }, 'Uploading new file version...')

let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}
await async.retry(
{ times: 5, interval: 2000, errorFilter: isRetryableNetworkError },
async () => {
let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}

// Object.assign gives us the opportunity to enforce required options with
// Flow while they're only optional in the Metadata type. For example,
// `md5sum` and `mime` are optional in Metadata because they only apply to
// files. But we're sure we have files at this point and that they do have
// those attributes.
const options = Object.assign(
{},
{
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime,
updatedAt: mostRecentUpdatedAt(doc),
ifMatch: doc.remote._rev
// Object.assign gives us the opportunity to enforce required options with
// Flow while they're only optional in the Metadata type. For example,
// `md5sum` and `mime` are optional in Metadata because they only apply to
// files. But we're sure we have files at this point and that they do have
// those attributes.
const options = Object.assign(
{},
{
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime,
updatedAt: mostRecentUpdatedAt(doc),
ifMatch: doc.remote._rev
}
)
const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream

const updated = await this.remoteCozy.updateFileById(
doc.remote._id,
source,
options
)
metadata.updateRemote(doc, updated)
}
)
const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream

const updated = await this.remoteCozy.updateFileById(
doc.remote._id,
source,
options
)
metadata.updateRemote(doc, updated)
}

async updateFileMetadataAsync(doc /*: SavedMetadata */) /*: Promise<void> */ {
Expand Down
5 changes: 5 additions & 0 deletions core/remote/watcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class RemoteWatcher {
async watch() /*: Promise<?RemoteError> */ {
const release = await this.pouch.lock(this)
try {
if (!this.running) {
log.debug('Watcher stopped: skipping remote watch')
return
}

this.events.emit('buffering-start')

const seq = await this.pouch.getRemoteSeq()
Expand Down
6 changes: 3 additions & 3 deletions core/sync/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ const wrapError = (
return new SyncError({ sideName, err, code: INCOMPATIBLE_DOC_CODE, doc })
} else if (err instanceof remoteErrors.ExcludedDirError) {
return new SyncError({ sideName, err, code: EXCLUDED_DIR_CODE, doc })
} else if (sideName === 'remote' || err.name === 'FetchError') {
} else if (remoteErrors.isNetworkError(err)) {
// FetchErrors can be raised from the LocalWriter when failing to download a
// file for example. In this case the sideName will be "local" but the error
// name will still be "FetchError".
// file for example. In this case the error name won't be "FetchError" but
// its message will still contain `net::`.
// If err is a RemoteError, its code will be reused.
return new SyncError({
sideName,
Expand Down
17 changes: 10 additions & 7 deletions core/utils/lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ type State = 'done-stop' | 'will-start' | 'done-start' | 'will-stop'
class LifeCycle extends EventEmitter {
/*::
currentState: State
blockedFor: Set<string>
blockedFor: ?string
log: Logger
*/

constructor(logger /*: Logger */) {
super()
this.currentState = 'done-stop'
this.blockedFor = new Set()
this.blockedFor = null
this.log = logger
}

Expand Down Expand Up @@ -89,18 +89,21 @@ class LifeCycle extends EventEmitter {
}

blockFor(reason /*: string */) {
this.blockedFor.add(reason)
this.log.debug(`blocking for ${reason}`)
this.blockedFor = reason
}

unblockFor(reason /*: string */) {
if (reason === 'all') this.blockedFor.clear()
else this.blockedFor.delete(reason)
if (this.blockedFor.size === 0) this.emit('ready')
this.log.debug(`unblocking for ${reason}`)
if (reason === 'all' || reason === this.blockedFor) {
this.blockedFor = null
this.emit('ready')
}
}

async ready() /*: Promise<void> */ {
return new Promise(resolve => {
if (this.blockedFor.size === 0) resolve()
if (this.blockedFor == null) resolve()
else this.once('ready', resolve)
})
}
Expand Down
2 changes: 2 additions & 0 deletions test/support/helpers/remote.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class RemoteTestHelpers {
}

async pullChanges() {
this.side.watcher.running = true
await this.side.watcher.watch()
this.side.watcher.running = false
}

async createDirectory(
Expand Down
Loading

0 comments on commit df852aa

Please sign in to comment.