Skip to content
This repository has been archived by the owner on Oct 5, 2023. It is now read-only.

Commit

Permalink
Merge pull request #85 from socialcast/atomic-swap
Browse files Browse the repository at this point in the history
Atomic swap
  • Loading branch information
mandrews committed Jan 12, 2015
2 parents 9599b89 + f770db3 commit 77f5093
Show file tree
Hide file tree
Showing 25 changed files with 596 additions and 648 deletions.
4 changes: 0 additions & 4 deletions lib/masamune.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ module Masamune
require 'masamune/helpers'
require 'masamune/configuration'
require 'masamune/data_plan'
require 'masamune/data_plan_rule'
require 'masamune/data_plan_elem'
require 'masamune/data_plan_set'
require 'masamune/data_plan_builder'
require 'masamune/thor'
require 'masamune/thor_loader'
require 'masamune/filesystem'
Expand Down
30 changes: 15 additions & 15 deletions lib/masamune/actions/data_flow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ module DataFlow

include Masamune::Actions::DateParse

def data_plan
self.class.data_plan
def engine
self.class.engine
end

def targets
data_plan.targets(current_command_name)
engine.targets(current_command_name)
end

def sources
data_plan.sources(current_command_name)
engine.sources(current_command_name)
end

def target
Expand Down Expand Up @@ -46,23 +46,23 @@ def parse_file_type(key)
end

base.after_initialize(:final) do |thor, options|
# Only execute this block if DataPlan is not currently executing
next if thor.data_plan.current_rule.present?
thor.data_plan.environment = thor.environment
thor.data_plan.filesystem.environment = thor.environment
# Only execute this block if DataPlan::Engine is not currently executing
next if thor.engine.executing?
thor.engine.environment = thor.environment
thor.engine.filesystem.environment = thor.environment

raise Thor::RequiredArgumentMissingError, "No value provided for required options '--start' or '--at'" unless options[:start] || options[:at] || options[:sources] || options[:targets]
raise Thor::MalformattedArgumentError, "Cannot specify both option '--sources' and option '--targets'" if options[:sources] && options[:targets]

desired_sources = Masamune::DataPlanSet.new thor.current_command_name, thor.parse_file_type(:sources)
desired_targets = Masamune::DataPlanSet.new thor.current_command_name, thor.parse_file_type(:targets)
desired_sources = Masamune::DataPlan::Set.new thor.current_command_name, thor.parse_file_type(:sources)
desired_targets = Masamune::DataPlan::Set.new thor.current_command_name, thor.parse_file_type(:targets)

if thor.start_time && thor.stop_time
desired_targets.merge thor.data_plan.targets_for_date_range(thor.current_command_name, thor.start_time, thor.stop_time)
desired_targets.merge thor.engine.targets_for_date_range(thor.current_command_name, thor.start_time, thor.stop_time)
end

thor.data_plan.prepare(thor.current_command_name, options.merge(sources: desired_sources, targets: desired_targets))
thor.data_plan.execute(thor.current_command_name, options)
thor.engine.prepare(thor.current_command_name, options.merge(sources: desired_sources, targets: desired_targets))
thor.engine.execute(thor.current_command_name, options)
exit 0 if thor.top_level?
end if defined?(base.after_initialize)
end
Expand Down Expand Up @@ -96,8 +96,8 @@ def create_command(*a)
end
end

def data_plan
@@data_plan ||= Masamune::DataPlanBuilder.instance.build(@@namespaces, @@commands, @@sources, @@targets)
def engine
@@engine ||= Masamune::DataPlan::Builder.instance.build(@@namespaces, @@commands, @@sources, @@targets)
end

private
Expand Down
171 changes: 7 additions & 164 deletions lib/masamune/data_plan.rb
Original file line number Diff line number Diff line change
@@ -1,166 +1,9 @@
require 'active_support'
require 'active_support/core_ext/numeric/time'

require 'masamune/data_plan_set'

class Masamune::DataPlan
MAX_DEPTH = 10

include Masamune::HasEnvironment
include Masamune::Accumulate

def initialize
@target_rules = Hash.new
@source_rules = Hash.new
@command_rules = Hash.new
@targets = Hash.new { |set,rule| set[rule] = Masamune::DataPlanSet.new(@target_rules[rule]) }
@sources = Hash.new { |set,rule| set[rule] = Masamune::DataPlanSet.new(@source_rules[rule]) }
@set_cache = Hash.new { |cache,level| cache[level] = Hash.new }
@current_rule = nil
@current_depth = 0
end

def filesystem
@filesystem ||= Masamune::CachedFilesystem.new(environment.filesystem)
end

