diff --git a/packages/cache/src/internal/cacheHttpClient.ts b/packages/cache/src/internal/cacheHttpClient.ts index f96ca381e1..ec7fd95cf0 100644 --- a/packages/cache/src/internal/cacheHttpClient.ts +++ b/packages/cache/src/internal/cacheHttpClient.ts @@ -8,6 +8,7 @@ import { import * as crypto from 'crypto' import * as fs from 'fs' import {URL} from 'url' +import {Readable as ReadableStream} from 'stream' import * as utils from './cacheUtils' import {CompressionMethod} from './constants' @@ -21,6 +22,7 @@ import { ArtifactCacheList } from './contracts' import { + type ChunkWriteCallback, downloadCacheHttpClient, downloadCacheHttpClientConcurrent, downloadCacheStorageSDK @@ -39,6 +41,8 @@ import { const versionSalt = '1.0' +type GetStreamForChunk = (start: number, end: number) => NodeJS.ReadableStream + function getCacheApiUrl(resource: string): string { const baseUrl: string = process.env['ACTIONS_CACHE_URL'] || '' if (!baseUrl) { @@ -168,10 +172,82 @@ async function printCachesListForDiagnostics( } } +export async function downloadCacheToBuffer( + archiveLocation: string, + options?: DownloadOptions +): Promise { + let lastWrittenChunkLocation: number = 0 + let writtenBytes: number = 0 + interface IChunk { + chunkBuffer: Buffer + chunkLength: number + bufferPosition: number + } + const bufferChunks: IChunk[] = [] + + await downloadCacheInternal( + archiveLocation, + ( + chunkBuffer, + chunkLength = chunkBuffer.length, + bufferPosition = lastWrittenChunkLocation + ) => { + bufferChunks.push({chunkBuffer, chunkLength, bufferPosition}) + lastWrittenChunkLocation = bufferPosition + chunkLength + writtenBytes = + writtenBytes > lastWrittenChunkLocation + ? writtenBytes + : lastWrittenChunkLocation + }, + () => writtenBytes, + options + ) + + if (bufferChunks.length === 1) { + const [{chunkBuffer}] = bufferChunks + return chunkBuffer + } else if (bufferChunks.length === 0) { + return Buffer.alloc(0) + } else { + const finalBuffer = Buffer.alloc(writtenBytes) + for (const { + chunkBuffer: buffer, + bufferPosition: position, + chunkLength: length + } of bufferChunks) { + buffer.copy(finalBuffer, position, 0, length) + } + + return finalBuffer + } +} + export async function downloadCache( archiveLocation: string, archivePath: string, options?: DownloadOptions +): Promise { + const archiveDescriptor = await fs.promises.open(archivePath, 'w') + + try { + await downloadCacheInternal( + archiveLocation, + async (buffer, length, position) => { + await archiveDescriptor.write(buffer, 0, length, position) + }, + () => utils.getArchiveFileSizeInBytes(archivePath), + options + ) + } finally { + await archiveDescriptor.close() + } +} + +async function downloadCacheInternal( + archiveLocation: string, + onChunk: ChunkWriteCallback, + getWrittenLength: () => number, + options: DownloadOptions | undefined ): Promise { const archiveUrl = new URL(archiveLocation) const downloadOptions = getDownloadOptions(options) @@ -181,22 +257,23 @@ export async function downloadCache( // Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability. await downloadCacheStorageSDK( archiveLocation, - archivePath, + onChunk, + getWrittenLength, downloadOptions ) } else if (downloadOptions.concurrentBlobDownloads) { // Use concurrent implementation with HttpClient to work around blob SDK issue await downloadCacheHttpClientConcurrent( archiveLocation, - archivePath, + onChunk, downloadOptions ) } else { // Otherwise, download using the Actions http-client. - await downloadCacheHttpClient(archiveLocation, archivePath) + await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength) } } else { - await downloadCacheHttpClient(archiveLocation, archivePath) + await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength) } } @@ -277,13 +354,12 @@ async function uploadChunk( async function uploadFile( httpClient: HttpClient, cacheId: number, - archivePath: string, + fileSize: number, + getReadStreamForChunk: GetStreamForChunk, options?: UploadOptions ): Promise { // Upload Chunks - const fileSize = utils.getArchiveFileSizeInBytes(archivePath) const resourceUrl = getCacheApiUrl(`caches/${cacheId.toString()}`) - const fd = fs.openSync(archivePath, 'r') const uploadOptions = getUploadOptions(options) const concurrency = utils.assertDefined( @@ -299,41 +375,24 @@ async function uploadFile( core.debug('Awaiting all uploads') let offset = 0 - try { - await Promise.all( - parallelUploads.map(async () => { - while (offset < fileSize) { - const chunkSize = Math.min(fileSize - offset, maxChunkSize) - const start = offset - const end = offset + chunkSize - 1 - offset += maxChunkSize - - await uploadChunk( - httpClient, - resourceUrl, - () => - fs - .createReadStream(archivePath, { - fd, - start, - end, - autoClose: false - }) - .on('error', error => { - throw new Error( - `Cache upload failed because file read failed with ${error.message}` - ) - }), - start, - end - ) - } - }) - ) - } finally { - fs.closeSync(fd) - } - return + await Promise.all( + parallelUploads.map(async () => { + while (offset < fileSize) { + const chunkSize = Math.min(fileSize - offset, maxChunkSize) + const start = offset + const end = offset + chunkSize - 1 + offset += maxChunkSize + + await uploadChunk( + httpClient, + resourceUrl, + () => getReadStreamForChunk(start, end), + start, + end + ) + } + }) + ) } async function commitCache( @@ -354,15 +413,66 @@ export async function saveCache( cacheId: number, archivePath: string, options?: UploadOptions +): Promise { + const fileSize = utils.getArchiveFileSizeInBytes(archivePath) + const fd = fs.openSync(archivePath, 'r') + + try { + await saveCacheInner( + cacheId, + (start, end) => + fs + .createReadStream(archivePath, { + fd, + start, + end, + autoClose: false + }) + .on('error', error => { + throw new Error( + `Cache upload failed because file read failed with ${error.message}` + ) + }), + fileSize, + options + ) + } finally { + fs.closeSync(fd) + } +} + +export async function saveCacheBuffer( + cacheId: number, + buffer: Buffer, + options?: UploadOptions +): Promise { + await saveCacheInner( + cacheId, + (start, end) => ReadableStream.from(buffer.subarray(start, end + 1)), + buffer.length, + options + ) +} + +async function saveCacheInner( + cacheId: number, + getReadStreamForChunk: GetStreamForChunk, + cacheSize: number, + options?: UploadOptions ): Promise { const httpClient = createHttpClient() core.debug('Upload cache') - await uploadFile(httpClient, cacheId, archivePath, options) + await uploadFile( + httpClient, + cacheId, + cacheSize, + getReadStreamForChunk, + options + ) // Commit Cache - core.debug('Commiting cache') - const cacheSize = utils.getArchiveFileSizeInBytes(archivePath) + core.debug('Committing cache') core.info( `Cache Size: ~${Math.round(cacheSize / (1024 * 1024))} MB (${cacheSize} B)` ) diff --git a/packages/cache/src/internal/downloadUtils.ts b/packages/cache/src/internal/downloadUtils.ts index de57ed78ff..144ea2009e 100644 --- a/packages/cache/src/internal/downloadUtils.ts +++ b/packages/cache/src/internal/downloadUtils.ts @@ -3,17 +3,21 @@ import {HttpClient, HttpClientResponse} from '@actions/http-client' import {BlockBlobClient} from '@azure/storage-blob' import {TransferProgressEvent} from '@azure/ms-rest-js' import * as buffer from 'buffer' -import * as fs from 'fs' import * as stream from 'stream' import * as util from 'util' -import * as utils from './cacheUtils' import {SocketTimeout} from './constants' import {DownloadOptions} from '../options' import {retryHttpClientResponse} from './requestUtils' import {AbortController} from '@azure/abort-controller' +export type ChunkWriteCallback = ( + chunk: Buffer, + count: number | undefined, + offset: number | undefined +) => Promise | void + /** * Pipes the body of a HTTP response to a stream * @@ -169,9 +173,9 @@ export class DownloadProgress { */ export async function downloadCacheHttpClient( archiveLocation: string, - archivePath: string + onChunk: ChunkWriteCallback, + getWrittenLength: () => number ): Promise { - const writeStream = fs.createWriteStream(archivePath) const httpClient = new HttpClient('actions/cache') const downloadResponse = await retryHttpClientResponse( 'downloadCache', @@ -184,14 +188,16 @@ export async function downloadCacheHttpClient( core.debug(`Aborting download, socket timed out after ${SocketTimeout} ms`) }) - await pipeResponseToStream(downloadResponse, writeStream) + // readBodyBuffer is always defined in http-client + const responseBuffer: Buffer = await downloadResponse.readBodyBuffer!() + onChunk(responseBuffer, undefined, undefined) // Validate download size. const contentLengthHeader = downloadResponse.message.headers['content-length'] if (contentLengthHeader) { const expectedLength = parseInt(contentLengthHeader) - const actualLength = utils.getArchiveFileSizeInBytes(archivePath) + const actualLength = getWrittenLength() if (actualLength !== expectedLength) { throw new Error( @@ -211,10 +217,9 @@ export async function downloadCacheHttpClient( */ export async function downloadCacheHttpClientConcurrent( archiveLocation: string, - archivePath: fs.PathLike, + onChunk: ChunkWriteCallback, options: DownloadOptions ): Promise { - const archiveDescriptor = await fs.promises.open(archivePath, 'w') const httpClient = new HttpClient('actions/cache', undefined, { socketTimeout: options.timeoutInMs, keepAlive: true @@ -270,16 +275,13 @@ export async function downloadCacheHttpClientConcurrent( | undefined const waitAndWrite: () => Promise = async () => { - const segment = await Promise.race(Object.values(activeDownloads)) - await archiveDescriptor.write( - segment.buffer, - 0, - segment.count, - segment.offset + const {buffer, count, offset} = await Promise.race( + Object.values(activeDownloads) ) + onChunk(buffer, count, offset) actives-- - delete activeDownloads[segment.offset] - bytesDownloaded += segment.count + delete activeDownloads[offset] + bytesDownloaded += count progressFn({loadedBytes: bytesDownloaded}) } @@ -297,7 +299,6 @@ export async function downloadCacheHttpClientConcurrent( } } finally { httpClient.dispose() - await archiveDescriptor.close() } } @@ -373,7 +374,8 @@ declare class DownloadSegment { */ export async function downloadCacheStorageSDK( archiveLocation: string, - archivePath: string, + onChunk: ChunkWriteCallback, + getWrittenLength: () => number, options: DownloadOptions ): Promise { const client = new BlockBlobClient(archiveLocation, undefined, { @@ -394,7 +396,7 @@ export async function downloadCacheStorageSDK( 'Unable to determine content length, downloading file with http-client...' ) - await downloadCacheHttpClient(archiveLocation, archivePath) + await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength) } else { // Use downloadToBuffer for faster downloads, since internally it splits the // file into 4 MB chunks which can then be parallelized and retried independently @@ -407,8 +409,6 @@ export async function downloadCacheStorageSDK( const maxSegmentSize = Math.min(134217728, buffer.constants.MAX_LENGTH) const downloadProgress = new DownloadProgress(contentLength) - const fd = fs.openSync(archivePath, 'w') - try { downloadProgress.startDisplayTimer() const controller = new AbortController() @@ -437,12 +437,11 @@ export async function downloadCacheStorageSDK( 'Aborting cache download as the download time exceeded the timeout.' ) } else if (Buffer.isBuffer(result)) { - fs.writeFileSync(fd, result) + onChunk(result, undefined, undefined) } } } finally { downloadProgress.stopDisplayTimer() - fs.closeSync(fd) } } }