Simple Python client for interacting with Google BigQuery.
This client provides an API for retrieving and inserting BigQuery data by wrapping Google's low-level API client library. It also provides facilities that make it convenient to access data that is tied to an App Engine appspot, such as request logs.
pip install bigquery-python
from bigquery import get_client
# BigQuery project id as listed in the Google Developers Console.
project_id = 'project_id'
# Service account email address as listed in the Google Developers Console.
service_account = '[email protected]'
# PKCS12 or PEM key provided by Google.
key = 'key.pem'
client = get_client(project_id, service_account=service_account,
private_key_file=key, readonly=True)
# JSON key provided by Google
json_key = 'key.json'
client = get_client(json_key_file=json_key, readonly=True)
# Submit an async query.
job_id, _results = client.query('SELECT * FROM dataset.my_table LIMIT 1000')
# Check if the query has finished running.
complete, row_count = client.check_job(job_id)
# Retrieve the results.
results = client.get_query_rows(job_id)
The BigQuery client allows you to execute raw queries against a dataset. The query
method inserts a query job into BigQuery. By default, query
method runs asynchronously with 0
for timeout
. When a non-zero timeout value is specified, the job will wait for the results, and throws an exception on timeout.
When you run an async query, you can use the returned job_id
to poll for job status later with check_job
.
# Submit an async query
job_id, _results = client.query('SELECT * FROM dataset.my_table LIMIT 1000')
# Do other stuffs
# Poll for query completion.
complete, row_count = client.check_job(job_id)
# Retrieve the results.
if complete:
results = client.get_query_rows(job_id)
You can also specify a non-zero timeout value if you want your query to be synchronous.
# Submit a synchronous query
try:
_job_id, results = client.query('SELECT * FROM dataset.my_table LIMIT 1000', timeout=10)
except BigQueryTimeoutException:
print "Timeout"
The query_builder
module provides an API for generating query strings that can be run using the BigQuery client.
from bigquery.query_builder import render_query
selects = {
'start_time': {
'alias': 'Timestamp',
'format': 'INTEGER-FORMAT_UTC_USEC'
}
}
conditions = [
{
'field': 'Timestamp',
'type': 'INTEGER',
'comparators': [
{
'condition': '>=',
'negate': False,
'value': 1399478981
}
]
}
]
grouping = ['Timestamp']
having = [
{
'field': 'Timestamp',
'type': 'INTEGER',
'comparators': [
{
'condition': '==',
'negate': False,
'value': 1399478981
}
]
}
]
order_by ={'fields': ['Timestamp'], 'direction': 'desc'}
query = render_query(
'dataset',
['table'],
select=selects,
conditions=conditions,
groupings=grouping,
having=having,
order_by=order_by,
limit=47
)
job_id, _ = client.query(query)
The BigQuery client provides facilities to manage dataset tables, including creating, deleting, checking the existence, and getting the metadata of tables.
# Create a new table.
schema = [
{'name': 'foo', 'type': 'STRING', 'mode': 'nullable'},
{'name': 'bar', 'type': 'FLOAT', 'mode': 'nullable'}
]
created = client.create_table('dataset', 'my_table', schema)
# Delete an existing table.
deleted = client.delete_table('dataset', 'my_table')
# Check if a table exists.
exists = client.check_table('dataset', 'my_table')
# Get a table's full metadata. Includes numRows, numBytes, etc.
# See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables
metadata = client.get_table('dataset', 'my_table')
There is also functionality for retrieving tables that are associated with a Google App Engine appspot, assuming table names are in the form of appid_YYYY_MM or YYYY_MM_appid. This allows tables between a date range to be selected and queried on.
# Get appspot tables falling within a start and end time.
from datetime import datetime, timedelta
range_end = datetime.utcnow()
range_start = range_end - timedelta(weeks=12)
tables = client.get_tables('dataset', 'appid', range_start, range_end)
The client provides an API for inserting data into a BigQuery table. The last parameter refers to an optional insert id key used to avoid duplicate entries.
# Insert data into table.
rows = [
{'one': 'ein', 'two': 'zwei'}
{'id': 'NzAzYmRiY', 'one': 'uno', 'two': 'dos'},
{'id': 'NzAzYmRiY', 'one': 'ein', 'two': 'zwei'} # duplicate entry
]
inserted = client.push_rows('dataset', 'table', rows, 'id')
You can write query results directly to table. When either dataset or table parameter is omitted, query result will be written to temporary table.
# write to permanent table
job = client.write_to_table('SELECT * FROM dataset.original_table LIMIT 100',
'dataset',
'table')
try:
job_resource = client.wait_for_job(job, timeout=60)
print job_resource
except BigQueryTimeoutException:
print "Timeout"
# write to permanent table with UDF in query string
external_udf_uris = ["gs://bigquery-sandbox-udf/url_decode.js"]
query = """SELECT requests, title
FROM
urlDecode(
SELECT
title, sum(requests) AS num_requests
FROM
[fh-bigquery:wikipedia.pagecounts_201504]
WHERE language = 'fr'
GROUP EACH BY title
)
WHERE title LIKE '%ç%'
ORDER BY requests DESC
LIMIT 100
"""
job = client.write_to_table(
query,
'dataset',
'table',
external_udf_uris=external_udf_uris
)
try:
job_resource = client.wait_for_job(job, timeout=60)
print job_resource
except BigQueryTimeoutException:
print "Timeout"
# write to temporary table
job = client.write_to_table('SELECT * FROM dataset.original_table LIMIT 100')
try:
job_resource = client.wait_for_job(job, timeout=60)
print job_resource
except BigQueryTimeoutException:
print "Timeout"
schema = [ {"name": "username", "type": "string", "mode": "nullable"} ]
job = client.import_data_from_uris( ['gs://mybucket/mydata.json'],
'dataset',
'table',
schema,
source_format=JOB_SOURCE_FORMAT_JSON)
try:
job_resource = client.wait_for_job(job, timeout=60)
print job_resource
except BigQueryTimeoutException:
print "Timeout"
job = client.export_data_to_uris( ['gs://mybucket/mydata.json'],
'dataset',
'table')
try:
job_resource = client.wait_for_job(job, timeout=60)
print job_resource
except BigQueryTimeoutException:
print "Timeout"
The client provides an API for listing, creating, deleting, updating and patching datasets.
# List datasets
datasets = client.get_datasets()
# Create dataset
dataset = client.create_dataset('mydataset', friendly_name="My Dataset", description="A dataset created by me")
# Get dataset
client.get_dataset('mydataset')
# Delete dataset
client.delete_dataset('mydataset')
client.delete_dataset('mydataset', delete_contents=True) # delete even if it contains data
# Update dataset
client.update_dataset('mydataset', friendly_name="mon Dataset") # description is deleted
# Patch dataset
client.patch_dataset('mydataset', friendly_name="mon Dataset") # friendly_name changed; description is preserved
# Check if dataset exists.
exists = client.check_dataset('mydataset')
from bigquery import schema_from_record
schema_from_record({"id":123, "posts": [{"id":123, "text": "this is a post"}], "username": "bob"})
Requirements to commit here:
- Branch off master, PR back to master.
- Your code should pass Flake8.
- Unit test coverage is required.
- Good docstrs are required.
- Good commit messages are required.