Skip to content

Commit

Permalink
Move to Spyke
Browse files Browse the repository at this point in the history
  • Loading branch information
Sethi, Ankur committed Aug 24, 2023
1 parent b9eee2f commit 6d9d57e
Show file tree
Hide file tree
Showing 18 changed files with 183 additions and 134 deletions.
1 change: 1 addition & 0 deletions .ruby-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.2.2
13 changes: 6 additions & 7 deletions camunda-workflow.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ Gem::Specification.new do |spec|
spec.license = 'MIT'
spec.required_ruby_version = ">= 2.5"

spec.add_dependency 'faraday', '~> 1.10.1'
spec.add_dependency 'faraday_middleware', '~> 1.2.0'
spec.add_dependency 'her', '~> 1.1.1'
spec.add_dependency 'faraday', '~> 2.7'
spec.add_dependency 'spyke', '~> 7.1.1'
# We use .camelcase, .underscore and ActiveSupport::Concern
spec.add_dependency 'activesupport', '>= 3.2.0'
spec.add_development_dependency 'pry-rescue', '~> 1.5.0'
spec.add_development_dependency 'pry-stack_explorer', '~> 0.4.9.3'
spec.add_development_dependency 'rails', '>= 7.0.2'
spec.add_development_dependency 'rspec', '~> 3.9.0'
spec.add_development_dependency 'debug', ">= 1.0.0"
spec.add_development_dependency 'rails', '~> 7.0'
spec.add_development_dependency 'rspec', '~> 3.12.0'
spec.add_development_dependency 'rspec-debug', ">= 0.2.0"
spec.add_development_dependency 'rubocop', '~> 0.77'
spec.add_development_dependency 'rubocop-rspec', '~> 1.37.0'
spec.add_development_dependency 'simplecov', '~> 0.17.1'
Expand Down
90 changes: 71 additions & 19 deletions lib/camunda.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
require 'active_support/core_ext/string/inflections'
require 'active_support/core_ext/object/blank'
require 'her'
require 'spyke'
require 'faraday'
require 'faraday_middleware'
require 'camunda/railtie' if defined?(Rails)
# Top level module for camunda-workflow.
module Camunda
Expand All @@ -19,31 +18,84 @@ def logger
end
end
end
##
# Responsible for handling deserialization of variables.
class Her::Middleware::SnakeCase < Faraday::Response::Middleware
# Check if variables are an Array or JSON and ensure variable names are transformed back from camelCase to snake_case.
# @param env [Array,Hash]

class JSONParser < Faraday::Middleware
def on_complete(env)
return if env[:body].blank?
json = MultiJson.load(env.body, symbolize_keys: true)
env.body = {
data: json[:result],
metadata: json[:extra],
errors: json[:errors]
}
end
end
module Middleware
class Camunda::FirstLevelParseJSON < JSONParser
# Taken from Her::Middleware::FirstLevelParseJSON
# Parse the response body
#
# @param [String] body The response body
# @return [Mixed] the parsed response
# @private
def parse(body)
json = JSON.parse body
if json.respond_to?(:keys)
errors = json.delete(:errors)
metadata = json.delete(:metadata)
end
error ||= {}
metadata ||= {}
{
:data => json,
:errors => errors,
:metadata => metadata
}
end

json = JSON.parse(env[:body])
case json
when Array
json.map { |hash| transform_hash!(hash) }
when Hash
transform_hash!(json)
# This method is triggered when the response has been received. It modifies
# the value of `env[:body]`.
#
# @param [Hash] env The response environment
# @private
def on_complete(env)
env[:body] = case env[:status]
when 204
parse('{}')
when 200
parse(env[:body])
when 400..599
body = env[:body] == '' ? '{}' : env[:body]
{ errors: parse(body) }
end
end
env[:body] = JSON.generate(json)
end

