Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add child_spans for Sidekiq Queue instrumentation #2403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
### Features

- Add `include_sentry_event` matcher for RSpec [#2424](https://github.com/getsentry/sentry-ruby/pull/2424)
- Add support for Sentry Cache instrumentation, when using Rails.cache [#2380](https://github.com/getsentry/sentry-ruby/pull/2380)
- Add support for Sentry Cache instrumentation, when using Rails.cache ([#2380](https://github.com/getsentry/sentry-ruby/pull/2380))
- Add support for Queue Instrumentation for Sidekiq. [#2403](https://github.com/getsentry/sentry-ruby/pull/2403)

Note: MemoryStore and FileStore require Rails 8.0+

Expand Down
2 changes: 2 additions & 0 deletions sentry-sidekiq/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ end

gem "rails", "> 5.0.0"

gem "timecop"

eval_gemfile File.expand_path("../Gemfile", __dir__)
35 changes: 30 additions & 5 deletions sentry-sidekiq/lib/sentry/sidekiq/sentry_context_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@

module Sentry
module Sidekiq
module Helpers
def set_span_data(span, id:, queue:, latency: nil, retry_count: nil)
if span
span.set_data("messaging.message.id", id)
span.set_data("messaging.destination.name", queue)
span.set_data("messaging.message.receive.latency", latency) if latency
span.set_data("messaging.message.retry.count", retry_count) if retry_count
end
end
end

class SentryContextServerMiddleware
OP_NAME = "queue.sidekiq"
include Sentry::Sidekiq::Helpers

OP_NAME = "queue.process"
SPAN_ORIGIN = "auto.queue.sidekiq"

def call(_worker, job, queue)
def call(worker, job, queue)
return yield unless Sentry.initialized?

context_filter = Sentry::Sidekiq::ContextFilter.new(job)
Expand All @@ -23,7 +36,12 @@ def call(_worker, job, queue)
scope.set_contexts(sidekiq: job.merge("queue" => queue))
scope.set_transaction_name(context_filter.transaction_name, source: :task)
transaction = start_transaction(scope, job["trace_propagation_headers"])
scope.set_span(transaction) if transaction

if transaction
scope.set_span(transaction)

set_span_data(transaction, id: job["jid"], queue: queue, latency: ((Time.now.to_f - job["enqueued_at"]) * 1000).to_i, retry_count: job["retry_count"] || 0)
end

begin
yield
Expand Down Expand Up @@ -63,13 +81,20 @@ def finish_transaction(transaction, status)
end

class SentryContextClientMiddleware
def call(_worker_class, job, _queue, _redis_pool)
include Sentry::Sidekiq::Helpers

def call(worker_class, job, queue, _redis_pool)
return yield unless Sentry.initialized?

user = Sentry.get_current_scope.user
job["sentry_user"] = user unless user.empty?
job["trace_propagation_headers"] ||= Sentry.get_trace_propagation_headers
yield

Sentry.with_child_span(op: "queue.publish", description: worker_class.to_s) do |span|
set_span_data(span, id: job["jid"], queue: queue)

yield
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "spec_helper"
require "timecop"

RSpec.shared_context "sidekiq", shared_context: :metadata do
let(:user) { { "id" => rand(10_000) } }
Expand Down Expand Up @@ -65,6 +66,30 @@
expect(transaction.contexts.dig(:trace, :origin)).to eq('auto.queue.sidekiq')
end

it "adds a queue.process spans" do
Timecop.freeze do
execute_worker(processor, HappyWorker)
execute_worker(processor, HappyWorker, jid: '123456', timecop_delay: Time.now + 86400)

expect(transport.events.count).to eq(2)

transaction = transport.events[0]
expect(transaction).not_to be_nil
expect(transaction.spans.count).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123123') # Default defined in #execute_worker
expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default')
expect(transaction.contexts[:trace][:data]['messaging.message.retry.count']).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(0)

transaction = transport.events[1]
expect(transaction).not_to be_nil
expect(transaction.spans.count).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123456') # Explicitly set above.
expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default')
expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(86400000)
end
end

context "with trace_propagation_headers" do
let(:parent_transaction) { Sentry.start_transaction(op: "sidekiq") }

Expand All @@ -73,6 +98,7 @@
execute_worker(processor, HappyWorker, trace_propagation_headers: trace_propagation_headers)

expect(transport.events.count).to eq(1)

transaction = transport.events[0]
expect(transaction).not_to be_nil
expect(transaction.contexts.dig(:trace, :trace_id)).to eq(parent_transaction.trace_id)
Expand Down Expand Up @@ -156,5 +182,18 @@
expect(second_headers["sentry-trace"]).to eq(transaction.to_sentry_trace)
expect(second_headers["baggage"]).to eq(transaction.to_baggage)
end

it "has a queue.publish span" do
message_id = client.push('queue' => 'default', 'class' => HappyWorker, 'args' => [])

transaction.finish

expect(transport.events.count).to eq(1)
event = transport.events.last
expect(event.spans.count).to eq(1)
expect(event.spans[0][:op]).to eq("queue.publish")
expect(event.spans[0][:data]['messaging.message.id']).to eq(message_id)
expect(event.spans[0][:data]['messaging.destination.name']).to eq('default')
end
end
end
2 changes: 1 addition & 1 deletion sentry-sidekiq/spec/sentry/sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def retry_last_failed_job
expect(transaction.contexts.dig(:trace, :trace_id)).to be_a(String)
expect(transaction.contexts.dig(:trace, :span_id)).to be_a(String)
expect(transaction.contexts.dig(:trace, :status)).to eq("ok")
expect(transaction.contexts.dig(:trace, :op)).to eq("queue.sidekiq")
expect(transaction.contexts.dig(:trace, :op)).to eq("queue.process")
end

it "records transaction with exception" do
Expand Down
9 changes: 7 additions & 2 deletions sentry-sidekiq/spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,20 @@ def sidekiq_config(opts)

def execute_worker(processor, klass, **options)
klass_options = klass.sidekiq_options_hash || {}

# for Ruby < 2.6
klass_options.each do |k, v|
options[k.to_sym] = v
end

msg = Sidekiq.dump_json(jid: "123123", class: klass, args: [], **options)
jid = options.delete(:jid) || "123123"
timecop_delay = options.delete(:timecop_delay)

msg = Sidekiq.dump_json(created_at: Time.now.to_f, enqueued_at: Time.now.to_f, jid: jid, class: klass, args: [], **options)
Timecop.freeze(timecop_delay) if timecop_delay
work = Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)
process_work(processor, work)
ensure
Timecop.return if timecop_delay
end

def process_work(processor, work)
Expand Down
Loading