diff --git a/containers/airflow/Dockerfile b/containers/airflow/Dockerfile index 96068ff..db1658c 100644 --- a/containers/airflow/Dockerfile +++ b/containers/airflow/Dockerfile @@ -73,13 +73,8 @@ COPY --chown=airflow containers/airflow/envs/* /opt/envs/ USER airflow -ARG POSTGRES_EPIGRAPH_HOST -ARG POSTGRES_EPIGRAPH_PORT -ARG POSTGRES_EPIGRAPH_USER -ARG POSTGRES_EPIGRAPH_PASSWORD -ARG POSTGRES_EPIGRAPH_DB -ENV DB_USER "${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}" -ENV DB_URI "${DB_USER}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}" +ARG DB_URI +ENV DB_URI "${DB_URI}" RUN /usr/local/bin/python -m virtualenv /opt/envs/py310 --python="/opt/py310/bin/python3.10" \ && sed -i "s/include-system-site-packages = false/include-system-site-packages = true/" /opt/envs/py310/pyvenv.cfg \ @@ -94,7 +89,9 @@ RUN /usr/local/bin/python -m virtualenv /opt/envs/py311 --python="/opt/py311/bin && source /opt/envs/py311/bin/activate \ && pip install "cython<3.0.0" \ && pip install --no-build-isolation "pyyaml<6.0" \ - && pip install -r /opt/envs/pysus.txt + && pip install \ + psycopg2-binary \ + -r /opt/envs/pysus.txt WORKDIR ${AIRFLOW_HOME} diff --git a/containers/airflow/dags/brasil/sinan/chikungunya.py b/containers/airflow/dags/brasil/sinan/chikungunya.py new file mode 100644 index 0000000..f4e037c --- /dev/null +++ b/containers/airflow/dags/brasil/sinan/chikungunya.py @@ -0,0 +1,180 @@ +import pendulum + +from datetime import timedelta +from airflow import DAG +from airflow.decorators import task +from airflow.models import Variable + + +default_args = { + "owner": "epigraphhub", + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 1, 3), + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=1), +} + +with DAG( + dag_id='SINAN_CHIK', + tags=['SINAN', 'Brasil', 'Chikungunya'], + schedule='0 0 3 * *', + default_args=default_args, + catchup=False, +) as dag: + + CONN = Variable.get('egh_conn', deserialize_json=True) + + @task.external_python( + task_id='update_chik', + python='/opt/py311/bin/python3.11' + ) + def update_chik(egh_conn: dict): + """ + This task will run in an isolated python environment, containing PySUS + package. The task will fetch for all Chikungunya years from DATASUS and + insert them into EGH database + """ + import os + import logging + import pandas as pd + + from sqlalchemy import create_engine, text + from pysus.ftp.databases.sinan import SINAN + + sinan = SINAN().load() + dis_code = "CHIK" + tablename = "sinan_chikungunya_m" + files = sinan.get_files(dis_code=dis_code) + + def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): + """ + Insert dataframe into db, include missing columns if needed + """ + df.columns = df.columns.str.lower() + + with create_engine(egh_conn['URI']).connect() as conn: + # Get columns + res = conn.execute( + text(f'SELECT * FROM brasil.{tablename} LIMIT 1')) + sql_columns = set(i[0] for i in res.cursor.description) + + df_columns = set(df.columns) + columns_to_add = df_columns.difference(sql_columns) + + if columns_to_add: + sql_statements = [f"ALTER TABLE brasil.{tablename}"] + for column in columns_to_add: + sql_statements.append( + f"ADD COLUMN {column} TEXT,") # object + + sql_statements[-1] = sql_statements[-1].replace(',', ';') + + with create_engine(egh_conn['URI']).connect() as conn: + sql = ' '.join(sql_statements) + logging.warning(f"EXECUTING: {sql}") + conn.execute(text(sql)) + conn.commit() + + for col, dtype in df.dtypes.items(): + if col in ['dt_notific', 'dt_sin_pri']: + try: + df[col] = pd.to_datetime(df[col]).dt.strftime( + '%d%m%Y').astype('object') + dtype = 'object' + logging.warning( + f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'" + ) + except ValueError as error: + logging.error( + f'Could not format date column correctly: {error}') + df[col] = df[col].astype('object') + dtype = 'object' + + if str(dtype) != 'object': + df[col] = df[col].astype('object') + logging.warning( + f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + ) + + df['year'] = year + df['prelim'] = prelim + + df.to_sql( + name=tablename, + con=engine, + schema="brasil", + if_exists='append', + index=False + ) + + def insert_parquets(parquet_dir: str, year: int, prelim: bool): + """ + Insert parquet dir into database using its chunks. Delete the chunk + and the directory after insertion + """ + for parquet in os.listdir(parquet_dir): + file = os.path.join(parquet_dir, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') + + to_sql_include_cols(df, prelim, create_engine(egh_conn['URI'])) + logging.warning(f"{len(df)} rows inserted into {tablename}") + + del df + os.remove(file) + os.rmdir(parquets.path) + + f_stage = {} + for file in files: + code, year = sinan.format(file) + stage = 'prelim' if 'PRELIM' in file.path else 'final' + + if not stage in f_stage: + f_stage[stage] = [year] + else: + f_stage[stage].append(year) + + for year in f_stage['final']: + # Check if final is already in DB + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = False" + )) + count = cur.fetchone()[0] + + logging.info(f"Final year {year}: {count}") + + if not count: + # Check on prelims + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + count = cur.fetchone()[0] + + if count: + # Update prelim to final + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, False) + + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, True) + + update_chik(CONN) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py new file mode 100644 index 0000000..97232e5 --- /dev/null +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -0,0 +1,178 @@ +import pendulum + +from datetime import timedelta +from airflow import DAG +from airflow.decorators import task +from airflow.models import Variable + + +default_args = { + "owner": "epigraphhub", + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 1, 1), + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=1), +} + +with DAG( + dag_id='SINAN_DENG', + tags=['SINAN', 'Brasil', 'Dengue'], + schedule='@monthly', + default_args=default_args, + catchup=False, +) as dag: + + CONN = Variable.get('egh_conn', deserialize_json=True) + + @task.external_python( + task_id='update_dengue', + python='/opt/py311/bin/python3.11' + ) + def update_dengue(egh_conn: dict): + """ + This task will run in an isolated python environment, containing PySUS + package. The task will fetch for all Dengue years from DATASUS and insert + them into EGH database + """ + import os + import logging + import pandas as pd + + from sqlalchemy import create_engine, text + from pysus.ftp.databases.sinan import SINAN + + sinan = SINAN().load() + dis_code = "DENG" + tablename = "sinan_dengue_m" + files = sinan.get_files(dis_code=dis_code) + + + def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): + """ + Insert dataframe into db, include missing columns if needed + """ + df.columns = df.columns.str.lower() + + with create_engine(egh_conn['URI']).connect() as conn: + # Get columns + res = conn.execute(text(f'SELECT * FROM brasil.{tablename} LIMIT 1')) + sql_columns = set(i[0] for i in res.cursor.description) + + df_columns = set(df.columns) + columns_to_add = df_columns.difference(sql_columns) + + if columns_to_add: + sql_statements = [f"ALTER TABLE brasil.{tablename}"] + for column in columns_to_add: + sql_statements.append(f"ADD COLUMN {column} TEXT,") # object + + sql_statements[-1] = sql_statements[-1].replace(',', ';') + + with create_engine(egh_conn['URI']).connect() as conn: + sql = ' '.join(sql_statements) + logging.warning(f"EXECUTING: {sql}") + conn.execute(text(sql)) + conn.commit() + + for col, dtype in df.dtypes.items(): + if col in ['dt_notific', 'dt_sin_pri']: + try: + df[col] = pd.to_datetime(df[col]).dt.strftime('%d%m%Y').astype('object') + dtype = 'object' + logging.warning( + f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'" + ) + except ValueError as error: + logging.error(f'Could not format date column correctly: {error}') + df[col] = df[col].astype('object') + dtype = 'object' + + if str(dtype) != 'object': + df[col] = df[col].astype('object') + logging.warning( + f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + ) + + df['year'] = year + df['prelim'] = prelim + + df.to_sql( + name=tablename, + con=engine, + schema="brasil", + if_exists='append', + index=False + ) + + def insert_parquets(parquet_dir: str, year: int, prelim: bool): + """ + Insert parquet dir into database using its chunks. Delete the chunk + and the directory after insertion + """ + for parquet in os.listdir(parquet_dir): + file = os.path.join(parquet_dir, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') + + to_sql_include_cols(df, prelim, create_engine(egh_conn['URI'])) + logging.warning(f"{len(df)} rows inserted into {tablename}") + + del df + os.remove(file) + os.rmdir(parquets.path) + + + f_stage = {} + for file in files: + code, year = sinan.format(file) + stage = 'prelim' if 'PRELIM' in file.path else 'final' + + if not stage in f_stage: + f_stage[stage] = [year] + else: + f_stage[stage].append(year) + + for year in f_stage['final']: + # Check if final is already in DB + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = False" + )) + count = cur.fetchone()[0] + + logging.info(f"Final year {year}: {count}") + + if not count: + # Check on prelims + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + count = cur.fetchone()[0] + + if count: + # Update prelim to final + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, False) + + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, True) + + update_dengue(CONN) diff --git a/containers/airflow/dags/brasil/sinan/zika.py b/containers/airflow/dags/brasil/sinan/zika.py new file mode 100644 index 0000000..da6ab7c --- /dev/null +++ b/containers/airflow/dags/brasil/sinan/zika.py @@ -0,0 +1,180 @@ +import pendulum + +from datetime import timedelta +from airflow import DAG +from airflow.decorators import task +from airflow.models import Variable + + +default_args = { + "owner": "epigraphhub", + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 1, 2), + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=1), +} + +with DAG( + dag_id='SINAN_ZIKA', + tags=['SINAN', 'Brasil', 'Zika'], + schedule='0 0 2 * *', + default_args=default_args, + catchup=False, +) as dag: + + CONN = Variable.get('egh_conn', deserialize_json=True) + + @task.external_python( + task_id='update_zika', + python='/opt/py311/bin/python3.11' + ) + def update_zika(egh_conn: dict): + """ + This task will run in an isolated python environment, containing PySUS + package. The task will fetch for all Zika years from DATASUS and insert + them into EGH database + """ + import os + import logging + import pandas as pd + + from sqlalchemy import create_engine, text + from pysus.ftp.databases.sinan import SINAN + + sinan = SINAN().load() + dis_code = "ZIKA" + tablename = "sinan_zika_m" + files = sinan.get_files(dis_code=dis_code) + + def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): + """ + Insert dataframe into db, include missing columns if needed + """ + df.columns = df.columns.str.lower() + + with create_engine(egh_conn['URI']).connect() as conn: + # Get columns + res = conn.execute( + text(f'SELECT * FROM brasil.{tablename} LIMIT 1')) + sql_columns = set(i[0] for i in res.cursor.description) + + df_columns = set(df.columns) + columns_to_add = df_columns.difference(sql_columns) + + if columns_to_add: + sql_statements = [f"ALTER TABLE brasil.{tablename}"] + for column in columns_to_add: + sql_statements.append( + f"ADD COLUMN {column} TEXT,") # object + + sql_statements[-1] = sql_statements[-1].replace(',', ';') + + with create_engine(egh_conn['URI']).connect() as conn: + sql = ' '.join(sql_statements) + logging.warning(f"EXECUTING: {sql}") + conn.execute(text(sql)) + conn.commit() + + for col, dtype in df.dtypes.items(): + if col in ['dt_notific', 'dt_sin_pri']: + try: + df[col] = pd.to_datetime(df[col]).dt.strftime( + '%d%m%Y').astype('object') + dtype = 'object' + logging.warning( + f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'" + ) + except ValueError as error: + logging.error( + f'Could not format date column correctly: {error}') + df[col] = df[col].astype('object') + dtype = 'object' + + if str(dtype) != 'object': + df[col] = df[col].astype('object') + logging.warning( + f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + ) + + df['year'] = year + df['prelim'] = prelim + + df.to_sql( + name=tablename, + con=engine, + schema="brasil", + if_exists='append', + index=False + ) + + def insert_parquets(parquet_dir: str, year: int, prelim: bool): + """ + Insert parquet dir into database using its chunks. Delete the chunk + and the directory after insertion + """ + for parquet in os.listdir(parquet_dir): + file = os.path.join(parquet_dir, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') + + to_sql_include_cols(df, prelim, create_engine(egh_conn['URI'])) + logging.warning(f"{len(df)} rows inserted into {tablename}") + + del df + os.remove(file) + os.rmdir(parquets.path) + + f_stage = {} + for file in files: + code, year = sinan.format(file) + stage = 'prelim' if 'PRELIM' in file.path else 'final' + + if not stage in f_stage: + f_stage[stage] = [year] + else: + f_stage[stage].append(year) + + for year in f_stage['final']: + # Check if final is already in DB + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = False" + )) + count = cur.fetchone()[0] + + logging.info(f"Final year {year}: {count}") + + if not count: + # Check on prelims + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + count = cur.fetchone()[0] + + if count: + # Update prelim to final + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, False) + + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, True) + + update_zika(CONN) diff --git a/containers/airflow/env.tpl b/containers/airflow/env.tpl new file mode 100644 index 0000000..9b4705a --- /dev/null +++ b/containers/airflow/env.tpl @@ -0,0 +1,20 @@ +AIRFLOW_PROJ_DIR=${AIRFLOW_PROJ_DIR} +AIRFLOW_UID=${AIRFLOW_UID} +AIRFLOW_PORT=${AIRFLOW_PORT} +_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME} +_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD} + +AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY} + +AIRFLOW__SMTP__SMTP_HOST=${AIRFLOW__SMTP__SMTP_HOST} +AIRFLOW__SMTP__SMTP_USER=${AIRFLOW__SMTP__SMTP_USER} +AIRFLOW__SMTP__SMTP_PASSWORD=${AIRFLOW__SMTP__SMTP_PASSWORD} +AIRFLOW__SMTP__SMTP_PORT=${AIRFLOW__SMTP__SMTP_PORT:-587} +AIRFLOW__SMTP__SMTP_MAIL_FROM=${AIRFLOW__SMTP__SMTP_MAIL_FROM} + +POSTGRES_EPIGRAPH_DB=${POSTGRES_EPIGRAPH_DB} +POSTGRES_EPIGRAPH_HOST=${POSTGRES_EPIGRAPH_HOST} +POSTGRES_EPIGRAPH_PORT=${POSTGRES_EPIGRAPH_PORT} +POSTGRES_EPIGRAPH_USER=${POSTGRES_EPIGRAPH_USER} +POSTGRES_EPIGRAPH_PASSWORD=${POSTGRES_EPIGRAPH_PASSWORD} +AIRFLOW_VAR_EGH_CONN='{"URI":"postgresql://${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"}' diff --git a/containers/airflow/envs/pysus.txt b/containers/airflow/envs/pysus.txt index 79e71e0..3508e37 100644 --- a/containers/airflow/envs/pysus.txt +++ b/containers/airflow/envs/pysus.txt @@ -1 +1,2 @@ pysus >= 0.10.2 +SQLAlchemy >= 2.0.21 diff --git a/containers/compose-airflow.yaml b/containers/compose-airflow.yaml index 7f7fbd5..428d887 100644 --- a/containers/compose-airflow.yaml +++ b/containers/compose-airflow.yaml @@ -5,17 +5,14 @@ x-airflow-common: context: .. dockerfile: containers/airflow/Dockerfile args: - POSTGRES_EPIGRAPH_HOST: ${POSTGRES_EPIGRAPH_HOST} - POSTGRES_EPIGRAPH_PORT: ${POSTGRES_EPIGRAPH_PORT} - POSTGRES_EPIGRAPH_USER: ${POSTGRES_EPIGRAPH_USER} - POSTGRES_EPIGRAPH_PASSWORD: ${POSTGRES_EPIGRAPH_PASSWORD} - POSTGRES_EPIGRAPH_DB: ${POSTGRES_EPIGRAPH_DB} + DB_URI: ${DB_URI} environment: &airflow-common-env AIRFLOW_HOME: /opt/airflow AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW__CORE__FERNET_KEY} AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + AIRFLOW_VAR_EGH_CONN: ${AIRFLOW_VAR_EGH_CONN} volumes: - ${AIRFLOW_PROJ_DIR}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR}/logs:/opt/airflow/logs