Skip to content

Commit

Permalink
enable new afc loading
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Apr 26, 2024
1 parent 5ad3acb commit 112049c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 53 deletions.
6 changes: 3 additions & 3 deletions src/research_etl/etl_afc/afc_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def load_afc_data(s3_object: str, db_manager: DatabaseManager, afc_type: str) ->
delete_query = f"DELETE FROM afc.{afc_type} WHERE servicedate = '{service_date}';"
db_manager.execute(sa.text(delete_query))

afc_copy(s3_object, f"afc.{afc_type}", headers)
afc_copy(s3_object, f"afc.{afc_type}", headers, null_as=False)


def load_lookups(s3_object: str, db_manager: DatabaseManager) -> None:
Expand Down Expand Up @@ -197,8 +197,8 @@ def run(db_manager: DatabaseManager) -> None:
each found file should temporarily downoladed locally for processsing and
then deleted
"""
s3_in_path = "afc/in"
s3_error_path = "afc/error"
s3_in_path = "afc_oracle_db"
s3_error_path = "afc_oracle_db_error"
s3_in_bucket = os.getenv("AFC_IN_BUCKET", "")

process_log = ProcessLogger("afc_etl_job")
Expand Down
5 changes: 3 additions & 2 deletions src/research_etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

# from research_etl.etl_korbato.korbato_job import alt_run as odx_catch_up_job

# from research_etl.etl_afc.afc_job import run as afc_job
from research_etl.etl_afc.afc_job import run as afc_job

# from research_etl.mbta_open_data.csat_job import run as csat_job
# from research_etl.mbta_open_data.gse_job import run as gse_job

Expand All @@ -26,7 +27,7 @@ def run_jobs() -> None:
gtfs_job(db_manager)
odx_job(db_manager)
# odx_catch_up_job(db_manager)
# afc_job(db_manager)
afc_job(db_manager)
# csat_job(db_manager)
# gse_job(db_manager)

Expand Down
8 changes: 6 additions & 2 deletions src/research_etl/utils/util_rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def copy_gzip_csv_to_db(local_path: str, destination_table: str) -> None:
run_psql_subprocess(psql, copy_log)


def afc_copy(obj_path: str, destination_table: str, headers: List[str]) -> None:
def afc_copy(obj_path: str, destination_table: str, headers: List[str], null_as: bool = True) -> None:
"""
load local csv or csv.gz file into DB using psql COPY command
Expand All @@ -351,7 +351,11 @@ def afc_copy(obj_path: str, destination_table: str, headers: List[str]) -> None:
elif obj_path.lower().endswith(".gz"):
copy_from = f"FROM PROGRAM 'gzip -dc {obj_path}' "

copy_command = f"\\COPY {destination_table} ({','.join(headers)}) {copy_from} WITH NULL '\\N' CSV"
null_as_str = "WITH NULL '\\N'"
if not null_as:
null_as_str = ""

copy_command = f"\\COPY {destination_table} ({','.join(headers)}) {copy_from} {null_as_str} CSV QUOTE AS '\"'"

