diff --git a/docs/code_demos/recipes/database_to_api.py b/docs/code_demos/recipes/database_to_api.py index 8534f32..3b796ff 100644 --- a/docs/code_demos/recipes/database_to_api.py +++ b/docs/code_demos/recipes/database_to_api.py @@ -5,37 +5,47 @@ import datetime as dt import aiohttp -from etlhelper import iter_chunks +import etlhelper as etl +# import DbParams from db import ORACLE_DB logger = logging.getLogger("copy_sensors_async") +# SQL query to get data from Oracle SELECT_SENSORS = """ SELECT CODE, DESCRIPTION FROM BGS.DIC_SEN_SENSOR WHERE date_updated BETWEEN :startdate AND :enddate ORDER BY date_updated """ + +# URL of API we want to send data to BASE_URL = "http://localhost:9200/" +# Headers to tell the API we are sending data in JSON format HEADERS = {"Content-Type": "application/json"} def copy_sensors(startdate: dt.datetime, enddate: dt.datetime) -> None: - """Read sensors from Oracle and post to REST API.""" + """Read sensors from Oracle and post to REST API. + Requires startdate amd enddate to filter to rows changed in a certain time period. + """ logger.info("Copying sensors with timestamps from %s to %s", startdate.isoformat(), enddate.isoformat()) row_count = 0 + # Connect using the imported DbParams with ORACLE_DB.connect("ORACLE_PASSWORD") as conn: # chunks is a generator that yields lists of dictionaries - chunks = iter_chunks( + # passing in our select query, connection object, bind variable parameters and custom transform function + chunks = etl.iter_chunks( SELECT_SENSORS, conn, parameters={"startdate": startdate, "enddate": enddate}, transform=transform_sensors, ) + # for each chunk of rows, synchronously post them to API for chunk in chunks: result = asyncio.run(post_chunk(chunk)) row_count += len(result) @@ -65,10 +75,14 @@ def transform_sensors(chunk: list[tuple]) -> list[tuple]: async def post_chunk(chunk: list[tuple]) -> list: """Post multiple items to API asynchronously.""" + # initialize aiohttp session async with aiohttp.ClientSession() as session: # Build list of tasks tasks = [] + # add each row to list of tasks for aiohttp to execute for item in chunk: + # a task is the instance of a function being executed with distinct arguments + # in this case, the post_one function with argument of a dictionary representing a row of data tasks.append(post_one(item, session)) # Process tasks in parallel. An exception in any will be raised. @@ -83,6 +97,7 @@ async def post_one(item: tuple, session: aiohttp.ClientSession) -> int: response = await session.post( BASE_URL + "sensors/_doc", headers=HEADERS, + # convert python dict to json object data=json.dumps(item), ) @@ -108,6 +123,6 @@ async def post_one(item: tuple, session: aiohttp.ClientSession) -> int: logger.setLevel(logging.INFO) logger.addHandler(handler) - # Copy data from 1 January 2000 to 00:00:00 today + # Copy data that was updated between 1 January 2000 to 00:00:00 today today = dt.datetime.combine(dt.date.today(), dt.time.min) copy_sensors(dt.datetime(2000, 1, 1), today) diff --git a/docs/etl_functions/extract.rst b/docs/etl_functions/extract.rst index fe3eb88..f5e031c 100644 --- a/docs/etl_functions/extract.rst +++ b/docs/etl_functions/extract.rst @@ -56,6 +56,8 @@ Keyword arguments All extract functions are derived from :func:`iter_chunks() ` and take the same keyword arguments, which are passed through. +.. _parameters: + parameters """""""""" diff --git a/docs/recipes/database_to_api.rst b/docs/recipes/database_to_api.rst index 903341b..47468e6 100644 --- a/docs/recipes/database_to_api.rst +++ b/docs/recipes/database_to_api.rst @@ -7,9 +7,13 @@ ETL for posting data from a database into an HTTP API. The API could be a NoSQL document store (e.g. ElasticSearch, Cassandra) or some other web service. -This example transfers data from Oracle to ElasticSearch. It uses -``iter_chunks`` to fetch data from the database without loading it all -into memory at once. A custom transform function creates a dictionary +This example posts data from an Oracle database to an HTTP API. It uses +:func:`iter_chunks() ` to fetch data from the +database without loading it all +into memory at once. :ref:`Parameters ` are sent with the database query to filter +rows to only those changed within specified time period. This is used to +only transfer data that has changed since the last time this script was +ran. A custom transform function creates a dictionary structure from each row of data. This is “dumped” into JSON and posted to the API via ``aiohttp``. @@ -23,3 +27,7 @@ transfer as opposed to posting records in series. In this example, failed rows will fail the whole job. Removing the ``raise_for_status()`` call will let them just be logged instead. + +To provide the database connection, :class:`DbParams ` object is +imported from a separate `db` file. +