From 50ab2f63bc7a8ca4721302614934ec66ef2ad085 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Mon, 16 Dec 2024 10:58:59 +0000 Subject: [PATCH 01/17] Add inbound webhooks model --- app/models/inbound_webhook.rb | 35 +++++++++++++++++++ .../20241213182343_create_inbound_webhooks.rb | 17 +++++++++ db/schema.rb | 14 ++++++++ spec/factories/inbound_webhooks.rb | 17 +++++++++ spec/models/inbound_webhook_spec.rb | 16 +++++++++ 5 files changed, 99 insertions(+) create mode 100644 app/models/inbound_webhook.rb create mode 100644 db/migrate/20241213182343_create_inbound_webhooks.rb create mode 100644 spec/factories/inbound_webhooks.rb create mode 100644 spec/models/inbound_webhook_spec.rb diff --git a/app/models/inbound_webhook.rb b/app/models/inbound_webhook.rb new file mode 100644 index 00000000000..994e91b9f30 --- /dev/null +++ b/app/models/inbound_webhook.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +class InboundWebhook < ApplicationRecord + belongs_to :organization + + validates :event_type, :payload, :source, :status, presence: true + + STATUSES = {pending: "pending"} + + enum :status, STATUSES +end + +# == Schema Information +# +# Table name: inbound_webhooks +# +# id :uuid not null, primary key +# code :string +# event_type :string not null +# payload :jsonb not null +# signature :string +# source :string not null +# status :string default("pending"), not null +# created_at :datetime not null +# updated_at :datetime not null +# organization_id :uuid not null +# +# Indexes +# +# index_inbound_webhooks_on_organization_id (organization_id) +# +# Foreign Keys +# +# fk_rails_... (organization_id => organizations.id) +# diff --git a/db/migrate/20241213182343_create_inbound_webhooks.rb b/db/migrate/20241213182343_create_inbound_webhooks.rb new file mode 100644 index 00000000000..8c49d305595 --- /dev/null +++ b/db/migrate/20241213182343_create_inbound_webhooks.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class CreateInboundWebhooks < ActiveRecord::Migration[7.1] + def change + create_table :inbound_webhooks, id: :uuid do |t| + t.string :source, null: false + t.string :event_type, null: false + t.jsonb :payload, null: false + t.string :status, null: false, default: 'pending' + t.belongs_to :organization, null: false, foreign_key: true, type: :uuid, index: true + t.string :code + t.string :signature + + t.timestamps + end + end +end diff --git a/db/schema.rb b/db/schema.rb index 184c0a4afa2..6cc5cbb33c2 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -721,6 +721,19 @@ t.index ["parent_group_id"], name: "index_groups_on_parent_group_id" end + create_table "inbound_webhooks", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.string "source", null: false + t.string "event_type", null: false + t.jsonb "payload", null: false + t.string "status", default: "pending", null: false + t.uuid "organization_id", null: false + t.string "code" + t.string "signature" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["organization_id"], name: "index_inbound_webhooks_on_organization_id" + end + create_table "integration_collection_mappings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| t.uuid "integration_id", null: false t.integer "mapping_type", null: false @@ -1412,6 +1425,7 @@ add_foreign_key "group_properties", "groups", on_delete: :cascade add_foreign_key "groups", "billable_metrics", on_delete: :cascade add_foreign_key "groups", "groups", column: "parent_group_id" + add_foreign_key "inbound_webhooks", "organizations" add_foreign_key "integration_collection_mappings", "integrations" add_foreign_key "integration_customers", "customers" add_foreign_key "integration_customers", "integrations" diff --git a/spec/factories/inbound_webhooks.rb b/spec/factories/inbound_webhooks.rb new file mode 100644 index 00000000000..020bad6cc45 --- /dev/null +++ b/spec/factories/inbound_webhooks.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :inbound_webhook do + organization + + source { "stripe" } + event_type { "payment_intent.succeeded" } + status { "pending" } + code { "webhook-endpoint-code" } + signature { "MySignature" } + + payload do + File.read(Rails.root.join('spec/fixtures/stripe/payment_intent_event.json')) + end + end +end diff --git a/spec/models/inbound_webhook_spec.rb b/spec/models/inbound_webhook_spec.rb new file mode 100644 index 00000000000..68b69a14d59 --- /dev/null +++ b/spec/models/inbound_webhook_spec.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe InboundWebhook, type: :model do + subject(:inbound_webhook) { build(:inbound_webhook) } + + it { is_expected.to belong_to(:organization) } + + it { is_expected.to validate_presence_of(:event_type) } + it { is_expected.to validate_presence_of(:payload) } + it { is_expected.to validate_presence_of(:source) } + it { is_expected.to validate_presence_of(:status) } + + it { is_expected.to be_pending } +end From 3f0c2fabc9d9b7f90a1f8d57abf003cc21a6f6c4 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Tue, 17 Dec 2024 13:04:25 +0000 Subject: [PATCH 02/17] Persist and process stripe webhooks --- app/controllers/webhooks_controller.rb | 16 ++-- app/jobs/inbound_webhooks/process_job.rb | 11 +++ app/models/inbound_webhook.rb | 7 +- .../inbound_webhooks/create_service.rb | 40 ++++++++++ .../inbound_webhooks/process_service.rb | 48 ++++++++++++ .../stripe/handle_incoming_webhook_service.rb | 13 ++-- .../jobs/inbound_webhooks/process_job_spec.rb | 33 ++++++++ spec/requests/webhooks_controller_spec.rb | 72 ++++++++---------- .../inbound_webhooks/create_service_spec.rb | 61 +++++++++++++++ .../inbound_webhooks/process_service_spec.rb | 75 +++++++++++++++++++ .../handle_incoming_webhook_service_spec.rb | 12 +-- 11 files changed, 320 insertions(+), 68 deletions(-) create mode 100644 app/jobs/inbound_webhooks/process_job.rb create mode 100644 app/services/inbound_webhooks/create_service.rb create mode 100644 app/services/inbound_webhooks/process_service.rb create mode 100644 spec/jobs/inbound_webhooks/process_job_spec.rb create mode 100644 spec/services/inbound_webhooks/create_service_spec.rb create mode 100644 spec/services/inbound_webhooks/process_service_spec.rb diff --git a/app/controllers/webhooks_controller.rb b/app/controllers/webhooks_controller.rb index 51c0f697b68..1922cb628f1 100644 --- a/app/controllers/webhooks_controller.rb +++ b/app/controllers/webhooks_controller.rb @@ -2,20 +2,16 @@ class WebhooksController < ApplicationController def stripe - result = PaymentProviders::Stripe::HandleIncomingWebhookService.call( + result = InboundWebhooks::CreateService.call( organization_id: params[:organization_id], + webhook_source: :stripe, code: params[:code].presence, - body: request.body.read, - signature: request.headers['HTTP_STRIPE_SIGNATURE'] + payload: request.body.read, + signature: request.headers["HTTP_STRIPE_SIGNATURE"], + event_type: params[:type] ) - unless result.success? - if result.error.is_a?(BaseService::ServiceFailure) && result.error.code == 'webhook_error' - return head(:bad_request) - end - - result.raise_if_error! - end + return head(:bad_request) unless result.success? head(:ok) end diff --git a/app/jobs/inbound_webhooks/process_job.rb b/app/jobs/inbound_webhooks/process_job.rb new file mode 100644 index 00000000000..7ef646e9373 --- /dev/null +++ b/app/jobs/inbound_webhooks/process_job.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module InboundWebhooks + class ProcessJob < ApplicationJob + queue_as :default + + def perform(inbound_webhook:) + InboundWebhooks::ProcessService.call(inbound_webhook:).raise_if_error! + end + end +end diff --git a/app/models/inbound_webhook.rb b/app/models/inbound_webhook.rb index 994e91b9f30..cf3d85a6508 100644 --- a/app/models/inbound_webhook.rb +++ b/app/models/inbound_webhook.rb @@ -5,7 +5,12 @@ class InboundWebhook < ApplicationRecord validates :event_type, :payload, :source, :status, presence: true - STATUSES = {pending: "pending"} + STATUSES = { + pending: "pending", + processing: "processing", + processed: "processed", + failed: "failed" + } enum :status, STATUSES end diff --git a/app/services/inbound_webhooks/create_service.rb b/app/services/inbound_webhooks/create_service.rb new file mode 100644 index 00000000000..9dce52f42f3 --- /dev/null +++ b/app/services/inbound_webhooks/create_service.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module InboundWebhooks + class CreateService < BaseService + def initialize(organization_id:, webhook_source:, payload:, event_type:, code: nil, signature: nil) + @organization_id = organization_id + @webhook_source = webhook_source + @code = code + @payload = payload + @signature = signature + @event_type = event_type + + super + end + + def call + inbound_webhook = InboundWebhook.create!( + organization_id:, + source: webhook_source, + code:, + payload:, + signature:, + event_type: + ) + + after_commit do + InboundWebhooks::ProcessJob.perform_later(inbound_webhook:) + end + + result.inbound_webhook = inbound_webhook + result + rescue ActiveRecord::RecordInvalid => e + result.record_validation_failure!(record: e.record) + end + + private + + attr_reader :organization_id, :webhook_source, :code, :payload, :signature, :event_type + end +end diff --git a/app/services/inbound_webhooks/process_service.rb b/app/services/inbound_webhooks/process_service.rb new file mode 100644 index 00000000000..2646defed74 --- /dev/null +++ b/app/services/inbound_webhooks/process_service.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module InboundWebhooks + class ProcessService < BaseService + WEBHOOK_HANDLER_SERVICES = { + stripe: PaymentProviders::Stripe::HandleIncomingWebhookService + } + + def initialize(inbound_webhook:) + @inbound_webhook = inbound_webhook + + super + end + + def call + inbound_webhook.processing! + + handler_result = handler_service_klass.call(inbound_webhook:) + + unless handler_result.success? + inbound_webhook.failed! + return handler_result + end + + inbound_webhook.processed! + + result.inbound_webhook = inbound_webhook + result + rescue + inbound_webhook.failed! + raise + end + + private + + attr_reader :inbound_webhook + + def handler_service_klass + WEBHOOK_HANDLER_SERVICES.fetch(webhook_source) do + raise NameError, "Invalid inbound webhook source: #{webhook_source}" + end + end + + def webhook_source + inbound_webhook.source.to_sym + end + end +end diff --git a/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb b/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb index 333094718bc..c254496aa0f 100644 --- a/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb +++ b/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb @@ -3,11 +3,10 @@ module PaymentProviders module Stripe class HandleIncomingWebhookService < BaseService - def initialize(organization_id:, body:, signature:, code: nil) - @organization_id = organization_id - @body = body - @signature = signature - @code = code + extend Forwardable + + def initialize(inbound_webhook:) + @inbound_webhook = inbound_webhook super end @@ -22,7 +21,7 @@ def call return payment_provider_result unless payment_provider_result.success? event = ::Stripe::Webhook.construct_event( - body, + payload, signature, payment_provider_result.payment_provider&.webhook_secret ) @@ -42,7 +41,7 @@ def call private - attr_reader :organization_id, :body, :signature, :code + def_delegators :@inbound_webhook, :code, :organization_id, :payload, :signature end end end diff --git a/spec/jobs/inbound_webhooks/process_job_spec.rb b/spec/jobs/inbound_webhooks/process_job_spec.rb new file mode 100644 index 00000000000..765038121e9 --- /dev/null +++ b/spec/jobs/inbound_webhooks/process_job_spec.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe InboundWebhooks::ProcessJob, type: :job do + subject(:process_job) { described_class } + + let(:inbound_webhook) { create :inbound_webhook } + let(:result) { BaseService::Result.new } + + before do + allow(InboundWebhooks::ProcessService).to receive(:call).and_return(result) + end + + it "calls the process webhook service" do + process_job.perform_now(inbound_webhook:) + + expect(InboundWebhooks::ProcessService) + .to have_received(:call) + .with(inbound_webhook:) + end + + context "when result is a failure" do + let(:result) do + BaseService::Result.new.service_failure!(code: "error", message: "error message") + end + + it "raises an error" do + expect { process_job.perform_now(inbound_webhook:) } + .to raise_error(BaseService::FailedResult) + end + end +end diff --git a/spec/requests/webhooks_controller_spec.rb b/spec/requests/webhooks_controller_spec.rb index 65fd6f86315..6c523334a46 100644 --- a/spec/requests/webhooks_controller_spec.rb +++ b/spec/requests/webhooks_controller_spec.rb @@ -1,77 +1,67 @@ # frozen_string_literal: true -require 'rails_helper' +require "rails_helper" RSpec.describe WebhooksController, type: :request do - describe 'POST /stripe' do - let(:organization) { create(:organization) } - - let(:stripe_provider) do - create( - :stripe_provider, - organization:, - webhook_secret: 'secrests' - ) - end - - let(:stripe_service) { instance_double(PaymentProviders::StripeService) } + describe "POST /stripe" do + let(:organization_id) { Faker::Internet.uuid } + let(:code) { "stripe_1" } + let(:signature) { "signature" } + let(:event_type) { "payment_intent.succeeded" } let(:event) do - path = Rails.root.join('spec/fixtures/stripe/payment_intent_event.json') + path = Rails.root.join("spec/fixtures/stripe/payment_intent_event.json") JSON.parse(File.read(path)) end - let(:result) do - result = BaseService::Result.new - result.event = Stripe::Event.construct_from(event) - result - end + let(:payload) { event.merge(code:) } + let(:result) { BaseService::Result.new } before do - allow(PaymentProviders::Stripe::HandleIncomingWebhookService) + allow(InboundWebhooks::CreateService) .to receive(:call) .with( - organization_id: organization.id, - code: nil, - body: event.to_json, - signature: 'signature' + organization_id:, + webhook_source: :stripe, + code:, + payload: payload.to_json, + signature:, + event_type: ) .and_return(result) end - it 'handle stripe webhooks' do + it "handle stripe webhooks" do post( - "/webhooks/stripe/#{stripe_provider.organization_id}", - params: event.to_json, + "/webhooks/stripe/#{organization_id}", + params: payload.to_json, headers: { - 'HTTP_STRIPE_SIGNATURE' => 'signature', - 'Content-Type' => 'application/json' + "HTTP_STRIPE_SIGNATURE" => signature, + "Content-Type" => "application/json" } ) expect(response).to have_http_status(:success) - expect(PaymentProviders::Stripe::HandleIncomingWebhookService) - .to have_received(:call) + expect(InboundWebhooks::CreateService).to have_received(:call) end - context 'when failing to handle stripe event' do - let(:result) do - BaseService::Result.new.service_failure!(code: 'webhook_error', message: 'Invalid payload') + context "when InboundWebhooks::CreateService is not successful" do + before do + result.record_validation_failure!(record: build(:inbound_webhook)) end - it 'returns a bad request' do + it "returns a bad request" do post( - "/webhooks/stripe/#{stripe_provider.organization_id}", - params: event.to_json, + "/webhooks/stripe/#{organization_id}", + params: payload.to_json, headers: { - 'HTTP_STRIPE_SIGNATURE' => 'signature', - 'Content-Type' => 'application/json' + "HTTP_STRIPE_SIGNATURE" => signature, + "Content-Type" => "application/json" } ) expect(response).to have_http_status(:bad_request) - expect(PaymentProviders::Stripe::HandleIncomingWebhookService) - .to have_received(:call) + expect(InboundWebhooks::CreateService).to have_received(:call) end end end diff --git a/spec/services/inbound_webhooks/create_service_spec.rb b/spec/services/inbound_webhooks/create_service_spec.rb new file mode 100644 index 00000000000..f14db412f2c --- /dev/null +++ b/spec/services/inbound_webhooks/create_service_spec.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe InboundWebhooks::CreateService, type: :service do + subject(:result) do + described_class.call( + organization_id: organization.id, + webhook_source:, + code:, + payload:, + signature:, + event_type: + ) + end + + let(:organization) { create :organization } + let(:code) { "stripe_1" } + let(:webhook_source) { "stripe" } + let(:signature) { "signature" } + let(:payload) { event.merge(code:).to_json } + let(:event_type) { "payment_intent.successful" } + + let(:event) do + path = Rails.root.join("spec/fixtures/stripe/payment_intent_event.json") + JSON.parse(File.read(path)) + end + + it "creates an inbound webhook" do + expect { result }.to change(InboundWebhook, :count).by(1) + end + + it "returns a pending inbound webhook in the result" do + expect(result.inbound_webhook).to be_a(InboundWebhook) + expect(result.inbound_webhook).to be_pending + end + + it "queues an InboundWebhook::ProcessJob job" do + result + + expect(InboundWebhooks::ProcessJob) + .to have_been_enqueued + .with(inbound_webhook: result.inbound_webhook) + end + + context "with validation error" do + let(:webhook_source) { nil } + + it "returns an error" do + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ValidationFailure) + expect(result.error.messages[:source]).to eq(["value_is_mandatory"]) + end + + it "does not queue an InboundWebhook::ProcessJob job" do + result + + expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued + end + end +end diff --git a/spec/services/inbound_webhooks/process_service_spec.rb b/spec/services/inbound_webhooks/process_service_spec.rb new file mode 100644 index 00000000000..ef5fb517dec --- /dev/null +++ b/spec/services/inbound_webhooks/process_service_spec.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe InboundWebhooks::ProcessService, type: :service do + subject(:result) { described_class.call(inbound_webhook:) } + + let(:inbound_webhook) { create :inbound_webhook, source: webhook_source } + let(:webhook_source) { "stripe" } + let(:handle_incoming_webhook_service_result) { BaseService::Result.new } + + before do + allow(PaymentProviders::Stripe::HandleIncomingWebhookService) + .to receive(:call) + .and_return(handle_incoming_webhook_service_result) + end + + it "updateds inbound webhook status to processing" do + allow(inbound_webhook).to receive(:processing!) + + result + expect(inbound_webhook).to have_received(:processing!).once + end + + context "when inbound webhook source is invalid" do + let(:webhook_source) { "invalid_source" } + + it "flags inbound webhook as failed and raises an error" do + expect { result } + .to change(inbound_webhook, :status).to("failed") + .and raise_error( + NameError, + "Invalid inbound webhook source: invalid_source" + ) + end + end + + context "when webhook source is Stripe" do + let(:webhook_source) { "stripe" } + + before do + allow(PaymentProviders::Stripe::HandleIncomingWebhookService) + .to receive(:call) + .and_return(handle_incoming_webhook_service_result) + end + + it "delegates the call to the Stripe webhook hanlder service" do + expect(result).to be_success + expect(PaymentProviders::Stripe::HandleIncomingWebhookService) + .to have_received(:call) + .with(inbound_webhook:) + end + + it "updated inbound webhook status to processed" do + expect { result }.to change(inbound_webhook, :status).to("processed") + end + + context "when the stripe webhook handling fails" do + before do + handle_incoming_webhook_service_result.service_failure!( + code: "error", message: "error message" + ) + end + + it "returns the handler results" do + expect(result).not_to be_success + expect(result).to eq(handle_incoming_webhook_service_result) + end + + it "updates inbound webhook status to failed" do + expect { result }.to change(inbound_webhook, :status).to("failed") + end + end + end +end diff --git a/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb b/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb index 8b4bdb8120d..56b13b28737 100644 --- a/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb +++ b/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb @@ -3,18 +3,12 @@ require "rails_helper" RSpec.describe PaymentProviders::Stripe::HandleIncomingWebhookService, type: :service do - subject(:result) do - described_class.call( - organization_id: organization.id, - body: event.to_json, - signature: "signature", - code: - ) - end + subject(:result) { described_class.call(inbound_webhook:) } + let(:inbound_webhook) { create :inbound_webhook, organization:, code: } + let(:code) { nil } let(:membership) { create(:membership) } let(:organization) { membership.organization } - let(:code) { nil } let(:stripe_provider) { create(:stripe_provider, organization:) } let(:event_result) { Stripe::Event.construct_from(event) } From e01430e9da2c48fcd8754cd15457730ce264d4c7 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Tue, 17 Dec 2024 13:56:54 +0000 Subject: [PATCH 03/17] Add clock job to clean old (older than 90 days) inbound webhooks from DB --- .../clock/inbound_webhooks_cleanup_job.rb | 13 +++++++++++ clock.rb | 6 +++++ spec/clockwork_spec.rb | 21 +++++++++++++++++ .../inbound_webhooks_cleanup_job_spec.rb | 23 +++++++++++++++++++ 4 files changed, 63 insertions(+) create mode 100644 app/jobs/clock/inbound_webhooks_cleanup_job.rb create mode 100644 spec/jobs/clock/inbound_webhooks_cleanup_job_spec.rb diff --git a/app/jobs/clock/inbound_webhooks_cleanup_job.rb b/app/jobs/clock/inbound_webhooks_cleanup_job.rb new file mode 100644 index 00000000000..2b8e07ff205 --- /dev/null +++ b/app/jobs/clock/inbound_webhooks_cleanup_job.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Clock + class InboundWebhooksCleanupJob < ApplicationJob + include SentryCronConcern + + queue_as 'clock' + + def perform + InboundWebhook.where('updated_at < ?', 90.days.ago).destroy_all + end + end +end diff --git a/clock.rb b/clock.rb index 3db485ebb01..438110ef7ae 100644 --- a/clock.rb +++ b/clock.rb @@ -119,6 +119,12 @@ module Clockwork .perform_later end + every(1.day, 'schedule:clean_inbound_webhooks', at: '01:10') do + Clock::InboundWebhooksCleanupJob + .set(sentry: {"slug" => 'lago_clean_inbound_webhooks', "cron" => '5 1 * * *'}) + .perform_later + end + unless ActiveModel::Type::Boolean.new.cast(ENV['LAGO_DISABLE_EVENTS_VALIDATION']) every(1.hour, 'schedule:post_validate_events', at: '*:05') do Clock::EventsValidationJob diff --git a/spec/clockwork_spec.rb b/spec/clockwork_spec.rb index f5a3d65f4b0..a77fb9fddb4 100644 --- a/spec/clockwork_spec.rb +++ b/spec/clockwork_spec.rb @@ -177,4 +177,25 @@ expect(Clock::ProcessDunningCampaignsJob).to have_been_enqueued end end + + describe "schedule:clean_inbound_webhooks" do + let(:job) { "schedule:clean_inbound_webhooks" } + let(:start_time) { Time.zone.parse("1 Apr 2022 00:01:00") } + let(:end_time) { Time.zone.parse("2 Apr 2022 00:00:00") } + + it "enqueue a clean inbound webhooks job" do + Clockwork::Test.run( + file: clock_file, + start_time:, + end_time:, + tick_speed: 1.minute + ) + + expect(Clockwork::Test).to be_ran_job(job) + expect(Clockwork::Test.times_run(job)).to eq(1) + + Clockwork::Test.block_for(job).call + expect(Clock::InboundWebhooksCleanupJob).to have_been_enqueued + end + end end diff --git a/spec/jobs/clock/inbound_webhooks_cleanup_job_spec.rb b/spec/jobs/clock/inbound_webhooks_cleanup_job_spec.rb new file mode 100644 index 00000000000..e3765701b80 --- /dev/null +++ b/spec/jobs/clock/inbound_webhooks_cleanup_job_spec.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require "rails_helper" + +describe Clock::InboundWebhooksCleanupJob, job: true do + subject(:inbound_webhooks_cleanup_job) { described_class } + + describe ".perform" do + it "removes all old inbound webhooks" do + create(:inbound_webhook, updated_at: 90.days.ago) + + expect { inbound_webhooks_cleanup_job.perform_now } + .to change(InboundWebhook, :count).to(0) + end + + it "does not delete recent inbound webhooks" do + create(:inbound_webhook, updated_at: 89.days.ago) + + expect { inbound_webhooks_cleanup_job.perform_now } + .not_to change(InboundWebhook, :count) + end + end +end From d3f4322a06b0a5ad235be6bd3c66e4b97ea9568a Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Tue, 17 Dec 2024 14:20:09 +0000 Subject: [PATCH 04/17] Add a Retry job for failed inbound webhooks --- app/jobs/clock/inbound_webhooks_retry_job.rb | 15 ++++++++++ clock.rb | 6 ++++ spec/clockwork_spec.rb | 21 +++++++++++++ .../clock/inbound_webhooks_retry_job_spec.rb | 30 +++++++++++++++++++ 4 files changed, 72 insertions(+) create mode 100644 app/jobs/clock/inbound_webhooks_retry_job.rb create mode 100644 spec/jobs/clock/inbound_webhooks_retry_job_spec.rb diff --git a/app/jobs/clock/inbound_webhooks_retry_job.rb b/app/jobs/clock/inbound_webhooks_retry_job.rb new file mode 100644 index 00000000000..4b35a747e43 --- /dev/null +++ b/app/jobs/clock/inbound_webhooks_retry_job.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module Clock + class InboundWebhooksRetryJob < ApplicationJob + include SentryCronConcern + + queue_as 'clock' + + def perform + InboundWebhook.failed.find_each do |inbound_webhook| + InboundWebhooks::ProcessJob.perform_later(inbound_webhook:) + end + end + end +end diff --git a/clock.rb b/clock.rb index 438110ef7ae..76fdaacfca7 100644 --- a/clock.rb +++ b/clock.rb @@ -152,4 +152,10 @@ module Clockwork .set(sentry: {"slug" => "lago_retry_failed_invoices", "cron" => '*/15 * * * *'}) .perform_later end + + every(15.minutes, "schedule:retry_failed_inbound_webhooks") do + Clock::InboundWebhooksRetryJob + .set(sentry: {"slug" => "lago_retry_failed_inbound_webhooks", "cron" => '*/15 * * * *'}) + .perform_later + end end diff --git a/spec/clockwork_spec.rb b/spec/clockwork_spec.rb index a77fb9fddb4..8e50669d660 100644 --- a/spec/clockwork_spec.rb +++ b/spec/clockwork_spec.rb @@ -198,4 +198,25 @@ expect(Clock::InboundWebhooksCleanupJob).to have_been_enqueued end end + + describe "schedule:retry_failed_inbound_webhooks" do + let(:job) { "schedule:retry_failed_inbound_webhooks" } + let(:start_time) { Time.zone.parse("1 Apr 2022 00:05:00") } + let(:end_time) { Time.zone.parse("1 Apr 2022 00:20:00") } + + it "enqueue a retry failed inbound webhooks job" do + Clockwork::Test.run( + file: clock_file, + start_time:, + end_time:, + tick_speed: 1.minute + ) + + expect(Clockwork::Test).to be_ran_job(job) + expect(Clockwork::Test.times_run(job)).to eq(1) + + Clockwork::Test.block_for(job).call + expect(Clock::InboundWebhooksRetryJob).to have_been_enqueued + end + end end diff --git a/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb new file mode 100644 index 00000000000..e198f7e54f6 --- /dev/null +++ b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe Clock::InboundWebhooksRetryJob, job: true do + subject(:inbound_webhooks_retry_job) { described_class } + + describe '.perform' do + let(:pending_inbound_webhook) { create :inbound_webhook, status: "pending" } + let(:failed_inbound_webhook) { create :inbound_webhook, status: "failed" } + let(:processing_inbound_webhoook) { create :inbound_webhook, status: "processing" } + let(:processed_inbound_webhook) { create :inbound_webhook, status: "processed" } + + before do + pending_inbound_webhook + failed_inbound_webhook + processed_inbound_webhook + processed_inbound_webhook + end + + it "queues a job to process the failed inbound webhook" do + inbound_webhooks_retry_job.perform_now + + expect(InboundWebhooks::ProcessJob).to have_been_enqueued.once + expect(InboundWebhooks::ProcessJob) + .to have_been_enqueued + .with(inbound_webhook: failed_inbound_webhook) + end + end +end From 61e77a24a51c222c5cb4bc580d7a9c9eb7ed0499 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Tue, 17 Dec 2024 17:02:20 +0000 Subject: [PATCH 05/17] Align with dive-in, retry inbound webhooks when stuck for 12 hours skip failed and processed. --- .../clock/inbound_webhooks_cleanup_job.rb | 4 +- app/jobs/clock/inbound_webhooks_retry_job.rb | 7 +- clock.rb | 4 +- spec/clockwork_spec.rb | 6 +- .../clock/inbound_webhooks_retry_job_spec.rb | 82 +++++++++++++++---- 5 files changed, 80 insertions(+), 23 deletions(-) diff --git a/app/jobs/clock/inbound_webhooks_cleanup_job.rb b/app/jobs/clock/inbound_webhooks_cleanup_job.rb index 2b8e07ff205..1026d56899c 100644 --- a/app/jobs/clock/inbound_webhooks_cleanup_job.rb +++ b/app/jobs/clock/inbound_webhooks_cleanup_job.rb @@ -4,10 +4,10 @@ module Clock class InboundWebhooksCleanupJob < ApplicationJob include SentryCronConcern - queue_as 'clock' + queue_as "clock" def perform - InboundWebhook.where('updated_at < ?', 90.days.ago).destroy_all + InboundWebhook.where("updated_at < ?", 90.days.ago).destroy_all end end end diff --git a/app/jobs/clock/inbound_webhooks_retry_job.rb b/app/jobs/clock/inbound_webhooks_retry_job.rb index 4b35a747e43..0ae52f07c5f 100644 --- a/app/jobs/clock/inbound_webhooks_retry_job.rb +++ b/app/jobs/clock/inbound_webhooks_retry_job.rb @@ -4,10 +4,13 @@ module Clock class InboundWebhooksRetryJob < ApplicationJob include SentryCronConcern - queue_as 'clock' + queue_as "clock" def perform - InboundWebhook.failed.find_each do |inbound_webhook| + InboundWebhook + .where(status: ["pending", "processing"]) + .where("updated_at < ?", 12.hours.ago) + .find_each do |inbound_webhook| InboundWebhooks::ProcessJob.perform_later(inbound_webhook:) end end diff --git a/clock.rb b/clock.rb index 76fdaacfca7..074f888ae0f 100644 --- a/clock.rb +++ b/clock.rb @@ -153,9 +153,9 @@ module Clockwork .perform_later end - every(15.minutes, "schedule:retry_failed_inbound_webhooks") do + every(15.minutes, "schedule:retry_inbound_webhooks") do Clock::InboundWebhooksRetryJob - .set(sentry: {"slug" => "lago_retry_failed_inbound_webhooks", "cron" => '*/15 * * * *'}) + .set(sentry: {"slug" => "lago_retry_inbound_webhooks", "cron" => '*/15 * * * *'}) .perform_later end end diff --git a/spec/clockwork_spec.rb b/spec/clockwork_spec.rb index 8e50669d660..bd08f12de0d 100644 --- a/spec/clockwork_spec.rb +++ b/spec/clockwork_spec.rb @@ -199,12 +199,12 @@ end end - describe "schedule:retry_failed_inbound_webhooks" do - let(:job) { "schedule:retry_failed_inbound_webhooks" } + describe "schedule:retry_inbound_webhooks" do + let(:job) { "schedule:retry_inbound_webhooks" } let(:start_time) { Time.zone.parse("1 Apr 2022 00:05:00") } let(:end_time) { Time.zone.parse("1 Apr 2022 00:20:00") } - it "enqueue a retry failed inbound webhooks job" do + it "enqueue a retry inbound webhooks job" do Clockwork::Test.run( file: clock_file, start_time:, diff --git a/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb index e198f7e54f6..92954fbe5dc 100644 --- a/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb +++ b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb @@ -1,30 +1,84 @@ # frozen_string_literal: true -require 'rails_helper' +require "rails_helper" describe Clock::InboundWebhooksRetryJob, job: true do subject(:inbound_webhooks_retry_job) { described_class } - describe '.perform' do - let(:pending_inbound_webhook) { create :inbound_webhook, status: "pending" } + describe ".perform" do + let(:inbound_webhook) { create :inbound_webhook, status:, updated_at: } let(:failed_inbound_webhook) { create :inbound_webhook, status: "failed" } let(:processing_inbound_webhoook) { create :inbound_webhook, status: "processing" } let(:processed_inbound_webhook) { create :inbound_webhook, status: "processed" } - before do - pending_inbound_webhook - failed_inbound_webhook - processed_inbound_webhook - processed_inbound_webhook + before { inbound_webhook } + + context "when inbound webhook is pending" do + let(:status) { "pending" } + let(:updated_at) { 11.hours.ago } + + it "does not queue a job" do + inbound_webhooks_retry_job.perform_now + + expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued + end + + context "when inbound webhook has not being updated for more than 12 hours" do + let(:updated_at) { 12.hours.ago } + + it "queues a job to process the failed inbound webhook" do + inbound_webhooks_retry_job.perform_now + + expect(InboundWebhooks::ProcessJob) + .to have_been_enqueued + .with(inbound_webhook: inbound_webhook) + end + end + end + + context "when inbound webhook is processing" do + let(:status) { "processing" } + let(:updated_at) { 11.hours.ago } + + it "does not queue a job" do + inbound_webhooks_retry_job.perform_now + + expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued + end + + context "when inbound webhook has not being updated for more than 12 hours" do + let(:updated_at) { 12.hours.ago } + + it "queues a job to process the failed inbound webhook" do + inbound_webhooks_retry_job.perform_now + + expect(InboundWebhooks::ProcessJob) + .to have_been_enqueued + .with(inbound_webhook: inbound_webhook) + end + end end - it "queues a job to process the failed inbound webhook" do - inbound_webhooks_retry_job.perform_now + context "when inbound webhook is failed" do + let(:status) { "failed" } + let(:updated_at) { 1.day.ago } + + it "does not queue a job" do + inbound_webhooks_retry_job.perform_now + + expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued + end + end + + context "when inbound webhook is processed" do + let(:status) { "processed" } + let(:updated_at) { 1.day.ago } + + it "does not queue a job" do + inbound_webhooks_retry_job.perform_now - expect(InboundWebhooks::ProcessJob).to have_been_enqueued.once - expect(InboundWebhooks::ProcessJob) - .to have_been_enqueued - .with(inbound_webhook: failed_inbound_webhook) + expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued + end end end end From 21425791403e97ea16576ad1b4f6647abf928626 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Tue, 17 Dec 2024 17:09:37 +0000 Subject: [PATCH 06/17] retry inbound webhooks after two hours being stuck in pending or processing status... --- app/jobs/clock/inbound_webhooks_retry_job.rb | 2 +- spec/jobs/clock/inbound_webhooks_retry_job_spec.rb | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/app/jobs/clock/inbound_webhooks_retry_job.rb b/app/jobs/clock/inbound_webhooks_retry_job.rb index 0ae52f07c5f..a555e486033 100644 --- a/app/jobs/clock/inbound_webhooks_retry_job.rb +++ b/app/jobs/clock/inbound_webhooks_retry_job.rb @@ -9,7 +9,7 @@ class InboundWebhooksRetryJob < ApplicationJob def perform InboundWebhook .where(status: ["pending", "processing"]) - .where("updated_at < ?", 12.hours.ago) + .where("updated_at < ?", 2.hours.ago) .find_each do |inbound_webhook| InboundWebhooks::ProcessJob.perform_later(inbound_webhook:) end diff --git a/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb index 92954fbe5dc..14d68eda683 100644 --- a/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb +++ b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb @@ -15,7 +15,7 @@ context "when inbound webhook is pending" do let(:status) { "pending" } - let(:updated_at) { 11.hours.ago } + let(:updated_at) { 110.minutes.ago } it "does not queue a job" do inbound_webhooks_retry_job.perform_now @@ -23,8 +23,8 @@ expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued end - context "when inbound webhook has not being updated for more than 12 hours" do - let(:updated_at) { 12.hours.ago } + context "when inbound webhook has not being updated for more than 2 hours" do + let(:updated_at) { 2.hours.ago } it "queues a job to process the failed inbound webhook" do inbound_webhooks_retry_job.perform_now @@ -38,7 +38,7 @@ context "when inbound webhook is processing" do let(:status) { "processing" } - let(:updated_at) { 11.hours.ago } + let(:updated_at) { 110.minutes.ago } it "does not queue a job" do inbound_webhooks_retry_job.perform_now @@ -46,8 +46,8 @@ expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued end - context "when inbound webhook has not being updated for more than 12 hours" do - let(:updated_at) { 12.hours.ago } + context "when inbound webhook has not being updated for more than 2 hours" do + let(:updated_at) { 2.hours.ago } it "queues a job to process the failed inbound webhook" do inbound_webhooks_retry_job.perform_now From 70a516e699de7e76047cdc62c1e0b55bff676fec Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Thu, 19 Dec 2024 12:35:41 +0000 Subject: [PATCH 07/17] Add proccessing_at to InboundWebhooks --- app/models/inbound_webhook.rb | 5 +++++ ...1219122151_add_processing_at_to_inbound_webhooks.rb | 7 +++++++ db/schema.rb | 3 ++- spec/models/inbound_webhook_spec.rb | 10 ++++++++++ 4 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 db/migrate/20241219122151_add_processing_at_to_inbound_webhooks.rb diff --git a/app/models/inbound_webhook.rb b/app/models/inbound_webhook.rb index cf3d85a6508..9514e234f56 100644 --- a/app/models/inbound_webhook.rb +++ b/app/models/inbound_webhook.rb @@ -13,6 +13,10 @@ class InboundWebhook < ApplicationRecord } enum :status, STATUSES + + def processing! + update!(status: :processing, processing_at: Time.zone.now) + end end # == Schema Information @@ -23,6 +27,7 @@ class InboundWebhook < ApplicationRecord # code :string # event_type :string not null # payload :jsonb not null +# processing_at :datetime # signature :string # source :string not null # status :string default("pending"), not null diff --git a/db/migrate/20241219122151_add_processing_at_to_inbound_webhooks.rb b/db/migrate/20241219122151_add_processing_at_to_inbound_webhooks.rb new file mode 100644 index 00000000000..7325997062e --- /dev/null +++ b/db/migrate/20241219122151_add_processing_at_to_inbound_webhooks.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class AddProcessingAtToInboundWebhooks < ActiveRecord::Migration[7.1] + def change + add_column :inbound_webhooks, :processing_at, :timestamp, precision: nil + end +end diff --git a/db/schema.rb b/db/schema.rb index 6cc5cbb33c2..b80cf3c0aff 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_12_17_120924) do +ActiveRecord::Schema[7.1].define(version: 2024_12_19_122151) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -731,6 +731,7 @@ t.string "signature" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.datetime "processing_at", precision: nil t.index ["organization_id"], name: "index_inbound_webhooks_on_organization_id" end diff --git a/spec/models/inbound_webhook_spec.rb b/spec/models/inbound_webhook_spec.rb index 68b69a14d59..a4a0df8dbda 100644 --- a/spec/models/inbound_webhook_spec.rb +++ b/spec/models/inbound_webhook_spec.rb @@ -13,4 +13,14 @@ it { is_expected.to validate_presence_of(:status) } it { is_expected.to be_pending } + + describe "#processing!" do + it "updates status and processing_at" do + freeze_time do + expect { inbound_webhook.processing! } + .to change(inbound_webhook, :status).to("processing") + .and change(inbound_webhook, :processing_at).to(Time.zone.now) + end + end + end end From d852e15452e67bb637963ab02125cb83eadda787 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Thu, 19 Dec 2024 13:33:17 +0000 Subject: [PATCH 08/17] Add guard clauses based on inbound webhook status for process service --- .../inbound_webhooks/process_service.rb | 10 ++++ .../inbound_webhooks/process_service_spec.rb | 54 +++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/app/services/inbound_webhooks/process_service.rb b/app/services/inbound_webhooks/process_service.rb index 2646defed74..2ac844b7ba8 100644 --- a/app/services/inbound_webhooks/process_service.rb +++ b/app/services/inbound_webhooks/process_service.rb @@ -2,6 +2,8 @@ module InboundWebhooks class ProcessService < BaseService + WEBHOOK_PROCESSING_WINDOW = 2.hours + WEBHOOK_HANDLER_SERVICES = { stripe: PaymentProviders::Stripe::HandleIncomingWebhookService } @@ -13,6 +15,10 @@ def initialize(inbound_webhook:) end def call + return result if within_processing_window? + return result if inbound_webhook.failed? + return result if inbound_webhook.processed? + inbound_webhook.processing! handler_result = handler_service_klass.call(inbound_webhook:) @@ -44,5 +50,9 @@ def handler_service_klass def webhook_source inbound_webhook.source.to_sym end + + def within_processing_window? + inbound_webhook.processing? && inbound_webhook.processing_at > WEBHOOK_PROCESSING_WINDOW.ago + end end end diff --git a/spec/services/inbound_webhooks/process_service_spec.rb b/spec/services/inbound_webhooks/process_service_spec.rb index ef5fb517dec..fe10061dea7 100644 --- a/spec/services/inbound_webhooks/process_service_spec.rb +++ b/spec/services/inbound_webhooks/process_service_spec.rb @@ -35,6 +35,60 @@ end end + context "when inbound webhook is within processing window" do + let(:inbound_webhook) do + create( + :inbound_webhook, + source: webhook_source, + status: "processing", + processing_at: 119.minutes.ago + ) + end + + it "does not process the webhook" do + expect(result).to be_success + expect(PaymentProviders::Stripe::HandleIncomingWebhookService) + .not_to have_received(:call) + end + end + + context "when inbound webhook is outside the processing window" do + let(:inbound_webhook) do + create( + :inbound_webhook, + source: webhook_source, + status: "processing", + processing_at: 121.minutes.ago + ) + end + + it "processes the webhook as normal" do + expect(result).to be_success + end + end + + context "when inbound webhook has failed" do + let(:inbound_webhook) { create :inbound_webhook, source: webhook_source, status: } + let(:status) { "failed" } + + it "does not process the webhook" do + expect(result).to be_success + expect(PaymentProviders::Stripe::HandleIncomingWebhookService) + .not_to have_received(:call) + end + end + + context "when inbound webhook has been processed" do + let(:inbound_webhook) { create :inbound_webhook, source: webhook_source, status: } + let(:status) { "processed" } + + it "does not process the webhook" do + expect(result).to be_success + expect(PaymentProviders::Stripe::HandleIncomingWebhookService) + .not_to have_received(:call) + end + end + context "when webhook source is Stripe" do let(:webhook_source) { "stripe" } From f7e5254dda50f9d8ada25dfc2ee91d06d57255d8 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Thu, 19 Dec 2024 14:38:37 +0000 Subject: [PATCH 09/17] Retry retriable inbound webhooks from clock job retriable is defined as inbound webhooks in: - processing status, and with processing_at earlier than 2 hours ago - pending status, and created_at earlier than 2 hours ago --- app/jobs/clock/inbound_webhooks_retry_job.rb | 5 +--- app/models/inbound_webhook.rb | 4 +++ .../clock/inbound_webhooks_retry_job_spec.rb | 27 +++++++++---------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/app/jobs/clock/inbound_webhooks_retry_job.rb b/app/jobs/clock/inbound_webhooks_retry_job.rb index a555e486033..13c46e9757d 100644 --- a/app/jobs/clock/inbound_webhooks_retry_job.rb +++ b/app/jobs/clock/inbound_webhooks_retry_job.rb @@ -7,10 +7,7 @@ class InboundWebhooksRetryJob < ApplicationJob queue_as "clock" def perform - InboundWebhook - .where(status: ["pending", "processing"]) - .where("updated_at < ?", 2.hours.ago) - .find_each do |inbound_webhook| + InboundWebhook.retriable.find_each do |inbound_webhook| InboundWebhooks::ProcessJob.perform_later(inbound_webhook:) end end diff --git a/app/models/inbound_webhook.rb b/app/models/inbound_webhook.rb index 9514e234f56..bc9788c6ebd 100644 --- a/app/models/inbound_webhook.rb +++ b/app/models/inbound_webhook.rb @@ -14,6 +14,10 @@ class InboundWebhook < ApplicationRecord enum :status, STATUSES + scope :retriable, -> { reprocessable.or(old_pending) } + scope :reprocessable, -> { processing.where("processing_at <= ?", 2.hours.ago) } + scope :old_pending, -> { pending.where("created_at <= ?", 2.hours.ago) } + def processing! update!(status: :processing, processing_at: Time.zone.now) end diff --git a/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb index 14d68eda683..89117c24a31 100644 --- a/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb +++ b/spec/jobs/clock/inbound_webhooks_retry_job_spec.rb @@ -6,16 +6,12 @@ subject(:inbound_webhooks_retry_job) { described_class } describe ".perform" do - let(:inbound_webhook) { create :inbound_webhook, status:, updated_at: } - let(:failed_inbound_webhook) { create :inbound_webhook, status: "failed" } - let(:processing_inbound_webhoook) { create :inbound_webhook, status: "processing" } - let(:processed_inbound_webhook) { create :inbound_webhook, status: "processed" } - before { inbound_webhook } context "when inbound webhook is pending" do + let(:inbound_webhook) { create :inbound_webhook, status:, created_at: } let(:status) { "pending" } - let(:updated_at) { 110.minutes.ago } + let(:created_at) { 119.minutes.ago } it "does not queue a job" do inbound_webhooks_retry_job.perform_now @@ -23,10 +19,10 @@ expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued end - context "when inbound webhook has not being updated for more than 2 hours" do - let(:updated_at) { 2.hours.ago } + context "when inbound webhook was created more than 2 hours ago" do + let(:created_at) { 121.hours.ago } - it "queues a job to process the failed inbound webhook" do + it "queues a job to process the inbound webhook" do inbound_webhooks_retry_job.perform_now expect(InboundWebhooks::ProcessJob) @@ -37,8 +33,9 @@ end context "when inbound webhook is processing" do + let(:inbound_webhook) { create :inbound_webhook, status:, processing_at: } let(:status) { "processing" } - let(:updated_at) { 110.minutes.ago } + let(:processing_at) { 119.minutes.ago } it "does not queue a job" do inbound_webhooks_retry_job.perform_now @@ -46,10 +43,10 @@ expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued end - context "when inbound webhook has not being updated for more than 2 hours" do - let(:updated_at) { 2.hours.ago } + context "when inbound webhook started processing more than 2 hours ago" do + let(:processing_at) { 121.minutes.ago } - it "queues a job to process the failed inbound webhook" do + it "queues a job to process the inbound webhook" do inbound_webhooks_retry_job.perform_now expect(InboundWebhooks::ProcessJob) @@ -60,8 +57,8 @@ end context "when inbound webhook is failed" do + let(:inbound_webhook) { create :inbound_webhook, status: } let(:status) { "failed" } - let(:updated_at) { 1.day.ago } it "does not queue a job" do inbound_webhooks_retry_job.perform_now @@ -71,8 +68,8 @@ end context "when inbound webhook is processed" do + let(:inbound_webhook) { create :inbound_webhook, status: } let(:status) { "processed" } - let(:updated_at) { 1.day.ago } it "does not queue a job" do inbound_webhooks_retry_job.perform_now From eb955bc70c3c822fff9424c9af7397d64ce501f2 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Thu, 19 Dec 2024 14:41:32 +0000 Subject: [PATCH 10/17] refactor --- app/models/inbound_webhook.rb | 6 ++++-- app/services/inbound_webhooks/process_service.rb | 4 +--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/app/models/inbound_webhook.rb b/app/models/inbound_webhook.rb index bc9788c6ebd..8ca86032b92 100644 --- a/app/models/inbound_webhook.rb +++ b/app/models/inbound_webhook.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class InboundWebhook < ApplicationRecord + WEBHOOK_PROCESSING_WINDOW = 2.hours + belongs_to :organization validates :event_type, :payload, :source, :status, presence: true @@ -15,8 +17,8 @@ class InboundWebhook < ApplicationRecord enum :status, STATUSES scope :retriable, -> { reprocessable.or(old_pending) } - scope :reprocessable, -> { processing.where("processing_at <= ?", 2.hours.ago) } - scope :old_pending, -> { pending.where("created_at <= ?", 2.hours.ago) } + scope :reprocessable, -> { processing.where("processing_at <= ?", WEBHOOK_PROCESSING_WINDOW.ago) } + scope :old_pending, -> { pending.where("created_at <= ?", WEBHOOK_PROCESSING_WINDOW.ago) } def processing! update!(status: :processing, processing_at: Time.zone.now) diff --git a/app/services/inbound_webhooks/process_service.rb b/app/services/inbound_webhooks/process_service.rb index 2ac844b7ba8..9990396ecec 100644 --- a/app/services/inbound_webhooks/process_service.rb +++ b/app/services/inbound_webhooks/process_service.rb @@ -2,8 +2,6 @@ module InboundWebhooks class ProcessService < BaseService - WEBHOOK_PROCESSING_WINDOW = 2.hours - WEBHOOK_HANDLER_SERVICES = { stripe: PaymentProviders::Stripe::HandleIncomingWebhookService } @@ -52,7 +50,7 @@ def webhook_source end def within_processing_window? - inbound_webhook.processing? && inbound_webhook.processing_at > WEBHOOK_PROCESSING_WINDOW.ago + inbound_webhook.processing? && inbound_webhook.processing_at > InboundWebhook::WEBHOOK_PROCESSING_WINDOW.ago end end end From 665b64c6400b684a4d9896559cbd082925f48cd9 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Mon, 23 Dec 2024 13:59:39 +0000 Subject: [PATCH 11/17] Add Stripe Service to Validate incoming webhooks payload --- .../validate_incoming_webhook_service.rb | 36 +++++++++++++ .../validate_incoming_webhook_service_spec.rb | 53 +++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 app/services/payment_providers/stripe/validate_incoming_webhook_service.rb create mode 100644 spec/services/payment_providers/stripe/validate_incoming_webhook_service_spec.rb diff --git a/app/services/payment_providers/stripe/validate_incoming_webhook_service.rb b/app/services/payment_providers/stripe/validate_incoming_webhook_service.rb new file mode 100644 index 00000000000..c0a544c89a3 --- /dev/null +++ b/app/services/payment_providers/stripe/validate_incoming_webhook_service.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module PaymentProviders + module Stripe + class ValidateIncomingWebhookService < BaseService + def initialize(payload:, signature:, payment_provider:) + @payload = payload + @signature = signature + @provider = payment_provider + + super + end + + def call + ::Stripe::Webhook::Signature.verify_header( + payload, + signature, + webhook_secret, + tolerance: ::Stripe::Webhook::DEFAULT_TOLERANCE + ) + + result + rescue ::Stripe::SignatureVerificationError + result.service_failure!(code: "webhook_error", message: "Invalid signature") + end + + private + + attr_reader :payload, :signature, :provider + + def webhook_secret + provider.webhook_secret + end + end + end +end diff --git a/spec/services/payment_providers/stripe/validate_incoming_webhook_service_spec.rb b/spec/services/payment_providers/stripe/validate_incoming_webhook_service_spec.rb new file mode 100644 index 00000000000..871b05bcf3c --- /dev/null +++ b/spec/services/payment_providers/stripe/validate_incoming_webhook_service_spec.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe PaymentProviders::Stripe::ValidateIncomingWebhookService, type: :service do + subject(:result) do + described_class.call(payload:, signature:, payment_provider:) + end + + let(:payload) { "webhook_payload" } + let(:signature) { "signature" } + let(:payment_provider) { create(:stripe_provider, webhook_secret:) } + let(:webhook_secret) { "webhook_secret" } + let(:payload) { "{stripe incoming webhook JSON payload}" } + let(:stripe_default_tolerance) { 300 } + + before do + allow(::Stripe::Webhook::Signature).to receive(:verify_header).and_return(true) + end + + it "validates the payload" do + expect(result).to be_success + + expect(::Stripe::Webhook::Signature) + .to have_received(:verify_header) + .with( + payload, + signature, + webhook_secret, + tolerance: stripe_default_tolerance + ).once + end + + context "when signature is invalid" do + before do + allow(::Stripe::Webhook::Signature) + .to receive(:verify_header) + .and_raise( + ::Stripe::SignatureVerificationError.new( + "Unable to extract timestamp and signatures from header", + signature, + http_body: payload + ) + ) + end + + it "returns a service failure" do + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ServiceFailure) + expect(result.error.message).to eq("webhook_error: Invalid signature") + end + end +end From fbf273d44d3b3829e823220c80dde7b9820078b2 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Mon, 23 Dec 2024 14:13:21 +0000 Subject: [PATCH 12/17] Add service to validate payload of inbound webhooks --- .../validate_payload_service.rb | 54 ++++++++++++++ .../validate_payload_service_spec.rb | 73 +++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 app/services/inbound_webhooks/validate_payload_service.rb create mode 100644 spec/services/inbound_webhooks/validate_payload_service_spec.rb diff --git a/app/services/inbound_webhooks/validate_payload_service.rb b/app/services/inbound_webhooks/validate_payload_service.rb new file mode 100644 index 00000000000..99a6afbbd2c --- /dev/null +++ b/app/services/inbound_webhooks/validate_payload_service.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module InboundWebhooks + class ValidatePayloadService < BaseService + WEBHOOK_SOURCES = { + stripe: PaymentProviders::Stripe::ValidateIncomingWebhookService + } + + def initialize(organization_id:, code:, payload:, webhook_source:, signature:) + @organization_id = organization_id + @code = code + @payload = payload + @signature = signature + @webhook_source = webhook_source&.to_sym + + super + end + + def call + return result.service_failure!(code: "webhook_error", message: "Invalid webhook source") unless webhook_source_valid? + return payment_provider_result unless payment_provider_result.success? + + validate_webhook_payload_result + end + + private + + attr_reader :organization_id, :code, :payload, :signature, :webhook_source + + def webhook_source_valid? + WEBHOOK_SOURCES.include?(webhook_source) + end + + def validate_webhook_payload_result + WEBHOOK_SOURCES[webhook_source].call( + payload:, + signature:, + payment_provider: + ) + end + + def payment_provider + payment_provider_result.payment_provider + end + + def payment_provider_result + @payment_provider_result ||= PaymentProviders::FindService.call( + organization_id:, + code:, + payment_provider_type: webhook_source.to_s + ) + end + end +end diff --git a/spec/services/inbound_webhooks/validate_payload_service_spec.rb b/spec/services/inbound_webhooks/validate_payload_service_spec.rb new file mode 100644 index 00000000000..3cfa48957c5 --- /dev/null +++ b/spec/services/inbound_webhooks/validate_payload_service_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe InboundWebhooks::ValidatePayloadService, type: :service do + subject(:result) do + described_class.call( + organization_id: organization.id, + code:, + payload:, + webhook_source:, + signature: + ) + end + + let(:organization) { create(:organization) } + let(:code) { "payment_provider_1" } + let(:payload) { "webhook_payload" } + let(:signature) { "signature" } + let(:webhook_source) { "stripe" } + + context "when webhook source is unknown" do + let(:webhook_source) { "unknown" } + + it "returns an error" do + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ServiceFailure) + expect(result.error.message).to eq("webhook_error: Invalid webhook source") + end + end + + context "when payment provider is not found" do + it "returns a service failure" do + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ServiceFailure) + expect(result.error.message).to eq("payment_provider_not_found: Payment provider not found") + end + end + + context "when webhook source is stripe" do + let(:webhook_source) { "stripe" } + let(:payload) { "webhook_payload" } + + before do + allow(::Stripe::Webhook::Signature).to receive(:verify_header).and_return(true) + create(:stripe_provider, organization:, code:) + end + + it "validates the payload" do + expect(result).to be_success + end + + context "when signature is invalid" do + before do + allow(::Stripe::Webhook::Signature) + .to receive(:verify_header) + .and_raise( + ::Stripe::SignatureVerificationError.new( + "Unable to extract timestamp and signatures from header", + signature, + http_body: payload + ) + ) + end + + it "returns a service failure" do + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ServiceFailure) + expect(result.error.message).to eq("webhook_error: Invalid signature") + end + end + end +end From 7ef935f92df993dbdae4a937f2fb6a56742f751f Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Mon, 23 Dec 2024 14:19:06 +0000 Subject: [PATCH 13/17] Validate webhook payload before create inbound webhook and queue job --- .../inbound_webhooks/create_service.rb | 12 ++++++++++ .../inbound_webhooks/create_service_spec.rb | 23 ++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/app/services/inbound_webhooks/create_service.rb b/app/services/inbound_webhooks/create_service.rb index 9dce52f42f3..b7544985847 100644 --- a/app/services/inbound_webhooks/create_service.rb +++ b/app/services/inbound_webhooks/create_service.rb @@ -14,6 +14,8 @@ def initialize(organization_id:, webhook_source:, payload:, event_type:, code: n end def call + return validate_payload_result unless validate_payload_result.success? + inbound_webhook = InboundWebhook.create!( organization_id:, source: webhook_source, @@ -36,5 +38,15 @@ def call private attr_reader :organization_id, :webhook_source, :code, :payload, :signature, :event_type + + def validate_payload_result + @validate_payload_result ||= InboundWebhooks::ValidatePayloadService.call( + organization_id:, + code:, + payload:, + signature:, + webhook_source: + ) + end end end diff --git a/spec/services/inbound_webhooks/create_service_spec.rb b/spec/services/inbound_webhooks/create_service_spec.rb index f14db412f2c..2f6e12e349e 100644 --- a/spec/services/inbound_webhooks/create_service_spec.rb +++ b/spec/services/inbound_webhooks/create_service_spec.rb @@ -20,12 +20,19 @@ let(:signature) { "signature" } let(:payload) { event.merge(code:).to_json } let(:event_type) { "payment_intent.successful" } + let(:validation_payload_result) { BaseService::Result.new } let(:event) do path = Rails.root.join("spec/fixtures/stripe/payment_intent_event.json") JSON.parse(File.read(path)) end + before do + allow(InboundWebhooks::ValidatePayloadService) + .to receive(:call) + .and_return(validation_payload_result) + end + it "creates an inbound webhook" do expect { result }.to change(InboundWebhook, :count).by(1) end @@ -43,7 +50,7 @@ .with(inbound_webhook: result.inbound_webhook) end - context "with validation error" do + context "with record validation error" do let(:webhook_source) { nil } it "returns an error" do @@ -58,4 +65,18 @@ expect(InboundWebhooks::ProcessJob).not_to have_been_enqueued end end + + context "when payload validation fails" do + let(:validation_payload_result) do + BaseService::Result.new.service_failure!( + code: "webhook_error", message: "Invalid signature" + ) + end + + it "returns an error" do + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ServiceFailure) + expect(result.error.message).to eq "webhook_error: Invalid signature" + end + end end From 31b59d6bef3a021a38788f8b24b8458629762a6c Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Mon, 23 Dec 2024 14:32:21 +0000 Subject: [PATCH 14/17] Update stripe handle incoming webhook only contruct event as paylaoad is already validated. --- .../stripe/handle_incoming_webhook_service.rb | 32 +++++++------------ .../handle_incoming_webhook_service_spec.rb | 32 ++----------------- 2 files changed, 15 insertions(+), 49 deletions(-) diff --git a/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb b/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb index c254496aa0f..d24f4699952 100644 --- a/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb +++ b/app/services/payment_providers/stripe/handle_incoming_webhook_service.rb @@ -12,36 +12,28 @@ def initialize(inbound_webhook:) end def call - payment_provider_result = PaymentProviders::FindService.call( - organization_id:, - code:, - payment_provider_type: "stripe" - ) - - return payment_provider_result unless payment_provider_result.success? - - event = ::Stripe::Webhook.construct_event( - payload, - signature, - payment_provider_result.payment_provider&.webhook_secret - ) - PaymentProviders::Stripe::HandleEventJob.perform_later( - organization: payment_provider_result.payment_provider.organization, - event: event.to_json + organization:, + event: stripe_event.to_json ) - result.event = event + result.event = stripe_event result rescue JSON::ParserError result.service_failure!(code: "webhook_error", message: "Invalid payload") - rescue ::Stripe::SignatureVerificationError - result.service_failure!(code: "webhook_error", message: "Invalid signature") end private - def_delegators :@inbound_webhook, :code, :organization_id, :payload, :signature + def_delegators :@inbound_webhook, :organization, :payload + + def stripe_event + @stripe_event ||= ::Stripe::Event.construct_from(json_payload) + end + + def json_payload + JSON.parse(payload, symbolize_names: true) + end end end end diff --git a/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb b/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb index 56b13b28737..29fd0d621e5 100644 --- a/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb +++ b/spec/services/payment_providers/stripe/handle_incoming_webhook_service_spec.rb @@ -5,11 +5,7 @@ RSpec.describe PaymentProviders::Stripe::HandleIncomingWebhookService, type: :service do subject(:result) { described_class.call(inbound_webhook:) } - let(:inbound_webhook) { create :inbound_webhook, organization:, code: } - let(:code) { nil } - let(:membership) { create(:membership) } - let(:organization) { membership.organization } - let(:stripe_provider) { create(:stripe_provider, organization:) } + let(:inbound_webhook) { create :inbound_webhook } let(:event_result) { Stripe::Event.construct_from(event) } let(:event) do @@ -17,42 +13,20 @@ JSON.parse(File.read(path)) end - before { stripe_provider } - it "checks the webhook" do - allow(::Stripe::Webhook).to receive(:construct_event) - .and_return(event_result) - expect(result).to be_success expect(result.event).to eq(event_result) expect(PaymentProviders::Stripe::HandleEventJob).to have_been_enqueued end context "when failing to parse payload" do - it "returns an error" do - allow(::Stripe::Webhook).to receive(:construct_event) - .and_raise(JSON::ParserError) - - expect(result).not_to be_success - expect(result.error).to be_a(BaseService::ServiceFailure) - expect(result.error.code).to eq("webhook_error") - expect(result.error.error_message).to eq("Invalid payload") - end - end + let(:event) { "invalid" } - context "when failing to validate the signature" do it "returns an error" do - allow(::Stripe::Webhook).to receive(:construct_event) - .and_raise( - ::Stripe::SignatureVerificationError.new( - "error", "signature", http_body: event.to_json - ) - ) - expect(result).not_to be_success expect(result.error).to be_a(BaseService::ServiceFailure) expect(result.error.code).to eq("webhook_error") - expect(result.error.error_message).to eq("Invalid signature") + expect(result.error.error_message).to eq("Invalid payload") end end end From 1718a1f9af12d3629399eb28dbbb7d8551f4aef2 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Mon, 23 Dec 2024 15:22:11 +0000 Subject: [PATCH 15/17] Update app/jobs/inbound_webhooks/process_job.rb Co-authored-by: Vincent Pochet --- app/jobs/inbound_webhooks/process_job.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/jobs/inbound_webhooks/process_job.rb b/app/jobs/inbound_webhooks/process_job.rb index 7ef646e9373..c631c8d4930 100644 --- a/app/jobs/inbound_webhooks/process_job.rb +++ b/app/jobs/inbound_webhooks/process_job.rb @@ -5,7 +5,7 @@ class ProcessJob < ApplicationJob queue_as :default def perform(inbound_webhook:) - InboundWebhooks::ProcessService.call(inbound_webhook:).raise_if_error! + InboundWebhooks::ProcessService.call!(inbound_webhook:) end end end From 3b43f6f7aef10b70a5cd5476daa7d3b19dab1b68 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Mon, 23 Dec 2024 16:11:07 +0000 Subject: [PATCH 16/17] Use enum for inbound webhook status column and add indices --- app/models/inbound_webhook.rb | 8 +++++--- .../inbound_webhooks/process_service.rb | 4 ++-- .../20241213182343_create_inbound_webhooks.rb | 4 +++- ...223154437_add_indices_to_inbound_webhooks.rb | 17 +++++++++++++++++ db/schema.rb | 7 +++++-- .../inbound_webhooks/process_service_spec.rb | 8 ++++---- 6 files changed, 36 insertions(+), 12 deletions(-) create mode 100644 db/migrate/20241223154437_add_indices_to_inbound_webhooks.rb diff --git a/app/models/inbound_webhook.rb b/app/models/inbound_webhook.rb index 8ca86032b92..cfc45cd54f0 100644 --- a/app/models/inbound_webhook.rb +++ b/app/models/inbound_webhook.rb @@ -10,7 +10,7 @@ class InboundWebhook < ApplicationRecord STATUSES = { pending: "pending", processing: "processing", - processed: "processed", + succeeded: "succeeded", failed: "failed" } @@ -36,14 +36,16 @@ def processing! # processing_at :datetime # signature :string # source :string not null -# status :string default("pending"), not null +# status :enum default("pending"), not null # created_at :datetime not null # updated_at :datetime not null # organization_id :uuid not null # # Indexes # -# index_inbound_webhooks_on_organization_id (organization_id) +# index_inbound_webhooks_on_organization_id (organization_id) +# index_inbound_webhooks_on_status_and_created_at (status,created_at) WHERE (status = 'pending'::inbound_webhook_status) +# index_inbound_webhooks_on_status_and_processing_at (status,processing_at) WHERE (status = 'processing'::inbound_webhook_status) # # Foreign Keys # diff --git a/app/services/inbound_webhooks/process_service.rb b/app/services/inbound_webhooks/process_service.rb index 9990396ecec..c56a3f31d1a 100644 --- a/app/services/inbound_webhooks/process_service.rb +++ b/app/services/inbound_webhooks/process_service.rb @@ -15,7 +15,7 @@ def initialize(inbound_webhook:) def call return result if within_processing_window? return result if inbound_webhook.failed? - return result if inbound_webhook.processed? + return result if inbound_webhook.succeeded? inbound_webhook.processing! @@ -26,7 +26,7 @@ def call return handler_result end - inbound_webhook.processed! + inbound_webhook.succeeded! result.inbound_webhook = inbound_webhook result diff --git a/db/migrate/20241213182343_create_inbound_webhooks.rb b/db/migrate/20241213182343_create_inbound_webhooks.rb index 8c49d305595..5c4e619ddcc 100644 --- a/db/migrate/20241213182343_create_inbound_webhooks.rb +++ b/db/migrate/20241213182343_create_inbound_webhooks.rb @@ -2,11 +2,13 @@ class CreateInboundWebhooks < ActiveRecord::Migration[7.1] def change + create_enum :inbound_webhook_status, %w[pending processing succeeded failed] + create_table :inbound_webhooks, id: :uuid do |t| t.string :source, null: false t.string :event_type, null: false t.jsonb :payload, null: false - t.string :status, null: false, default: 'pending' + t.enum :status, enum_type: "inbound_webhook_status", null: false, default: 'pending' t.belongs_to :organization, null: false, foreign_key: true, type: :uuid, index: true t.string :code t.string :signature diff --git a/db/migrate/20241223154437_add_indices_to_inbound_webhooks.rb b/db/migrate/20241223154437_add_indices_to_inbound_webhooks.rb new file mode 100644 index 00000000000..c949d8a3155 --- /dev/null +++ b/db/migrate/20241223154437_add_indices_to_inbound_webhooks.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddIndicesToInboundWebhooks < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + add_index :inbound_webhooks, + %i[status processing_at], + where: "status = 'processing'", + algorithm: :concurrently + + add_index :inbound_webhooks, + %i[status created_at], + where: "status = 'pending'", + algorithm: :concurrently + end +end diff --git a/db/schema.rb b/db/schema.rb index b80cf3c0aff..9c8ca1af163 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_12_19_122151) do +ActiveRecord::Schema[7.1].define(version: 2024_12_23_154437) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -21,6 +21,7 @@ create_enum "billable_metric_rounding_function", ["round", "floor", "ceil"] create_enum "billable_metric_weighted_interval", ["seconds"] create_enum "customer_type", ["company", "individual"] + create_enum "inbound_webhook_status", ["pending", "processing", "succeeded", "failed"] create_enum "payment_payable_payment_status", ["pending", "processing", "succeeded", "failed"] create_enum "subscription_invoicing_reason", ["subscription_starting", "subscription_periodic", "subscription_terminating", "in_advance_charge", "in_advance_charge_periodic", "progressive_billing"] create_enum "tax_status", ["pending", "succeeded", "failed"] @@ -725,7 +726,7 @@ t.string "source", null: false t.string "event_type", null: false t.jsonb "payload", null: false - t.string "status", default: "pending", null: false + t.enum "status", default: "pending", null: false, enum_type: "inbound_webhook_status" t.uuid "organization_id", null: false t.string "code" t.string "signature" @@ -733,6 +734,8 @@ t.datetime "updated_at", null: false t.datetime "processing_at", precision: nil t.index ["organization_id"], name: "index_inbound_webhooks_on_organization_id" + t.index ["status", "created_at"], name: "index_inbound_webhooks_on_status_and_created_at", where: "(status = 'pending'::inbound_webhook_status)" + t.index ["status", "processing_at"], name: "index_inbound_webhooks_on_status_and_processing_at", where: "(status = 'processing'::inbound_webhook_status)" end create_table "integration_collection_mappings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| diff --git a/spec/services/inbound_webhooks/process_service_spec.rb b/spec/services/inbound_webhooks/process_service_spec.rb index fe10061dea7..cac3a5c7614 100644 --- a/spec/services/inbound_webhooks/process_service_spec.rb +++ b/spec/services/inbound_webhooks/process_service_spec.rb @@ -78,9 +78,9 @@ end end - context "when inbound webhook has been processed" do + context "when inbound webhook has been succeeded" do let(:inbound_webhook) { create :inbound_webhook, source: webhook_source, status: } - let(:status) { "processed" } + let(:status) { "succeeded" } it "does not process the webhook" do expect(result).to be_success @@ -105,8 +105,8 @@ .with(inbound_webhook:) end - it "updated inbound webhook status to processed" do - expect { result }.to change(inbound_webhook, :status).to("processed") + it "updated inbound webhook status to succeeded" do + expect { result }.to change(inbound_webhook, :status).to("succeeded") end context "when the stripe webhook handling fails" do From e2becd3da163fc800c51e57a62593e3ebeaf7ff3 Mon Sep 17 00:00:00 2001 From: Ancor Cruz Date: Tue, 24 Dec 2024 07:33:54 +0000 Subject: [PATCH 17/17] remove after_commit --- app/services/inbound_webhooks/create_service.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/app/services/inbound_webhooks/create_service.rb b/app/services/inbound_webhooks/create_service.rb index b7544985847..450baba9796 100644 --- a/app/services/inbound_webhooks/create_service.rb +++ b/app/services/inbound_webhooks/create_service.rb @@ -25,9 +25,7 @@ def call event_type: ) - after_commit do - InboundWebhooks::ProcessJob.perform_later(inbound_webhook:) - end + InboundWebhooks::ProcessJob.perform_later(inbound_webhook:) result.inbound_webhook = inbound_webhook result