Skip to content

Commit

Permalink
Batch aggregation (#4303)
Browse files Browse the repository at this point in the history
* Aggregation migration, model, spec, factory

* Client & spec

* Policy and spec

* Schemas

* Update policy & schemas

* Update status enum

* Add default to status migration

* Update serializer

* Update factory

* Controller and spec

* Fix migration

* Hound

* Hound again

* Third round hound

* Feed hound

* Consistency

* Add project_id to Aggregations to allow Doorkeeper to scope by project

* Clarify admin specs, add collab spec

* Remove ignored_columns

* Add spec for failed service connections

* Add serializer spec

* Implement #destroy, add error specs for collisions

* Aggregation documentation

* reorder spec

* Fix hash alignment

* Remove aggregation association from subject

* Remove user uniqueness constraint

* MIgrate database, update structure.sql

* Resolve migrations
  • Loading branch information
zwolf authored Jun 13, 2024
1 parent 6b7c85e commit 509c773
Show file tree
Hide file tree
Showing 18 changed files with 604 additions and 153 deletions.
29 changes: 21 additions & 8 deletions app/controllers/api/v1/aggregations_controller.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
class Api::V1::AggregationsController < Api::ApiController
# include JsonApiController::PunditPolicy
# frozen_string_literal: true

# THIS FUNCTIONALITY IS BEING DEPRECATED EFFECTIVE IMMEDIATELY
# A REPLACEMENT IS FORTHCOMING.
class Api::V1::AggregationsController < Api::ApiController
include JsonApiController::PunditPolicy

# require_authentication :create, :update, scopes: [:project]
# resource_actions :create, :update, :show, :index
# schema_type :json_schema
# before_action :filter_by_subject_set, only: :index
require_authentication :index, :show, :update, :destroy, :create, scopes: [:project]
resource_actions :index, :show, :create, :update, :destroy
schema_type :json_schema

def create
workflow = Workflow.find(create_params['links']['workflow'])
project_id = workflow.project.id
create_params['links']['project'] = project_id
response = AggregationClient.new.send_aggregation_request(
project_id,
workflow.id,
create_params['links']['user']
)
super do |agg|
agg.update({ task_id: response.body[:task_id], status: 'pending' })
end
rescue AggregationClient::ConnectionError
json_api_render(:service_unavailable, 'The aggregation service is unavailable or not responding')
end
end
24 changes: 10 additions & 14 deletions app/models/aggregation.rb
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
# frozen_string_literal: true

class Aggregation < ApplicationRecord

belongs_to :workflow
belongs_to :subject

validates_presence_of :workflow, :subject, :aggregation
validates_uniqueness_of :subject_id, scope: :workflow_id
validate :aggregation, :workflow_version_present

private
belongs_to :project
belongs_to :user
validates :project, :workflow, :user, presence: true
validates :workflow, uniqueness: true

def workflow_version_present
wv_key = :workflow_version
if aggregation && !aggregation.symbolize_keys.has_key?(wv_key)
errors.add(:aggregation, "must have #{wv_key} metadata")
end
end
enum status: {
created: 0,
pending: 1,
completed: 2,
failed: 3
}
end
1 change: 0 additions & 1 deletion app/models/subject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class Subject < ApplicationRecord
class_name: "Medium",
as: :linked
has_many :recents, dependent: :destroy
has_many :aggregations, dependent: :destroy
has_many :tutorial_workflows,
class_name: 'Workflow',
foreign_key: 'tutorial_subject_id',
Expand Down
2 changes: 1 addition & 1 deletion app/models/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Workflow < ApplicationRecord
has_many :user_seen_subjects, dependent: :destroy
has_many :workflow_tutorials, dependent: :destroy
has_many :tutorials, through: :workflow_tutorials
has_many :aggregations, dependent: :destroy
has_one :aggregation, dependent: :destroy
has_many :attached_images, -> { where(type: "workflow_attached_image") }, class_name: "Medium",
as: :linked
has_one :classifications_export, -> { where(type: "workflow_classifications_export").order(created_at: :desc) },
Expand Down
27 changes: 12 additions & 15 deletions app/policies/aggregation_policy.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
# frozen_string_literal: true

class AggregationPolicy < ApplicationPolicy
class ReadScope < Scope
# Short circuiting scopes for private aggrevations before they get removed next PR
class Scope < Scope
def resolve(action)
parent_scope = policy_for(Workflow).scope_for(action)
parent_scope = policy_for(Workflow).scope_for(:update)
scope.where(workflow_id: parent_scope.select(:id))
end
end

class WriteScope < Scope
def resolve(action)
parent_scope = policy_for(Workflow).scope_for(action)
scope.where(workflow_id: parent_scope.select(:id))
end
end
scope :index, :show, :create, :update, :destroy, with: Scope

scope :index, :show, with: ReadScope
scope :update, :destroy, :versions, :version, with: WriteScope
def linkable_workflows
policy_for(Workflow).scope_for(:update)
end

def linkable_subjects
policy_for(Subject).scope_for(:show)
def linkable_projects
policy_for(Project).scope_for(:update)
end

def linkable_workflows
policy_for(Workflow).scope_for(:update)
def linkable_users
policy_for(User).scope_for(:update)
end
end
35 changes: 22 additions & 13 deletions app/schemas/aggregation_create_schema.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
class AggregationCreateSchema < JsonSchema
schema do
type "object"
description "An Aggregation for a subject"
required "links"
type 'object'
description 'An Aggregation for a workflow'
required 'links'
additional_properties false

property "aggregation" do
type "object"
property 'uuid' do
type 'string'
end

property "links" do
type "object"
additional_properties false
property 'task_id' do
type 'string'
end

required "subject", "workflow"
property 'status' do
type 'string'
end

property 'links' do
type 'object'
required 'workflow'
additional_properties false

property "subject" do
type "integer", "string"
property 'workflow' do
type 'integer', 'string'
pattern '^[0-9]*$'
end

property "workflow" do
type "integer", "string"
property 'user' do
type 'integer', 'string'
pattern '^[0-9]*$'
end
end
end
Expand Down
26 changes: 9 additions & 17 deletions app/schemas/aggregation_update_schema.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
class AggregationUpdateSchema < JsonSchema
schema do
type "object"
description "An Aggregation for a subject"
required "aggregation", "links"
type 'object'
description 'An Aggregation for a workflow'
additional_properties false

property "aggregation" do
type "object"
property 'uuid' do
type 'string'
end

property "links" do
type "object"
additional_properties false

required "subject", "workflow"

property "subject" do
type "integer", "string"
end
property 'task_id' do
type 'string'
end

property "workflow" do
type "integer", "string"
end
property 'status' do
type 'string'
end
end
end
6 changes: 3 additions & 3 deletions app/serializers/aggregation_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ class AggregationSerializer
include Serialization::PanoptesRestpack
include CachedSerializer

attributes :id, :created_at, :updated_at, :aggregation, :href
can_include :workflow, :subject
attributes :id, :href, :created_at, :updated_at, :uuid, :task_id, :status
can_include :project, :workflow, :user

can_filter_by :workflow, :subject
can_filter_by :project, :workflow
end
58 changes: 58 additions & 0 deletions app/services/aggregation_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

class AggregationClient
class ConnectionError < StandardError; end
class ResourceNotFound < ConnectionError; end
class ServerError < ConnectionError; end

attr_reader :connection

def initialize(adapter=Faraday.default_adapter)
@connection = connect!(adapter)
@host ||= ENV.fetch('AGGREGATION_HOST', 'http://test.example.com')
end

def connect!(adapter)
Faraday.new(@host, ssl: { verify: false }) do |faraday|
faraday.request :json
faraday.response :json, content_type: /\bjson$/
faraday.adapter(*adapter)
end
end

def send_aggregation_request(project_id, workflow_id, user_id)
params = { project_id: project_id, workflow_id: workflow_id, user_id: user_id }

request(:post, '/run_aggregation') do |req|
req.body = params.to_json
end
end

private

def request(http_method, params)
response = connection.send(http_method, *params) do |req|
req.headers['Accept'] = 'application/json'
req.headers['Content-Type'] = 'application/json'
req.options.timeout = 5 # open/read timeout in seconds
req.options.open_timeout = 2 # connection open timeout in seconds
yield req if block_given?
end

handle_response(response)
rescue Faraday::TimeoutError,
Faraday::ConnectionFailed => e
raise ConnectionError, e.message
end

def handle_response(response)
case response.status
when 404
raise ResourceNotFound, status: response.status, body: response.body
when 400..600
raise ServerError, response.body
else
response.body
end
end
end
31 changes: 31 additions & 0 deletions db/migrate/20240304201959_refactor_aggregation_model.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

class RefactorAggregationModel < ActiveRecord::Migration[6.1]
def up
# delete existing unused columns
safety_assured { remove_column :aggregations, :subject_id }
safety_assured { remove_column :aggregations, :aggregation }

# and the new aggregations columns
add_column :aggregations, :project_id, :integer
add_foreign_key :aggregations, :projects, column: :project_id, validate: false

add_column :aggregations, :user_id, :integer
add_foreign_key :aggregations, :users, column: :user_id, validate: false

add_column :aggregations, :uuid, :string
add_column :aggregations, :task_id, :string
add_column :aggregations, :status, :integer, default: 0
end

def down
add_column :aggregations, :subject_id, :integer
add_column :aggregations, :aggregation, :jsonb

remove_column :aggregations, :user_id
remove_column :aggregations, :project_id
remove_column :aggregations, :uuid
remove_column :aggregations, :task_id
remove_column :aggregations, :status
end
end
41 changes: 23 additions & 18 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@ ALTER SEQUENCE public.access_control_lists_id_seq OWNED BY public.access_control
CREATE TABLE public.aggregations (
id integer NOT NULL,
workflow_id integer,
subject_id integer,
aggregation jsonb DEFAULT '{}'::jsonb NOT NULL,
created_at timestamp without time zone,
updated_at timestamp without time zone
updated_at timestamp without time zone,
project_id integer,
user_id integer,
uuid character varying,
task_id character varying,
status integer DEFAULT 0
);


Expand Down Expand Up @@ -2801,13 +2804,6 @@ CREATE INDEX index_access_control_lists_on_resource_id_and_resource_type ON publ
CREATE INDEX index_access_control_lists_on_user_group_id ON public.access_control_lists USING btree (user_group_id);


--
-- Name: index_aggregations_on_subject_id_and_workflow_id; Type: INDEX; Schema: public; Owner: -
--

CREATE UNIQUE INDEX index_aggregations_on_subject_id_and_workflow_id ON public.aggregations USING btree (subject_id, workflow_id);


--
-- Name: index_aggregations_on_workflow_id; Type: INDEX; Schema: public; Owner: -
--
Expand Down Expand Up @@ -3909,14 +3905,6 @@ ALTER TABLE ONLY public.subject_groups
ADD CONSTRAINT fk_rails_283ede5252 FOREIGN KEY (project_id) REFERENCES public.projects(id);


--
-- Name: aggregations fk_rails_28a7ada458; Type: FK CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.aggregations
ADD CONSTRAINT fk_rails_28a7ada458 FOREIGN KEY (subject_id) REFERENCES public.subjects(id) ON UPDATE CASCADE ON DELETE CASCADE;


--
-- Name: project_contents fk_rails_305e6d8bf1; Type: FK CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4069,6 +4057,14 @@ ALTER TABLE ONLY public.collections_projects
ADD CONSTRAINT fk_rails_895b025564 FOREIGN KEY (project_id) REFERENCES public.projects(id);


--
-- Name: aggregations fk_rails_8eb620b6f6; Type: FK CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.aggregations
ADD CONSTRAINT fk_rails_8eb620b6f6 FOREIGN KEY (user_id) REFERENCES public.users(id) NOT VALID;


--
-- Name: set_member_subjects fk_rails_93073bf3b1; Type: FK CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4221,6 +4217,14 @@ ALTER TABLE ONLY public.organization_versions
ADD CONSTRAINT fk_rails_be858ed31d FOREIGN KEY (organization_id) REFERENCES public.organizations(id);


--
-- Name: aggregations fk_rails_c7d229ada4; Type: FK CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.aggregations
ADD CONSTRAINT fk_rails_c7d229ada4 FOREIGN KEY (project_id) REFERENCES public.projects(id) NOT VALID;


--
-- Name: subject_set_imports fk_rails_d596712569; Type: FK CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4598,6 +4602,7 @@ INSERT INTO "schema_migrations" (version) VALUES
('20240216142515'),
('20240216171653'),
('20240216171937'),
('20240304201959'),
('20240531184258');


Loading

0 comments on commit 509c773

Please sign in to comment.