Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for caching and restoring data as Buffers. #1762

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 155 additions & 45 deletions packages/cache/src/internal/cacheHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import * as crypto from 'crypto'
import * as fs from 'fs'
import {URL} from 'url'
import {Readable as ReadableStream} from 'stream'

import * as utils from './cacheUtils'
import {CompressionMethod} from './constants'
Expand All @@ -21,6 +22,7 @@ import {
ArtifactCacheList
} from './contracts'
import {
type ChunkWriteCallback,
downloadCacheHttpClient,
downloadCacheHttpClientConcurrent,
downloadCacheStorageSDK
Expand All @@ -39,6 +41,8 @@ import {

const versionSalt = '1.0'

type GetStreamForChunk = (start: number, end: number) => NodeJS.ReadableStream

function getCacheApiUrl(resource: string): string {
const baseUrl: string = process.env['ACTIONS_CACHE_URL'] || ''
if (!baseUrl) {
Expand Down Expand Up @@ -168,10 +172,82 @@ async function printCachesListForDiagnostics(
}
}

export async function downloadCacheToBuffer(
archiveLocation: string,
options?: DownloadOptions
): Promise<Buffer> {
let lastWrittenChunkLocation: number = 0
let writtenBytes: number = 0
interface IChunk {
chunkBuffer: Buffer
chunkLength: number
bufferPosition: number
}
const bufferChunks: IChunk[] = []

await downloadCacheInternal(
archiveLocation,
(
chunkBuffer,
chunkLength = chunkBuffer.length,
bufferPosition = lastWrittenChunkLocation
) => {
bufferChunks.push({chunkBuffer, chunkLength, bufferPosition})
lastWrittenChunkLocation = bufferPosition + chunkLength
writtenBytes =
writtenBytes > lastWrittenChunkLocation
? writtenBytes
: lastWrittenChunkLocation
},
() => writtenBytes,
options
)

if (bufferChunks.length === 1) {
const [{chunkBuffer}] = bufferChunks
return chunkBuffer
} else if (bufferChunks.length === 0) {
return Buffer.alloc(0)
} else {
const finalBuffer = Buffer.alloc(writtenBytes)
for (const {
chunkBuffer: buffer,
bufferPosition: position,
chunkLength: length
} of bufferChunks) {
buffer.copy(finalBuffer, position, 0, length)
}

return finalBuffer
}
}

export async function downloadCache(
archiveLocation: string,
archivePath: string,
options?: DownloadOptions
): Promise<void> {
const archiveDescriptor = await fs.promises.open(archivePath, 'w')

try {
await downloadCacheInternal(
archiveLocation,
async (buffer, length, position) => {
await archiveDescriptor.write(buffer, 0, length, position)
},
() => utils.getArchiveFileSizeInBytes(archivePath),
options
)
} finally {
await archiveDescriptor.close()
}
}

async function downloadCacheInternal(
archiveLocation: string,
onChunk: ChunkWriteCallback,
getWrittenLength: () => number,
options: DownloadOptions | undefined
): Promise<void> {
const archiveUrl = new URL(archiveLocation)
const downloadOptions = getDownloadOptions(options)
Expand All @@ -181,22 +257,23 @@ export async function downloadCache(
// Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability.
await downloadCacheStorageSDK(
archiveLocation,
archivePath,
onChunk,
getWrittenLength,
downloadOptions
)
} else if (downloadOptions.concurrentBlobDownloads) {
// Use concurrent implementation with HttpClient to work around blob SDK issue
await downloadCacheHttpClientConcurrent(
archiveLocation,
archivePath,
onChunk,
downloadOptions
)
} else {
// Otherwise, download using the Actions http-client.
await downloadCacheHttpClient(archiveLocation, archivePath)
await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength)
}
} else {
await downloadCacheHttpClient(archiveLocation, archivePath)
await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength)
}
}

Expand Down Expand Up @@ -277,13 +354,12 @@ async function uploadChunk(
async function uploadFile(
httpClient: HttpClient,
cacheId: number,
archivePath: string,
fileSize: number,
getReadStreamForChunk: GetStreamForChunk,
options?: UploadOptions
): Promise<void> {
// Upload Chunks
const fileSize = utils.getArchiveFileSizeInBytes(archivePath)
const resourceUrl = getCacheApiUrl(`caches/${cacheId.toString()}`)
const fd = fs.openSync(archivePath, 'r')
const uploadOptions = getUploadOptions(options)

const concurrency = utils.assertDefined(
Expand All @@ -299,41 +375,24 @@ async function uploadFile(
core.debug('Awaiting all uploads')
let offset = 0

try {
await Promise.all(
parallelUploads.map(async () => {
while (offset < fileSize) {
const chunkSize = Math.min(fileSize - offset, maxChunkSize)
const start = offset
const end = offset + chunkSize - 1
offset += maxChunkSize

await uploadChunk(
httpClient,
resourceUrl,
() =>
fs
.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
})
.on('error', error => {
throw new Error(
`Cache upload failed because file read failed with ${error.message}`
)
}),
start,
end
)
}
})
)
} finally {
fs.closeSync(fd)
}
return
await Promise.all(
parallelUploads.map(async () => {
while (offset < fileSize) {
const chunkSize = Math.min(fileSize - offset, maxChunkSize)
const start = offset
const end = offset + chunkSize - 1
offset += maxChunkSize

await uploadChunk(
httpClient,
resourceUrl,
() => getReadStreamForChunk(start, end),
start,
end
)
}
})
)
}

async function commitCache(
Expand All @@ -354,15 +413,66 @@ export async function saveCache(
cacheId: number,
archivePath: string,
options?: UploadOptions
): Promise<void> {
const fileSize = utils.getArchiveFileSizeInBytes(archivePath)
const fd = fs.openSync(archivePath, 'r')

try {
await saveCacheInner(
cacheId,
(start, end) =>
fs
.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
})
.on('error', error => {
throw new Error(
`Cache upload failed because file read failed with ${error.message}`
)
}),
fileSize,
options
)
} finally {
fs.closeSync(fd)
}
}

export async function saveCacheBuffer(
cacheId: number,
buffer: Buffer,
options?: UploadOptions
): Promise<void> {
await saveCacheInner(
cacheId,
(start, end) => ReadableStream.from(buffer.subarray(start, end + 1)),
buffer.length,
options
)
}

async function saveCacheInner(
cacheId: number,
getReadStreamForChunk: GetStreamForChunk,
cacheSize: number,
options?: UploadOptions
): Promise<void> {
const httpClient = createHttpClient()

core.debug('Upload cache')
await uploadFile(httpClient, cacheId, archivePath, options)
await uploadFile(
httpClient,
cacheId,
cacheSize,
getReadStreamForChunk,
options
)

// Commit Cache
core.debug('Commiting cache')
const cacheSize = utils.getArchiveFileSizeInBytes(archivePath)
core.debug('Committing cache')
core.info(
`Cache Size: ~${Math.round(cacheSize / (1024 * 1024))} MB (${cacheSize} B)`
)
Expand Down
Loading
Loading