Skip to content

Commit

Permalink
Add parameters for BigQuery datasets and write method (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
giancarloaf authored Jun 10, 2022
1 parent 9154b39 commit 694a507
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 26 deletions.
80 changes: 73 additions & 7 deletions modules/import_har.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

import apache_beam as beam
from apache_beam.io.gcp.bigquery import BigQueryWriteFn
from apache_beam.io.gcp.bigquery import BigQueryWriteFn, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.runners import DataflowRunner

Expand All @@ -12,21 +12,65 @@

def parse_args(argv):
parser = argparse.ArgumentParser()

parser.add_argument(
"--input",
dest="input",
help="Input file to process. Example: gs://httparchive/crawls/*Jan_1_2022",
)

parser.add_argument(
"--subscription",
dest="subscription",
help="Pub/Sub subscription. Example: `projects/httparchive/subscriptions/har-gcs-pipeline`",
)

bq_write_methods = [
WriteToBigQuery.Method.STREAMING_INSERTS,
WriteToBigQuery.Method.FILE_LOADS,
]
parser.add_argument(
"--big_query_write_method",
dest="big_query_write_method",
help=f"BigQuery write method. One of {','.join(bq_write_methods)}",
choices=bq_write_methods,
default=WriteToBigQuery.Method.STREAMING_INSERTS,
)

parser.add_argument(
"--dataset_summary_pages",
dest="dataset_summary_pages",
help="BigQuery dataset to write summary pages tables",
default=constants.bigquery["datasets"]["pages"],
)

parser.add_argument(
"--dataset_summary_requests",
dest="dataset_summary_requests",
help="BigQuery dataset to write summary requests tables",
default=constants.bigquery["datasets"]["requests"],
)

parser.add_argument(
"--dataset_summary_pages_home_only",
dest="dataset_summary_pages_home_only",
help="BigQuery dataset to write summary pages tables (home-page-only)",
default=constants.bigquery["datasets"]["home_pages"],
)

parser.add_argument(
"--dataset_summary_requests_home_only",
dest="dataset_summary_requests_home_only",
help="BigQuery dataset to write summary requests tables (home-page-only)",
default=constants.bigquery["datasets"]["home_requests"],
)

return parser.parse_known_args(argv)


def run(argv=None):
def create_pipeline(argv=None):
known_args, pipeline_args = parse_args(argv)
logging.info(f"Pipeline Options: known_args={known_args},pipeline_args={pipeline_args}")
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
standard_options = pipeline_options.view_as(StandardOptions)
if not (known_args.subscription or known_args.input):
Expand Down Expand Up @@ -55,33 +99,45 @@ def run(argv=None):
deadletter_queues = {}

deadletter_queues["pages"] = pages | "WritePagesToBigQuery" >> WriteBigQuery(
table=lambda row: utils.format_table_name(row, "pages"),
table=lambda row: utils.format_table_name(
row, known_args.dataset_summary_pages
),
schema=constants.bigquery["schemas"]["pages"],
streaming=standard_options.streaming,
method=known_args.big_query_write_method,
)

deadletter_queues[
"requests"
] = requests | "WriteRequestsToBigQuery" >> WriteBigQuery(
table=lambda row: utils.format_table_name(row, "requests"),
table=lambda row: utils.format_table_name(
row, known_args.dataset_summary_requests
),
schema=constants.bigquery["schemas"]["requests"],
streaming=standard_options.streaming,
method=known_args.big_query_write_method,
)

deadletter_queues[
"home_pages"
] = home_pages | "WriteHomePagesToBigQuery" >> WriteBigQuery(
table=lambda row: utils.format_table_name(row, "home_pages"),
table=lambda row: utils.format_table_name(
row, known_args.dataset_summary_pages_home_only
),
schema=constants.bigquery["schemas"]["pages"],
streaming=standard_options.streaming,
method=known_args.big_query_write_method,
)

deadletter_queues[
"home_requests"
] = home_requests | "WriteHomeRequestsToBigQuery" >> WriteBigQuery(
table=lambda row: utils.format_table_name(row, "home_requests"),
table=lambda row: utils.format_table_name(
row, known_args.dataset_summary_requests_home_only
),
schema=constants.bigquery["schemas"]["requests"],
streaming=standard_options.streaming,
method=known_args.big_query_write_method,
)

# deadletter logging
Expand All @@ -102,6 +158,16 @@ def run(argv=None):

# TODO detect DONE file, move temp table to final destination, shutdown pipeline (if streaming)

pipeline_result = p.run()
return p


def run(argv=None):
logging.getLogger().setLevel(logging.INFO)
p = create_pipeline()
pipeline_result = p.run(argv)
if not isinstance(p.runner, DataflowRunner):
pipeline_result.wait_until_finish()


if __name__ == '__main__':
run()
27 changes: 11 additions & 16 deletions modules/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import apache_beam as beam
from apache_beam.io import ReadFromPubSub, WriteToBigQuery, BigQueryDisposition
from apache_beam.io.gcp.bigquery_tools import FileFormat, RetryStrategy
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
from dateutil import parser as date_parser

from modules import constants, utils
Expand Down Expand Up @@ -51,37 +51,32 @@ def expand(self, p):


class WriteBigQuery(beam.PTransform):
def __init__(self, table, schema, streaming=None):
def __init__(self, table, schema, streaming=None, method=None):
super().__init__()
self.table = table
self.schema = schema
self.streaming = streaming
self.method = method

def resolve_params(self):
if self.streaming:
# streaming pipeline
if self.method == WriteToBigQuery.Method.STREAMING_INSERTS:
return {
"method": WriteToBigQuery.Method.STREAMING_INSERTS,
"create_disposition": BigQueryDisposition.CREATE_IF_NEEDED,
"write_disposition": BigQueryDisposition.WRITE_APPEND,
"with_auto_sharding": True,
# parameters for STREAMING_INSERTS
"method": WriteToBigQuery.Method.STREAMING_INSERTS,
"ignore_unknown_columns": True,
"insert_retry_strategy": RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
# parameters for FILE_LOADS
# "method": WriteToBigQuery.Method.FILE_LOADS,
# "triggering_frequency": 5 * 60, # seconds
# "additional_bq_parameters": {"ignoreUnknownValues": True},
"with_auto_sharding": self.streaming,
"ignore_unknown_columns": True,
}
else:
# batch pipeline
if self.method == WriteToBigQuery.Method.FILE_LOADS:
return {
"method": WriteToBigQuery.Method.FILE_LOADS,
"create_disposition": BigQueryDisposition.CREATE_IF_NEEDED,
"write_disposition": BigQueryDisposition.WRITE_TRUNCATE,
"method": WriteToBigQuery.Method.FILE_LOADS,
"temp_file_format": FileFormat.JSON,
"additional_bq_parameters": {"ignoreUnknownValues": True},
}
else:
raise RuntimeError(f"BigQuery write method not supported: {self.method}")

def expand(self, pcoll, **kwargs):
return pcoll | WriteToBigQuery(
Expand Down
4 changes: 2 additions & 2 deletions modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ def client_name(file_name):
return client.lower()


def format_table_name(row, table):
def format_table_name(row, table, const=constants.bigquery):
table_name = "{}.{}_{}".format(
constants.bigquery["datasets"][table], row["date"], row["client"]
const["datasets"][table], row["date"], row["client"]
)

if not table_name:
Expand Down
12 changes: 11 additions & 1 deletion test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def test_datetime_to_epoch_empty(self):

def test_crawl_date(self):
dir_name = "gs://httparchive/crawls/android-Apr_1_2022"
self.assertEqual(utils.crawl_date(dir_name), datetime.datetime(2022, 4, 1, 0, 0))
self.assertEqual(
utils.crawl_date(dir_name), datetime.datetime(2022, 4, 1, 0, 0)
)

def test_clamp_integer_normal(self):
self.assertEqual(utils.clamp_integer(1000), 1000)
Expand All @@ -148,3 +150,11 @@ def test_clamp_integers(self):
self.assertEqual(data['b'], utils.BIGQUERY_MAX_INT)
self.assertEqual(data['c'], None)
self.assertIn("Clamping required for {'b': " + str(b), log.output[0])

def test_format_table_name(self):
constant = {"datasets": {"table_type": "project_name:dataset_name"}}
row = {"date": "2022-01-01", "client": "test"}
self.assertEqual(
utils.format_table_name(row, "table_type", constant),
f"{constant['datasets']['table_type']}.{row['date']}_{row['client']}",
)

0 comments on commit 694a507

Please sign in to comment.