Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sinan DAGS): create DAG to fetch dengue data from SINAN #201

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
13 changes: 5 additions & 8 deletions containers/airflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,8 @@ COPY --chown=airflow containers/airflow/envs/* /opt/envs/

USER airflow

ARG POSTGRES_EPIGRAPH_HOST
ARG POSTGRES_EPIGRAPH_PORT
ARG POSTGRES_EPIGRAPH_USER
ARG POSTGRES_EPIGRAPH_PASSWORD
ARG POSTGRES_EPIGRAPH_DB
ENV DB_USER "${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}"
ENV DB_URI "${DB_USER}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"
ARG DB_URI
ENV DB_URI "${DB_URI}"

RUN /usr/local/bin/python -m virtualenv /opt/envs/py310 --python="/opt/py310/bin/python3.10" \
&& sed -i "s/include-system-site-packages = false/include-system-site-packages = true/" /opt/envs/py310/pyvenv.cfg \
Expand All @@ -94,7 +89,9 @@ RUN /usr/local/bin/python -m virtualenv /opt/envs/py311 --python="/opt/py311/bin
&& source /opt/envs/py311/bin/activate \
&& pip install "cython<3.0.0" \
&& pip install --no-build-isolation "pyyaml<6.0" \
&& pip install -r /opt/envs/pysus.txt
&& pip install \
psycopg2-binary \
-r /opt/envs/pysus.txt

WORKDIR ${AIRFLOW_HOME}

Expand Down
180 changes: 180 additions & 0 deletions containers/airflow/dags/brasil/sinan/chikungunya.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import pendulum

from datetime import timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable


default_args = {
"owner": "epigraphhub",
"depends_on_past": False,
"start_date": pendulum.datetime(2023, 1, 3),
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=1),
}

with DAG(
dag_id='SINAN_CHIK',
tags=['SINAN', 'Brasil', 'Chikungunya'],
schedule='0 0 3 * *',
default_args=default_args,
catchup=False,
) as dag:

CONN = Variable.get('egh_conn', deserialize_json=True)

@task.external_python(
task_id='update_chik',
python='/opt/py311/bin/python3.11'
)
def update_chik(egh_conn: dict):
"""
This task will run in an isolated python environment, containing PySUS
package. The task will fetch for all Chikungunya years from DATASUS and
insert them into EGH database
"""
import os
import logging
import pandas as pd

from sqlalchemy import create_engine, text
from pysus.ftp.databases.sinan import SINAN

sinan = SINAN().load()
dis_code = "CHIK"
tablename = "sinan_chikungunya_m"
files = sinan.get_files(dis_code=dis_code)

def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine):
"""
Insert dataframe into db, include missing columns if needed
"""
df.columns = df.columns.str.lower()

with create_engine(egh_conn['URI']).connect() as conn:
# Get columns
res = conn.execute(
text(f'SELECT * FROM brasil.{tablename} LIMIT 1'))
sql_columns = set(i[0] for i in res.cursor.description)

df_columns = set(df.columns)
columns_to_add = df_columns.difference(sql_columns)

if columns_to_add:
sql_statements = [f"ALTER TABLE brasil.{tablename}"]
for column in columns_to_add:
sql_statements.append(
f"ADD COLUMN {column} TEXT,") # object

sql_statements[-1] = sql_statements[-1].replace(',', ';')

with create_engine(egh_conn['URI']).connect() as conn:
sql = ' '.join(sql_statements)
logging.warning(f"EXECUTING: {sql}")
conn.execute(text(sql))
conn.commit()

for col, dtype in df.dtypes.items():
if col in ['dt_notific', 'dt_sin_pri']:
try:
df[col] = pd.to_datetime(df[col]).dt.strftime(
'%d%m%Y').astype('object')
dtype = 'object'
logging.warning(
f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'"
)
except ValueError as error:
logging.error(
f'Could not format date column correctly: {error}')
df[col] = df[col].astype('object')
dtype = 'object'

if str(dtype) != 'object':
df[col] = df[col].astype('object')
logging.warning(
f"Column '{col}' of type '{dtype}' has been parsed to 'object'"
)

df['year'] = year
df['prelim'] = prelim

df.to_sql(
name=tablename,
con=engine,
schema="brasil",
if_exists='append',
index=False
)

def insert_parquets(parquet_dir: str, year: int, prelim: bool):
"""
Insert parquet dir into database using its chunks. Delete the chunk
and the directory after insertion
"""
for parquet in os.listdir(parquet_dir):
file = os.path.join(parquet_dir, parquet)
df = pd.read_parquet(str(file), engine='fastparquet')

to_sql_include_cols(df, prelim, create_engine(egh_conn['URI']))
logging.warning(f"{len(df)} rows inserted into {tablename}")

del df
os.remove(file)
os.rmdir(parquets.path)

f_stage = {}
for file in files:
code, year = sinan.format(file)
stage = 'prelim' if 'PRELIM' in file.path else 'final'

if not stage in f_stage:
f_stage[stage] = [year]
else:
f_stage[stage].append(year)

for year in f_stage['final']:
# Check if final is already in DB
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(text(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = False"
))
count = cur.fetchone()[0]

logging.info(f"Final year {year}: {count}")

if not count:
# Check on prelims
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(text(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))
count = cur.fetchone()[0]

if count:
# Update prelim to final
cur = conn.execute(text(
f'DELETE FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))

parquets = sinan.download(sinan.get_files(dis_code, year))
insert_parquets(parquets.path, year, False)

for year in f_stage['prelim']:
with create_engine(egh_conn['URI']).connect() as conn:
# Update prelim
cur = conn.execute(text(
f'DELETE FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))

parquets = sinan.download(sinan.get_files(dis_code, year))
insert_parquets(parquets.path, year, True)

update_chik(CONN)
178 changes: 178 additions & 0 deletions containers/airflow/dags/brasil/sinan/dengue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import pendulum

from datetime import timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable


default_args = {
"owner": "epigraphhub",
"depends_on_past": False,
"start_date": pendulum.datetime(2023, 1, 1),
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=1),
}

with DAG(
dag_id='SINAN_DENG',
tags=['SINAN', 'Brasil', 'Dengue'],
schedule='@monthly',
default_args=default_args,
catchup=False,
) as dag:

CONN = Variable.get('egh_conn', deserialize_json=True)

@task.external_python(
task_id='update_dengue',
python='/opt/py311/bin/python3.11'
)
def update_dengue(egh_conn: dict):
"""
This task will run in an isolated python environment, containing PySUS
package. The task will fetch for all Dengue years from DATASUS and insert
them into EGH database
"""
import os
import logging
import pandas as pd

from sqlalchemy import create_engine, text
from pysus.ftp.databases.sinan import SINAN

sinan = SINAN().load()
dis_code = "DENG"
tablename = "sinan_dengue_m"
files = sinan.get_files(dis_code=dis_code)


def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine):
"""
Insert dataframe into db, include missing columns if needed
"""
df.columns = df.columns.str.lower()

with create_engine(egh_conn['URI']).connect() as conn:
# Get columns
res = conn.execute(text(f'SELECT * FROM brasil.{tablename} LIMIT 1'))
sql_columns = set(i[0] for i in res.cursor.description)

df_columns = set(df.columns)
columns_to_add = df_columns.difference(sql_columns)

if columns_to_add:
sql_statements = [f"ALTER TABLE brasil.{tablename}"]
for column in columns_to_add:
sql_statements.append(f"ADD COLUMN {column} TEXT,") # object

sql_statements[-1] = sql_statements[-1].replace(',', ';')

with create_engine(egh_conn['URI']).connect() as conn:
sql = ' '.join(sql_statements)
logging.warning(f"EXECUTING: {sql}")
conn.execute(text(sql))
conn.commit()

for col, dtype in df.dtypes.items():
if col in ['dt_notific', 'dt_sin_pri']:
try:
df[col] = pd.to_datetime(df[col]).dt.strftime('%d%m%Y').astype('object')
dtype = 'object'
logging.warning(
f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'"
)
except ValueError as error:
logging.error(f'Could not format date column correctly: {error}')
df[col] = df[col].astype('object')
dtype = 'object'

if str(dtype) != 'object':
df[col] = df[col].astype('object')
logging.warning(
f"Column '{col}' of type '{dtype}' has been parsed to 'object'"
)

df['year'] = year
df['prelim'] = prelim

df.to_sql(
name=tablename,
con=engine,
schema="brasil",
if_exists='append',
index=False
)

def insert_parquets(parquet_dir: str, year: int, prelim: bool):
"""
Insert parquet dir into database using its chunks. Delete the chunk
and the directory after insertion
"""
for parquet in os.listdir(parquet_dir):
file = os.path.join(parquet_dir, parquet)
df = pd.read_parquet(str(file), engine='fastparquet')

to_sql_include_cols(df, prelim, create_engine(egh_conn['URI']))
logging.warning(f"{len(df)} rows inserted into {tablename}")

del df
os.remove(file)
os.rmdir(parquets.path)


f_stage = {}
for file in files:
code, year = sinan.format(file)
stage = 'prelim' if 'PRELIM' in file.path else 'final'

if not stage in f_stage:
f_stage[stage] = [year]
else:
f_stage[stage].append(year)

for year in f_stage['final']:
# Check if final is already in DB
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(text(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = False"
))
count = cur.fetchone()[0]

logging.info(f"Final year {year}: {count}")

if not count:
# Check on prelims
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(text(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))
count = cur.fetchone()[0]

if count:
# Update prelim to final
cur = conn.execute(text(
f'DELETE FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))

parquets = sinan.download(sinan.get_files(dis_code, year))
insert_parquets(parquets.path, year, False)

for year in f_stage['prelim']:
with create_engine(egh_conn['URI']).connect() as conn:
# Update prelim
cur = conn.execute(text(
f'DELETE FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))

parquets = sinan.download(sinan.get_files(dis_code, year))
insert_parquets(parquets.path, year, True)

update_dengue(CONN)
Loading
Loading