Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Persist inbound stripe webhooks #2972

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions app/controllers/webhooks_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@

class WebhooksController < ApplicationController
def stripe
result = PaymentProviders::Stripe::HandleIncomingWebhookService.call(
result = InboundWebhooks::CreateService.call(
organization_id: params[:organization_id],
webhook_source: :stripe,
code: params[:code].presence,
body: request.body.read,
signature: request.headers['HTTP_STRIPE_SIGNATURE']
payload: request.body.read,
signature: request.headers["HTTP_STRIPE_SIGNATURE"],
event_type: params[:type]
)

unless result.success?
if result.error.is_a?(BaseService::ServiceFailure) && result.error.code == 'webhook_error'
return head(:bad_request)
end

result.raise_if_error!
end
return head(:bad_request) unless result.success?

head(:ok)
end
Expand Down
13 changes: 13 additions & 0 deletions app/jobs/clock/inbound_webhooks_cleanup_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

module Clock
class InboundWebhooksCleanupJob < ApplicationJob
include SentryCronConcern

queue_as "clock"

def perform
InboundWebhook.where("updated_at < ?", 90.days.ago).destroy_all
end
end
end
15 changes: 15 additions & 0 deletions app/jobs/clock/inbound_webhooks_retry_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module Clock
class InboundWebhooksRetryJob < ApplicationJob
include SentryCronConcern

queue_as "clock"

def perform
InboundWebhook.retriable.find_each do |inbound_webhook|
InboundWebhooks::ProcessJob.perform_later(inbound_webhook:)
end
end
end
end
11 changes: 11 additions & 0 deletions app/jobs/inbound_webhooks/process_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module InboundWebhooks
class ProcessJob < ApplicationJob
queue_as :default

def perform(inbound_webhook:)
InboundWebhooks::ProcessService.call(inbound_webhook:).raise_if_error!
ancorcruz marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
51 changes: 51 additions & 0 deletions app/models/inbound_webhook.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

class InboundWebhook < ApplicationRecord
WEBHOOK_PROCESSING_WINDOW = 2.hours

belongs_to :organization

validates :event_type, :payload, :source, :status, presence: true

STATUSES = {
pending: "pending",
processing: "processing",
processed: "processed",
failed: "failed"
}

enum :status, STATUSES

scope :retriable, -> { reprocessable.or(old_pending) }
scope :reprocessable, -> { processing.where("processing_at <= ?", WEBHOOK_PROCESSING_WINDOW.ago) }
scope :old_pending, -> { pending.where("created_at <= ?", WEBHOOK_PROCESSING_WINDOW.ago) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without indexes on these columns, this will be a full table scan always 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add two combined indices [status, processing_at] and [status, created_at] should do it.


def processing!
update!(status: :processing, processing_at: Time.zone.now)
end
end

# == Schema Information
#
# Table name: inbound_webhooks
#
# id :uuid not null, primary key
# code :string
# event_type :string not null
# payload :jsonb not null
# processing_at :datetime
# signature :string
# source :string not null
# status :string default("pending"), not null
# created_at :datetime not null
# updated_at :datetime not null
# organization_id :uuid not null
#
# Indexes
#
# index_inbound_webhooks_on_organization_id (organization_id)
#
# Foreign Keys
#
# fk_rails_... (organization_id => organizations.id)
#
40 changes: 40 additions & 0 deletions app/services/inbound_webhooks/create_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

module InboundWebhooks
class CreateService < BaseService
def initialize(organization_id:, webhook_source:, payload:, event_type:, code: nil, signature: nil)
@organization_id = organization_id
@webhook_source = webhook_source
@code = code
@payload = payload
@signature = signature
@event_type = event_type

super
end

def call
inbound_webhook = InboundWebhook.create!(
organization_id:,
source: webhook_source,
code:,
payload:,
signature:,
event_type:
)

after_commit do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we are not relying on transaction right? So I guess this after_commit block is not mandatory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ensures the job is not added to the queue and tried to be processed until the DB transaction is committed. This will avoid race condition between sidekiq and postgres where sidekiq picks the job but it is unable to find the resource in DB.

Copy link
Collaborator

@vincent-pochet vincent-pochet Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but here InboundWebhooks::CreateService is called directly from the controller and no transaction is created in it so the job will always be called after the InboundWebhook creation transaction.
That's why I was asking. No a major issue anyway ;)

InboundWebhooks::ProcessJob.perform_later(inbound_webhook:)
end

result.inbound_webhook = inbound_webhook
result
rescue ActiveRecord::RecordInvalid => e
result.record_validation_failure!(record: e.record)
end

private

attr_reader :organization_id, :webhook_source, :code, :payload, :signature, :event_type
end
end
56 changes: 56 additions & 0 deletions app/services/inbound_webhooks/process_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

module InboundWebhooks
class ProcessService < BaseService
WEBHOOK_HANDLER_SERVICES = {
stripe: PaymentProviders::Stripe::HandleIncomingWebhookService
}

def initialize(inbound_webhook:)
@inbound_webhook = inbound_webhook

super
end

def call
return result if within_processing_window?
return result if inbound_webhook.failed?
return result if inbound_webhook.processed?

inbound_webhook.processing!
ancorcruz marked this conversation as resolved.
Show resolved Hide resolved

handler_result = handler_service_klass.call(inbound_webhook:)

unless handler_result.success?
inbound_webhook.failed!
return handler_result
end

inbound_webhook.processed!

result.inbound_webhook = inbound_webhook
result
rescue
inbound_webhook.failed!
raise
end

private

attr_reader :inbound_webhook

def handler_service_klass
WEBHOOK_HANDLER_SERVICES.fetch(webhook_source) do
raise NameError, "Invalid inbound webhook source: #{webhook_source}"
end
end

def webhook_source
inbound_webhook.source.to_sym
end

def within_processing_window?
inbound_webhook.processing? && inbound_webhook.processing_at > InboundWebhook::WEBHOOK_PROCESSING_WINDOW.ago
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
module PaymentProviders
module Stripe
class HandleIncomingWebhookService < BaseService
def initialize(organization_id:, body:, signature:, code: nil)
@organization_id = organization_id
@body = body
@signature = signature
@code = code
extend Forwardable

def initialize(inbound_webhook:)
@inbound_webhook = inbound_webhook

super
end
Expand All @@ -22,7 +21,7 @@ def call
return payment_provider_result unless payment_provider_result.success?

event = ::Stripe::Webhook.construct_event(
body,
payload,
signature,
payment_provider_result.payment_provider&.webhook_secret
)
Expand All @@ -42,7 +41,7 @@ def call

private

attr_reader :organization_id, :body, :signature, :code
def_delegators :@inbound_webhook, :code, :organization_id, :payload, :signature
end
end
end
12 changes: 12 additions & 0 deletions clock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ module Clockwork
.perform_later
end

every(1.day, 'schedule:clean_inbound_webhooks', at: '01:10') do
Clock::InboundWebhooksCleanupJob
.set(sentry: {"slug" => 'lago_clean_inbound_webhooks', "cron" => '5 1 * * *'})
.perform_later
end

unless ActiveModel::Type::Boolean.new.cast(ENV['LAGO_DISABLE_EVENTS_VALIDATION'])
every(1.hour, 'schedule:post_validate_events', at: '*:05') do
Clock::EventsValidationJob
Expand Down Expand Up @@ -146,4 +152,10 @@ module Clockwork
.set(sentry: {"slug" => "lago_retry_failed_invoices", "cron" => '*/15 * * * *'})
.perform_later
end

every(15.minutes, "schedule:retry_inbound_webhooks") do
Clock::InboundWebhooksRetryJob
.set(sentry: {"slug" => "lago_retry_inbound_webhooks", "cron" => '*/15 * * * *'})
.perform_later
end
end
17 changes: 17 additions & 0 deletions db/migrate/20241213182343_create_inbound_webhooks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

class CreateInboundWebhooks < ActiveRecord::Migration[7.1]
def change
create_table :inbound_webhooks, id: :uuid do |t|
t.string :source, null: false
t.string :event_type, null: false
t.jsonb :payload, null: false
t.string :status, null: false, default: 'pending'
t.belongs_to :organization, null: false, foreign_key: true, type: :uuid, index: true
t.string :code
t.string :signature

t.timestamps
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class AddProcessingAtToInboundWebhooks < ActiveRecord::Migration[7.1]
def change
add_column :inbound_webhooks, :processing_at, :timestamp, precision: nil
end
end
17 changes: 16 additions & 1 deletion db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions spec/clockwork_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,46 @@
expect(Clock::ProcessDunningCampaignsJob).to have_been_enqueued
end
end

describe "schedule:clean_inbound_webhooks" do
let(:job) { "schedule:clean_inbound_webhooks" }
let(:start_time) { Time.zone.parse("1 Apr 2022 00:01:00") }
let(:end_time) { Time.zone.parse("2 Apr 2022 00:00:00") }

it "enqueue a clean inbound webhooks job" do
Clockwork::Test.run(
file: clock_file,
start_time:,
end_time:,
tick_speed: 1.minute
)

expect(Clockwork::Test).to be_ran_job(job)
expect(Clockwork::Test.times_run(job)).to eq(1)

Clockwork::Test.block_for(job).call
expect(Clock::InboundWebhooksCleanupJob).to have_been_enqueued
end
end

describe "schedule:retry_inbound_webhooks" do
let(:job) { "schedule:retry_inbound_webhooks" }
let(:start_time) { Time.zone.parse("1 Apr 2022 00:05:00") }
let(:end_time) { Time.zone.parse("1 Apr 2022 00:20:00") }

it "enqueue a retry inbound webhooks job" do
Clockwork::Test.run(
file: clock_file,
start_time:,
end_time:,
tick_speed: 1.minute
)

expect(Clockwork::Test).to be_ran_job(job)
expect(Clockwork::Test.times_run(job)).to eq(1)

Clockwork::Test.block_for(job).call
expect(Clock::InboundWebhooksRetryJob).to have_been_enqueued
end
end
end
Loading
Loading