From 5f70280ba9f0004c78b30b56ebf0d0035dea0b78 Mon Sep 17 00:00:00 2001 From: Michael Andrews Date: Thu, 12 Mar 2015 11:22:25 -0700 Subject: [PATCH 1/7] Optionally quote raw json fields before CSV parse --- lib/masamune/schema/catalog.rb | 4 +++ lib/masamune/schema/map.rb | 36 +++++++++++++++++++---- lib/masamune/schema/store.rb | 1 + spec/masamune/schema/map_spec.rb | 50 +++++++++++++++++++++++++++++++- 4 files changed, 85 insertions(+), 6 deletions(-) diff --git a/lib/masamune/schema/catalog.rb b/lib/masamune/schema/catalog.rb index 067057e..3305163 100644 --- a/lib/masamune/schema/catalog.rb +++ b/lib/masamune/schema/catalog.rb @@ -51,6 +51,10 @@ def format @options.key?(:format) ? @options[:format] : __getobj__.format end + def json_encoding + @options.key?(:json_encoding) ? @options[:json_encoding] : __getobj__.json_encoding + end + def headers @options.key?(:headers) ? @options[:headers] : __getobj__.headers end diff --git a/lib/masamune/schema/map.rb b/lib/masamune/schema/map.rb index d8a3773..b4260e4 100644 --- a/lib/masamune/schema/map.rb +++ b/lib/masamune/schema/map.rb @@ -24,6 +24,34 @@ module Masamune::Schema class Map + class JSONEncoder < SimpleDelegator + def initialize(io, store) + super io + @store = store + end + + def gets(*a) + line = __getobj__.gets(*a) + return unless line + return line if skip? + encode(line.split(separator)).join(separator) + end + + private + + def skip? + @store.json_encoding == :quoted + end + + def encode(fields = []) + fields.map { |field| field =~ /^{|}$/ ? %Q{"#{field.gsub('"', '""')}"} : field } + end + + def separator + @separator ||= (@store.format == :tsv ? "\t" : ',') + end + end + class Buffer extend Forwardable @@ -42,7 +70,7 @@ def bind(io) def each(&block) raise 'must call Buffer#bind first' unless @io - CSV.parse(@io, options.merge(headers: @store.headers || @table.columns.keys)) do |data| + CSV.parse(JSONEncoder.new(@io, @store), options.merge(headers: @store.headers || @table.columns.keys)) do |data| next if data.to_s =~ /\A#/ row = Masamune::Schema::Row.new(parent: @table, values: data.to_hash, strict: false) yield row.to_hash @@ -59,10 +87,8 @@ def append(data) end def options - if @store.format == :tsv - { skip_blanks: true, col_sep: "\t" } - else - { skip_blanks: true } + {skip_blanks: true}.tap do | opts| + opts[:col_sep] = "\t" if @store.format == :tsv end end end diff --git a/lib/masamune/schema/store.rb b/lib/masamune/schema/store.rb index b86eb00..33f4562 100644 --- a/lib/masamune/schema/store.rb +++ b/lib/masamune/schema/store.rb @@ -30,6 +30,7 @@ class Store { type: nil, format: ->(store) { store.type == :postgres ? :csv : :tsv }, + json_encoding: ->(store) { store.type == :postgres ? :quoted : :raw }, headers: ->(store) { store.type == :postgres ? true : false }, debug: false } diff --git a/spec/masamune/schema/map_spec.rb b/spec/masamune/schema/map_spec.rb index f118dfa..2396d90 100644 --- a/spec/masamune/schema/map_spec.rb +++ b/spec/masamune/schema/map_spec.rb @@ -202,7 +202,7 @@ it_behaves_like 'apply input/output' end - context 'from event to postgres dimension' do + context 'from event to postgres dimension with quoted json' do before do catalog.schema :files do map from: hive.user_event, to: postgres.user_dimension do |row| @@ -481,5 +481,53 @@ it_behaves_like 'apply input/output' end + + context 'from event to postgres dimension with raw json' do + before do + catalog.schema :files do + map from: hive.user_event, to: postgres.user_dimension do |row| + { + 'tenant_id' => row[:tenant_id], + 'user_id' => row[:id], + 'user_account_state.name' => row[:type] =~ /delete/ ? 'deleted' : 'active', + 'admin' => row[:type] =~ /delete/ ? row[:admin_was] : row[:admin_now], + 'preferences_now' => row[:preferences_now], + 'preferences_was' => row[:preferences_was], + 'source' => 'user_event', + 'cluster_id' => 100 + } + end + end + end + + let(:source) do + catalog.hive.user_event + end + + let(:target) do + catalog.postgres.user_dimension + end + + let(:source_data) do + <<-EOS.strip_heredoc + X user_create 1 30 0 \\N \\N \\N + Y user_delete 2 40 0 1 {"enabled":true} \\N + EOS + end + + let(:target_data) do + <<-EOS.strip_heredoc + tenant_id,user_id,user_account_state_type_name,admin,preferences_now,preferences_was,source,cluster_id + 30,1,active,FALSE,{},{},user_event,100 + 40,2,deleted,TRUE,"{""enabled"":true}",{},user_event,100 + EOS + end + + it 'should match target data' do + is_expected.to eq(target_data) + end + + it_behaves_like 'apply input/output' + end end end From ef56774ed35376559ef8e677be2aec7a5bfb4ff6 Mon Sep 17 00:00:00 2001 From: Michael Andrews Date: Thu, 12 Mar 2015 11:49:23 -0700 Subject: [PATCH 2/7] Add coverage for converting unix epoch timestamps --- lib/masamune/schema/column.rb | 8 +++++++- spec/masamune/schema/column_spec.rb | 10 ++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/masamune/schema/column.rb b/lib/masamune/schema/column.rb index 083f009..1c17e4e 100644 --- a/lib/masamune/schema/column.rb +++ b/lib/masamune/schema/column.rb @@ -243,7 +243,13 @@ def ruby_value(value, recursive = true) when Date, DateTime value.to_time when String - Time.parse(value.to_s) + if value =~ /^\d+$/ + Time.at(value.to_i) + else + Time.parse(value) + end + when Integer + Time.at(value) when nil nil end diff --git a/spec/masamune/schema/column_spec.rb b/spec/masamune/schema/column_spec.rb index b03b9a5..28db58e 100644 --- a/spec/masamune/schema/column_spec.rb +++ b/spec/masamune/schema/column_spec.rb @@ -263,6 +263,16 @@ it { is_expected.to eq(value) } end + context 'when Integer' do + let(:value) { Time.now.utc.to_i } + it { is_expected.to eq(Time.at(value)) } + end + + context 'when String encoded Integer' do + let(:value) { "#{Time.now.utc.to_i}" } + it { is_expected.to eq(Time.at(value.to_i)) } + end + context 'when YYYY-mm-dd' do let(:value) { '2015-01-01' } it { is_expected.to eq(Time.parse(value)) } From f7d56a2615d58a065bb25e76fb5a024e15488be2 Mon Sep 17 00:00:00 2001 From: Michael Andrews Date: Mon, 16 Mar 2015 14:44:06 -0700 Subject: [PATCH 3/7] Add ruby-1.9.2 --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index c88a4ed..0a0e6a4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ language: ruby rvm: + - 1.9.2 - 2.0.0 - 2.1.2 - 2.1.5 From 9e9c28b8b7bd831b584629fa94d2038b8895e65c Mon Sep 17 00:00:00 2001 From: Michael Andrews Date: Mon, 16 Mar 2015 15:43:49 -0700 Subject: [PATCH 4/7] Add ruby-1.9.3 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 0a0e6a4..591db0a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: ruby rvm: - - 1.9.2 + - 1.9.3 - 2.0.0 - 2.1.2 - 2.1.5 From 4f0f69ed4a6794d35f2f4cfcb2f02a7892c9e34d Mon Sep 17 00:00:00 2001 From: Michael Andrews Date: Mon, 16 Mar 2015 16:33:50 -0700 Subject: [PATCH 5/7] Remove ruby-1.9.3 --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 591db0a..c88a4ed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,5 @@ language: ruby rvm: - - 1.9.3 - 2.0.0 - 2.1.2 - 2.1.5 From 90b2a555cf99158c6434d14c395689fe6d5db9b0 Mon Sep 17 00:00:00 2001 From: Michael Andrews Date: Mon, 16 Mar 2015 16:42:10 -0700 Subject: [PATCH 6/7] Match on string instead of line --- lib/masamune/schema/column.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/masamune/schema/column.rb b/lib/masamune/schema/column.rb index 1c17e4e..05e1ac9 100644 --- a/lib/masamune/schema/column.rb +++ b/lib/masamune/schema/column.rb @@ -243,7 +243,7 @@ def ruby_value(value, recursive = true) when Date, DateTime value.to_time when String - if value =~ /^\d+$/ + if value =~ /\A\d+\z/ Time.at(value.to_i) else Time.parse(value) From bc3c14500e8aedfd5d2dc6b429f82eb9ca6fd122 Mon Sep 17 00:00:00 2001 From: Michael Andrews Date: Mon, 16 Mar 2015 17:21:18 -0700 Subject: [PATCH 7/7] Ensure double quotes are not escaped twice --- lib/masamune/schema/map.rb | 6 +++++- spec/masamune/schema/map_spec.rb | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/lib/masamune/schema/map.rb b/lib/masamune/schema/map.rb index b4260e4..9657952 100644 --- a/lib/masamune/schema/map.rb +++ b/lib/masamune/schema/map.rb @@ -44,7 +44,11 @@ def skip? end def encode(fields = []) - fields.map { |field| field =~ /^{|}$/ ? %Q{"#{field.gsub('"', '""')}"} : field } + fields.map { |field| field =~ /^{|}$/ ? quote(field) : field } + end + + def quote(field) + %Q{"#{field.gsub(/(?