Skip to content

Commit

Permalink
refactor renlearn
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Sep 18, 2023
1 parent c11a297 commit e010767
Show file tree
Hide file tree
Showing 9 changed files with 502 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .trunk/trunk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ lint:
enabled:
- [email protected]
- [email protected]
- [email protected].289
- [email protected].290
- [email protected]
- [email protected]
- git-diff-check
Expand Down
118 changes: 59 additions & 59 deletions pdm.lock

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions src/teamster/core/renlearn/assets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from dagster import config_from_files
from dagster import (
MultiPartitionsDefinition,
StaticPartitionsDefinition,
config_from_files,
)

from teamster.core.renlearn.schema import ASSET_FIELDS
from teamster.core.sftp.assets import build_sftp_asset
Expand All @@ -13,10 +17,17 @@ def build_renlearn_sftp_asset(config_dir, code_location, timezone):
code_location=code_location,
source_system="renlearn",
asset_fields=ASSET_FIELDS,
partitions_def=FiscalYearPartitionsDefinition(
start_date=a["partition_start_date"],
timezone=timezone.name,
start_month=7,
partitions_def=MultiPartitionsDefinition(
{
"start_date": FiscalYearPartitionsDefinition(
start_date=a["partition_keys"]["start_date"],
timezone=timezone.name,
start_month=7,
),
"subject": StaticPartitionsDefinition(
a["partition_keys"]["subject"]
),
}
),
slugify_cols=False,
**a,
Expand Down
266 changes: 254 additions & 12 deletions src/teamster/core/renlearn/schema.py

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions src/teamster/core/sftp/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def _asset(context: OpExecutionContext):
regexp=remote_file_regex, context=context
)

archive_filepath_regex_composed = compose_regex(
regexp=archive_filepath, context=context
)

# list files remote filepath
conn = ssh.get_connection()

Expand Down Expand Up @@ -120,9 +124,9 @@ def _asset(context: OpExecutionContext):
# unzip file, if necessary
if archive_filepath is not None:
with zipfile.ZipFile(file=local_filepath) as zf:
zf.extract(member=archive_filepath, path="./data")
zf.extract(member=archive_filepath_regex_composed, path="./data")

local_filepath = f"./data/{archive_filepath}"
local_filepath = f"./data/{archive_filepath_regex_composed}"

# exit if file is empty
if os.path.getsize(local_filepath) == 0:
Expand Down
49 changes: 35 additions & 14 deletions src/teamster/kippmiami/renlearn/config/assets.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
assets:
- asset_name: accelerated_reader
remote_filepath: .
# trunk-ignore(yamllint/quoted-strings)
remote_file_regex: 'KIPP Miami\.zip'
archive_filepath: AR.csv
partition_start_date: 2022-07-01
- asset_name: star_reading
remote_file_regex: KIPP Miami\.zip
archive_filepath: (?P<subject>)\.csv
partition_keys:
start_date: 2023-07-01
subject:
- AR
- asset_name: star
remote_filepath: .
# trunk-ignore(yamllint/quoted-strings)
remote_file_regex: 'KIPP Miami\.zip'
archive_filepath: SR.csv
partition_start_date: 2022-07-01
- asset_name: star_math
remote_file_regex: KIPP Miami\.zip
archive_filepath: (?P<subject>)\.csv
partition_keys:
start_date: 2023-07-01
subject:
- SM
- SR
- SEL
- asset_name: star_skill_area
remote_filepath: .
# trunk-ignore(yamllint/quoted-strings)
remote_file_regex: 'KIPP Miami\.zip'
archive_filepath: SM.csv
partition_start_date: 2022-07-01
remote_file_regex: KIPP Miami\.zip
archive_filepath: (?P<subject>)_SkillArea_v1\.csv
partition_keys:
start_date: 2023-07-01
subject:
- SM
- SR
- SEL
- asset_name: fast_star
remote_filepath: .
remote_file_regex: KIPP Miami\.zip
archive_filepath: FL_FAST_(?P<subject>)_K-2\.csv
partition_keys:
start_date: 2023-07-01
subject:
- SM
- SR
- SEL
- SEL_Domains
30 changes: 15 additions & 15 deletions src/teamster/kippnewark/renlearn/config/assets.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
assets:
- asset_name: accelerated_reader
remote_filepath: .
# trunk-ignore(yamllint/quoted-strings)
remote_file_regex: 'KIPP TEAM & Family\.zip'
archive_filepath: AR.csv
partition_start_date: 2022-07-01
- asset_name: star_reading
remote_file_regex: KIPP TEAM & Family\.zip
archive_filepath: (?P<subject>)\.csv
partition_keys:
start_date: 2023-07-01
subject:
- AR
- asset_name: star
remote_filepath: .
# trunk-ignore(yamllint/quoted-strings)
remote_file_regex: 'KIPP TEAM & Family\.zip'
archive_filepath: SR.csv
partition_start_date: 2022-07-01
- asset_name: star_math
remote_filepath: .
# trunk-ignore(yamllint/quoted-strings)
remote_file_regex: 'KIPP TEAM & Family\.zip'
archive_filepath: SM.csv
partition_start_date: 2022-07-01
remote_file_regex: KIPP TEAM & Family\.zip
archive_filepath: (?P<subject>)\.csv
partition_keys:
start_date: 2023-07-01
subject:
- SM
- SR
- SEL
31 changes: 28 additions & 3 deletions tests/renlearn/test_renlearn_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,28 @@
from dagster import EnvVar, build_resources

