From 92ce923ccbb6c48b5ae5e937de18108921205c71 Mon Sep 17 00:00:00 2001 From: Markus Schirp Date: Fri, 1 Mar 2024 14:07:21 +0000 Subject: [PATCH] Add parallel output process capture --- lib/mutant.rb | 3 +- lib/mutant/mutation/runner.rb | 3 +- lib/mutant/mutation/runner/sink.rb | 10 +- lib/mutant/parallel.rb | 20 +- lib/mutant/parallel/connection.rb | 177 +++++++++++ lib/mutant/parallel/pipe.rb | 39 +++ lib/mutant/parallel/worker.rb | 55 +++- lib/mutant/pipe.rb | 96 ------ spec/integration/mutant/parallel_spec.rb | 243 +++++++++++++++ spec/unit/mutant/mutation/runner/sink_spec.rb | 58 +++- spec/unit/mutant/mutation/runner_spec.rb | 7 +- .../mutant/parallel/connection/reader_spec.rb | 255 ++++++++++++++++ .../{pipe => parallel}/connection_spec.rb | 25 +- spec/unit/mutant/{ => parallel}/pipe_spec.rb | 2 +- spec/unit/mutant/parallel/worker_spec.rb | 284 ++++++++++++++---- spec/unit/mutant/parallel_spec.rb | 6 +- 16 files changed, 1062 insertions(+), 221 deletions(-) create mode 100644 lib/mutant/parallel/connection.rb create mode 100644 lib/mutant/parallel/pipe.rb delete mode 100644 lib/mutant/pipe.rb create mode 100644 spec/integration/mutant/parallel_spec.rb create mode 100644 spec/unit/mutant/parallel/connection/reader_spec.rb rename spec/unit/mutant/{pipe => parallel}/connection_spec.rb (92%) rename spec/unit/mutant/{ => parallel}/pipe_spec.rb (97%) diff --git a/lib/mutant.rb b/lib/mutant.rb index 138252509..4c6523f33 100644 --- a/lib/mutant.rb +++ b/lib/mutant.rb @@ -74,7 +74,6 @@ module Mutant require 'mutant/bootstrap' require 'mutant/version' require 'mutant/env' - require 'mutant/pipe' require 'mutant/util' require 'mutant/registry' require 'mutant/ast' @@ -102,6 +101,8 @@ module Mutant require 'mutant/isolation/fork' require 'mutant/isolation/none' require 'mutant/parallel' + require 'mutant/parallel/connection' + require 'mutant/parallel/pipe' require 'mutant/parallel/driver' require 'mutant/parallel/source' require 'mutant/parallel/worker' diff --git a/lib/mutant/mutation/runner.rb b/lib/mutant/mutation/runner.rb index 701274f22..fae126f73 100644 --- a/lib/mutant/mutation/runner.rb +++ b/lib/mutant/mutation/runner.rb @@ -22,7 +22,7 @@ def self.run_mutation_analysis(env) private_class_method :run_mutation_analysis def self.async_driver(env) - Parallel.async(env.world, mutation_test_config(env)) + Parallel.async(world: env.world, config: mutation_test_config(env)) end private_class_method :async_driver @@ -47,6 +47,7 @@ def self.mutation_test_config(env) process_name: 'mutant-worker-process', sink: Sink.new(env: env), source: Parallel::Source::Array.new(jobs: env.mutations.each_index.to_a), + timeout: nil, thread_name: 'mutant-worker-thread' ) end diff --git a/lib/mutant/mutation/runner/sink.rb b/lib/mutant/mutation/runner/sink.rb index 4d5d2557a..1201d6d5b 100644 --- a/lib/mutant/mutation/runner/sink.rb +++ b/lib/mutant/mutation/runner/sink.rb @@ -4,6 +4,8 @@ module Mutant class Mutation module Runner class Sink + include Parallel::Sink + include Anima.new(:env) # Initialize object @@ -35,11 +37,13 @@ def stop? # Handle mutation finish # - # @param [Result::MutationIndex] mutation_index_result + # @param [Parallel::Response] response # # @return [self] - def result(mutation_index_result) - mutation_result = mutation_result(mutation_index_result) + def response(response) + fail response.error if response.error + + mutation_result = mutation_result(response.result) subject = mutation_result.mutation.subject diff --git a/lib/mutant/parallel.rb b/lib/mutant/parallel.rb index 60761bf38..2a524643c 100644 --- a/lib/mutant/parallel.rb +++ b/lib/mutant/parallel.rb @@ -9,7 +9,7 @@ module Parallel # @param [Config] config # # @return [Driver] - def self.async(world, config) + def self.async(config:, world:) shared = shared_state(world, config) world.process_warmup @@ -23,6 +23,7 @@ def self.async(world, config) ) end + # rubocop:disable Metric/MethodLength def self.workers(world, config, shared) Array.new(config.jobs) do |index| Worker.start( @@ -30,12 +31,14 @@ def self.workers(world, config, shared) index: index, on_process_start: config.on_process_start, process_name: "#{config.process_name}-#{index}", + timeout: config.timeout, world: world, **shared ) end end private_class_method :workers + # rubocop:enable Metric/MethodLength def self.shared_state(world, config) { @@ -69,16 +72,16 @@ def self.shared(klass, world, **attributes) end private_class_method :shared - # Job result sink - class Sink + # Job result sink signature + module Sink include AbstractType # Process job result # - # @param [Object] + # @param [Response] # # @return [self] - abstract_method :result + abstract_method :response # The sink status # @@ -100,10 +103,15 @@ class Config :process_name, :sink, :source, - :thread_name + :thread_name, + :timeout ) end # Config + class Response + include Anima.new(:error, :log, :result) + end + # Parallel execution status class Status include Adamantium, Anima.new( diff --git a/lib/mutant/parallel/connection.rb b/lib/mutant/parallel/connection.rb new file mode 100644 index 000000000..cf1b62584 --- /dev/null +++ b/lib/mutant/parallel/connection.rb @@ -0,0 +1,177 @@ +# frozen_string_literal: true + +module Mutant + module Parallel + class Connection + include Anima.new(:marshal, :reader, :writer) + + Error = Class.new(RuntimeError) + + HEADER_FORMAT = 'N' + HEADER_SIZE = 4 + MAX_BYTES = (2**32).pred + + class Reader + include Anima.new(:deadline, :io, :marshal, :response_reader, :log_reader) + + private(*anima.attribute_names) + + private_class_method :new + + attr_reader :log + + def error + @errors.first + end + + def result + @results.first + end + + def initialize(*) + super + + @buffer = +'' + @log = +'' + + # Array of size max 1 as surrogate for + # terrible default nil ivars. + @errors = [] + @lengths = [] + @results = [] + end + + def self.read_response(**attributes) + reader = new(**attributes).read_till_final + + Response.new( + log: reader.log, + error: reader.error, + result: reader.result + ) + end + + # rubocop:disable Metrics/MethodLength + def read_till_final + readers = [response_reader, log_reader] + + until result || error + status = deadline.status + + break timeout unless status.ok? + + reads, _others = io.select(readers, nil, nil, status.time_left) + + break timeout unless reads + + reads.each do |ready| + if ready.equal?(response_reader) + advance_result + else + advance_log + end + end + end + + self + end + # rubocop:enable Metrics/MethodLength + + private + + def timeout + @errors << Timeout + end + + def advance_result + if length + if read_buffer(length) + @results << marshal.load(@buffer) + end + elsif read_buffer(HEADER_SIZE) + @lengths << Util.one(@buffer.unpack(HEADER_FORMAT)) + @buffer = +'' + end + end + + def length + @lengths.first + end + + def advance_log + with_nonblock_read(io: log_reader, max_bytes: 4096, &log.public_method(:<<)) + end + + def read_buffer(max_bytes) + with_nonblock_read( + io: response_reader, + max_bytes: max_bytes - @buffer.bytesize + ) do |chunk| + @buffer << chunk + @buffer.bytesize.equal?(max_bytes) + end + end + + # rubocop:disable Metrics/MethodLength + def with_nonblock_read(io:, max_bytes:) + io.binmode + + chunk = io.read_nonblock(max_bytes, exception: false) + + case chunk + when nil + @errors << EOFError + false + when String + yield chunk + else + fail "Unexpected nonblocking read return: #{chunk.inspect}" + end + end + # rubocop:enable Metrics/MethodLength + end + + class Frame + include Anima.new(:io) + + def receive_value + read(Util.one(read(HEADER_SIZE).unpack(HEADER_FORMAT))) + end + + def send_value(body) + bytesize = body.bytesize + + fail Error, 'message to big' if bytesize > MAX_BYTES + + io.binmode + io.write([bytesize].pack(HEADER_FORMAT)) + io.write(body) + end + + private + + def read(bytes) + io.binmode + io.read(bytes) or fail Error, 'Unexpected EOF' + end + end + + def receive_value + marshal.load(reader.receive_value) + end + + def send_value(value) + writer.send_value(marshal.dump(value)) + self + end + + def self.from_pipes(marshal:, reader:, writer:) + new( + marshal: marshal, + reader: Frame.new(io: reader.to_reader), + writer: Frame.new(io: writer.to_writer) + ) + end + end # Connection + end # Parallel +end # Mutant diff --git a/lib/mutant/parallel/pipe.rb b/lib/mutant/parallel/pipe.rb new file mode 100644 index 000000000..d4010a158 --- /dev/null +++ b/lib/mutant/parallel/pipe.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +module Mutant + module Parallel + class Pipe + include Adamantium, Anima.new(:reader, :writer) + + # Run block with pipe in binmode + # + # @return [undefined] + def self.with(io) + io.pipe(binmode: true) do |(reader, writer)| + yield new(reader: reader, writer: writer) + end + end + + def self.from_io(io) + reader, writer = io.pipe(binmode: true) + new(reader: reader, writer: writer) + end + + # Writer end of the pipe + # + # @return [IO] + def to_writer + reader.close + writer + end + + # Parent reader end of the pipe + # + # @return [IO] + def to_reader + writer.close + reader + end + end # Pipe + end # Parallel +end # Mutant diff --git a/lib/mutant/parallel/worker.rb b/lib/mutant/parallel/worker.rb index 691245e5d..60ba2e2e7 100644 --- a/lib/mutant/parallel/worker.rb +++ b/lib/mutant/parallel/worker.rb @@ -9,6 +9,7 @@ class Config :index, :on_process_start, :process_name, + :timeout, :var_active_jobs, :var_final, :var_running, @@ -18,38 +19,49 @@ class Config ) end - include Adamantium, Anima.new(:connection, :config, :pid) + include Adamantium, Anima.new(:config, :connection, :log_reader, :pid, :response_reader) def self.start(**attributes) start_config(Config.new(**attributes)) end # rubocop:disable Metrics/MethodLength + # rubocop:disable Metrics/AbcSize def self.start_config(config) world = config.world io = world.io marshal = world.marshal - request = Pipe.from_io(io) - response = Pipe.from_io(io) + log, request, response = Pipe.from_io(io), Pipe.from_io(io), Pipe.from_io(io) pid = world.process.fork do + log_writer = log.to_writer + + world.stderr.reopen(log_writer) + world.stdout.reopen(log_writer) + run_child( config: config, - connection: Pipe::Connection.from_pipes(marshal: marshal, reader: request, writer: response) + connection: Connection.from_pipes(marshal: marshal, reader: request, writer: response), + log_writer: log_writer ) end + connection = Connection.from_pipes(marshal: marshal, reader: response, writer: request) + new( - pid: pid, - config: config, - connection: Pipe::Connection.from_pipes(marshal: marshal, reader: response, writer: request) + config: config, + connection: connection, + log_reader: log.to_reader, + response_reader: connection.reader.io, + pid: pid ) end private_class_method :start_config + # rubocop:enable Metrics/AbcSize # rubocop:enable Metrics/MethodLength - def self.run_child(config:, connection:) + def self.run_child(config:, connection:, log_writer:) world = config.world world.thread.current.name = config.process_name @@ -58,7 +70,9 @@ def self.run_child(config:, connection:) config.on_process_start.call(index: config.index) loop do - connection.send_value(config.block.call(connection.receive_value)) + value = config.block.call(connection.receive_value) + log_writer.flush + connection.send_value(value) end end private_class_method :run_child @@ -67,26 +81,39 @@ def index config.index end - # Run worker payload + # Run worker loop # # @return [self] + # + # rubocop:disable Metrics/MethodLength + # rubocop:disable Metrics/AbcSize def call loop do job = next_job or break job_start(job) - result = connection.call(job.payload) + connection.send_value(job.payload) + + response = Connection::Reader.read_response( + deadline: config.world.deadline(config.timeout), + io: config.world.io, + log_reader: log_reader, + marshal: config.world.marshal, + response_reader: response_reader + ) job_done(job) - break if add_result(result) + break if add_response(response) || response.error end finalize self end + # rubocop:enable Metrics/AbcSize + # rubocop:enable Metrics/MethodLength def signal process.kill('TERM', pid) @@ -110,9 +137,9 @@ def next_job end end - def add_result(result) + def add_response(response) config.var_sink.with do |sink| - sink.result(result) + sink.response(response) sink.stop? end end diff --git a/lib/mutant/pipe.rb b/lib/mutant/pipe.rb deleted file mode 100644 index 89823643d..000000000 --- a/lib/mutant/pipe.rb +++ /dev/null @@ -1,96 +0,0 @@ -# frozen_string_literal: true - -module Mutant - # Pipe abstraction - class Pipe - include Adamantium, Anima.new(:reader, :writer) - - # Run block with pipe in binmode - # - # @return [undefined] - def self.with(io) - io.pipe(binmode: true) do |(reader, writer)| - yield new(reader: reader, writer: writer) - end - end - - def self.from_io(io) - reader, writer = io.pipe(binmode: true) - new(reader: reader, writer: writer) - end - - # Writer end of the pipe - # - # @return [IO] - def to_writer - reader.close - writer - end - - # Parent reader end of the pipe - # - # @return [IO] - def to_reader - writer.close - reader - end - - class Connection - include Anima.new(:marshal, :reader, :writer) - - Error = Class.new(RuntimeError) - - class Frame - include Anima.new(:io) - - HEADER_FORMAT = 'N' - MAX_BYTES = (2**32).pred - HEADER_SIZE = 4 - - def receive_value - header = read(HEADER_SIZE) - read(Util.one(header.unpack(HEADER_FORMAT))) - end - - def send_value(body) - bytesize = body.bytesize - - fail Error, 'message to big' if bytesize > MAX_BYTES - - io.binmode - io.write([bytesize].pack(HEADER_FORMAT)) - io.write(body) - end - - private - - def read(bytes) - io.binmode - io.read(bytes) or fail Error, 'Unexpected EOF' - end - end - - def call(payload) - send_value(payload) - receive_value - end - - def receive_value - marshal.load(reader.receive_value) - end - - def send_value(value) - writer.send_value(marshal.dump(value)) - self - end - - def self.from_pipes(marshal:, reader:, writer:) - new( - marshal: marshal, - reader: Frame.new(io: reader.to_reader), - writer: Frame.new(io: writer.to_writer) - ) - end - end - end # Pipe -end # Mutant diff --git a/spec/integration/mutant/parallel_spec.rb b/spec/integration/mutant/parallel_spec.rb new file mode 100644 index 000000000..5ea540936 --- /dev/null +++ b/spec/integration/mutant/parallel_spec.rb @@ -0,0 +1,243 @@ +# frozen_string_literal: true + +RSpec.describe 'parallel', mutant: false do + let(:sink_class) do + Class.new do + include Mutant::Parallel::Sink + + def initialize + @responses = [] + end + + def response(response) + @responses << response + end + + def stop? + false + end + + def status + @responses + end + end + end + + specify 'trivial' do + sink = sink_class.new + + config = Mutant::Parallel::Config.new( + block: ->(value) { puts("Payload: #{value}"); value * 2 }, + jobs: 1, + on_process_start: ->(index:) { puts("Booting: #{index}") }, + process_name: 'test-parallel-process', + sink: sink, + source: Mutant::Parallel::Source::Array.new(jobs: [1, 2, 3]), + thread_name: 'test-parallel-thread', + timeout: 1.0 + ) + + driver = Mutant::Parallel.async( + config: config, + world: Mutant::WORLD + ) + + loop do + status = driver.wait_timeout(0.1) + break if status.done? + end + + expect(sink.status).to eql( + [ + Mutant::Parallel::Response.new( + error: nil, + output: "Booting: 0\nPayload: 1\n", + result: 2 + ), + Mutant::Parallel::Response.new( + error: nil, + output: "Payload: 2\n", + result: 4 + ), + Mutant::Parallel::Response.new( + error: nil, + output: "Payload: 3\n", + result: 6 + ) + ] + ) + end + + specify 'crashing' do + sink = sink_class.new + + config = Mutant::Parallel::Config.new( + block: ->(value) { fail if value.equal?(2) }, + jobs: 1, + on_process_start: ->(index:) {}, + process_name: 'test-parallel-process', + sink: sink, + source: Mutant::Parallel::Source::Array.new(jobs: [1, 2, 3]), + thread_name: 'test-parallel-thread', + timeout: 1.0 + ) + + driver = Mutant::Parallel.async( + config: config, + world: Mutant::WORLD + ) + + loop do + status = driver.wait_timeout(0.1) + break if status.done? + end + + responses = sink.status + + expect(responses.length).to be(2) + + response_a, response_b = responses + + expect(response_a).to eql(Mutant::Parallel::Response.new(output: '', result: nil, error: nil)) + expect(response_b.error).to be(EOFError) + expect(response_b.result).to be(nil) + expect(response_b.output.match?('
')).to be(true) + end + + specify 'massive' do + b = '#' * (1024**2) * 10 + + sink = sink_class.new + + config = Mutant::Parallel::Config.new( + block: ->(value) { (b * value).tap(&method(:puts)) }, + jobs: 1, + on_process_start: ->(_) { puts b }, + process_name: 'test-parallel-process', + sink: sink, + source: Mutant::Parallel::Source::Array.new(jobs: [1, 2]), + thread_name: 'test-parallel-thread', + timeout: 1.0 + ) + + driver = Mutant::Parallel.async( + config: config, + world: Mutant::WORLD + ) + + loop do + status = driver.wait_timeout(0.1) + break if status.done? + end + + expect(sink.status).to eql( + [ + Mutant::Parallel::Response.new( + error: nil, + output: "#{b}\n#{b}\n", + result: b + ), + Mutant::Parallel::Response.new( + error: nil, + output: "#{b}#{b}\n", + result: b * 2 + ) + ] + ) + end + + specify 'chatty' do + sink = sink_class.new + + config = Mutant::Parallel::Config.new( + block: ->(value) { value }, + jobs: 1, + on_process_start: ->(_) { Thread.start { i = 0; loop { puts(""); } } }, + process_name: 'test-parallel-process', + sink: sink, + source: Mutant::Parallel::Source::Array.new(jobs: [1, 2, 3]), + thread_name: 'test-parallel-thread', + timeout: 1.0 + ) + + driver = Mutant::Parallel.async( + config: config, + world: Mutant::WORLD + ) + + loop do + status = driver.wait_timeout(0.1) + break if status.done? + end + + responses = sink.status + + expect(responses.length).to be(3) + + responses.each do |response| + expect(response.output.match?(//)).to be(true) + end + end + + specify 'many' do + sink = sink_class.new + + config = Mutant::Parallel::Config.new( + block: ->(value) { value }, + jobs: Etc.nprocessors, + on_process_start: ->(index:) {}, + process_name: 'test-parallel-process', + sink: sink, + source: Mutant::Parallel::Source::Array.new(jobs: Array.new(1000) { |value| value }), + thread_name: 'test-parallel-thread', + timeout: 1.0 + ) + + driver = Mutant::Parallel.async( + config: config, + world: Mutant::WORLD + ) + + loop do + status = driver.wait_timeout(0.1) + break if status.done? + end + + expect(sink.status.length).to be(1000) + end + + specify 'stuck' do + sink = sink_class.new + + config = Mutant::Parallel::Config.new( + block: ->(_value) { sleep }, + jobs: 1, + on_process_start: ->(index:) {}, + process_name: 'test-parallel-process', + sink: sink, + source: Mutant::Parallel::Source::Array.new(jobs: [1]), + thread_name: 'test-parallel-thread', + timeout: 1.0 + ) + + driver = Mutant::Parallel.async( + config: config, + world: Mutant::WORLD + ) + + loop do + status = driver.wait_timeout(0.1) + break if status.done? + end + + expect(sink.status).to eql( + [ + Mutant::Parallel::Response.new( + output: '', + result: nil, + error: Timeout + ) + ] + ) + end +end diff --git a/spec/unit/mutant/mutation/runner/sink_spec.rb b/spec/unit/mutant/mutation/runner/sink_spec.rb index ca868b125..62587a779 100644 --- a/spec/unit/mutant/mutation/runner/sink_spec.rb +++ b/spec/unit/mutant/mutation/runner/sink_spec.rb @@ -3,37 +3,63 @@ describe Mutant::Mutation::Runner::Sink do setup_shared_context + let(:mutation_a_index_response) do + Mutant::Parallel::Response.new( + result: mutation_a_index_result, + log: '', + error: nil + ) + end + + let(:mutation_b_index_response) do + Mutant::Parallel::Response.new( + result: mutation_b_index_result, + log: '', + error: nil + ) + end + shared_context 'one result' do before do - object.result(mutation_a_index_result) + object.response(mutation_a_index_response) end end shared_context 'two results' do before do - object.result(mutation_a_index_result) - object.result(mutation_b_index_result) + object.response(mutation_a_index_response) + object.response(mutation_b_index_response) end end let(:object) { described_class.new(env: env) } - describe '#result' do - subject { object.result(mutation_a_index_result) } - - it 'aggregates results in #status' do - subject - object.result(mutation_b_index_result) - expect(object.status).to eql( - Mutant::Result::Env.new( - env: env, - runtime: 0.0, - subject_results: [subject_a_result] + describe '#response' do + subject { object.response(mutation_a_index_response) } + + context 'on success' do + it 'aggregates results in #status' do + subject + object.response(mutation_b_index_response) + expect(object.status).to eql( + Mutant::Result::Env.new( + env: env, + runtime: 0.0, + subject_results: [subject_a_result] + ) ) - ) + end + + it_should_behave_like 'a command method' end - it_should_behave_like 'a command method' + context 'on error' do + let(:mutation_a_index_response) { super().with(error: EOFError) } + + it 're-raises the error' do + expect { subject }.to raise_error(EOFError) + end + end end describe '#status' do diff --git a/spec/unit/mutant/mutation/runner_spec.rb b/spec/unit/mutant/mutation/runner_spec.rb index 40345a74c..610aec366 100644 --- a/spec/unit/mutant/mutation/runner_spec.rb +++ b/spec/unit/mutant/mutation/runner_spec.rb @@ -51,7 +51,8 @@ process_name: 'mutant-worker-process', sink: described_class::Sink.new(env: env), source: Mutant::Parallel::Source::Array.new(jobs: env.mutations.each_index.to_a), - thread_name: 'mutant-worker-thread' + thread_name: 'mutant-worker-thread', + timeout: nil ) end @@ -93,7 +94,7 @@ def apply { receiver: Mutant::Parallel, selector: :async, - arguments: [world, parallel_config], + arguments: [{ world: world, config: parallel_config }], reaction: { return: driver } }, { @@ -170,7 +171,7 @@ def apply { receiver: Mutant::Parallel, selector: :async, - arguments: [world, parallel_config], + arguments: [{ world: world, config: parallel_config }], reaction: { return: driver } }, { diff --git a/spec/unit/mutant/parallel/connection/reader_spec.rb b/spec/unit/mutant/parallel/connection/reader_spec.rb new file mode 100644 index 000000000..89844ccd5 --- /dev/null +++ b/spec/unit/mutant/parallel/connection/reader_spec.rb @@ -0,0 +1,255 @@ +# frozen_string_literal: true + +RSpec.describe Mutant::Parallel::Connection::Reader do + let(:deadline) { instance_double(Mutant::Timer::Deadline) } + let(:header_segment) { [result_segment.bytesize].pack('N') } + let(:io) { class_double(IO) } + let(:marshal) { class_double(Marshal) } + let(:log_reader) { instance_double(IO) } + let(:response_reader) { instance_double(IO) } + let(:result) { double('reader-result') } + let(:result_segment) { '' } + + describe '.read_response' do + def apply + described_class.read_response( + deadline: deadline, + io: io, + log_reader: log_reader, + marshal: marshal, + response_reader: response_reader + ) + end + + def binmode(io) + { + receiver: io, + selector: :binmode + } + end + + def read(io:, bytes:, chunk:) + { + receiver: io, + selector: :read_nonblock, + arguments: [bytes, { exception: false }], + reaction: { return: chunk } + } + end + + def select(ready) + { + receiver: io, + selector: :select, + arguments: [[response_reader, log_reader], nil, nil, 1.0], + reaction: { return: [ready] } + } + end + + def deadline_status(time_left: 1.0) + { + receiver: deadline, + selector: :status, + reaction: { return: Mutant::Timer::Deadline::Status.new(time_left: time_left) } + } + end + + def marshal_load + { + receiver: marshal, + selector: :load, + arguments: [result_segment], + reaction: { return: result } + } + end + + context 'on result' do + context 'with full reads' do + let(:raw_expectations) do + [ + deadline_status, + select([response_reader]), + binmode(response_reader), + read(io: response_reader, bytes: 4, chunk: header_segment), + deadline_status, + select([response_reader]), + binmode(response_reader), + read( + bytes: result_segment.bytesize, + chunk: result_segment, + io: response_reader + ), + marshal_load + ] + end + + it 'returns parallel result' do + verify_events do + expect(apply).to eql( + Mutant::Parallel::Response.new( + error: nil, + log: '', + result: result + ) + ) + end + end + end + + context 'with partial reads' do + let(:raw_expectations) do + [ + deadline_status, + select([response_reader]), + binmode(response_reader), + read(io: response_reader, bytes: 4, chunk: header_segment[0..1]), + deadline_status, + select([response_reader]), + binmode(response_reader), + read(io: response_reader, bytes: 2, chunk: header_segment[2..]), + deadline_status, + select([response_reader]), + binmode(response_reader), + read( + bytes: result_segment.bytesize, + chunk: result_segment, + io: response_reader + ), + marshal_load + ] + end + + it 'returns parallel result' do + verify_events do + expect(apply).to eql( + Mutant::Parallel::Response.new( + error: nil, + log: '', + result: result + ) + ) + end + end + end + end + + context 'on IO timeout with log' do + let(:raw_expectations) do + [ + deadline_status, + select([log_reader]), + binmode(log_reader), + read( + io: log_reader, + bytes: 4096, + chunk: '' + ), + deadline_status, + select(nil) + ] + end + + it 'returns parallel result' do + verify_events do + expect(apply).to eql( + Mutant::Parallel::Response.new( + error: Timeout, + log: '', + result: nil + ) + ) + end + end + end + + context 'on worker crash (eof)' do + let(:raw_expectations) do + [ + deadline_status, + select([response_reader]), + binmode(response_reader), + read( + bytes: 4, + io: response_reader, + chunk: nil + ) + ] + end + + it 'returns parallel result' do + verify_events do + expect(apply).to eql( + Mutant::Parallel::Response.new( + error: EOFError, + log: '', + result: nil + ) + ) + end + end + end + + context 'on IO timeout' do + let(:raw_expectations) do + [ + deadline_status, + select(nil) + ] + end + + it 'returns parallel result' do + verify_events do + expect(apply).to eql( + Mutant::Parallel::Response.new( + error: Timeout, + log: '', + result: nil + ) + ) + end + end + end + + context 'on CPU timeout' do + let(:raw_expectations) do + [ + deadline_status(time_left: 0) + ] + end + + it 'returns parallel result' do + verify_events do + expect(apply).to eql( + Mutant::Parallel::Response.new( + error: Timeout, + log: '', + result: nil + ) + ) + end + end + end + + context 'with future partial IO API changes' do + let(:raw_expectations) do + [ + deadline_status, + select([response_reader]), + binmode(response_reader), + { + receiver: response_reader, + selector: :read_nonblock, + arguments: [4, { exception: false }], + reaction: { return: :wait_readable } + } + ] + end + + it 'returns parallel result' do + verify_events do + expect { apply }.to raise_error(RuntimeError, 'Unexpected nonblocking read return: :wait_readable') + end + end + end + end +end diff --git a/spec/unit/mutant/pipe/connection_spec.rb b/spec/unit/mutant/parallel/connection_spec.rb similarity index 92% rename from spec/unit/mutant/pipe/connection_spec.rb rename to spec/unit/mutant/parallel/connection_spec.rb index d57e33fdb..2993331b2 100644 --- a/spec/unit/mutant/pipe/connection_spec.rb +++ b/spec/unit/mutant/parallel/connection_spec.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -RSpec.describe Mutant::Pipe::Connection do +RSpec.describe Mutant::Parallel::Connection do let(:marshal) { class_double(Marshal) } let(:request) { 1 } let(:request_bytes) { 'request-bytes' } @@ -9,7 +9,7 @@ let(:pipe_a) do instance_double( - Mutant::Pipe, + Mutant::Parallel::Pipe, to_reader: instance_double(IO, :reader_a), to_writer: instance_double(IO, :writer_a) ) @@ -17,7 +17,7 @@ let(:pipe_b) do instance_double( - Mutant::Pipe, + Mutant::Parallel::Pipe, to_reader: instance_double(IO, :reader_b), to_writer: instance_double(IO, :writer_b) ) @@ -191,25 +191,6 @@ def apply end end - describe '#call' do - def apply - object.call(request) - end - - let(:raw_expectations) do - [ - *send_request, - *receive_response - ] - end - - it 'performs expected events' do - verify_events do - expect(apply).to eql(response) - end - end - end - describe '.from_pipes' do it 'returns expected connection' do expect(object).to eql( diff --git a/spec/unit/mutant/pipe_spec.rb b/spec/unit/mutant/parallel/pipe_spec.rb similarity index 97% rename from spec/unit/mutant/pipe_spec.rb rename to spec/unit/mutant/parallel/pipe_spec.rb index 13bb8c1fb..4a31fa9c4 100644 --- a/spec/unit/mutant/pipe_spec.rb +++ b/spec/unit/mutant/parallel/pipe_spec.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -RSpec.describe Mutant::Pipe do +RSpec.describe Mutant::Parallel::Pipe do let(:io) { class_double(IO) } let(:reader) { instance_double(IO, :a) } let(:writer) { instance_double(IO, :a) } diff --git a/spec/unit/mutant/parallel/worker_spec.rb b/spec/unit/mutant/parallel/worker_spec.rb index 97ae1193d..e7177c7e8 100644 --- a/spec/unit/mutant/parallel/worker_spec.rb +++ b/spec/unit/mutant/parallel/worker_spec.rb @@ -1,24 +1,62 @@ # frozen_string_literal: true RSpec.describe Mutant::Parallel::Worker do - let(:active_jobs) { instance_double(Set) } - let(:block) { ->(value) { value * 2 } } - let(:connection) { instance_double(Mutant::Pipe::Connection) } - let(:index) { 0 } - let(:on_process_start) { instance_double(Proc) } - let(:payload_a) { instance_double(Object) } - let(:pid) { instance_double(Integer) } - let(:process_name) { 'worker-process' } - let(:result_a) { instance_double(Object) } - let(:running) { 1 } - let(:sink) { instance_double(Mutant::Parallel::Sink) } - let(:source) { instance_double(Mutant::Parallel::Source) } - let(:var_active_jobs) { instance_double(Mutant::Variable::IVar, :active_jobs) } - let(:var_final) { instance_double(Mutant::Variable::IVar, :final) } - let(:var_running) { instance_double(Mutant::Variable::MVar, :running) } - let(:var_sink) { instance_double(Mutant::Variable::IVar, :sink) } - let(:var_source) { instance_double(Mutant::Variable::IVar, :source) } - let(:world) { fake_world } + def io(name) + instance_double(IO, name) + end + + def pipe(name) + reader, writer = io("#{name}_reader"), io("#{name}_writer") + + instance_double( + Mutant::Parallel::Pipe, + name, + to_reader: reader, + to_writer: writer + ) + end + + let(:active_jobs) { instance_double(Set) } + let(:block) { ->(value) { value * 2 } } + let(:deadline) { instance_double(Mutant::Timer::Deadline) } + let(:index) { 0 } + let(:log_pipe) { pipe(:log) } + let(:on_process_start) { instance_double(Proc) } + let(:payload_a) { instance_double(Object) } + let(:payload_b) { instance_double(Object) } + let(:pid) { instance_double(Integer) } + let(:process_name) { 'worker-process' } + let(:request_pipe) { pipe(:request) } + let(:response_pipe) { pipe(:response) } + let(:result_a) { instance_double(Object) } + let(:result_b) { instance_double(Object) } + let(:running) { 1 } + let(:sink) { instance_double(Mutant::Parallel::Sink) } + let(:source) { instance_double(Mutant::Parallel::Source) } + let(:var_active_jobs) { instance_double(Mutant::Variable::IVar, :active_jobs) } + let(:var_final) { instance_double(Mutant::Variable::IVar, :final) } + let(:var_running) { instance_double(Mutant::Variable::MVar, :running) } + let(:var_sink) { instance_double(Mutant::Variable::IVar, :sink) } + let(:var_source) { instance_double(Mutant::Variable::IVar, :source) } + + let(:parent_connection) do + double( + Mutant::Parallel::Connection, + reader: instance_double(Mutant::Parallel::Connection::Frame, io: response_pipe.to_reader) + ) + end + + let(:world) do + instance_double( + Mutant::World, + io: class_double(IO), + marshal: class_double(Marshal), + process: class_double(Process), + stderr: instance_double(IO), + stdout: instance_double(IO), + thread: class_double(Thread) + ) + end let(:shared) do { @@ -32,13 +70,16 @@ subject do described_class.new( - connection: connection, - pid: pid, - config: described_class::Config.new( + connection: parent_connection, + log_reader: log_pipe.to_reader, + response_reader: response_pipe.to_reader, + pid: pid, + config: described_class::Config.new( block: block, index: index, - process_name: process_name, on_process_start: on_process_start, + process_name: process_name, + timeout: 1.0, world: world, **shared ) @@ -59,15 +100,22 @@ ) end + let(:job_b) do + instance_double( + Mutant::Parallel::Source::Job, + payload: payload_b + ) + end + def apply subject.call end - def sink_result(result) + def sink_response(response) { receiver: sink, - selector: :result, - arguments: [result] + selector: :response, + arguments: [response] } end @@ -103,12 +151,19 @@ def with(var, value) } end - def process(payload, result) + def send_value(payload) + { + receiver: parent_connection, + selector: :send_value, + arguments: [payload] + } + end + + def receive_value(result) { - receiver: connection, - selector: :call, - arguments: [payload], - reaction: { return: result } + receiver: connection, + selector: :receive_value, + reaction: { return: result } } end @@ -153,7 +208,51 @@ def finalize ] end - context 'when processing jobs till sink stops accepting' do + # rubocop:disable Metrics/MethodLength + def read_response(response) + { + receiver: Mutant::Parallel::Connection::Reader, + arguments: [ + { + deadline: deadline, + io: world.io, + marshal: world.marshal, + log_reader: log_pipe.to_reader, + response_reader: response_pipe.to_reader + } + ], + selector: :read_response, + reaction: { return: response } + } + end + # rubocop:enable Metrics/MethodLength + + def new_deadline + { + receiver: world, + selector: :deadline, + arguments: [1.0], + reaction: { return: deadline } + } + end + + let(:response_a) do + Mutant::Parallel::Response.new( + error: nil, + log: 'log-a', + result: result_a + ) + end + + let(:response_b) do + Mutant::Parallel::Response.new( + error: nil, + log: 'log-b', + result: result_b + ) + end + + context 'when processing single job till sink stops accepting' do let(:raw_expectations) do [ with(var_source, source), @@ -161,11 +260,13 @@ def finalize source_next(job_a), with(var_active_jobs, active_jobs), add_job(job_a), - process(payload_a, result_a), + send_value(payload_a), + new_deadline, + read_response(response_a), with(var_active_jobs, active_jobs), remove_job(job_a), with(var_sink, sink), - sink_result(result_a), + sink_response(response_a), sink_stop?(true), *finalize ] @@ -174,6 +275,73 @@ def finalize include_examples 'worker execution' end + context 'when processing multiple jobs till sink stops accepting' do + let(:raw_expectations) do + [ + with(var_source, source), + source_next?(true), + source_next(job_a), + with(var_active_jobs, active_jobs), + add_job(job_a), + send_value(payload_a), + new_deadline, + read_response(response_a), + with(var_active_jobs, active_jobs), + remove_job(job_a), + with(var_sink, sink), + sink_response(response_a), + sink_stop?(false), + with(var_source, source), + source_next?(true), + source_next(job_b), + with(var_active_jobs, active_jobs), + add_job(job_b), + send_value(payload_b), + new_deadline, + read_response(response_b), + with(var_active_jobs, active_jobs), + remove_job(job_b), + with(var_sink, sink), + sink_response(response_b), + sink_stop?(true), + *finalize + ] + end + + include_examples 'worker execution' + end + + context 'when processing jobs till error' do + let(:response_a) do + Mutant::Parallel::Response.new( + error: Timeout, + log: 'log', + result: nil + ) + end + + let(:raw_expectations) do + [ + with(var_source, source), + source_next?(true), + source_next(job_a), + with(var_active_jobs, active_jobs), + add_job(job_a), + send_value(payload_a), + new_deadline, + read_response(response_a), + with(var_active_jobs, active_jobs), + remove_job(job_a), + with(var_sink, sink), + sink_response(response_a), + sink_stop?(false), + *finalize + ] + end + + include_examples 'worker execution' + end + context 'when processing jobs till source is empty' do let(:raw_expectations) do [ @@ -243,25 +411,8 @@ def apply end describe '.start' do - let(:child_connection) { instance_double(Mutant::Pipe::Connection) } - let(:parent_connection) { instance_double(Mutant::Pipe::Connection) } - let(:forked_main_thread) { instance_double(Thread) } - - def io(name) - instance_double(IO, name) - end - - def pipe(name) - instance_double( - Mutant::Pipe, - name, - to_reader: io("#{name}_reader"), - to_writer: io("#{name}_writer") - ) - end - - let(:request_pipe) { pipe(:request) } - let(:response_pipe) { pipe(:response) } + let(:child_connection) { instance_double(Mutant::Parallel::Connection) } + let(:forked_main_thread) { instance_double(Thread) } # rubocop:disable Metrics/MethodLength def apply @@ -270,6 +421,7 @@ def apply index: index, on_process_start: on_process_start, process_name: process_name, + timeout: 1.0, var_active_jobs: var_active_jobs, var_final: var_final, var_running: var_running, @@ -283,13 +435,19 @@ def apply let(:raw_expectations) do [ { - receiver: Mutant::Pipe, + receiver: Mutant::Parallel::Pipe, + selector: :from_io, + arguments: [world.io], + reaction: { return: log_pipe } + }, + { + receiver: Mutant::Parallel::Pipe, selector: :from_io, arguments: [world.io], reaction: { return: request_pipe } }, { - receiver: Mutant::Pipe, + receiver: Mutant::Parallel::Pipe, selector: :from_io, arguments: [world.io], reaction: { return: response_pipe } @@ -300,7 +458,17 @@ def apply reaction: { yields: [], return: pid } }, { - receiver: Mutant::Pipe::Connection, + receiver: world.stderr, + selector: :reopen, + arguments: [log_pipe.to_writer] + }, + { + receiver: world.stdout, + selector: :reopen, + arguments: [log_pipe.to_writer] + }, + { + receiver: Mutant::Parallel::Connection, selector: :from_pipes, arguments: [{ marshal: world.marshal, reader: request_pipe, writer: response_pipe }], reaction: { return: child_connection } @@ -330,6 +498,10 @@ def apply selector: :receive_value, reaction: { return: 1 } }, + { + receiver: log_pipe.to_writer, + selector: :flush + }, { receiver: child_connection, selector: :send_value, @@ -341,7 +513,7 @@ def apply reaction: { exception: StopIteration } }, { - receiver: Mutant::Pipe::Connection, + receiver: Mutant::Parallel::Connection, selector: :from_pipes, arguments: [{ marshal: world.marshal, reader: response_pipe, writer: request_pipe }], reaction: { return: parent_connection } diff --git a/spec/unit/mutant/parallel_spec.rb b/spec/unit/mutant/parallel_spec.rb index ba6284ece..f1771f132 100644 --- a/spec/unit/mutant/parallel_spec.rb +++ b/spec/unit/mutant/parallel_spec.rb @@ -3,7 +3,7 @@ RSpec.describe Mutant::Parallel do describe '.async' do def apply - described_class.async(world, config) + described_class.async(config: config, world: world) end let(:block) { instance_double(Proc) } @@ -44,7 +44,8 @@ def apply process_name: 'parallel-process', sink: sink, source: source, - thread_name: 'parallel-thread' + thread_name: 'parallel-thread', + timeout: 1.0 ) end @@ -75,6 +76,7 @@ def worker(index, name, worker) index: index, on_process_start: config.on_process_start, process_name: name, + timeout: 1.0, var_active_jobs: var_active_jobs, var_final: var_final, var_running: var_running,