Skip to content

Commit

Permalink
refactor tar reading to use minitar
Browse files Browse the repository at this point in the history
  • Loading branch information
ezekg committed Nov 1, 2024
1 parent d13ba53 commit e7ad646
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 44 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ gem 'lograge'
gem 'aws-sdk-s3', '~> 1'
gem 'semverse'
gem 'compact_index'
gem 'minitar'

# Misc
gem 'null_association'
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ GEM
mime-types-data (3.2022.0105)
mini_mime (1.1.5)
mini_portile2 (2.8.7)
minitar (1.0.2)
minitest (5.25.1)
msgpack (1.7.2)
multi_json (1.15.0)
Expand Down Expand Up @@ -551,6 +552,7 @@ DEPENDENCIES
kaminari (~> 1.2.0)
listen (>= 3.8.0)
lograge
minitar
msgpack (~> 1.7)
nokogiri (~> 1.16.5)
null_association
Expand Down
16 changes: 8 additions & 8 deletions app/workers/process_docker_image_worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require 'rubygems/package'
require 'minitar'
require 'zlib'

class ProcessDockerImageWorker < BaseWorker
MIN_TARBALL_SIZE = 5.bytes # to avoid processing empty or invalid tarballs
Expand All @@ -24,12 +25,12 @@ def perform(artifact_id)
tgz = client.get_object(bucket: artifact.bucket, key: artifact.key)
.body

# gunzip and untar the image tarball
# unpack the package tarball
tar = gunzip(tgz)

untar tar do |archive|
unpack tar do |archive|
archive.each do |entry|
case entry.full_name
case entry.name
in 'manifest.json'
raise ImageNotAcceptableError, 'manifest must be a manifest.json file' unless
entry.file?
Expand All @@ -48,7 +49,7 @@ def perform(artifact_id)
content: json,
)
in %r{^blobs/sha256/} if entry.file?
key = artifact.key_for(entry.full_name)
key = artifact.key_for(entry.name)

# skip if already uploaded
next if
Expand All @@ -73,9 +74,8 @@ def perform(artifact_id)
resource: artifact,
)
rescue ImageNotAcceptableError,
Gem::Package::FormatError,
Zlib::GzipFile::Error,
Zlib::Error,
Minitar::Error,
IOError => e
Keygen.logger.warn { "[workers.process-docker-image-worker] Error: #{e.class.name} - #{e.message}" }

Expand All @@ -91,7 +91,7 @@ def perform(artifact_id)
private

def gunzip(io) = Zlib::GzipReader.new(io)
def untar(io, &) = Gem::Package::TarReader.new(io, &)
def unpack(io, &) = Minitar::Reader.open(io, &)

class ImageNotAcceptableError < StandardError
def backtrace = nil # silence backtrace
Expand Down
58 changes: 34 additions & 24 deletions app/workers/process_npm_package_worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require 'rubygems/package'
require 'minitar'
require 'zlib'

class ProcessNpmPackageWorker < BaseWorker
MIN_TARBALL_SIZE = 5.bytes # to avoid processing empty or invalid tarballs
Expand All @@ -24,31 +25,41 @@ def perform(artifact_id)
tgz = client.get_object(bucket: artifact.bucket, key: artifact.key)
.body

# gunzip and untar the package tarball
# unpack the package tarball
tar = gunzip(tgz)

untar tar do |archive|
# NOTE(ezekg) npm prefixes everything in the archive with package/
archive.seek('package/package.json') do |entry|
raise PackageNotAcceptableError, 'manifest must be a package.json file' unless
entry.file?

raise PackageNotAcceptableError, 'manifest is too big' if
entry.size > MAX_MANIFEST_SIZE

# the manifest is already in json format
json = entry.read

ReleaseManifest.create!(
account_id: artifact.account_id,
environment_id: artifact.environment_id,
release_id: artifact.release_id,
release_artifact_id: artifact.id,
content: json,
)
unpack tar do |archive|
archive.each do |entry|
# NOTE(ezekg) npm prefixes everything in the archive with package/
case entry.name
in 'package/package.json'
raise PackageNotAcceptableError, 'manifest must be a package.json file' unless
entry.file?

raise PackageNotAcceptableError, 'manifest is too big' if
entry.size > MAX_MANIFEST_SIZE

# the manifest is already in json format
json = entry.read

ReleaseManifest.create!(
account_id: artifact.account_id,
environment_id: artifact.environment_id,
release_id: artifact.release_id,
release_artifact_id: artifact.id,
content: json,
)

# all we need
break
else
end
end
end

# not sure why GzipReader#open doesn't take an io?
tar.close

artifact.update!(status: 'UPLOADED')

BroadcastEventService.call(
Expand All @@ -57,9 +68,8 @@ def perform(artifact_id)
resource: artifact,
)
rescue PackageNotAcceptableError,
Gem::Package::FormatError,
Zlib::GzipFile::Error,
Zlib::Error,
Minitar::Error,
IOError => e
Keygen.logger.warn { "[workers.process-npm-package-worker] Error: #{e.class.name} - #{e.message}" }

Expand All @@ -75,7 +85,7 @@ def perform(artifact_id)
private

def gunzip(io) = Zlib::GzipReader.new(io)
def untar(io, &) = Gem::Package::TarReader.new(io, &)
def unpack(io, &) = Minitar::Reader.open(io, &)

class PackageNotAcceptableError < StandardError
def backtrace = nil # silence backtrace
Expand Down
11 changes: 5 additions & 6 deletions spec/factories/release_manifest.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

require 'rubygems/package'
require 'minitar'
require 'zlib'