def add_target_rule(rule, target_options = {})
@target_rules[rule] = Masamune::DataPlanRule.new(self, rule, :target, target_options)
end

def get_target_rule(rule)
@target_rules[rule]
end

def add_source_rule(rule, source_options = {})
@source_rules[rule] = Masamune::DataPlanRule.new(self, rule, :source, source_options)
end

def get_source_rule(rule)
@source_rules[rule]
end

def add_command_rule(rule, command)
@command_rules[rule] = command
end

# TODO use constructed reference instead
def rule_for_target(target)
target_matches = @target_rules.select { |rule, matcher| matcher.primary? && matcher.matches?(target) }
source_matches = @source_rules.select { |rule, matcher| matcher.matches?(target) }

if target_matches.empty?
if source_matches.empty?
raise "No rule matches target #{target}"
else
Masamune::DataPlanRule::TERMINAL
end
else
logger.error("Multiple rules match target #{target}") if target_matches.length > 1
target_matches.map(&:first).first
end
end

def targets_for_date_range(rule, start, stop, &block)
target_template = @target_rules[rule]
return unless target_template
target_template.generate(start.to_time.utc, stop.to_time.utc) do |target|
yield target
end
end
method_accumulate :targets_for_date_range, lambda { |plan, rule, _, _| Masamune::DataPlanSet.new(plan.get_target_rule(rule)) }

def targets_for_source(rule, source, &block)
source_template = @source_rules[rule]
target_template = @target_rules[rule]
source_instance = source.is_a?(Masamune::DataPlanElem) ? source : source_template.bind_input(source)
source_template.generate_via_unify(source_instance, target_template) do |target|
yield target
end
end
method_accumulate :targets_for_source, lambda { |plan, rule, source| Masamune::DataPlanSet.new(plan.get_target_rule(rule)) }

def sources_for_target(rule, target, &block)
source_template = @source_rules[rule]
target_template = @target_rules[rule]
target_instance = target.is_a?(Masamune::DataPlanElem) ? target : target_template.bind_input(target)
target_template.generate_via_unify(target_instance, source_template) do |source|
yield source
end
end
method_accumulate :sources_for_target, lambda { |plan, rule, target| Masamune::DataPlanSet.new(plan.get_source_rule(rule)) }

def targets(rule)
@set_cache[:targets_for_rule][rule] ||= @targets[rule].union(@sources[rule].targets)
end

def sources(rule)
@set_cache[:sources_for_rule][rule] ||= @sources[rule].union(@targets[rule].sources).adjacent
end

def prepare(rule, options = {})
@targets[rule].merge options.fetch(:targets, [])
@sources[rule].merge options.fetch(:sources, [])

dirty = false
targets(rule).incomplete do |target|
if target.removable?
logger.warn("Detected incomplete target #{target.input}, removing")
target.remove
dirty = true
end
end

targets(rule).stale do |target|
if target.removable?
logger.warn("Detected stale target #{target.input}, removing")
target.remove
dirty = true
else
logger.warn("Detected stale target #{target.input}, skipping")
end
end

clear! if dirty

constrain_max_depth(rule) do
sources(rule).group_by { |source| rule_for_target(source.input) }.each do |derived_rule, sources|
prepare(derived_rule, targets: sources.map(&:input)) if derived_rule != Masamune::DataPlanRule::TERMINAL
end
end if options.fetch(:resolve, true)
end

def execute(rule, options = {})
return if targets(rule).missing.empty?

constrain_max_depth(rule) do
sources(rule).missing.group_by { |source| rule_for_target(source.input) }.each do |derived_rule, sources|
execute(derived_rule, options) if derived_rule != Masamune::DataPlanRule::TERMINAL
end
end if options.fetch(:resolve, true)

@current_rule = rule
@command_rules[rule].call(self, rule, options)
clear!
ensure
@current_rule = nil
end

def current_rule
@current_rule
end

def constrain_max_depth(rule, &block)
@current_depth += 1
raise "Max depth of #{MAX_DEPTH} exceeded for rule '#{rule}'" if @current_depth > MAX_DEPTH
yield
ensure
@current_depth -= 1
end

def clear!
@set_cache.clear
filesystem.clear!
environment.postgres_helper.clear!
module Masamune
module DataPlan
require 'masamune/data_plan/builder'
require 'masamune/data_plan/elem'
require 'masamune/data_plan/engine'
require 'masamune/data_plan/rule'
require 'masamune/data_plan/set'
end
end
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
require 'singleton'

class Masamune::DataPlanBuilder
class Masamune::DataPlan::Builder
include Singleton

