Skip to content

Commit

Permalink
Treat warehouse schedules as in the opscenter-configured timezone. (s…
Browse files Browse the repository at this point in the history
…undeck-io#397)

* Treat warehouse schedules as in the opscenter-configured timezone.

1. All warehouse schedule tasks should be marked as running in the
   configured opscenter timezone.
2. When the task runs, convert the current time and the last task run
   time to the configured timezone. Then, apply those times against the
   task schedule and execute the logic per usual.

Adds some (contrived) SQL unit tests to validate the logic of this task
before we have the admin procedures.

Closes sundeck-io#396
  • Loading branch information
joshelser authored Oct 2, 2023
1 parent d510240 commit 9d9a9f0
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 36 deletions.
53 changes: 48 additions & 5 deletions app/crud/test_wh_task_scheduling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import datetime
import pytest
from pytz import timezone
from .wh_sched import WarehouseSchedules, update_task_state, task_offsets
from .test_fixtures import MockSession

_pacific = timezone("America/Los_Angeles")
_eastern = timezone("America/New_York")
_london = timezone("Europe/London")
_kolkata = timezone("Asia/Kolkata")


def _make_schedule(
name: str,
Expand All @@ -28,7 +35,7 @@ def _make_schedule(


def test_no_schedules_disables_task(session: MockSession):
assert not update_task_state(session, [])
assert not update_task_state(session, [], tz=_pacific)
assert len(session._sql) == 1
for offset in task_offsets:
assert (
Expand All @@ -44,7 +51,7 @@ def test_disabled_schedules_disables_task(session: MockSession):
),
]

assert not update_task_state(session, schedules)
assert not update_task_state(session, schedules, tz=_pacific)
assert len(session._sql) == 1
for offset in task_offsets:
assert (
Expand All @@ -60,7 +67,7 @@ def test_weekday_schedules(session: MockSession):
_make_schedule("COMPUTE_WH", datetime.time(17, 30), datetime.time(23, 59)),
]

assert update_task_state(session, schedules) is True
assert update_task_state(session, schedules, tz=_pacific) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

Expand Down Expand Up @@ -112,7 +119,7 @@ def test_weekdays_and_weekends_schedule(session: MockSession):
),
]

assert update_task_state(session, schedules) is True
assert update_task_state(session, schedules, tz=_pacific) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

Expand Down Expand Up @@ -161,7 +168,7 @@ def test_multiple_warehouses(session: MockSession):
_make_schedule("BATCH_WH", datetime.time(12, 45), datetime.time(23, 59)),
]

assert update_task_state(session, schedules) is True
assert update_task_state(session, schedules, tz=_pacific) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

Expand Down Expand Up @@ -195,3 +202,39 @@ def test_multiple_warehouses(session: MockSession):
f"alter task if exists tasks.warehouse_scheduling_45 set schedule = 'using cron {expected_cron_45}';"
in script
)


@pytest.mark.parametrize("tz", [_pacific, _eastern, _london, _kolkata])
def test_timezones(session: MockSession, tz: timezone):
schedules = [
_make_schedule("COMPUTE_WH", datetime.time(0, 0), datetime.time(9, 0)),
_make_schedule("COMPUTE_WH", datetime.time(9, 0), datetime.time(17, 30)),
_make_schedule("COMPUTE_WH", datetime.time(17, 30), datetime.time(23, 59)),
]

assert update_task_state(session, schedules, tz=tz) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

# _0 and _30 should be resumed
assert "alter task if exists tasks.warehouse_scheduling_0 suspend;" in script

expected_cron_0 = f"0 0,9 * * 1-5 {tz.zone.lower()}"
assert (
f"alter task if exists tasks.warehouse_scheduling_0 set schedule = 'using cron {expected_cron_0}';"
in script
)
assert "alter task if exists tasks.warehouse_scheduling_0 resume;" in script

assert "alter task if exists tasks.warehouse_scheduling_30 suspend;" in script
assert "alter task if exists tasks.warehouse_scheduling_30 resume;" in script

# _15 and _45 should be suspended
assert "alter task if exists tasks.warehouse_scheduling_15 suspend;" in script
assert "alter task if exists tasks.warehouse_scheduling_45 suspend;" in script

