diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/embulk-output-bigquery.iml b/.idea/embulk-output-bigquery.iml new file mode 100644 index 0000000..d6ebd48 --- /dev/null +++ b/.idea/embulk-output-bigquery.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..639900d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..60e4079 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 10cbd00..87e998d 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -63,8 +63,6 @@ def self.configure(config, schema, task_count) 'payload_column' => config.param('payload_column', :string, :default => nil), 'payload_column_index' => config.param('payload_column_index', :integer, :default => nil), - 'description' => config.param('description', :string, :default => nil), - 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil), 'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0 'send_timeout_sec' => config.param('send_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0 @@ -89,6 +87,7 @@ def self.configure(config, schema, task_count) 'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false), 'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false), 'time_partitioning' => config.param('time_partitioning', :hash, :default => nil), + 'range_partitioning' => config.param('range_partitioning', :hash, :default => nil), 'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0 'schema_update_options' => config.param('schema_update_options', :array, :default => nil), diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index 66400a0..5076cb3 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -121,7 +121,7 @@ def load_from_gcs(object_uris, table) opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = with_network_retry { client.insert_job(@project, body, **opts) } + response = with_network_retry { client.insert_job(@project, body, opts) } unless @task['is_skip_job_result_check'] response = wait_load('Load', response) end @@ -222,7 +222,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND') # }, } Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = with_network_retry { client.insert_job(@project, body, **opts) } + response = with_network_retry { client.insert_job(@project, body, opts) } if @task['is_skip_job_result_check'] response else @@ -278,7 +278,7 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = with_network_retry { client.insert_job(@project, body, **opts) } + response = with_network_retry { client.insert_job(@project, body, opts) } wait_load('Copy', response) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e.message, error_class: e.class} @@ -372,7 +372,7 @@ def create_dataset(dataset = nil, reference: nil) end opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_dataset(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" } - with_network_retry { client.insert_dataset(@project, body, **opts) } + with_network_retry { client.insert_dataset(@project, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message # ignore 'Already Exists' error @@ -410,6 +410,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) dataset ||= @dataset options ||= {} options['time_partitioning'] ||= @task['time_partitioning'] + options['range_partitioning'] ||= @task['range_partitioning'] if Helper.has_partition_decorator?(table) options['time_partitioning'] ||= {'type' => 'DAY'} table = Helper.chomp_partition_decorator(table) @@ -420,7 +421,6 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) table_reference: { table_id: table, }, - description: @task['description'], schema: { fields: fields, } @@ -434,6 +434,17 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) } end + if options['range_partitioning'] + body[:range_partitioning] = { + field: options['range_partitioning']['field'], + } + body[:range_partitioning][:range] = { + start: options['range_partitioning']['range']['start'], + end: options['range_partitioning']['range']['end'], + interval: options['range_partitioning']['range']['interval'], + } + end + options['clustering'] ||= @task['clustering'] if options['clustering'] body[:clustering] = { @@ -447,8 +458,8 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) end opts = {} - Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" } - with_network_retry { client.insert_table(@destination_project, dataset, body, **opts) } + Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" } + with_network_retry { client.insert_table(@project, dataset, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message # ignore 'Already Exists' error @@ -457,7 +468,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}" + "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}" } raise Error, "failed to create table #{@destination_project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}" end