Skip to content

Commit

Permalink
Combine summary and non-summary pipelines (#91)
Browse files Browse the repository at this point in the history
* Reimplement HAR data pipeline with Apache Beam in Python (#93)

* python dataflow pipeline

* example dataflow job

* add support for LH, requests, and pages

* run the whole pipeline

* implement a maximum content size limit

make a deepcopy of the request before deleting the response body

* file notice

* optimize pipeline input

see https://stackoverflow.com/questions/60874942/avoid-recomputing-size-of-all-cloud-storage-files-in-beam-python-sdk

* improve pipeline

fix edge case in technologies transform
try to do more work in more CPUs

* use beam bq sink

* almost working

* integrate python pipeline into sync_har

* ujson

* handle nulls

* install ujson deps on workers

* FIXES

undo ujson, fix flatmap, upgrade to beam 2.26

* it works!

* Fix response_bodies pipeline (#123)

* python dataflow pipeline

* fix response_bodies

* Omit null response bodies (#125)

* Update HAR location to crawls directory

* Add extra check to prevent errors

* Improved exceptions

* Add support for LH desktop (#165)

* Use page URL from metadata (#166)

* Use 4 partitions for requests and response_bodies (#169)

* Update bigquery_import.py

* Update bigquery_import.py

* default url

* partitions

* debugging

* baa baa

* fix

* Ignore secondary pages in non-summary pipeline (#174)

* home pages only

* fix crawl_depth to 0

* move bigquery_import.py

* rename `ImportHarJson` to `HarJsonToSummary`

* refactor bigquery_import.py

* rename `import_har.py` to `summary_pipeline.py`

* rename `bigquery_import.py` to `non_summary_pipeline.py`

* Summary and non-summary pipeline refactors

* summary_pipeline.py: factor out steps following the read (i.e. flattening, BigQuery writes, dead-lettering)

* new custom `PipelineOptions` class for both summary_pipeline.py and non_summary_pipeline.py

* Add combined_pipeline.py

* Working progress on combined_pipeline.py

* combined_pipeline.py: non-summary tables working correctly

* constants.py: added non-summary tables and schemas

* non_summary_pipeline.py: updated partitioning logic

* non_summary_pipeline.py: added `client` and `date` to parsed data for programmatic routing to BigQuery tables

* run_pipeline.py: updated module name

* summary_pipeline.py: updated naming conventions

* transformation.py: separated HAR to summary code into separate classes for re-usability

* utils.py: added and updated some helper functions

* Rename summary BQ write ptransform

* Fix unittests

* Working progress on combined_pipeline.py

* combined_pipeline.py: summary tables working correctly

* constants.py: corrected non-summary schema formatting

* schemas: added non-summary table json files

* transformation.py: added deadletter logging helper; updated requestid logic

* Fix dataflow runner pickling

* add python pipeline runner scripts

* rename `non_summary_pipeline.WriteBigQuery` to `WriteNonSummaryToBigQuery`

* Remove call to `WriteNonSummaryToBigQuery.__init__().super()`

* remove `non_summary_pipeline.get_gcs_dir()`

* various updates to summary_pipeline.py

* linting fixes

* linting fixes

* linting fixes

* linting fixes

* linting fixes

* linting fixes

* Update scripts and docs for run_pipeline.py changes

* linting fixes

* Partitioning: parameterize and add unit test

* Partitioning: add unit test

* delete run_combined_pipeline.py

* Various updates

* combined_pipeline.py: added `CombinedPipelineOptions`; removed `run()`; added combined/summary/non-summary pipeline conditional logic

* non_summary_pipeline.py: explicit options for `WriteNonSummaryToBigQuery`; removed `NonSummaryPipelineOptions`, `create_pipeline()` and `run()`

* run_pipeline.py: removed conditional pipeline logic; added/centralized `run()`

* summary_pipeline.py: removed `SummaryPipelineOptions`, `create_pipeline()` and `run()`

* Add pipeline serialization unittest

* linting fixes

* Add home-only/secondary logic to non-summary pipeline

* Update non-summary partitioning logic

* linting fixes

* Add `--input_file` argument and fix partitioning

* trim parsed_css from pages payload (#99)

* response type (#100)

* Update modules/non_summary_pipeline.py

* Update modules/combined_pipeline.py

* linter

* linter

Co-authored-by: Rick Viscomi <[email protected]>
Co-authored-by: Barry <[email protected]>
  • Loading branch information
3 people authored Jul 1, 2022
1 parent 28aa69c commit e7d1897
Show file tree
Hide file tree
Showing 19 changed files with 1,131 additions and 284 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@

# data-pipeline
The new HTTP Archive data pipeline built entirely on GCP

## Initial setup
TODO: follow known instructions
https://beam.apache.org/get-started/quickstart-py/

TODO: python3.8

Expand Down Expand Up @@ -52,12 +54,24 @@ while IFS= read -r f; do gcloud pubsub topics publish projects/httparchive/topic
./run_pipeline_streaming.sh
```

### WIP Read from GCS (batch)
### Read from GCS (batch)

```shell
./run_pipeline_batch.sh
```

### Pipeline types

By default, running the pipeline will run in "combined" mode to produce summary and non-summary tables.
This can be controlled using the `--pipeline_type` argument on either batch or streaming.

> ⚠ Note: streaming to non-summary tables is only supported in the combined pipeline currently (i.e. not supported in non-summary-only)
```shell
# example
./run_pipeline_batch.sh --pipeline_type=summary
```

## Update the pipeline
### Update streaming
Supply the run script with a currently running job name
Expand Down Expand Up @@ -98,6 +112,12 @@ Various incompatibilities due to missing features
* missing dead-letter collections for batch loads
* fixed vs auto-sharding

#### RuntimeError: VarLong too long

This is a known issue when using the DirectRunner on Windows 10 with the Beam Python SDK

https://issues.apache.org/jira/browse/BEAM-11037

### Response cache-control max-age

Various parsing issues due to unhandled cases
Expand Down
182 changes: 182 additions & 0 deletions modules/combined_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import argparse
import logging

import apache_beam as beam
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

from modules import summary_pipeline, non_summary_pipeline, constants
from modules.transformation import ReadHarFiles, HarJsonToSummaryDoFn


class CombinedPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
super()._add_argparse_args(parser)

parser.prog = "combined_pipeline"

pipeline_types = ["combined", "summary", "non-summary"]
parser.add_argument(
"--pipeline_type",
default="combined",
choices=pipeline_types,
help=f"Type of pipeline to run. One of {','.join(pipeline_types)}",
)

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--input",
help="Input file glob to process. Example: gs://httparchive/crawls/*Jan_1_2022",
)

group.add_argument(
'--input_file',
help="Input file containing a list of HAR files. "
"Example: gs://httparchive/crawls_manifest/android-May_12_2022.txt"
)

group.add_argument(
"--subscription",
help="Pub/Sub subscription. Example: `projects/httparchive/subscriptions/har-gcs-pipeline`",
)

bq_write_methods = [
WriteToBigQuery.Method.STREAMING_INSERTS,
WriteToBigQuery.Method.FILE_LOADS,
]
parser.add_argument(
"--big_query_write_method_summary",
dest="big_query_write_method",
help=f"BigQuery write method. One of {','.join(bq_write_methods)}",
choices=bq_write_methods,
default=WriteToBigQuery.Method.STREAMING_INSERTS,
)

parser.add_argument(
"--dataset_summary_pages",
help="BigQuery dataset to write summary pages tables",
default=constants.BIGQUERY["datasets"]["summary_pages_all"],
)
parser.add_argument(
"--dataset_summary_requests",
help="BigQuery dataset to write summary requests tables",
default=constants.BIGQUERY["datasets"]["summary_requests_all"],
)
parser.add_argument(
"--dataset_summary_pages_home_only",
dest="dataset_summary_pages_home_only",
help="BigQuery dataset to write summary pages tables (home-page-only)",
default=constants.BIGQUERY["datasets"]["summary_pages_home"],
)
parser.add_argument(
"--dataset_summary_requests_home_only",
help="BigQuery dataset to write summary requests tables (home-page-only)",
default=constants.BIGQUERY["datasets"]["summary_requests_home"],
)

parser.add_argument(
"--dataset_pages_home_only",
help="BigQuery dataset to write pages table (home-page-only)",
default=constants.BIGQUERY["datasets"]["pages_home"],
)
parser.add_argument(
"--dataset_technologies_home_only",
help="BigQuery dataset to write technologies table (home-page-only)",
default=constants.BIGQUERY["datasets"]["technologies_home"],
)
parser.add_argument(
"--dataset_lighthouse_home_only",
help="BigQuery dataset to write lighthouse table (home-page-only)",
default=constants.BIGQUERY["datasets"]["lighthouse_home"],
)
parser.add_argument(
"--dataset_requests_home_only",
help="BigQuery dataset to write requests table (home-page-only)",
default=constants.BIGQUERY["datasets"]["requests_home"],
)
parser.add_argument(
"--dataset_response_bodies_home_only",
help="BigQuery dataset to write response_bodies table (home-page-only)",
default=constants.BIGQUERY["datasets"]["response_bodies_home"],
)
parser.add_argument(
"--dataset_pages",
help="BigQuery dataset to write pages table",
default=constants.BIGQUERY["datasets"]["pages_all"],
)
parser.add_argument(
"--dataset_technologies",
help="BigQuery dataset to write technologies table",
default=constants.BIGQUERY["datasets"]["technologies_all"],
)
parser.add_argument(
"--dataset_lighthouse",
help="BigQuery dataset to write lighthouse table",
default=constants.BIGQUERY["datasets"]["lighthouse_all"],
)
parser.add_argument(
"--dataset_requests",
help="BigQuery dataset to write requests table",
default=constants.BIGQUERY["datasets"]["requests_all"],
)
parser.add_argument(
"--dataset_response_bodies",
help="BigQuery dataset to write response_bodies table",
default=constants.BIGQUERY["datasets"]["response_bodies_all"],
)

parser.add_argument(
"--non_summary_partitions",
dest="partitions",
help="Number of partitions to split non-summary BigQuery write tasks",
default=non_summary_pipeline.NUM_PARTITIONS,
)


def create_pipeline(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
standard_options = pipeline_options.view_as(StandardOptions)
combined_options = pipeline_options.view_as(CombinedPipelineOptions)
logging.info(
f"Pipeline Options: {known_args=},{pipeline_args=},{pipeline_options.get_all_options()},"
f"{standard_options},{combined_options}"
)

# TODO add metric counters for files in, processed, written to GCP & BQ

p = beam.Pipeline(options=pipeline_options)

files = p | ReadHarFiles(**combined_options.get_all_options())

# summary pipeline
if combined_options.pipeline_type in ["combined", "summary"]:
pages, requests = files | "ParseHarToSummary" >> beam.ParDo(
HarJsonToSummaryDoFn()
).with_outputs("page", "requests")

pages | summary_pipeline.WriteSummaryPagesToBigQuery(
combined_options, standard_options
)

requests | summary_pipeline.WriteSummaryRequestsToBigQuery(
combined_options, standard_options
)

# non-summary pipeline
if combined_options.pipeline_type in ["combined", "non-summary"]:
(
files
| "MapJSON" >> beam.MapTuple(non_summary_pipeline.from_json)
| "AddDateAndClient" >> beam.Map(non_summary_pipeline.add_date_and_client)
| "WriteNonSummaryTables"
>> non_summary_pipeline.WriteNonSummaryToBigQuery(
**combined_options.get_all_options()
)
)

# TODO detect DONE file, move temp table to final destination, shutdown pipeline (if streaming)

return p
58 changes: 31 additions & 27 deletions modules/constants.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,44 @@
import importlib.resources as pkg_resources
import json


def _get_schema(path):
return json.loads(pkg_resources.read_text("schema", path))


# TODO remove 'experimental' before going live
bigquery = {
BIGQUERY = {
"datasets": {
"pages": "httparchive:experimental_summary_pages",
"requests": "httparchive:experimental_summary_requests",
"home_pages": "httparchive:summary_pages",
"home_requests": "httparchive:summary_requests",
"summary_pages_all": "httparchive:experimental_summary_pages",
"summary_requests_all": "httparchive:experimental_summary_requests",
"pages_all": "httparchive:experimental_pages",
"technologies_all": "httparchive:experimental_technologies",
"lighthouse_all": "httparchive:experimental_lighthouse",
"requests_all": "httparchive:experimental_requests",
"response_bodies_all": "httparchive:experimental_response_bodies",
"summary_pages_home": "httparchive:summary_pages",
"summary_requests_home": "httparchive:summary_requests",
"pages_home": "httparchive:pages",
"technologies_home": "httparchive:technologies",
"lighthouse_home": "httparchive:lighthouse",
"requests_home": "httparchive:requests",
"response_bodies_home": "httparchive:response_bodies",
},
"schemas": {
"pages": {
"fields": json.loads(
pkg_resources.read_text("schema", "summary_pages.json")
)
},
"requests": {
"fields": json.loads(
pkg_resources.read_text("schema", "summary_requests.json")
)
},
"all_pages": {
"fields": json.loads(
pkg_resources.read_text("schema", "all_pages.json")
)
},
"all_requests": {
"fields": json.loads(
pkg_resources.read_text("schema", "all_requests.json")
)
},
"summary_pages": {"fields": _get_schema("summary_pages.json")},
"summary_requests": {"fields": _get_schema("summary_requests.json")},
"pages": {"fields": _get_schema("pages.json")},
"technologies": {"fields": _get_schema("technologies.json")},
"lighthouse": {"fields": _get_schema("lighthouse.json")},
"requests": {"fields": _get_schema("requests.json")},
"response_bodies": {"fields": _get_schema("response_bodies.json")},
"all_pages": {"fields": _get_schema("all_pages.json")},
"all_requests": {"fields": _get_schema("all_requests.json")},
},
}

# mapping of headers to DB fields
ghReqHeaders = {
GH_REQ_HEADERS = {
"accept": "req_accept",
"accept-charset": "req_accept_charset",
"accept-encoding": "req_accept_encoding",
Expand All @@ -46,7 +50,7 @@
"referer": "req_referer",
"user-agent": "req_user_agent",
}
ghRespHeaders = {
GH_RESP_HEADERS = {
"accept-ranges": "resp_accept_ranges",
"age": "resp_age",
"cache-control": "resp_cache_control",
Expand Down
22 changes: 19 additions & 3 deletions modules/import_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import DataflowRunner

from modules import constants
from modules import constants, utils


# BigQuery can handle rows up to 100 MB.
Expand Down Expand Up @@ -69,6 +69,7 @@ def get_page(har, client, crawl_date):
rank = int(metadata.get('rank')) if metadata.get('rank') else None

try:
page = trim_page(page)
payload_json = to_json(page)
except ValueError:
logging.warning('Skipping pages payload for "%s": unable to stringify as JSON.', wptid)
Expand Down Expand Up @@ -350,6 +351,10 @@ def get_requests(har, client, crawl_date):
if response_body is not None:
response_body = response_body[:MAX_CONTENT_SIZE]

mime_type = request.get('response').get('content').get('mimeType')
ext = utils.get_ext(request_url)
type = utils.pretty_type(mime_type, ext)

requests.append({
'date': date,
'client': client,
Expand All @@ -358,8 +363,7 @@ def get_requests(har, client, crawl_date):
'root_page': root_page,
'url': request_url,
'is_main_document': is_main_document,
# TODO: Get the type from the summary data.
'type': '',
'type': type,
'index': index,
'payload': payload,
# TODO: Get the summary data.
Expand Down Expand Up @@ -390,6 +394,18 @@ def trim_request(request):
return request


def trim_page(page):
"""Removes unneeded fields from the page object."""

if not page:
return None

# Make a copy first so the data can be used later.
page = deepcopy(page)
page.pop('_parsed_css')
return page


def to_json(obj):
"""Returns a JSON representation of the object.
Expand Down
Loading

0 comments on commit e7d1897

Please sign in to comment.