Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: s3 e2e tests race condition #534

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 84 additions & 29 deletions test/s3.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import http from 'node:http'
import {describe} from 'node:test'
import {strict as assert} from 'node:assert'
import {S3, S3ServiceException} from '@aws-sdk/client-s3'
import {randomUUID} from 'crypto'

const STORE_PATH = '/upload'

Expand All @@ -16,7 +17,6 @@ interface S3Options {

const expireTime = 5000
const s3Credentials = {
bucket: process.env.AWS_BUCKET as string,
region: process.env.AWS_REGION,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID as string,
Expand All @@ -26,10 +26,72 @@ const s3Credentials = {

const s3Client = new S3(s3Credentials)

const createStore = (options: S3Options = {}) =>
function createBucket(bucketName: string) {
return s3Client.createBucket({
Bucket: bucketName,
})
}

function deleteBucket(bucketName: string) {
return s3Client.deleteBucket({
Bucket: bucketName,
})
}

async function cleanObjects(bucketName: string) {
while (true) {
const listObjects = await s3Client.listObjects({
Bucket: bucketName,
})

const objects = listObjects.Contents
if (!objects || (objects && objects.length === 0)) {
break
}
await s3Client.deleteObjects({
Bucket: bucketName,
Delete: {
Objects: objects.map((o) => ({
Key: o.Key as string,
})),
},
})
}
}

async function cleanMultiparts(bucketName: string) {
while (true) {
const multiParts = await s3Client.listMultipartUploads({
Bucket: bucketName,
})

if (!multiParts.Uploads || (multiParts.Uploads && multiParts.Uploads.length === 0)) {
break
}

await Promise.all(
multiParts.Uploads?.map(async (upload) => {
return s3Client.abortMultipartUpload({
Bucket: bucketName,
Key: upload.Key,
UploadId: upload.UploadId,
})
})
)
}
}

async function cleanBucket(bucketName: string) {
await Promise.all([cleanObjects(bucketName), cleanMultiparts(bucketName)])
}

const createStore = (bucketName: string, options: S3Options = {}) =>
new S3Store({
...options,
s3ClientConfig: s3Credentials,
s3ClientConfig: {
bucket: bucketName,
...s3Credentials,
},
})

const createUpload = async (agent: SuperAgentTest, uploadLength: number) => {
Expand Down Expand Up @@ -70,9 +132,14 @@ describe('S3 Store E2E', () => {
let listener: http.Server
let agent: SuperAgentTest
let store: S3Store
let bucketName: string

before(async () => {
bucketName = `tus-s3-e2e-${randomUUID()}`
await createBucket(bucketName)
console.log(`Execution tests on bucket: ${bucketName}`)

before((done) => {
store = createStore({
store = createStore(bucketName, {
expirationPeriodInMilliseconds: expireTime,
partSize: 5_242_880,
})
Expand All @@ -82,27 +149,15 @@ describe('S3 Store E2E', () => {
})
listener = server.listen()
agent = request.agent(listener)
done()
})

let startTime: number
beforeEach(() => {
startTime = Date.now()
beforeEach(async () => {
await cleanBucket(bucketName)
})

afterEach(async () => {
const endTime = Date.now()
const elapsedMs = (endTime - startTime) / 10

if (elapsedMs < expireTime) {
await wait(expireTime - elapsedMs + 100)
}

await store.deleteExpired()
})

after((done) => {
listener.close(done)
after(async () => {
await deleteBucket(bucketName)
await new Promise((resolve) => listener.close(resolve))
})

it('should set Tus-Completed=false when the upload is not completed', async () => {
Expand All @@ -111,7 +166,7 @@ describe('S3 Store E2E', () => {
await patchUpload(agent, uploadId, data.subarray(0, 1024 * 1024 * 6))

const {TagSet} = await s3Client.getObjectTagging({
Bucket: s3Credentials.bucket,
Bucket: bucketName,
Key: uploadId + '.info',
})

Expand All @@ -133,7 +188,7 @@ describe('S3 Store E2E', () => {
await patchUpload(agent, uploadId, data.subarray(offset), offset)

const {TagSet} = await s3Client.getObjectTagging({
Bucket: s3Credentials.bucket,
Bucket: bucketName,
Key: uploadId + '.info',
})

Expand All @@ -150,11 +205,11 @@ describe('S3 Store E2E', () => {

const [infoFile, partFile] = await Promise.all([
s3Client.getObject({
Bucket: s3Credentials.bucket,
Bucket: bucketName,
Key: uploadId + '.info',
}),
s3Client.getObject({
Bucket: s3Credentials.bucket,
Bucket: bucketName,
Key: uploadId + '.part',
}),
])
Expand All @@ -165,19 +220,19 @@ describe('S3 Store E2E', () => {
assert(infoFile.$metadata.httpStatusCode === 200)
assert(partFile.$metadata.httpStatusCode === 200)

await wait(expireTime + 100)
await wait(expireTime + 500)

// .info file and .part should be deleted since now they should be expired
const deletedFiles = await store.deleteExpired()
assert(deletedFiles === 2, `not all parts were deleted, deleted ${deletedFiles}`)

const files = await Promise.allSettled([
s3Client.getObject({
Bucket: s3Credentials.bucket,
Bucket: bucketName,
Key: uploadId + '.info',
}),
s3Client.getObject({
Bucket: s3Credentials.bucket,
Bucket: bucketName,
Key: uploadId + '.part',
}),
])
Expand Down
Loading