diff --git a/README.md b/README.md index 85783b8..dab5502 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ + # data-pipeline The new HTTP Archive data pipeline built entirely on GCP ## Initial setup TODO: follow known instructions +https://beam.apache.org/get-started/quickstart-py/ TODO: python3.8 @@ -52,12 +54,24 @@ while IFS= read -r f; do gcloud pubsub topics publish projects/httparchive/topic ./run_pipeline_streaming.sh ``` -### WIP Read from GCS (batch) +### Read from GCS (batch) ```shell ./run_pipeline_batch.sh ``` +### Pipeline types + +By default, running the pipeline will run in "combined" mode to produce summary and non-summary tables. +This can be controlled using the `--pipeline_type` argument on either batch or streaming. + +> ⚠ Note: streaming to non-summary tables is only supported in the combined pipeline currently (i.e. not supported in non-summary-only) + +```shell +# example +./run_pipeline_batch.sh --pipeline_type=summary +``` + ## Update the pipeline ### Update streaming Supply the run script with a currently running job name @@ -98,6 +112,12 @@ Various incompatibilities due to missing features * missing dead-letter collections for batch loads * fixed vs auto-sharding +#### RuntimeError: VarLong too long + +This is a known issue when using the DirectRunner on Windows 10 with the Beam Python SDK + +https://issues.apache.org/jira/browse/BEAM-11037 + ### Response cache-control max-age Various parsing issues due to unhandled cases diff --git a/modules/combined_pipeline.py b/modules/combined_pipeline.py new file mode 100644 index 0000000..4cbb587 --- /dev/null +++ b/modules/combined_pipeline.py @@ -0,0 +1,182 @@ +import argparse +import logging + +import apache_beam as beam +from apache_beam.io import WriteToBigQuery +from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions + +from modules import summary_pipeline, non_summary_pipeline, constants +from modules.transformation import ReadHarFiles, HarJsonToSummaryDoFn + + +class CombinedPipelineOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + super()._add_argparse_args(parser) + + parser.prog = "combined_pipeline" + + pipeline_types = ["combined", "summary", "non-summary"] + parser.add_argument( + "--pipeline_type", + default="combined", + choices=pipeline_types, + help=f"Type of pipeline to run. One of {','.join(pipeline_types)}", + ) + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "--input", + help="Input file glob to process. Example: gs://httparchive/crawls/*Jan_1_2022", + ) + + group.add_argument( + '--input_file', + help="Input file containing a list of HAR files. " + "Example: gs://httparchive/crawls_manifest/android-May_12_2022.txt" + ) + + group.add_argument( + "--subscription", + help="Pub/Sub subscription. Example: `projects/httparchive/subscriptions/har-gcs-pipeline`", + ) + + bq_write_methods = [ + WriteToBigQuery.Method.STREAMING_INSERTS, + WriteToBigQuery.Method.FILE_LOADS, + ] + parser.add_argument( + "--big_query_write_method_summary", + dest="big_query_write_method", + help=f"BigQuery write method. One of {','.join(bq_write_methods)}", + choices=bq_write_methods, + default=WriteToBigQuery.Method.STREAMING_INSERTS, + ) + + parser.add_argument( + "--dataset_summary_pages", + help="BigQuery dataset to write summary pages tables", + default=constants.BIGQUERY["datasets"]["summary_pages_all"], + ) + parser.add_argument( + "--dataset_summary_requests", + help="BigQuery dataset to write summary requests tables", + default=constants.BIGQUERY["datasets"]["summary_requests_all"], + ) + parser.add_argument( + "--dataset_summary_pages_home_only", + dest="dataset_summary_pages_home_only", + help="BigQuery dataset to write summary pages tables (home-page-only)", + default=constants.BIGQUERY["datasets"]["summary_pages_home"], + ) + parser.add_argument( + "--dataset_summary_requests_home_only", + help="BigQuery dataset to write summary requests tables (home-page-only)", + default=constants.BIGQUERY["datasets"]["summary_requests_home"], + ) + + parser.add_argument( + "--dataset_pages_home_only", + help="BigQuery dataset to write pages table (home-page-only)", + default=constants.BIGQUERY["datasets"]["pages_home"], + ) + parser.add_argument( + "--dataset_technologies_home_only", + help="BigQuery dataset to write technologies table (home-page-only)", + default=constants.BIGQUERY["datasets"]["technologies_home"], + ) + parser.add_argument( + "--dataset_lighthouse_home_only", + help="BigQuery dataset to write lighthouse table (home-page-only)", + default=constants.BIGQUERY["datasets"]["lighthouse_home"], + ) + parser.add_argument( + "--dataset_requests_home_only", + help="BigQuery dataset to write requests table (home-page-only)", + default=constants.BIGQUERY["datasets"]["requests_home"], + ) + parser.add_argument( + "--dataset_response_bodies_home_only", + help="BigQuery dataset to write response_bodies table (home-page-only)", + default=constants.BIGQUERY["datasets"]["response_bodies_home"], + ) + parser.add_argument( + "--dataset_pages", + help="BigQuery dataset to write pages table", + default=constants.BIGQUERY["datasets"]["pages_all"], + ) + parser.add_argument( + "--dataset_technologies", + help="BigQuery dataset to write technologies table", + default=constants.BIGQUERY["datasets"]["technologies_all"], + ) + parser.add_argument( + "--dataset_lighthouse", + help="BigQuery dataset to write lighthouse table", + default=constants.BIGQUERY["datasets"]["lighthouse_all"], + ) + parser.add_argument( + "--dataset_requests", + help="BigQuery dataset to write requests table", + default=constants.BIGQUERY["datasets"]["requests_all"], + ) + parser.add_argument( + "--dataset_response_bodies", + help="BigQuery dataset to write response_bodies table", + default=constants.BIGQUERY["datasets"]["response_bodies_all"], + ) + + parser.add_argument( + "--non_summary_partitions", + dest="partitions", + help="Number of partitions to split non-summary BigQuery write tasks", + default=non_summary_pipeline.NUM_PARTITIONS, + ) + + +def create_pipeline(argv=None): + parser = argparse.ArgumentParser() + known_args, pipeline_args = parser.parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args, save_main_session=True) + standard_options = pipeline_options.view_as(StandardOptions) + combined_options = pipeline_options.view_as(CombinedPipelineOptions) + logging.info( + f"Pipeline Options: {known_args=},{pipeline_args=},{pipeline_options.get_all_options()}," + f"{standard_options},{combined_options}" + ) + + # TODO add metric counters for files in, processed, written to GCP & BQ + + p = beam.Pipeline(options=pipeline_options) + + files = p | ReadHarFiles(**combined_options.get_all_options()) + + # summary pipeline + if combined_options.pipeline_type in ["combined", "summary"]: + pages, requests = files | "ParseHarToSummary" >> beam.ParDo( + HarJsonToSummaryDoFn() + ).with_outputs("page", "requests") + + pages | summary_pipeline.WriteSummaryPagesToBigQuery( + combined_options, standard_options + ) + + requests | summary_pipeline.WriteSummaryRequestsToBigQuery( + combined_options, standard_options + ) + + # non-summary pipeline + if combined_options.pipeline_type in ["combined", "non-summary"]: + ( + files + | "MapJSON" >> beam.MapTuple(non_summary_pipeline.from_json) + | "AddDateAndClient" >> beam.Map(non_summary_pipeline.add_date_and_client) + | "WriteNonSummaryTables" + >> non_summary_pipeline.WriteNonSummaryToBigQuery( + **combined_options.get_all_options() + ) + ) + + # TODO detect DONE file, move temp table to final destination, shutdown pipeline (if streaming) + + return p diff --git a/modules/constants.py b/modules/constants.py index fec310e..748713a 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -1,40 +1,44 @@ import importlib.resources as pkg_resources import json + +def _get_schema(path): + return json.loads(pkg_resources.read_text("schema", path)) + + # TODO remove 'experimental' before going live -bigquery = { +BIGQUERY = { "datasets": { - "pages": "httparchive:experimental_summary_pages", - "requests": "httparchive:experimental_summary_requests", - "home_pages": "httparchive:summary_pages", - "home_requests": "httparchive:summary_requests", + "summary_pages_all": "httparchive:experimental_summary_pages", + "summary_requests_all": "httparchive:experimental_summary_requests", + "pages_all": "httparchive:experimental_pages", + "technologies_all": "httparchive:experimental_technologies", + "lighthouse_all": "httparchive:experimental_lighthouse", + "requests_all": "httparchive:experimental_requests", + "response_bodies_all": "httparchive:experimental_response_bodies", + "summary_pages_home": "httparchive:summary_pages", + "summary_requests_home": "httparchive:summary_requests", + "pages_home": "httparchive:pages", + "technologies_home": "httparchive:technologies", + "lighthouse_home": "httparchive:lighthouse", + "requests_home": "httparchive:requests", + "response_bodies_home": "httparchive:response_bodies", }, "schemas": { - "pages": { - "fields": json.loads( - pkg_resources.read_text("schema", "summary_pages.json") - ) - }, - "requests": { - "fields": json.loads( - pkg_resources.read_text("schema", "summary_requests.json") - ) - }, - "all_pages": { - "fields": json.loads( - pkg_resources.read_text("schema", "all_pages.json") - ) - }, - "all_requests": { - "fields": json.loads( - pkg_resources.read_text("schema", "all_requests.json") - ) - }, + "summary_pages": {"fields": _get_schema("summary_pages.json")}, + "summary_requests": {"fields": _get_schema("summary_requests.json")}, + "pages": {"fields": _get_schema("pages.json")}, + "technologies": {"fields": _get_schema("technologies.json")}, + "lighthouse": {"fields": _get_schema("lighthouse.json")}, + "requests": {"fields": _get_schema("requests.json")}, + "response_bodies": {"fields": _get_schema("response_bodies.json")}, + "all_pages": {"fields": _get_schema("all_pages.json")}, + "all_requests": {"fields": _get_schema("all_requests.json")}, }, } # mapping of headers to DB fields -ghReqHeaders = { +GH_REQ_HEADERS = { "accept": "req_accept", "accept-charset": "req_accept_charset", "accept-encoding": "req_accept_encoding", @@ -46,7 +50,7 @@ "referer": "req_referer", "user-agent": "req_user_agent", } -ghRespHeaders = { +GH_RESP_HEADERS = { "accept-ranges": "resp_accept_ranges", "age": "resp_age", "cache-control": "resp_cache_control", diff --git a/modules/import_all.py b/modules/import_all.py index 72ead5b..e55452b 100644 --- a/modules/import_all.py +++ b/modules/import_all.py @@ -14,7 +14,7 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners import DataflowRunner -from modules import constants +from modules import constants, utils # BigQuery can handle rows up to 100 MB. @@ -69,6 +69,7 @@ def get_page(har, client, crawl_date): rank = int(metadata.get('rank')) if metadata.get('rank') else None try: + page = trim_page(page) payload_json = to_json(page) except ValueError: logging.warning('Skipping pages payload for "%s": unable to stringify as JSON.', wptid) @@ -350,6 +351,10 @@ def get_requests(har, client, crawl_date): if response_body is not None: response_body = response_body[:MAX_CONTENT_SIZE] + mime_type = request.get('response').get('content').get('mimeType') + ext = utils.get_ext(request_url) + type = utils.pretty_type(mime_type, ext) + requests.append({ 'date': date, 'client': client, @@ -358,8 +363,7 @@ def get_requests(har, client, crawl_date): 'root_page': root_page, 'url': request_url, 'is_main_document': is_main_document, - # TODO: Get the type from the summary data. - 'type': '', + 'type': type, 'index': index, 'payload': payload, # TODO: Get the summary data. @@ -390,6 +394,18 @@ def trim_request(request): return request +def trim_page(page): + """Removes unneeded fields from the page object.""" + + if not page: + return None + + # Make a copy first so the data can be used later. + page = deepcopy(page) + page.pop('_parsed_css') + return page + + def to_json(obj): """Returns a JSON representation of the object. diff --git a/modules/import_har.py b/modules/import_har.py deleted file mode 100644 index 48ab81a..0000000 --- a/modules/import_har.py +++ /dev/null @@ -1,173 +0,0 @@ -import argparse -import logging - -import apache_beam as beam -from apache_beam.io.gcp.bigquery import BigQueryWriteFn, WriteToBigQuery -from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions -from apache_beam.runners import DataflowRunner - -from modules import constants, utils -from modules.transformation import ImportHarJson, ReadHarFiles, WriteBigQuery - - -def parse_args(argv): - parser = argparse.ArgumentParser() - - parser.add_argument( - "--input", - dest="input", - help="Input file to process. Example: gs://httparchive/crawls/*Jan_1_2022", - ) - - parser.add_argument( - "--subscription", - dest="subscription", - help="Pub/Sub subscription. Example: `projects/httparchive/subscriptions/har-gcs-pipeline`", - ) - - bq_write_methods = [ - WriteToBigQuery.Method.STREAMING_INSERTS, - WriteToBigQuery.Method.FILE_LOADS, - ] - parser.add_argument( - "--big_query_write_method", - dest="big_query_write_method", - help=f"BigQuery write method. One of {','.join(bq_write_methods)}", - choices=bq_write_methods, - default=WriteToBigQuery.Method.STREAMING_INSERTS, - ) - - parser.add_argument( - "--dataset_summary_pages", - dest="dataset_summary_pages", - help="BigQuery dataset to write summary pages tables", - default=constants.bigquery["datasets"]["pages"], - ) - - parser.add_argument( - "--dataset_summary_requests", - dest="dataset_summary_requests", - help="BigQuery dataset to write summary requests tables", - default=constants.bigquery["datasets"]["requests"], - ) - - parser.add_argument( - "--dataset_summary_pages_home_only", - dest="dataset_summary_pages_home_only", - help="BigQuery dataset to write summary pages tables (home-page-only)", - default=constants.bigquery["datasets"]["home_pages"], - ) - - parser.add_argument( - "--dataset_summary_requests_home_only", - dest="dataset_summary_requests_home_only", - help="BigQuery dataset to write summary requests tables (home-page-only)", - default=constants.bigquery["datasets"]["home_requests"], - ) - - return parser.parse_known_args(argv) - - -def create_pipeline(argv=None): - known_args, pipeline_args = parse_args(argv) - logging.info(f"Pipeline Options: known_args={known_args},pipeline_args={pipeline_args}") - pipeline_options = PipelineOptions(pipeline_args, save_main_session=True) - standard_options = pipeline_options.view_as(StandardOptions) - if not (known_args.subscription or known_args.input): - raise RuntimeError( - "Either one of --input or --subscription options must be provided" - ) - - # TODO log and persist execution arguments to storage for tracking - # https://beam.apache.org/documentation/patterns/pipeline-options/ - - # TODO add metric counters for files in, processed, written to GCP & BQ - - p = beam.Pipeline(options=pipeline_options) - - parsed = ( - p - | ReadHarFiles(known_args.subscription, known_args.input) - | "ParseHar" >> beam.ParDo(ImportHarJson()).with_outputs("page", "requests") - ) - pages, requests = parsed - requests = requests | "FlattenRequests" >> beam.FlatMap(lambda elements: elements) - - home_pages = pages | "FilterHomePages" >> beam.Filter(utils.is_home_page) - home_requests = requests | "FilterHomeRequests" >> beam.Filter(utils.is_home_page) - - deadletter_queues = {} - - deadletter_queues["pages"] = pages | "WritePagesToBigQuery" >> WriteBigQuery( - table=lambda row: utils.format_table_name( - row, known_args.dataset_summary_pages - ), - schema=constants.bigquery["schemas"]["pages"], - streaming=standard_options.streaming, - method=known_args.big_query_write_method, - ) - - deadletter_queues[ - "requests" - ] = requests | "WriteRequestsToBigQuery" >> WriteBigQuery( - table=lambda row: utils.format_table_name( - row, known_args.dataset_summary_requests - ), - schema=constants.bigquery["schemas"]["requests"], - streaming=standard_options.streaming, - method=known_args.big_query_write_method, - ) - - deadletter_queues[ - "home_pages" - ] = home_pages | "WriteHomePagesToBigQuery" >> WriteBigQuery( - table=lambda row: utils.format_table_name( - row, known_args.dataset_summary_pages_home_only - ), - schema=constants.bigquery["schemas"]["pages"], - streaming=standard_options.streaming, - method=known_args.big_query_write_method, - ) - - deadletter_queues[ - "home_requests" - ] = home_requests | "WriteHomeRequestsToBigQuery" >> WriteBigQuery( - table=lambda row: utils.format_table_name( - row, known_args.dataset_summary_requests_home_only - ), - schema=constants.bigquery["schemas"]["requests"], - streaming=standard_options.streaming, - method=known_args.big_query_write_method, - ) - - # deadletter logging - if standard_options.streaming: - for name, transform in deadletter_queues.items(): - transform_name = ( - f"Print{name.replace('_', ' ').title().replace(' ', '')}Errors" - ) - transform[BigQueryWriteFn.FAILED_ROWS] | transform_name >> beam.FlatMap( - lambda e: logging.error(f"Could not load {name} to BigQuery: {e}") - ) - - # TODO implement deadletter for FILE_LOADS? - # FAILED_ROWS not implemented for BigQueryBatchFileLoads in this version of beam (only _StreamToBigQuery) - # pages_result[BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS] | beam.Map(lambda e: print(f"jobidpair: {e}")) - # pages_result[BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS] | beam.Map(lambda e: print(f"files: {e}")) - # pages_result[BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS] | beam.Map(lambda e: print(f"copies: {e}")) - - # TODO detect DONE file, move temp table to final destination, shutdown pipeline (if streaming) - - return p - - -def run(argv=None): - logging.getLogger().setLevel(logging.INFO) - p = create_pipeline() - pipeline_result = p.run(argv) - if not isinstance(p.runner, DataflowRunner): - pipeline_result.wait_until_finish() - - -if __name__ == '__main__': - run() diff --git a/modules/non_summary_pipeline.py b/modules/non_summary_pipeline.py new file mode 100644 index 0000000..bde3aa2 --- /dev/null +++ b/modules/non_summary_pipeline.py @@ -0,0 +1,518 @@ +"""HTTP Archive dataflow pipeline for generating HAR data on BigQuery.""" + +from __future__ import absolute_import + +import json +import logging +from copy import deepcopy +from hashlib import sha256 + +import apache_beam as beam + +from modules import utils, constants, transformation + +# BigQuery can handle rows up to 100 MB. +MAX_CONTENT_SIZE = 2 * 1024 * 1024 +# Number of times to partition the requests tables. +NUM_PARTITIONS = 4 + + +def get_page(har): + """Parses the page from a HAR object.""" + + if not har: + return None + + page = har.get("log").get("pages")[0] + url = page.get("_URL") + + metadata = get_metadata(har) + if metadata: + # The page URL from metadata is more accurate. + # See https://github.com/HTTPArchive/data-pipeline/issues/48 + url = metadata.get("tested_url", url) + + try: + page = trim_page(page) + payload_json = to_json(page) + except Exception: + logging.warning( + 'Skipping pages payload for "%s": unable to stringify as JSON.' % url + ) + return None + + payload_size = len(payload_json) + if payload_size > MAX_CONTENT_SIZE: + logging.warning( + 'Skipping pages payload for "%s": payload size (%s) exceeds the maximum content size of %s bytes.' + % (url, payload_size, MAX_CONTENT_SIZE) + ) + return None + + return [ + { + "url": url, + "payload": payload_json, + "date": har["date"], + "client": har["client"], + "metadata": metadata, + } + ] + + +def get_page_url(har): + """Parses the page URL from a HAR object.""" + + page = get_page(har) + + if not page: + logging.warning("Unable to get URL from page (see preceding warning).") + return None + + return page[0].get("url") + + +def get_metadata(har): + page = har.get("log").get("pages")[0] + metadata = page.get("_metadata") + return metadata + + +def is_home_page(mapped_har): + metadata = mapped_har.get("metadata") + if metadata and "crawl_depth" in metadata: + return metadata.get("crawl_depth") == 0 + # Only home pages have a crawl depth of 0. + else: + return True + # legacy default + + +def partition_step(har, num_partitions): + """Returns a partition number based on the hashed HAR page URL""" + + if not har: + logging.warning("Unable to partition step, null HAR.") + return 0 + + page_url = get_page_url(har) + + if not page_url: + logging.warning("Skipping HAR: unable to get page URL (see preceding warning).") + return 0 + + _hash = hash_url(page_url) + + # shift partitions by one so the zero-th contains errors + offset = 1 + + return (_hash % (num_partitions - 1)) + offset + + +def get_requests(har): + """Parses the requests from a HAR object.""" + + if not har: + return None + + page_url = get_page_url(har) + + if not page_url: + # The page_url field indirectly depends on the get_page function. + # If the page data is unavailable for whatever reason, skip its requests. + logging.warning( + "Skipping requests payload: unable to get page URL (see preceding warning)." + ) + return None + + entries = har.get("log").get("entries") + + requests = [] + + for request in entries: + + request_url = request.get("_full_url") + + if not request_url: + logging.warning('Skipping empty request URL for "%s"', page_url) + continue + + try: + payload = to_json(trim_request(request)) + except Exception: + logging.warning( + 'Skipping requests payload for "%s": unable to stringify as JSON.' + % request_url + ) + continue + + payload_size = len(payload) + if payload_size > MAX_CONTENT_SIZE: + logging.warning( + 'Skipping requests payload for "%s": payload size (%s) exceeded maximum content size of %s bytes.' + % (request_url, payload_size, MAX_CONTENT_SIZE) + ) + continue + + metadata = get_metadata(har) + + requests.append( + { + "page": page_url, + "url": request_url, + "payload": payload, + "date": har["date"], + "client": har["client"], + "metadata": metadata, + } + ) + + return requests + + +def trim_request(request): + """Removes redundant fields from the request object.""" + + # Make a copy first so the response body can be used later. + request = deepcopy(request) + request.get("response").get("content").pop("text", None) + return request + + +def trim_page(page): + """Removes unneeded fields from the page object.""" + + if not page: + return None + + # Make a copy first so the data can be used later. + page = deepcopy(page) + page.pop("_parsed_css") + return page + + +def hash_url(url): + """Hashes a given URL to a process-stable integer value.""" + return int(sha256(url.encode("utf-8")).hexdigest(), 16) + + +def get_response_bodies(har): + """Parses response bodies from a HAR object.""" + + page_url = get_page_url(har) + requests = har.get("log").get("entries") + + response_bodies = [] + + for request in requests: + request_url = request.get("_full_url") + body = None + if request.get("response") and request.get("response").get("content"): + body = request.get("response").get("content").get("text", None) + + if body is None: + continue + + truncated = len(body) > MAX_CONTENT_SIZE + if truncated: + logging.warning( + 'Truncating response body for "%s". Response body size %s exceeds limit %s.' + % (request_url, len(body), MAX_CONTENT_SIZE) + ) + + metadata = get_metadata(har) + + response_bodies.append( + { + "page": page_url, + "url": request_url, + "body": body[:MAX_CONTENT_SIZE], + "truncated": truncated, + "date": har["date"], + "client": har["client"], + "metadata": metadata, + } + ) + + return response_bodies + + +def get_technologies(har): + """Parses the technologies from a HAR object.""" + + if not har: + return None + + page = har.get("log").get("pages")[0] + page_url = page.get("_URL") + app_names = page.get("_detected_apps", {}) + categories = page.get("_detected", {}) + metadata = get_metadata(har) + + # When there are no detected apps, it appears as an empty array. + if isinstance(app_names, list): + app_names = {} + categories = {} + + app_map = {} + app_list = [] + for app, info_list in app_names.items(): + if not info_list: + continue + # There may be multiple info values. Add each to the map. + for info in info_list.split(","): + app_id = "%s %s" % (app, info) if len(info) > 0 else app + app_map[app_id] = app + + for category, apps in categories.items(): + for app_id in apps.split(","): + app = app_map.get(app_id) + info = "" + if app is None: + app = app_id + else: + info = app_id[len(app):].strip() + app_list.append( + { + "url": page_url, + "category": category, + "app": app, + "info": info, + "date": har["date"], + "client": har["client"], + "metadata": metadata, + } + ) + + return app_list + + +def get_lighthouse_reports(har): + """Parses Lighthouse results from a HAR object.""" + + if not har: + return None + + report = har.get("_lighthouse") + + if not report: + return None + + page_url = get_page_url(har) + + if not page_url: + logging.warning( + "Skipping lighthouse report: unable to get page URL (see preceding warning)." + ) + return None + + # Omit large UGC. + report.get("audits").get("screenshot-thumbnails", {}).get("details", {}).pop( + "items", None + ) + + try: + report_json = to_json(report) + except Exception: + logging.warning( + 'Skipping Lighthouse report for "%s": unable to stringify as JSON.' + % page_url + ) + return None + + report_size = len(report_json) + if report_size > MAX_CONTENT_SIZE: + logging.warning( + 'Skipping Lighthouse report for "%s": Report size (%s) exceeded maximum content size of %s bytes.' + % (page_url, report_size, MAX_CONTENT_SIZE) + ) + return None + + metadata = get_metadata(har) + + return [ + { + "url": page_url, + "report": report_json, + "date": har["date"], + "client": har["client"], + "metadata": metadata, + } + ] + + +def to_json(obj): + """Returns a JSON representation of the object. + + This method attempts to mirror the output of the + legacy Java Dataflow pipeline. For the most part, + the default `json.dumps` config does the trick, + but there are a few settings to make it more consistent: + + - Omit whitespace between properties + - Do not escape non-ASCII characters (preserve UTF-8) + + One difference between this Python implementation and the + Java implementation is the way long numbers are handled. + A Python-serialized JSON string might look like this: + + "timestamp":1551686646079.9998 + + while the Java-serialized string uses scientific notation: + + "timestamp":1.5516866460799998E12 + + Out of a sample of 200 actual request objects, this was + the only difference between implementations. This can be + considered an improvement. + """ + + if not obj: + raise ValueError + + return json.dumps(obj, separators=(",", ":"), ensure_ascii=False) + + +def from_json(file_name, element): + """Returns an object from the JSON representation.""" + + try: + return file_name, json.loads(element) + except Exception as e: + logging.error('Unable to parse JSON object "%s...": %s' % (element[:50], e)) + return None + + +def add_date_and_client(element): + """Adds `date` and `client` attributes to facilitate BigQuery table routing""" + + file_name, har = element + date, client = utils.date_and_client_from_file_name(file_name) + page = har.get("log").get("pages")[0] + metadata = page.get("_metadata", {}) + har.update( + { + "date": "{:%Y_%m_%d}".format(date), + "client": metadata.get("layout", client).lower(), + } + ) + + return har + + +class WriteNonSummaryToBigQuery(beam.PTransform): + def __init__( + self, + streaming, + big_query_write_method, + partitions, + dataset_pages, + dataset_technologies, + dataset_lighthouse, + dataset_requests, + dataset_response_bodies, + dataset_pages_home_only, + dataset_technologies_home_only, + dataset_lighthouse_home_only, + dataset_requests_home_only, + dataset_response_bodies_home_only, + label=None, + **kwargs, + ): + # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3. + # super().__init__(label) + beam.PTransform.__init__(self) + self.label = label + + self.streaming = streaming + self.big_query_write_method = big_query_write_method + self.partitions = partitions + self.dataset_pages = dataset_pages + self.dataset_technologies = dataset_technologies + self.dataset_lighthouse = dataset_lighthouse + self.dataset_requests = dataset_requests + self.dataset_response_bodies = dataset_response_bodies + self.dataset_pages_home = dataset_pages_home_only + self.dataset_technologies_home = dataset_technologies_home_only + self.dataset_lighthouse_home = dataset_lighthouse_home_only + self.dataset_requests_home = dataset_requests_home_only + self.dataset_response_bodies_home = dataset_response_bodies_home_only + + def _transform_and_write_partition( + self, pcoll, name, index, fn, table_all, table_home, schema + ): + formatted_name = utils.title_case_beam_transform_name(name) + + all_rows = pcoll | f"Map{formatted_name}{index}" >> beam.FlatMap(fn) + + home_only_rows = all_rows | f"Filter{formatted_name}{index}" >> beam.Filter(is_home_page) + + all_rows | f"Write{formatted_name}All{index}" >> transformation.WriteBigQuery( + table=lambda row: utils.format_table_name(row, table_all), + schema=schema, + streaming=self.streaming, + method=self.big_query_write_method, + ) + + home_only_rows | f"Write{formatted_name}Home{index}" >> transformation.WriteBigQuery( + table=lambda row: utils.format_table_name(row, table_home), + schema=schema, + streaming=self.streaming, + method=self.big_query_write_method, + ) + + def expand(self, hars): + # Add one to the number of partitions to use the zero-th partition for failures + partitions = hars | beam.Partition(partition_step, self.partitions + 1) + + # enumerate starting from 1, discarding the 0th elements (failures) + for idx, part in enumerate(partitions, 1): + self._transform_and_write_partition( + pcoll=part, + name="pages", + index=idx, + fn=get_page, + table_all=self.dataset_pages, + table_home=self.dataset_pages_home, + schema=constants.BIGQUERY["schemas"]["pages"], + ) + + self._transform_and_write_partition( + pcoll=part, + name="technologies", + index=idx, + fn=get_technologies, + table_all=self.dataset_technologies, + table_home=self.dataset_technologies_home, + schema=constants.BIGQUERY["schemas"]["technologies"], + ) + + self._transform_and_write_partition( + pcoll=part, + name="lighthouse", + index=idx, + fn=get_lighthouse_reports, + table_all=self.dataset_lighthouse, + table_home=self.dataset_lighthouse_home, + schema=constants.BIGQUERY["schemas"]["lighthouse"], + ) + + self._transform_and_write_partition( + pcoll=part, + name="requests", + index=idx, + fn=get_requests, + table_all=self.dataset_requests, + table_home=self.dataset_requests_home, + schema=constants.BIGQUERY["schemas"]["requests"], + ) + + self._transform_and_write_partition( + pcoll=part, + name="response_bodies", + index=idx, + fn=get_response_bodies, + table_all=self.dataset_response_bodies, + table_home=self.dataset_response_bodies_home, + schema=constants.BIGQUERY["schemas"]["response_bodies"], + ) diff --git a/modules/summary_pipeline.py b/modules/summary_pipeline.py new file mode 100644 index 0000000..fa30c83 --- /dev/null +++ b/modules/summary_pipeline.py @@ -0,0 +1,85 @@ +import apache_beam as beam + +from modules import constants, utils +from modules.transformation import ( + WriteBigQuery, + add_deadletter_logging, +) + + +class WriteSummaryPagesToBigQuery(beam.PTransform): + def __init__(self, summary_options, standard_options, label=None): + super().__init__(label) + self.summary_options = summary_options + self.standard_options = standard_options + + def expand(self, pages): + home_pages = pages | "FilterHomePages" >> beam.Filter(utils.is_home_page) + + deadletter_queues = { + "pages": pages + | "WritePagesToBigQuery" + >> WriteBigQuery( + table=lambda row: utils.format_table_name( + row, self.summary_options.dataset_summary_pages + ), + schema=constants.BIGQUERY["schemas"]["summary_pages"], + streaming=self.standard_options.streaming, + method=self.summary_options.big_query_write_method, + ), + "home_pages": home_pages + | "WriteHomePagesToBigQuery" + >> WriteBigQuery( + table=lambda row: utils.format_table_name( + row, self.summary_options.dataset_summary_pages_home_only + ), + schema=constants.BIGQUERY["schemas"]["summary_pages"], + streaming=self.standard_options.streaming, + method=self.summary_options.big_query_write_method, + ), + } + + if self.standard_options.streaming: + add_deadletter_logging(deadletter_queues) + + +class WriteSummaryRequestsToBigQuery(beam.PTransform): + def __init__(self, summary_options, standard_options, label=None): + super().__init__(label) + self.summary_options = summary_options + self.standard_options = standard_options + + def expand(self, requests): + requests = requests | "FlattenRequests" >> beam.FlatMap( + lambda elements: elements + ) + + home_requests = requests | "FilterHomeRequests" >> beam.Filter( + utils.is_home_page + ) + + deadletter_queues = { + "requests": requests + | "WriteRequestsToBigQuery" + >> WriteBigQuery( + table=lambda row: utils.format_table_name( + row, self.summary_options.dataset_summary_requests + ), + schema=constants.BIGQUERY["schemas"]["summary_requests"], + streaming=self.standard_options.streaming, + method=self.summary_options.big_query_write_method, + ), + "home_requests": home_requests + | "WriteHomeRequestsToBigQuery" + >> WriteBigQuery( + table=lambda row: utils.format_table_name( + row, self.summary_options.dataset_summary_requests_home_only + ), + schema=constants.BIGQUERY["schemas"]["summary_requests"], + streaming=self.standard_options.streaming, + method=self.summary_options.big_query_write_method, + ), + } + + if self.standard_options.streaming: + add_deadletter_logging(deadletter_queues) diff --git a/modules/transformation.py b/modules/transformation.py index 75d5159..94050bf 100644 --- a/modules/transformation.py +++ b/modules/transformation.py @@ -6,17 +6,27 @@ import apache_beam as beam from apache_beam.io import ReadFromPubSub, WriteToBigQuery, BigQueryDisposition +from apache_beam.io.gcp.bigquery import BigQueryWriteFn from apache_beam.io.gcp.bigquery_tools import RetryStrategy from dateutil import parser as date_parser from modules import constants, utils +def add_deadletter_logging(deadletter_queues): + for name, transform in deadletter_queues.items(): + transform_name = f"Print{utils.title_case_beam_transform_name(name)}Errors" + transform[BigQueryWriteFn.FAILED_ROWS] | transform_name >> beam.FlatMap( + lambda e: logging.error(f"Could not load {name} to BigQuery: {e}") + ) + + class ReadHarFiles(beam.PTransform): - def __init__(self, subscription=None, _input=None): + def __init__(self, subscription=None, input=None, input_file=None, **kwargs): super().__init__() self.subscription = subscription - self.input = _input + self.input = input + self.input_file = input_file def expand(self, p): # PubSub pipeline @@ -31,19 +41,31 @@ def expand(self, p): ) # GCS pipeline else: - matching = ( - self.input if self.input.endswith(".har.gz") else f"{self.input}/*.har.gz" - ) + if self.input: + matching = ( + self.input + if self.input.endswith(".har.gz") + else f"{self.input}/*.har.gz" + # [x if x.endswith(".har.gz") else f"{x}/*.har.gz" for x in self.input] + ) - # using ReadAllFromText instead of ReadFromTextWithFilename to avoid listing file sizes locally - # https://stackoverflow.com/questions/60874942/avoid-recomputing-size-of-all-cloud-storage-files-in-beam-python-sdk - # https://issues.apache.org/jira/browse/BEAM-9620 - # not an issue for the java SDK - # https://beam.apache.org/releases/javadoc/2.37.0/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.Read.html#withHintMatchesManyFiles-- - files = ( - p + # using ReadAllFromText instead of ReadFromTextWithFilename to avoid listing file sizes locally + # https://stackoverflow.com/questions/60874942/avoid-recomputing-size-of-all-cloud-storage-files-in-beam-python-sdk + # https://issues.apache.org/jira/browse/BEAM-9620 + # not an issue for the java SDK + # https://beam.apache.org/releases/javadoc/2.37.0/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.Read.html#withHintMatchesManyFiles-- # TODO replace with match continuously for streaming? - | beam.Create([matching]) + reader = p | beam.Create([matching]) + else: + reader = ( + p + | beam.Create([self.input_file]) + | "ReadInputFile" >> beam.io.ReadAllFromText() + ) + + files = ( + reader + | beam.Reshuffle() | beam.io.ReadAllFromText(with_filename=True) ) @@ -84,33 +106,11 @@ def expand(self, pcoll, **kwargs): ) -def initialize_status_info(file_name, page): - # file name parsing kept for backward compatibility before 2022-03-01 - dir_name, base_name = os.path.split(file_name) - - date = utils.crawl_date(dir_name) - - metadata = page.get("_metadata", {}) - - return { - "archive": "All", # only one value found when porting logic from PHP - "label": "{dt:%b} {dt.day} {dt.year}".format(dt=date), - "crawlid": metadata.get("crawlid", 0), - "wptid": page.get("testID", base_name.split(".")[0]), - "medianRun": 1, # only available in RAW json (median.firstview.run), not HAR json - "page": metadata.get("tested_url", ""), - "pageid": utils.clamp_integer(metadata["page_id"]) if metadata.get("page_id") else None, - "rank": utils.clamp_integer(metadata["rank"]) if metadata.get("rank") else None, - "date": "{:%Y_%m_%d}".format(date), - "client": metadata.get("layout", utils.client_name(file_name)).lower(), - } - - -class ImportHarJson(beam.DoFn): +class HarJsonToSummaryDoFn(beam.DoFn): def process(self, element, **kwargs): file_name, data = element try: - page, requests = self.generate_pages(file_name, data) + page, requests = HarJsonToSummary.generate_pages(file_name, data) if page: yield beam.pvalue.TaggedOutput("page", page) if requests: @@ -118,9 +118,37 @@ def process(self, element, **kwargs): except Exception: logging.exception( f"Unable to unpack HAR, check previous logs for detailed errors. " - f"file_name={file_name}, element={element}" + f"{file_name=}, {element=}" ) + +class HarJsonToSummary: + @staticmethod + def initialize_status_info(file_name, page): + # file name parsing kept for backward compatibility before 2022-03-01 + dir_name, base_name = os.path.split(file_name) + + date, client_name = utils.date_and_client_from_file_name(file_name) + + metadata = page.get("_metadata", {}) + + return { + "archive": "All", # only one value found when porting logic from PHP + "label": "{dt:%b} {dt.day} {dt.year}".format(dt=date), + "crawlid": metadata.get("crawlid", 0), + "wptid": page.get("testID", base_name.split(".")[0]), + "medianRun": 1, # only available in RAW json (median.firstview.run), not HAR json + "page": metadata.get("tested_url", ""), + "pageid": utils.clamp_integer(metadata["page_id"]) + if metadata.get("page_id") + else None, + "rank": utils.clamp_integer(metadata["rank"]) + if metadata.get("rank") + else None, + "date": "{:%Y_%m_%d}".format(date), + "client": metadata.get("layout", client_name).lower(), + } + @staticmethod def generate_pages(file_name, element): if not element: @@ -139,24 +167,24 @@ def generate_pages(file_name, element): logging.warning(f"No pages found for: {file_name}") return None, None - status_info = initialize_status_info(file_name, pages[0]) + status_info = HarJsonToSummary.initialize_status_info(file_name, pages[0]) try: - page = ImportHarJson.import_page(pages[0], status_info) + page = HarJsonToSummary.import_page(pages[0], status_info) except Exception: logging.warning( f"import_page() failed for status_info:{status_info}", exc_info=True ) return None, None - entries, first_url, first_html_url = ImportHarJson.import_entries( + entries, first_url, first_html_url = HarJsonToSummary.import_entries( log["entries"], status_info ) if not entries: logging.warning(f"import_entries() failed for status_info:{status_info}") return None, None else: - agg_stats = ImportHarJson.aggregate_stats( + agg_stats = HarJsonToSummary.aggregate_stats( entries, first_url, first_html_url, status_info ) if not agg_stats: @@ -167,9 +195,9 @@ def generate_pages(file_name, element): else: page.update(agg_stats) - utils.clamp_integers(page, utils.int_columns_for_schema('pages')) + utils.clamp_integers(page, utils.int_columns_for_schema("pages")) for entry in entries: - utils.clamp_integers(entry, utils.int_columns_for_schema('requests')) + utils.clamp_integers(entry, utils.int_columns_for_schema("requests")) return page, entries @@ -178,11 +206,16 @@ def import_entries(entries, status_info): requests = [] first_url = "" first_html_url = "" + entry_number = 0 for entry in entries: + if entry.get("_number"): + entry_number = entry["_number"] + else: + entry_number += 1 ret_request = { - "requestid": (status_info["pageid"] << 32) + entry["_number"], + "requestid": (status_info["pageid"] << 32) + entry_number, "client": status_info["client"], "date": status_info["date"], "pageid": status_info["pageid"], @@ -212,7 +245,7 @@ def import_entries(entries, status_info): request_other_headers, request_cookie_size, ) = utils.parse_header( - request["headers"], constants.ghReqHeaders, cookie_key="cookie" + request["headers"], constants.GH_REQ_HEADERS, cookie_key="cookie" ) req_headers_size = ( @@ -285,7 +318,7 @@ def import_entries(entries, status_info): response_cookie_size, ) = utils.parse_header( response["headers"], - constants.ghRespHeaders, + constants.GH_RESP_HEADERS, cookie_key="set-cookie", output_headers=request_headers, ) @@ -406,9 +439,7 @@ def import_page(page, status_info): ) avg_dom_depth = ( - int(float(page.get("_avg_dom_depth"))) - if page.get("_avg_dom_depth") - else 0 + int(float(page.get("_avg_dom_depth"))) if page.get("_avg_dom_depth") else 0 ) return { diff --git a/modules/utils.py b/modules/utils.py index 12ad335..603701c 100644 --- a/modules/utils.py +++ b/modules/utils.py @@ -7,7 +7,7 @@ from modules import constants -BIGQUERY_MAX_INT = 2 ** 63 - 1 +BIGQUERY_MAX_INT = 2**63 - 1 def remove_empty_keys(d): @@ -142,6 +142,13 @@ def parse_header(input_headers, standard_headers, cookie_key, output_headers=Non return output_headers, ret_other, cookie_size +def date_and_client_from_file_name(file_name): + dir_name, base_name = os.path.split(file_name) + date = crawl_date(dir_name) + client = client_name(file_name) + return date, client + + def client_name(file_name): dir_name, base_name = os.path.split(file_name) client = dir_name.split("/")[-1].split("-")[0] @@ -154,15 +161,12 @@ def client_name(file_name): return client.lower() -def format_table_name(row, table, const=constants.bigquery): - table_name = "{}.{}_{}".format( - const["datasets"][table], row["date"], row["client"] - ) - - if not table_name: - logging.error(f"Unable to determine full table name. table={table},row={row}") - - return table_name +def format_table_name(row, table): + try: + return f"{table}.{row['date']}_{row['client']}" + except Exception: + logging.exception(f"Unable to determine full table name. {table=},{row=}") + raise def datetime_to_epoch(dt, status_info): @@ -199,8 +203,8 @@ def clamp_integers(data, columns): def int_columns_for_schema(schema_name): - schema = constants.bigquery['schemas'][schema_name]['fields'] - return [field['name'] for field in schema if field['type'] == 'INTEGER'] + schema = constants.BIGQUERY["schemas"][schema_name]["fields"] + return [field["name"] for field in schema if field["type"] == "INTEGER"] def is_home_page(element): @@ -211,3 +215,7 @@ def is_home_page(element): else: # legacy crawl data is all home-page only (i.e. no secondary pages) return True + + +def title_case_beam_transform_name(name): + return name.replace("_", " ").title().replace(" ", "") diff --git a/run_pipeline.py b/run_pipeline.py index 87be01e..3e7386f 100644 --- a/run_pipeline.py +++ b/run_pipeline.py @@ -1,7 +1,19 @@ +#!/usr/bin/env python3 + import logging -from modules import import_har +from apache_beam.runners import DataflowRunner -if __name__ == "__main__": +from modules import combined_pipeline + + +def run(argv=None): logging.getLogger().setLevel(logging.INFO) - import_har.run() + p = combined_pipeline.create_pipeline() + pipeline_result = p.run(argv) + if not isinstance(p.runner, DataflowRunner): + pipeline_result.wait_until_finish() + + +if __name__ == "__main__": + run() diff --git a/schema/lighthouse.json b/schema/lighthouse.json new file mode 100644 index 0000000..74676a8 --- /dev/null +++ b/schema/lighthouse.json @@ -0,0 +1,10 @@ +[ + { + "name": "url", + "type": "STRING" + }, + { + "name": "report", + "type": "STRING" + } +] diff --git a/schema/pages.json b/schema/pages.json new file mode 100644 index 0000000..bdf24f3 --- /dev/null +++ b/schema/pages.json @@ -0,0 +1,10 @@ +[ + { + "name": "url", + "type": "STRING" + }, + { + "name": "payload", + "type": "STRING" + } +] diff --git a/schema/requests.json b/schema/requests.json new file mode 100644 index 0000000..459fa42 --- /dev/null +++ b/schema/requests.json @@ -0,0 +1,14 @@ +[ + { + "name": "page", + "type": "STRING" + }, + { + "name": "url", + "type": "STRING" + }, + { + "name": "payload", + "type": "STRING" + } +] diff --git a/schema/response_bodies.json b/schema/response_bodies.json new file mode 100644 index 0000000..712d109 --- /dev/null +++ b/schema/response_bodies.json @@ -0,0 +1,18 @@ +[ + { + "name": "page", + "type": "STRING" + }, + { + "name": "url", + "type": "STRING" + }, + { + "name": "body", + "type": "STRING" + }, + { + "name": "truncated", + "type": "BOOLEAN" + } +] diff --git a/schema/technologies.json b/schema/technologies.json new file mode 100644 index 0000000..da3bc69 --- /dev/null +++ b/schema/technologies.json @@ -0,0 +1,18 @@ +[ + { + "name": "url", + "type": "STRING" + }, + { + "name": "category", + "type": "STRING" + }, + { + "name": "app", + "type": "STRING" + }, + { + "name": "info", + "type": "STRING" + } +] diff --git a/test/test_combined_pipeline.py b/test/test_combined_pipeline.py new file mode 100644 index 0000000..f5e7597 --- /dev/null +++ b/test/test_combined_pipeline.py @@ -0,0 +1,22 @@ +from unittest import TestCase + +import apache_beam as beam + +from modules.combined_pipeline import create_pipeline + + +class TestCombinedPipeline(TestCase): + def test_create_pipeline_serialization(self): + # batch/GCS file glob + p = create_pipeline(["--input", "foo"]) + beam.Pipeline.from_runner_api(p.to_runner_api(), p.runner, p._options) + + # batch/GCS file listing + p = create_pipeline(["--input_file", "bar"]) + beam.Pipeline.from_runner_api(p.to_runner_api(), p.runner, p._options) + + # streaming/pubsub + p = create_pipeline( + ["--subscription", "projects/httparchive/subscriptions/foo"] + ) + beam.Pipeline.from_runner_api(p.to_runner_api(), p.runner, p._options) diff --git a/test/test_non_summary_pipeline.py b/test/test_non_summary_pipeline.py new file mode 100644 index 0000000..d0d781b --- /dev/null +++ b/test/test_non_summary_pipeline.py @@ -0,0 +1,52 @@ +from unittest import TestCase, mock + +from modules.non_summary_pipeline import partition_step + + +class TestNonSummaryPipeline(TestCase): + @mock.patch("modules.non_summary_pipeline.get_page_url", lambda _: "example.com") + def test_partition_step(self): + crawl_depths = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + hars = [] + + for crawl_depth in crawl_depths: + hars += { + "log": {"pages": [{"_metadata": {"crawl_depth": crawl_depth}}]}, + "date": None, + "client": None, + } + + partitions = 4 + min_partition = 1 + + for har in hars: + self.assertTrue( + min_partition <= partition_step(har, partitions) <= partitions + ) + + def test_partition_step_empty_har(self): + har = None + partitions = 4 + expected_warning = "Unable to partition step, null HAR." + expected_return = 0 + + with self.assertLogs(level="WARNING") as log: + ret = partition_step(har, partitions) + self.assertEqual(len(log.output), 1) + self.assertEqual(len(log.records), 1) + self.assertIn(expected_warning, log.output[0]) + self.assertEqual(ret, expected_return) + + @mock.patch("modules.non_summary_pipeline.get_page_url", lambda _: None) + def test_partition_step_empty_page_url(self): + har = {"foo"} + partitions = 4 + expected_warning = "Skipping HAR: unable to get page URL" + expected_return = 0 + + with self.assertLogs(level="WARNING") as log: + ret = partition_step(har, partitions) + self.assertEqual(len(log.output), 1) + self.assertEqual(len(log.records), 1) + self.assertIn(expected_warning, log.output[0]) + self.assertEqual(ret, expected_return) diff --git a/test/test_transformation.py b/test/test_transformation.py index 6c4dbd9..8ae6693 100644 --- a/test/test_transformation.py +++ b/test/test_transformation.py @@ -1,12 +1,12 @@ from unittest import TestCase -from modules.transformation import ImportHarJson +from modules.transformation import HarJsonToSummary, HarJsonToSummaryDoFn class TestImportHarJson(TestCase): def test_generate_pages_none_error(self): with self.assertLogs(level="WARNING") as log: - ret = ImportHarJson.generate_pages("foo", None) + ret = HarJsonToSummary.generate_pages("foo", None) self.assertEqual(len(log.output), 1) self.assertEqual(len(log.records), 1) self.assertIn("HAR file read error", log.output[0]) @@ -14,14 +14,14 @@ def test_generate_pages_none_error(self): def test_generate_pages_decode_warning(self): with self.assertLogs(level="WARNING") as log: - ret = ImportHarJson.generate_pages("foo", "garbage") + ret = HarJsonToSummary.generate_pages("foo", "garbage") self.assertEqual(len(log.output), 1) self.assertEqual(len(log.records), 1) self.assertEqual(ret, (None, None)) def test_generate_pages_empty_error(self): with self.assertLogs(level="WARNING") as log: - ret = ImportHarJson.generate_pages("foo", '{"log": {"pages": []}}') + ret = HarJsonToSummary.generate_pages("foo", '{"log": {"pages": []}}') self.assertEqual(len(log.output), 1) self.assertEqual(len(log.records), 1) self.assertIn("No pages found", log.output[0]) @@ -29,8 +29,8 @@ def test_generate_pages_empty_error(self): def test_import_page_empty_status_info(self): with self.assertRaises(Exception): - ImportHarJson.import_page(None, {}) + HarJsonToSummary.import_page(None, {}) def test_import_har_json_bad_data(self): with self.assertRaises(StopIteration): - next(ImportHarJson().process(("file_name", "data"))) + next(HarJsonToSummaryDoFn().process(("file_name", "data"))) diff --git a/test/test_utils.py b/test/test_utils.py index 43fa6f6..0028105 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -135,26 +135,26 @@ def test_clamp_integer_negative(self): self.assertEqual(utils.clamp_integer(-1000), None) def test_clamp_integer_str(self): - self.assertEqual(utils.clamp_integer('1000'), 1000) + self.assertEqual(utils.clamp_integer("1000"), 1000) def test_clamp_integer_bigint(self): self.assertEqual(utils.clamp_integer(2**64), utils.BIGQUERY_MAX_INT) def test_clamp_integers(self): b = utils.BIGQUERY_MAX_INT + 10 - cols = ['a', 'b', 'c', 'd'] - data = {'a': 1, 'b': b, 'c': None} + cols = ["a", "b", "c", "d"] + data = {"a": 1, "b": b, "c": None} with self.assertLogs(level="WARNING") as log: utils.clamp_integers(data, cols) - self.assertEqual(data['a'], 1) - self.assertEqual(data['b'], utils.BIGQUERY_MAX_INT) - self.assertEqual(data['c'], None) + self.assertEqual(data["a"], 1) + self.assertEqual(data["b"], utils.BIGQUERY_MAX_INT) + self.assertEqual(data["c"], None) self.assertIn("Clamping required for {'b': " + str(b), log.output[0]) def test_format_table_name(self): - constant = {"datasets": {"table_type": "project_name:dataset_name"}} + project_dataset = "project_name:dataset_name" row = {"date": "2022-01-01", "client": "test"} self.assertEqual( - utils.format_table_name(row, "table_type", constant), - f"{constant['datasets']['table_type']}.{row['date']}_{row['client']}", + utils.format_table_name(row, project_dataset), + f"{project_dataset}.{row['date']}_{row['client']}", )