psql = [
"psql",
Expand Down
92 changes: 46 additions & 46 deletions tests/files/init_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,12 @@ CREATE SCHEMA afc;
CREATE TABLE afc.faregate (
trxtime timestamp without time zone,
servicedate date,
deviceclassid smallint,
deviceclassid integer,
deviceid integer,
uniquemsid integer,
eventsequno integer,
tariffversion smallint,
tarifflocationid smallint,
tariffversion integer,
tarifflocationid integer,
unplanned boolean,
eventcode integer,
inserted timestamp without time zone
Expand All @@ -615,22 +615,22 @@ CREATE TABLE afc.faregate (
CREATE INDEX ON afc.faregate (servicedate);

CREATE TABLE afc.ridership (
deviceclassid smallint,
deviceclassid integer,
deviceid integer,
uniquemsid integer,
salestransactionno integer,
sequenceno integer,
trxtime timestamp without time zone,
servicedate date,
branchlineid integer,
branchlineid character varying(50),
fareoptamount integer,
tariffversion smallint,
tariffversion integer,
articleno integer,
card character varying(50),
ticketstocktype smallint,
ticketstocktype integer,
tvmtarifflocationid integer,
movementtype smallint,
bookcanc smallint,
movementtype integer,
bookcanc integer,
correctioncounter bit(1),
correctionflag bit(1) DEFAULT 0::bit NOT NULL,
tempbooking bit(1) NOT NULL,
Expand All @@ -643,8 +643,8 @@ CREATE INDEX ON afc.ridership (servicedate);

CREATE TABLE afc.deviceclass (
deviceclassid integer PRIMARY KEY,
balancegroupid smallint,
deviceclasstype smallint,
balancegroupid integer,
deviceclasstype integer,
tvmtarversiongroupid integer,
tvmapltarversiongroupid integer,
tvmtechversiongroupid integer,
Expand All @@ -655,15 +655,15 @@ CREATE TABLE afc.deviceclass (
timenew timestamp without time zone,
userchange character varying(30),
timechange timestamp without time zone,
typeoftariffdownloaddata smallint,
typeoftariffdownloaddata integer,
parametergroupid integer
);

CREATE TABLE afc.event (
eventcode integer PRIMARY KEY,
eventdesc character varying(15),
eventtxt character varying(80),
eventgroupref smallint,
eventgroupref integer,
display bit(1),
logging bit(1),
alarm bit(1),
Expand All @@ -686,7 +686,7 @@ CREATE TABLE afc.eventgroup (
CREATE TABLE afc.holiday (
versionid integer NOT NULL,
datehour timestamp without time zone NOT NULL,
holidayclass smallint,
holidayclass integer,
description character varying(120),
usernew character varying(25) NOT NULL,
timenew timestamp without time zone NOT NULL,
Expand All @@ -697,15 +697,15 @@ CREATE TABLE afc.holiday (
CREATE TABLE afc.mbta_weekend_service (
servicehour timestamp without time zone,
servicedesc character varying(120),
servicetype smallint,
servicetype integer,
dateinserted timestamp without time zone
);

CREATE TABLE afc.routes (
routeid integer PRIMARY KEY,
description character varying(120) NOT NULL,
versionid smallint,
multimediagroupid smallint,
versionid integer,
multimediagroupid integer,
usernew character varying(25),
timenew timestamp without time zone,
userchange character varying(25),
Expand All @@ -716,10 +716,10 @@ CREATE TABLE afc.tariffversions (
versionid integer PRIMARY KEY,
validitystarttime timestamp without time zone,
validityendtime timestamp without time zone,
railroadid smallint,
railroadid integer,
description character varying(120),
status smallint,
theovertakerflag smallint,
status integer,
theovertakerflag integer,
usernew character varying(25),
timenew timestamp without time zone,
userchange character varying(25),
Expand All @@ -730,34 +730,34 @@ CREATE TABLE afc.tariffversions (
CREATE TABLE afc.tickettype (
versionid smallint NOT NULL,
tickettypeid integer NOT NULL,
summary smallint,
type smallint,
summary integer,
type integer,
amount integer,
balamt1 integer,
balamt2 smallint,
farecalculationruleid smallint,
multimediagroupid smallint,
statetaxid smallint,
sendonlevt smallint,
svcproviderid smallint,
validityid smallint,
genderinput smallint,
balamt2 integer,
farecalculationruleid integer,
multimediagroupid integer,
statetaxid integer,
sendonlevt integer,
svcproviderid integer,
validityid integer,
genderinput integer,
description character varying(120),
param1 smallint,
param2 smallint,
param3 smallint,
param4 smallint,
param5 smallint,
param6 smallint,
param7 smallint,
param8 smallint,
param1 integer,
param2 integer,
param3 integer,
param4 integer,
param5 integer,
param6 integer,
param7 integer,
param8 integer,
param9 character varying(15),
param10 character varying(15),
usernew character varying(25),
timenew timestamp without time zone,
userchange character varying(15),
timechange timestamp without time zone,
ticketmembergroupid smallint,
ticketmembergroupid integer,
externalid character varying(15)
);

Expand All @@ -767,22 +767,22 @@ CREATE TABLE afc.tvmstation (
namelong character varying(120),
name character varying(120),
town character varying(50),
tariffproperty smallint,
tariffzone smallint,
tariffproperty integer,
tariffzone integer,
usernew character varying(25),
timenew timestamp without time zone,
userchange character varying(15),
timechange timestamp without time zone,
companyid smallint,
graphickey smallint,
stationtype smallint,
companyid integer,
graphickey integer,
stationtype integer,
externalid smallint
);

CREATE TABLE afc.tvmtable (
deviceclassid integer NOT NULL,
deviceid integer NOT NULL,
balancegroupid smallint,
balancegroupid integer,
tvmabbreviation character varying(12) NOT NULL,
tvmlocation1 character varying(120),
tvmlocation2 character varying(120),
Expand Down Expand Up @@ -823,7 +823,7 @@ CREATE TABLE afc.tvmtable (
reserved2 bigint,
reserved1 bigint,
serialno integer,
fieldstate smallint,
fieldstate integer,
usernew character varying(20),
timenew timestamp without time zone,
userchange character varying(20),
Expand Down

0 comments on commit 112049c

Please sign in to comment.