Skip to content

Commit

Permalink
core: Quickly retry file transfers on net errors
Browse files Browse the repository at this point in the history
  When encountering some `net` system errors like a network change, we
  don't want to enter the offline state and wait 10 seconds before
  retrying the transfer.

  To speed up things, we add a retry logic around the transfer itself
  which will retry on specific `net` system errors without entering the
  offline mode and with a much smaller delay (2 s here).

  If after a few retries (5 here) we still encounter `net` errors or if
  at any time we encouter some other kind of error, the retry logic will
  throw the error and our error handling mechanisms will pick up from
  here.
  • Loading branch information
taratatach committed Apr 22, 2022
1 parent cdb7b3e commit e4ce588
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 95 deletions.
68 changes: 36 additions & 32 deletions core/local/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const bluebird = require('bluebird')

const { TMP_DIR_NAME } = require('./constants')
const { NOTE_MIME_TYPE } = require('../remote/constants')
const { isRetryableNetworkError } = require('../remote/errors')
const stater = require('./stater')
const metadata = require('../metadata')
const { hideOnWindows } = require('../utils/fs')
Expand Down Expand Up @@ -251,46 +252,49 @@ class Local /*:: implements Reader, Writer */ {
}
},

async existingFilePath => {
return new Promise((resolve, reject) => {
fse.ensureDir(this.tmpPath, async () => {
hideOnWindows(this.tmpPath)
if (existingFilePath) {
log.info(
{ path: filePath },
`Recopy ${existingFilePath} -> ${filePath}`
)
this.events.emit('transfer-copy', doc)
fse.copy(existingFilePath, tmpFile, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
} else {
try {
const reader = await this.other.createReadStreamAsync(doc)
const source = onProgress
? streamUtils.withProgress(reader, onProgress)
: reader

const destination = fse.createWriteStream(tmpFile)

stream.pipeline(source, destination, err => {
async.retryable(
{ times: 5, interval: 2000, errorFilter: isRetryableNetworkError },
async existingFilePath => {
return new Promise((resolve, reject) => {
fse.ensureDir(this.tmpPath, async () => {
hideOnWindows(this.tmpPath)
if (existingFilePath) {
log.info(
{ path: filePath },
`Recopy ${existingFilePath} -> ${filePath}`
)
this.events.emit('transfer-copy', doc)
fse.copy(existingFilePath, tmpFile, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
} catch (err) {
reject(err)
} else {
try {
const reader = await this.other.createReadStreamAsync(doc)
const source = onProgress
? streamUtils.withProgress(reader, onProgress)
: reader

const destination = fse.createWriteStream(tmpFile)

stream.pipeline(source, destination, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
} catch (err) {
reject(err)
}
}
}
})
})
})
},
}
),

async () => {
if (doc.md5sum != null) {
Expand Down
10 changes: 10 additions & 0 deletions core/remote/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,15 @@ function isNetworkError(err /*: Error */) {
)
}

function isRetryableNetworkError(err /*: Error */) {
return (
typeof err.message === 'string' &&
err.message.includes('net::') &&
!err.message.includes('net::ERR_INTERNET_DISCONNECTED') &&
!err.message.includes('net::ERR_PROXY_CONNECTION_FAILED')
)
}

module.exports = {
CozyDocumentMissingError,
DirectoryNotFound,
Expand Down Expand Up @@ -413,5 +422,6 @@ module.exports = {
UNREACHABLE_COZY_CODE,
USER_ACTION_REQUIRED_CODE,
isNetworkError,
isRetryableNetworkError,
wrapError
}
141 changes: 78 additions & 63 deletions core/remote/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
const autoBind = require('auto-bind')
const Promise = require('bluebird')
const path = require('path')
const async = require('async')

const logger = require('../utils/logger')
const measureTime = require('../utils/perfs')
const pathUtils = require('../utils/path')
const metadata = require('../metadata')
const { ROOT_DIR_ID, DIR_TYPE } = require('./constants')
const { RemoteCozy } = require('./cozy')
const { DirectoryNotFound, ExcludedDirError } = require('./errors')
const {
DirectoryNotFound,
ExcludedDirError,
isRetryableNetworkError
} = require('./errors')
const { RemoteWarningPoller } = require('./warning_poller')
const { RemoteWatcher } = require('./watcher')
const timestamp = require('../utils/timestamp')
Expand Down Expand Up @@ -182,32 +187,37 @@ class Remote /*:: implements Reader, Writer */ {
const [parentPath, name] = dirAndName(path)
const parent = await this.findDirectoryByPath(parentPath)

let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}

const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream
await async.retry(
{ times: 5, interval: 2000, errorFilter: isRetryableNetworkError },
async () => {
let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}

const created = await this.remoteCozy.createFile(source, {
...newDocumentAttributes(name, parent._id, doc.updated_at),
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime
})
metadata.updateRemote(doc, created)
const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream

const created = await this.remoteCozy.createFile(source, {
...newDocumentAttributes(name, parent._id, doc.updated_at),
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime
})
metadata.updateRemote(doc, created)
}
)

stopMeasure()
}
Expand All @@ -219,46 +229,51 @@ class Remote /*:: implements Reader, Writer */ {
const { path } = doc
log.info({ path }, 'Uploading new file version...')

let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}
await async.retry(
{ times: 5, interval: 2000, errorFilter: isRetryableNetworkError },
async () => {
let stream
try {
stream = await this.other.createReadStreamAsync(doc)
} catch (err) {
if (err.code === 'ENOENT') {
log.warn({ path }, 'Local file does not exist anymore.')
// FIXME: with this deletion marker, the record will be erased from
// PouchDB while the remote document will remain.
doc.trashed = true
return doc
}
throw err
}

// Object.assign gives us the opportunity to enforce required options with
// Flow while they're only optional in the Metadata type. For example,
// `md5sum` and `mime` are optional in Metadata because they only apply to
// files. But we're sure we have files at this point and that they do have
// those attributes.
const options = Object.assign(
{},
{
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime,
updatedAt: mostRecentUpdatedAt(doc),
ifMatch: doc.remote._rev
// Object.assign gives us the opportunity to enforce required options with
// Flow while they're only optional in the Metadata type. For example,
// `md5sum` and `mime` are optional in Metadata because they only apply to
// files. But we're sure we have files at this point and that they do have
// those attributes.
const options = Object.assign(
{},
{
checksum: doc.md5sum,
executable: doc.executable || false,
contentLength: doc.size,
contentType: doc.mime,
updatedAt: mostRecentUpdatedAt(doc),
ifMatch: doc.remote._rev
}
)
const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream

const updated = await this.remoteCozy.updateFileById(
doc.remote._id,
source,
options
)
metadata.updateRemote(doc, updated)
}
)
const source = onProgress
? streamUtils.withProgress(stream, onProgress)
: stream

const updated = await this.remoteCozy.updateFileById(
doc.remote._id,
source,
options
)
metadata.updateRemote(doc, updated)
}

async updateFileMetadataAsync(doc /*: SavedMetadata */) /*: Promise<void> */ {
Expand Down

0 comments on commit e4ce588

Please sign in to comment.