def build(namespaces, commands, sources, targets)
Masamune::DataPlan.new.tap do |data_plan|
Masamune::DataPlan::Engine.new.tap do |engine|
sources_for, sources_anon = partition_by_for(sources)
targets_for, targets_anon = partition_by_for(targets)

Expand All @@ -15,10 +15,10 @@ def build(namespaces, commands, sources, targets)
target_options = targets_for[name] || targets_anon.shift or next
next if source_options[:skip] || target_options[:skip]

data_plan.add_source_rule(command_name, source_options)
data_plan.add_target_rule(command_name, target_options)
engine.add_source_rule(command_name, source_options)
engine.add_target_rule(command_name, target_options)

data_plan.add_command_rule(command_name, thor_command_wrapper)
engine.add_command_rule(command_name, thor_command_wrapper)
end
end
end
Expand All @@ -35,9 +35,9 @@ def partition_by_for(annotations)
end

def thor_command_wrapper
Proc.new do |plan, rule, _|
plan.environment.with_exclusive_lock(rule) do
plan.environment.parent.invoke(rule)
Proc.new do |engine, rule, _|
engine.environment.with_exclusive_lock(rule) do
engine.environment.parent.invoke(rule)
end
end
end
Expand Down
43 changes: 14 additions & 29 deletions lib/masamune/data_plan_elem.rb → lib/masamune/data_plan/elem.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class Masamune::DataPlanElem
class Masamune::DataPlan::Elem
MISSING_MODIFIED_AT = Time.new(0)

include Masamune::Accumulate
Expand All @@ -21,12 +21,13 @@ def input
def partition
input.split('_').last
end
alias :suffix :partition

def exists?
if rule.for_path?
rule.plan.filesystem.exists?(path)
rule.engine.filesystem.exists?(path)
elsif rule.for_table_with_partition?
rule.plan.postgres_helper.table_exists?(table)
rule.engine.postgres_helper.table_exists?(table)
elsif rule.for_table?
table
end
Expand All @@ -40,61 +41,45 @@ def complete?

def last_modified_at
if rule.for_path?
rule.plan.filesystem.stat(path).try(:mtime)
rule.engine.filesystem.stat(path).try(:mtime)
elsif rule.for_table?
rule.plan.postgres_helper.table_last_modified_at(table, @options)
rule.engine.postgres_helper.table_last_modified_at(table, @options)
end || MISSING_MODIFIED_AT
end

def remove
if rule.for_path?
if rule.for_hive_table?
rule.plan.hive_helper.drop_partition(rule.hive_table, start_time.strftime(rule.hive_partition))
else
rule.plan.filesystem.remove_file(path)
end
elsif rule.for_table_with_partition?
rule.plan.postgres_helper.drop_table(table)
end
end

def removable?
!rule.immutable? && exists?
end

def explode(&block)
if rule.for_path?
file_glob = path
file_glob += '/' unless path.include?('*') || path.include?('.')
file_glob += '*' unless path.include?('*')
rule.plan.filesystem.glob(file_glob) do |new_path|
rule.engine.filesystem.glob(file_glob) do |new_path|
yield rule.bind_input(new_path)
end
elsif rule.for_table_with_partition?
yield table if rule.plan.postgres_helper.table_exists?(table)
yield table if rule.engine.postgres_helper.table_exists?(table)
end
end
method_accumulate :explode

def targets(&block)
return Masamune::DataPlanSet::EMPTY if @rule.for_targets?
rule.plan.targets_for_source(rule.name, self) do |target|
return Masamune::DataPlan::Set::EMPTY if @rule.for_targets?
rule.engine.targets_for_source(rule.name, self) do |target|
yield target
end
end
method_accumulate :targets, lambda { |elem| Masamune::DataPlanSet.new(elem.rule.plan.get_target_rule(elem.rule.name)) }
method_accumulate :targets, lambda { |elem| Masamune::DataPlan::Set.new(elem.rule.engine.get_target_rule(elem.rule.name)) }

def target
targets.first
end

def sources(&block)
return Masamune::DataPlanSet::EMPTY if @rule.for_sources?
rule.plan.sources_for_target(rule.name, self) do |source|
return Masamune::DataPlan::Set::EMPTY if @rule.for_sources?
rule.engine.sources_for_target(rule.name, self) do |source|
yield source
end
end
method_accumulate :sources, lambda { |elem| Masamune::DataPlanSet.new(elem.rule.plan.get_source_rule(elem.rule.name)) }
method_accumulate :sources, lambda { |elem| Masamune::DataPlan::Set.new(elem.rule.engine.get_source_rule(elem.rule.name)) }

def source
sources.first
Expand Down
Loading

0 comments on commit 77f5093

Please sign in to comment.