FactoryBot.define do
factory :release_manifest, aliases: %i[manifest] do
Expand All @@ -27,12 +29,9 @@
content {
tgz = file_fixture('hello-2.0.0.tgz').read
tar = Zlib::GzipReader.new(tgz)
pkg = nil

Gem::Package::TarReader.new(tar) do |archive|
archive.seek('package/package.json') do |entry|
pkg = entry.read
end
pkg = Minitar::Reader.open tar do |archive|
archive.find { _1.name in 'package/package.json' }
.read
end

pkg
Expand Down
12 changes: 6 additions & 6 deletions spec/workers/process_npm_package_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
require 'rails_helper'
require 'spec_helper'

require 'minitar'
require 'zlib'

describe ProcessNpmPackageWorker do
let(:account) { create(:account) }

Expand All @@ -15,12 +18,9 @@
let(:package) { file_fixture('hello-2.0.0.tgz').open }
let(:package_json) {
tar = Zlib::GzipReader.new(package)
pkg = nil

Gem::Package::TarReader.new(tar) do |archive|
archive.seek('package/package.json') do |entry|
pkg = entry.read
end
pkg = Minitar::Reader.open tar do |archive|
archive.find { _1.name in 'package/package.json' }
.read
end

pkg
Expand Down
146 changes: 146 additions & 0 deletions spec/workers/wait_for_artifact_upload_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,152 @@
end
end
end

context 'when artifact is an npm package' do
let(:artifact) { create(:artifact, :npm_package, :waiting, account:) }
let(:processor) { ProcessNpmPackageWorker }

before do
Aws.config[:s3][:stub_responses][:get_object] = [{ body: file_fixture('hello-2.0.0.tgz').open }]
end

after do
processor.clear
end

it 'should emit processing and succeeded events' do
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.processing') }.exactly(1).time
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.succeeded') }.exactly(1).time

subject.perform_async(artifact.id)
subject.drain

processor.drain
end

it 'should process package' do
subject.perform_async(artifact.id)

expect { subject.drain }.to change { processor.jobs.size }.from(0).to(1)
.and change { artifact.reload.status }.from('WAITING').to('PROCESSING')

expect { processor.drain }.to change { artifact.reload.status }.from('PROCESSING').to('UPLOADED')

expect(artifact.reload.manifest).to_not be nil
end

context 'when gem is invalid' do
let(:artifact) { create(:artifact, :npm_package, :waiting, account:) }
let(:processor) { ProcessNpmPackageWorker }

before do
Aws.config[:s3][:stub_responses][:get_object] = [{ body: file_fixture('invalid-2.0.0.tgz').open }]
end

after do
processor.clear
end

it 'should emit processing and failed events' do
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.processing') }.exactly(1).time
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.failed') }.exactly(1).time

subject.perform_async(artifact.id)
subject.drain

processor.drain
end

it 'should fail processing package' do
subject.perform_async(artifact.id)

expect { subject.drain }.to change { processor.jobs.size }.from(0).to(1)
.and change { artifact.reload.status }.from('WAITING').to('PROCESSING')

expect { processor.drain }.to change { artifact.reload.status }.from('PROCESSING').to('FAILED')

expect(artifact.reload.manifest).to be nil
end
end

context 'when gem is too small' do
let(:artifact) { create(:artifact, :npm_package, :waiting, account:) }
let(:processor) { ProcessNpmPackageWorker }

before do
Aws.config[:s3] = {
stub_responses: {
head_object: [{ content_length: 2.bytes.to_i, content_type: 'application/octet-stream', etag: '"14bfa6bb14875e45bba028a21ed38046"' }],
get_object: [{ body: file_fixture('hello-2.0.0.tgz').open }],
},
}
end

after do
processor.clear
end

it 'should emit processing and failed events' do
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.processing') }.exactly(1).time
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.failed') }.exactly(1).time

subject.perform_async(artifact.id)
subject.drain

processor.drain
end

it 'should fail processing package' do
subject.perform_async(artifact.id)

expect { subject.drain }.to change { processor.jobs.size }.from(0).to(1)
.and change { artifact.reload.status }.from('WAITING').to('PROCESSING')

expect { processor.drain }.to change { artifact.reload.status }.from('PROCESSING').to('FAILED')

expect(artifact.reload.manifest).to be nil
end
end

context 'when gem is too large' do
let(:artifact) { create(:artifact, :npm_package, :waiting, account:) }
let(:processor) { ProcessNpmPackageWorker }

before do
Aws.config[:s3] = {
stub_responses: {
head_object: [{ content_length: 1.gigabyte.to_i, content_type: 'application/octet-stream', etag: '"14bfa6bb14875e45bba028a21ed38046"' }],
get_object: [{ body: file_fixture('hello-2.0.0.tgz').open }],
},
}
end

after do
processor.clear
end

it 'should emit processing and succeeded events' do
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.processing') }.exactly(1).time
expect(BroadcastEventService).to receive(:call) { expect(_1).to include(event: 'artifact.upload.failed') }.exactly(1).time

subject.perform_async(artifact.id)
subject.drain

processor.drain
end

it 'should fail processing package' do
subject.perform_async(artifact.id)

expect { subject.drain }.to change { processor.jobs.size }.from(0).to(1)
.and change { artifact.reload.status }.from('WAITING').to('PROCESSING')

expect { processor.drain }.to change { artifact.reload.status }.from('PROCESSING').to('FAILED')

expect(artifact.reload.manifest).to be nil
end
end
end
end

context 'when an artifact is processing' do
Expand Down

0 comments on commit e7ad646

Please sign in to comment.