expected_cron_30 = f"30 17 * * 1-5 {tz.zone.lower()}"
assert (
f"alter task if exists tasks.warehouse_scheduling_30 set schedule = 'using cron {expected_cron_30}';"
in script
)
37 changes: 30 additions & 7 deletions app/crud/wh_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import datetime
from .base import BaseOpsCenterModel
from pydantic import validator, root_validator, Field
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError
from snowflake.snowpark.functions import col, max as sp_max
from snowflake.snowpark import Row, Session
import pandas as pd
Expand Down Expand Up @@ -319,21 +321,39 @@ def regenerate_alter_statements(session: Session, schedules: List[WarehouseSched
WarehouseAlterStatements.batch_write(session, alter_stmts, overwrite=True)


def update_task_state(session: Session, schedules: List[WarehouseSchedules]) -> bool:
def get_schedule_timezone(session: Session) -> timezone:
"""
Fetch the 'default_timezone' from the internal.config table. If there is no timezone set or the
timezone which is set fails to parse, this function will return the timezone for 'America/Los_Angeles'.
"""
str_tz = session.call("get_config", "default_timezone") or "America/Los_Angeles"
try:
return timezone(str_tz)
except UnknownTimeZoneError:
return timezone("America/Los_Angeles")


def update_task_state(
session: Session, schedules: List[WarehouseSchedules], tz=None
) -> bool:
# Make sure we have at least one enabled schedule.
enabled_schedules = [sch for sch in schedules if sch.enabled]
if len(enabled_schedules) == 0:
disable_all_tasks(session)
return False

# Indirection for unit tests. Caller is not expected to provide a timezone.
if not tz:
tz = get_schedule_timezone(session)

# Build the cron list for the enabled schedules
alter_statements = []
for offset in task_offsets:
# For each "offset", generate multiple statements
# 1. ALTER TASK ... SUSPEND
# 2. ALTER TASK ... SET SCHEDULE = 'USING CRON ...'
# 3. ALTER TASK ... RESUME
alter_statements.extend(_make_alter_task_statements(schedules, offset))
alter_statements.extend(_make_alter_task_statements(schedules, offset, tz))

# Collect the statements together
alter_body = "\n".join(alter_statements)
Expand All @@ -349,13 +369,15 @@ def update_task_state(session: Session, schedules: List[WarehouseSchedules]) ->


def _make_alter_task_statements(
schedules: List[WarehouseSchedules], offset: int
schedules: List[WarehouseSchedules],
offset: int,
tz: timezone,
) -> List[str]:
"""
Generates a list of ALTER TASK statements given the list of WarehouseSchedules and the task offset (the quarterly
minute offset from the hour).
"""
cron_schedule, should_run = _make_cron_schedule(schedules, offset)
cron_schedule, should_run = _make_cron_schedule(schedules, offset, tz)
if should_run:
return [
f"alter task if exists {task_name}_{offset} suspend;",
Expand All @@ -367,7 +389,9 @@ def _make_alter_task_statements(


def _make_cron_schedule(
schedules: List[WarehouseSchedules], offset: int
schedules: List[WarehouseSchedules],
offset: int,
tz: timezone,
) -> (str, bool):
"""
Takes a list of schedules and returns the cron schedule string which cover all schedule boundaries.
Expand Down Expand Up @@ -398,8 +422,7 @@ def _make_cron_schedule(
# Execute only weekends
days_of_week = "0,6"

# TODO use the configured timezone
return f"{offset} {cron_hours} * * {days_of_week} America/Los_Angeles", True
return f"{offset} {cron_hours} * * {days_of_week} {tz.zone}", True


def get_last_run(session: Session) -> datetime.datetime:
Expand Down
9 changes: 8 additions & 1 deletion app/ui/pages/06_WarehouseSchedule.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import streamlit as st
import sthelp
import warehouse_schedule
from modules import add_custom_modules


sthelp.chrome("Warehouse Schedule")

# Load custom OpsCenter python modules
if not add_custom_modules():
st.warning("Unable to load OpsCenter modules.")

import warehouse_schedule # noqa E402

warehouse_schedule.display()
30 changes: 26 additions & 4 deletions bootstrap/011_warehouse_schedules.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,36 @@ CREATE OR REPLACE PROCEDURE INTERNAL.UPDATE_WAREHOUSE_SCHEDULES(last_run timesta
DECLARE
task_outcome variant default (select object_construct());
BEGIN
-- Get the configured timezone or default to 'America/Los_Angeles'
let tz text;
call internal.get_config('default_timezone') into :tz;
if (tz is null) then
tz := 'America/Los_Angeles';
end if;
task_outcome := (select object_insert(:task_outcome, 'opscenter timezone', :tz));
task_outcome := (select object_insert(:task_outcome, 'account timezone', internal.get_current_timezone()));

-- The task calls this procedure with NULL and lets the procedure figure out the details.
-- The ability to specify timestamps is only to enable testing.
if (this_run is NULL) then
this_run := (select current_timestamp());
this_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, current_timestamp()));
else
this_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, :this_run));
end if;
task_outcome := (select object_insert(:task_outcome, 'this_run', :this_run));

if (last_run is NULL) then
last_run := (select run from internal.task_warehouse_schedule order by run desc limit 1);
last_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, run) from internal.task_warehouse_schedule order by run desc limit 1);
-- If we don't have any rows in internal.task_warehouse_schedule, rewind far enough that we will just pick
-- the current WH schedule and not think it has already been run.
if (last_run is NULL) then
last_run := (select timestampadd('days', -1, current_timestamp));
last_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, timestampadd('days', -1, current_timestamp)));
end if;
else
last_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, :last_run));
end if;

-- TODO handle looking back over a weekend boundary (from python)
-- TODO handle the timestamp from config (from python)
-- TODO the WEEK_START session parameter can alter what DAYOFWEEK returns.
let is_weekday boolean := (select DAYOFWEEK(:this_run) not in (0, 6));

Expand Down Expand Up @@ -94,3 +107,12 @@ EXCEPTION
INSERT INTO internal.task_warehouse_schedule SELECT :this_run, FALSE, :task_outcome;
RAISE;
END;

-- owners rights procedures can't get the timezone from the session parameters.
CREATE OR REPLACE FUNCTION INTERNAL.GET_CURRENT_TIMEZONE()
RETURNS STRING
LANGUAGE JAVASCRIPT
AS
$$
return Intl.DateTimeFormat().resolvedOptions().timeZone;
$$;
15 changes: 15 additions & 0 deletions test/common_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from typing import List, Dict


def generate_unique_name(prefix, timestamp_string) -> str:
Expand Down Expand Up @@ -51,3 +52,17 @@ def delete_list_of_probes(conn, sql):
for name in cur.execute(sql).fetchall():
delete_probe_statement = f"call ADMIN.DELETE_PROBE('{name[0]}');"
assert run_proc(conn, delete_probe_statement) is None


def fetch_all_warehouse_schedules(conn) -> List[Dict]:
with conn() as cnx, cnx.cursor() as cur:
return cur.execute(
"select * from internal.wh_schedules order by name, weekday, start_at"
).fetchall()


def reset_timezone(conn):
with conn.cursor() as cur:
_ = cur.execute(
"call internal.set_config('default_timezone', 'America/Los_Angeles')"
).fetchone()
17 changes: 16 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import datetime
import pytest
from contextlib import contextmanager
from common_utils import delete_list_of_labels, delete_list_of_probes
from common_utils import (
delete_list_of_labels,
delete_list_of_probes,
fetch_all_warehouse_schedules,
reset_timezone,
)

sys.path.append("../deploy")
import helpers # noqa E402
Expand Down Expand Up @@ -68,3 +73,13 @@ def timestamp_string(conn):

# call a function that deletes all the labels that were created in the session
delete_list_of_probes(conn, sql)

scheds = fetch_all_warehouse_schedules(conn)
str_scheds = [str(s) for s in scheds]
print("Warehouse Schedules:\n" + "\n".join(str_scheds))


@pytest.fixture(autouse=True)
def reset_timezone_before_test(conn):
with conn() as cnx:
reset_timezone(cnx)
Loading

0 comments on commit 9d9a9f0

Please sign in to comment.