Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel process log capture #1423

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/mutant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ module Mutant
require 'mutant/bootstrap'
require 'mutant/version'
require 'mutant/env'
require 'mutant/pipe'
require 'mutant/util'
require 'mutant/registry'
require 'mutant/ast'
Expand Down Expand Up @@ -102,6 +101,8 @@ module Mutant
require 'mutant/isolation/fork'
require 'mutant/isolation/none'
require 'mutant/parallel'
require 'mutant/parallel/connection'
require 'mutant/parallel/pipe'
require 'mutant/parallel/driver'
require 'mutant/parallel/source'
require 'mutant/parallel/worker'
Expand Down
3 changes: 2 additions & 1 deletion lib/mutant/mutation/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def self.run_mutation_analysis(env)
private_class_method :run_mutation_analysis

def self.async_driver(env)
Parallel.async(env.world, mutation_test_config(env))
Parallel.async(world: env.world, config: mutation_test_config(env))
end
private_class_method :async_driver

Expand All @@ -47,6 +47,7 @@ def self.mutation_test_config(env)
process_name: 'mutant-worker-process',
sink: Sink.new(env: env),
source: Parallel::Source::Array.new(jobs: env.mutations.each_index.to_a),
timeout: nil,
thread_name: 'mutant-worker-thread'
)
end
Expand Down
10 changes: 7 additions & 3 deletions lib/mutant/mutation/runner/sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Mutant
class Mutation
module Runner
class Sink
include Parallel::Sink

include Anima.new(:env)

# Initialize object
Expand Down Expand Up @@ -35,11 +37,13 @@ def stop?

# Handle mutation finish
#
# @param [Result::MutationIndex] mutation_index_result
# @param [Parallel::Response] response
#
# @return [self]
def result(mutation_index_result)
mutation_result = mutation_result(mutation_index_result)
def response(response)
fail response.error if response.error

mutation_result = mutation_result(response.result)

subject = mutation_result.mutation.subject

Expand Down
20 changes: 14 additions & 6 deletions lib/mutant/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Parallel
# @param [Config] config
#
# @return [Driver]
def self.async(world, config)
def self.async(config:, world:)
shared = shared_state(world, config)

world.process_warmup
Expand All @@ -23,19 +23,22 @@ def self.async(world, config)
)
end

# rubocop:disable Metric/MethodLength
def self.workers(world, config, shared)
Array.new(config.jobs) do |index|
Worker.start(
block: config.block,
index: index,
on_process_start: config.on_process_start,
process_name: "#{config.process_name}-#{index}",
timeout: config.timeout,
world: world,
**shared
)
end
end
private_class_method :workers
# rubocop:enable Metric/MethodLength

def self.shared_state(world, config)
{
Expand Down Expand Up @@ -69,16 +72,16 @@ def self.shared(klass, world, **attributes)
end
private_class_method :shared

# Job result sink
class Sink
# Job result sink signature
module Sink
include AbstractType

# Process job result
#
# @param [Object]
# @param [Response]
#
# @return [self]
abstract_method :result
abstract_method :response

# The sink status
#
Expand All @@ -100,10 +103,15 @@ class Config
:process_name,
:sink,
:source,
:thread_name
:thread_name,
:timeout
)
end # Config

class Response
include Anima.new(:error, :log, :result)
end

# Parallel execution status
class Status
include Adamantium, Anima.new(
Expand Down
177 changes: 177 additions & 0 deletions lib/mutant/parallel/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# frozen_string_literal: true

module Mutant
module Parallel
class Connection
include Anima.new(:marshal, :reader, :writer)

Error = Class.new(RuntimeError)

HEADER_FORMAT = 'N'
HEADER_SIZE = 4
MAX_BYTES = (2**32).pred

class Reader
include Anima.new(:deadline, :io, :marshal, :response_reader, :log_reader)

private(*anima.attribute_names)

private_class_method :new

attr_reader :log

def error
@errors.first
end

def result
@results.first
end

def initialize(*)
super

@buffer = +''
@log = +''

# Array of size max 1 as surrogate for
# terrible default nil ivars.
@errors = []
@lengths = []
@results = []
end

def self.read_response(**attributes)
reader = new(**attributes).read_till_final

Response.new(
log: reader.log,
error: reader.error,
result: reader.result
)
end

# rubocop:disable Metrics/MethodLength
def read_till_final
readers = [response_reader, log_reader]

until result || error
status = deadline.status

break timeout unless status.ok?

reads, _others = io.select(readers, nil, nil, status.time_left)

break timeout unless reads

reads.each do |ready|
if ready.equal?(response_reader)
advance_result
else
advance_log
end
end
end

self
end
# rubocop:enable Metrics/MethodLength

private

def timeout
@errors << Timeout
end

def advance_result
if length
if read_buffer(length)
@results << marshal.load(@buffer)
end
elsif read_buffer(HEADER_SIZE)
@lengths << Util.one(@buffer.unpack(HEADER_FORMAT))
@buffer = +''
end
end

def length
@lengths.first
end

def advance_log
with_nonblock_read(io: log_reader, max_bytes: 4096, &log.public_method(:<<))
end

def read_buffer(max_bytes)
with_nonblock_read(
io: response_reader,
max_bytes: max_bytes - @buffer.bytesize
) do |chunk|
@buffer << chunk
@buffer.bytesize.equal?(max_bytes)
end
end

# rubocop:disable Metrics/MethodLength
def with_nonblock_read(io:, max_bytes:)
io.binmode

chunk = io.read_nonblock(max_bytes, exception: false)

case chunk
when nil
@errors << EOFError
false
when String
yield chunk
else
fail "Unexpected nonblocking read return: #{chunk.inspect}"
end
end
# rubocop:enable Metrics/MethodLength
end

class Frame
include Anima.new(:io)

def receive_value
read(Util.one(read(HEADER_SIZE).unpack(HEADER_FORMAT)))
end

def send_value(body)
bytesize = body.bytesize

fail Error, 'message to big' if bytesize > MAX_BYTES

io.binmode
io.write([bytesize].pack(HEADER_FORMAT))
io.write(body)
end

private

def read(bytes)
io.binmode
io.read(bytes) or fail Error, 'Unexpected EOF'
end
end

def receive_value
marshal.load(reader.receive_value)
end

def send_value(value)
writer.send_value(marshal.dump(value))
self
end

def self.from_pipes(marshal:, reader:, writer:)
new(
marshal: marshal,
reader: Frame.new(io: reader.to_reader),
writer: Frame.new(io: writer.to_writer)
)
end
end # Connection
end # Parallel
end # Mutant
39 changes: 39 additions & 0 deletions lib/mutant/parallel/pipe.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

module Mutant
module Parallel
class Pipe
include Adamantium, Anima.new(:reader, :writer)

# Run block with pipe in binmode
#
# @return [undefined]
def self.with(io)
io.pipe(binmode: true) do |(reader, writer)|
yield new(reader: reader, writer: writer)
end
end

def self.from_io(io)
reader, writer = io.pipe(binmode: true)
new(reader: reader, writer: writer)
end

# Writer end of the pipe
#
# @return [IO]
def to_writer
reader.close
writer
end

# Parent reader end of the pipe
#
# @return [IO]
def to_reader
writer.close
reader
end
end # Pipe
end # Parallel
end # Mutant
Loading