private
# Responsible for handling deserialization of variables.
class Camunda::Middleware::SnakeCase < Faraday::Middleware
# Check if variables are an Array or JSON and ensure variable names are transformed back from camelCase to snake_case.
# @param env [Array,Hash]
def on_complete(env)
return if env[:body].blank?

# Return a new hash with all keys converted by the block operation.
def transform_hash!(hash)
hash.deep_transform_keys!(&:underscore)
json = JSON.parse(env[:body])
case json
when Array
json.map { |hash| transform_hash!(hash) }
when Hash
transform_hash!(json)
end
env[:body] = JSON.generate(json)
end

private

# Return a new hash with all keys converted by the block operation.
def transform_hash!(hash)
hash.deep_transform_keys!(&:underscore)
end
end
end

# Error when class corresponding to Camunda bpmn task does not exist.
class MissingImplementationClass < StandardError
# Initializes message for MissingImplementationClass
Expand Down
12 changes: 8 additions & 4 deletions lib/camunda/deployment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
# @note You must supply the paths of the BPMN files as a param titled file_names to deploy the BPMN file
# and deploy BPMN, DMN, CMMN definitions in the Camunda engine.
class Camunda::Deployment < Camunda::Model
collection_path 'deployment'
uri 'deployment/(:id)'

# Deploys a new process definition to Camunda and returns an instance of Camunda::ProcessDefinition.
# @note Only supporting .create which uses a POST on deployment/create.
# @example
Expand All @@ -21,9 +22,8 @@ def self.create(file_names:, tenant_id: nil, deployment_source: 'Camunda Workflo
tenant_id ||= Camunda::Workflow.configuration.tenant_id
args = file_data(file_names).merge('deployment-name' => deployment_name, 'deployment-source' => deployment_source)
args.merge!("tenant-id": tenant_id) if tenant_id
response = post_raw('deployment/create', args)

deployed_process_definitions(response[:parsed_data][:data][:deployed_process_definitions])
response = request(:post, "deployment/create", args)
deployed_process_definitions(response.body[:data][:deployed_process_definitions])
end

# Convenience method for dealing with files and IO that are to be uploaded
Expand All @@ -47,4 +47,8 @@ def self.deployed_process_definitions(definitions_hash)

definitions_hash.values.map { |process_definition| Camunda::ProcessDefinition.new process_definition }
end

def destroy(params = {})
self.attributes = delete(params)
end
end
39 changes: 17 additions & 22 deletions lib/camunda/external_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
class Camunda::ExternalTask < Camunda::Model
# Camunda engine doesn't searching on snake_case variables.
include Camunda::VariableSerialization
# collection_path sets the path for Her in Camunda::Model
collection_path 'external-task'
custom_post :fetchAndLock, :unlock
uri 'external-task/(:id)'

%i[fetchAndLock unlock].each do |name|
define_singleton_method(name) do |hash|
with("external-task/#{name}").where(hash).post
end
end

# @note long_polling_duration is defaulted to 30 seconds in Camunda::Workflow.configuration.
# @return [Integer] default polling duration from Camunda::Workflow configuration
Expand All @@ -40,29 +44,25 @@ def self.lock_duration
# @param input_variables [Hash] process variables
def failure(exception, input_variables={})
variables_information = "Input variables are #{input_variables.inspect}\n\n" if input_variables.present?
self.class.post_raw("#{collection_path}/#{id}/failure",
workerId: worker_id, errorMessage: exception.message,
errorDetails:
variables_information.to_s + exception.message +
backtrace_cleaner.clean(exception.backtrace).join("\n"))[:response]
self.class.request(:post, "external-task/#{id}/failure", workerId: worker_id, errorMessage: exception.message,
errorDetails:
variables_information.to_s + exception.message +
backtrace_cleaner.clean(exception.backtrace).join("\n"))
end

# Reports the error to Camunda and creates an incident for the process instance.
# @param bpmn_exception [Camunda::BpmnError]
def bpmn_error(bpmn_exception)
self.class.post_raw("#{collection_path}/#{id}/bpmnError",
workerId: worker_id, variables: serialize_variables(bpmn_exception.variables),
errorCode: bpmn_exception.error_code, errorMessage: bpmn_exception.message)[:response]
self.class.request(:post, "external-task/#{id}/bpmnError", workerId: worker_id, variables: serialize_variables(bpmn_exception.variables),
errorCode: bpmn_exception.error_code, errorMessage: bpmn_exception.message)
end

# Completes the process instance of a fetched task
# @param variables [Hash] submitted when starting the process definition
# @raise [Camunda::ExternalTask::SubmissionError] if Camunda does not accept the task submission
def complete(variables={})
self.class.post_raw("#{collection_path}/#{id}/complete",
workerId: worker_id, variables: serialize_variables(variables))[:response]
.tap do |response|
raise SubmissionError, response.body[:data][:message] unless response.success?
self.class.request(:post, "external-task/#{id}/complete", workerId: worker_id, variables: serialize_variables(variables)).tap do |response|
raise SubmissionError, response.body["errors"]["data"]["message"] unless response.errors.blank?
end
end

Expand All @@ -73,12 +73,6 @@ def worker_id
self.class.worker_id
end

# Helper method for instances since collection_path is a class method
# @return [String]
def collection_path
self.class.collection_path
end

# deserializes JSON attributes from variables returned by Camunda API
def variables
super.transform_values do |details|
Expand Down Expand Up @@ -145,7 +139,8 @@ def self.fetch_and_lock(topics, lock_duration: nil, long_polling_duration: nil)
# @param long_polling_duration [Integer]
# @return [Camunda::ExternalTask]
def self.fetch_and_queue(topics, lock_duration: nil, long_polling_duration: nil)
fetch_and_lock(topics, lock_duration: lock_duration, long_polling_duration: long_polling_duration).each do |task|
response = fetch_and_lock(topics, lock_duration: lock_duration, long_polling_duration: long_polling_duration)
response.each do |task|
task.queue_task
rescue Camunda::MissingImplementationClass => e
task.failure(e)
Expand Down
2 changes: 1 addition & 1 deletion lib/camunda/incident.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# that happen in the process engine. Such incidents usually indicate some kind of problem
# related to process execution. Incidents are reported on failures that occur.
class Camunda::Incident < Camunda::Model
collection_path 'incident'
uri 'incident/(:id)'
end
41 changes: 7 additions & 34 deletions lib/camunda/model.rb
Original file line number Diff line number Diff line change
@@ -1,43 +1,12 @@
require 'her/model'
require 'spyke/base'
# This class in the main element of Her. It defines which API models will be bound to.
class Camunda::Model
include Her::Model

def self.log_details?
defined?(Rails) && Rails.env.development?
end

# We use a lambda so that this is evaluated after Camunda::Workflow.configuration is set
api = lambda do
# Configuration for Her and Faraday requests and responses
Her::API.new(url: File.join(Camunda::Workflow.configuration.engine_url)) do |c|
c.path_prefix = Camunda::Workflow.configuration.engine_route_prefix
# Request
c.use Faraday::Request::Multipart
c.use FaradayMiddleware::EncodeJson
c.use Faraday::Request::BasicAuthentication, Camunda::Workflow.configuration.camunda_user,
Camunda::Workflow.configuration.camunda_password
# Response
c.use Faraday::Response::Logger, ActiveSupport::Logger.new($stdout), bodies: true if log_details?
c.use Her::Middleware::FirstLevelParseJSON

c.use Her::Middleware::SnakeCase
# Adapter
c.adapter :net_http

# HTTP proxy
c.proxy = Camunda::Workflow.configuration.http_proxy if Camunda::Workflow.configuration.http_proxy
end
end

use_api api

class Camunda::Model < Spyke::Base
# Returns result of find_by but raises an exception instead of returning nil
# @param params [Hash] query parameters
# @return [Camunda::Model]
# @raise [Camunda::Model::RecordNotFound] if query returns no results
def self.find_by!(params)
find_by(params).tap do |result|
with(base_path).where(params).first.tap do |result|
raise Camunda::Model::RecordNotFound unless result
end
end
Expand All @@ -49,6 +18,10 @@ def self.worker_id
Camunda::Workflow.configuration.worker_id
end

def self.base_path
uri.sub(/\(:id\)/, '')
end

class RecordNotFound < StandardError
end
end
14 changes: 7 additions & 7 deletions lib/camunda/process_definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# @see Camunda::ProcessInstance
class Camunda::ProcessDefinition < Camunda::Model
include Camunda::VariableSerialization
collection_path 'process-definition'
uri 'process-definition/(:id)'
# Starts an individual process instance by key and supplies process variables to be included in the process instance. In
# the example below a business key is provided. A business key is a domain-specific identifier of a process instance,
# it makes querying for task more efficient. The business key is displayed prominently in applications like Camunda Cockpit.
Expand All @@ -22,7 +22,7 @@ def self.start_by_key(key, hash={})
tenant_id = hash.delete(:tenant_id)
tenant_id ||= Camunda::Workflow.configuration.tenant_id

response = post_raw start_path_for_key(key, tenant_id), hash
response = request(:post, start_path_for_key(key, tenant_id), hash)
process_instance_result(response)
end

Expand All @@ -37,7 +37,7 @@ def self.start_by_key(key, hash={})
# @raise [Camunda::ProcessEngineException] if submission was unsuccessful
def start(hash={})
hash[:variables] = serialize_variables(hash[:variables]) if hash[:variables]
response = self.class.post_raw "process-definition/#{id}/start", hash
response = self.class.request(:post, "process-definition/#{id}/start", hash)
self.class.process_instance_result(response)
end

Expand All @@ -49,11 +49,11 @@ def self.start_path_for_key(key, tenant_id)
end

def self.process_instance_result(response)
unless response[:response].status == 200
raise Camunda::ProcessEngineException,
"#{response[:parsed_data][:data][:message]} HTTP Status: #{response[:response].status}"
errors = response.body["errors"] || {}
if errors.fetch("data",{})["type"] == "RestException"
raise Camunda::ProcessEngineException, response.body["errors"]["data"]["message"]
end

Camunda::ProcessInstance.new response[:parsed_data][:data]
Camunda::ProcessInstance.new response.body["data"]
end
end
6 changes: 3 additions & 3 deletions lib/camunda/process_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
# @see https://docs.camunda.org/manual/7.4/user-guide/process-engine/process-engine-concepts/
# @see Camunda::ProcessDefinition
class Camunda::ProcessInstance < Camunda::Model
collection_path 'process-instance'
uri 'process-instance/(:id)'
# GETs the process instance and deserializes the variables
def variables
response = self.class.get_raw "process-instance/#{id}/variables"
deserialize_variables response[:parsed_data][:data]
response = self.class.request(:get, "process-instance/#{id}/variables")
deserialize_variables response.body[:data]
end

private
Expand Down
4 changes: 2 additions & 2 deletions lib/camunda/signal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
# `Camunda::Signal.create name: 'Signal Name', variables: {foo: "bar"}`
class Camunda::Signal < Camunda::Model
include Camunda::VariableSerialization
collection_path 'signal'
uri 'signal/(:id)'
# Creates a signal within the process definition on the Camunda engine
# @param hash [Hash] variables that are sent to Camunda engine
# @return [{Symbol => Hash,Faraday::Response}]
def self.create(hash={})
hash[:variables] = serialize_variables(hash[:variables]) if hash[:variables]
post_raw collection_path, hash
request :post, 'signal', hash
end
end
Loading

0 comments on commit 6d9d57e

Please sign in to comment.