from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.utils.functions import regex_pattern_replace

MEMBER_FILES = [
"AR.csv",
"FL_FAST_SEL_Domains_K-2.csv",
"FL_FAST_SEL_K-2.csv",
"FL_FAST_SM_K-2.csv",
"FL_FAST_SR_K-2.csv",
"SEL_Dashboard_Standards_v2.csv",
"SEL_SkillArea_v1.csv",
"SEL.csv",
"SM_Dashboard_Standards_v2.csv",
"SM_SkillArea_v1.csv",
"SM.csv",
"SR_Dashboard_Standards_v2.csv",
"SR_SkillArea_v1.csv",
"SR.csv",
]


def test_regex_pattern_replace():
regex_pattern_replace(None, replacements={})


def _test(code_location, remote_filepath, members):
Expand All @@ -25,20 +47,23 @@ def _test(code_location, remote_filepath, members):
if members is not None:
with zipfile.ZipFile(file=local_filepath) as zf:
for member in members:
zf.extract(member=member, path=f"./env/renlearn/{code_location}")
try:
zf.extract(member=member, path=f"./env/renlearn/{code_location}")
except Exception:
continue


def test_kippmiami():
_test(
code_location="KIPPMIAMI",
remote_filepath="KIPP Miami.zip",
members=["AR.csv", "SR.csv", "SM.csv"],
members=MEMBER_FILES,
)


def test_kippnj():
_test(
code_location="KIPPNJ",
remote_filepath="KIPP TEAM & Family.zip",
members=["AR.csv", "SR.csv", "SM.csv"],
members=MEMBER_FILES,
)
96 changes: 88 additions & 8 deletions tests/renlearn/test_renlearn_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@
from teamster.core.renlearn.schema import ASSET_FIELDS
from teamster.core.utils.functions import get_avro_record_schema

CODE_LOCATION = "KIPPMIAMI"
# CODE_LOCATION = "KIPPNJ"


def _test(local_filepath, asset_name):
df = read_csv(filepath_or_buffer=local_filepath, low_memory=False).replace(
{nan: None}
)
df = read_csv(filepath_or_buffer=local_filepath, low_memory=False)

df.replace({nan: None}, inplace=True)
# df.rename(columns=lambda x: slugify(text=x, separator="_"), inplace=True)

count = df.shape[0]
records = df.to_dict(orient="records")
# dtypes_dict = df.dtypes.to_dict()
# print(dtypes_dict)
# print(df.dtypes.to_dict())

schema = get_avro_record_schema(name=asset_name, fields=ASSET_FIELDS[asset_name])
# print(schema)
Expand All @@ -41,12 +44,89 @@ def _test(local_filepath, asset_name):


def test_ar():
_test(local_filepath="env/renlearn/KIPPNJ/AR.csv", asset_name="accelerated_reader")
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/AR.csv",
asset_name="accelerated_reader",
)


def test_sm():
_test(local_filepath="env/renlearn/KIPPNJ/SM.csv", asset_name="star_math")
_test(local_filepath=f"env/renlearn/{CODE_LOCATION}/SM.csv", asset_name="star")


def test_sr():
_test(local_filepath="env/renlearn/KIPPMIAMI/SR.csv", asset_name="star_reading")
_test(local_filepath=f"env/renlearn/{CODE_LOCATION}/SR.csv", asset_name="star")


def test_sel():
_test(local_filepath=f"env/renlearn/{CODE_LOCATION}/SEL.csv", asset_name="star")


def test_sel_dashboard_standards():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/SEL_Dashboard_Standards_v2.csv",
asset_name="star_dashboard_standards",
)


def test_sm_dashboard_standards():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/SM_Dashboard_Standards_v2.csv",
asset_name="star_dashboard_standards",
)


def test_sr_dashboard_standards():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/SR_Dashboard_Standards_v2.csv",
asset_name="star_dashboard_standards",
)


def test_sel_skillarea():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/SEL_SkillArea_v1.csv",
asset_name="star_skill_area",
)


def test_sm_skillarea():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/SM_SkillArea_v1.csv",
asset_name="star_skill_area",
)


def test_sr_skillarea():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/SR_SkillArea_v1.csv",
asset_name="star_skill_area",
)


def test_fast_sm():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/FL_FAST_SM_K-2.csv",
asset_name="fast_star",
)


def test_fast_sr():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/FL_FAST_SR_K-2.csv",
asset_name="fast_star",
)


def test_fast_sel():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/FL_FAST_SEL_K-2.csv",
asset_name="fast_star",
)


def test_fast_sel_domains():
_test(
local_filepath=f"env/renlearn/{CODE_LOCATION}/FL_FAST_SEL_Domains_K-2.csv",
asset_name="fast_star",
)

0 comments on commit e010767

Please sign in to comment.