Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write JSON state #997

Merged
merged 22 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b4a7e16
Add types to DynamoDB restore flow. Add Persistable abstract class an…
KaspariK Sep 17, 2024
c081772
Remove old comments. Delete mcp state metadata.
KaspariK Sep 17, 2024
88de418
Return these
KaspariK Sep 17, 2024
909e9b2
Try itest validation without mcp_state
KaspariK Sep 17, 2024
1a5238b
Fix isinstance check for Action to_json
KaspariK Sep 17, 2024
c18cce4
Comment cleanup. Add cleanup TODOs for TRON-2293. Rename val in dynam…
KaspariK Sep 18, 2024
7fb271d
Remove shelve key test since we no longer save mcp_state or state_met…
KaspariK Sep 18, 2024
677d6e3
Remove deprecated itest. Rename some variables to make things a bit c…
KaspariK Sep 18, 2024
e262cbc
Remove unnecessary list call
KaspariK Sep 19, 2024
83d7c65
Replace ignore with proper types for sorted_groups
KaspariK Oct 18, 2024
002f935
Remove skip_validation since removing MCP StateMetadata
KaspariK Oct 18, 2024
f5ab20c
Add more types and a few TODO tickets
KaspariK Oct 18, 2024
00a50a9
Remove debug logs, add constant for transact_write_items, update tran…
KaspariK Oct 18, 2024
0eca03f
Check that rendered_command and docker_image are not None before crea…
KaspariK Oct 18, 2024
d727236
Remove comment from before my enlightenment
KaspariK Oct 18, 2024
a02aedc
Add remaining types, add tickets to valid TODOs, and delete irrelevan…
KaspariK Oct 23, 2024
f26f855
Fix time_zone type. Remove unreachable code after typing trigger_down…
KaspariK Oct 23, 2024
c3c8c77
Maintain most of the parsing logic as is. Add a few more tests. Add T…
KaspariK Oct 24, 2024
7a0cb98
Add error handling in various to_json() funcs. Break JSON write out f…
KaspariK Oct 24, 2024
e79ae25
Update to_json exception logging. On DynamoDB write failure only pass…
KaspariK Oct 25, 2024
e8dcc8e
Raise these so that we get the full picture on failure and can decide…
KaspariK Oct 25, 2024
b0186d1
Add serialization error counter
KaspariK Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ endif
@echo "make deb_bionic - Generate bionic deb package"
@echo "make itest_bionic - Run tests and integration checks"
@echo "make _itest_bionic - Run only integration checks"
@echo "make deb_jammy - Generate bionic deb package"
@echo "make deb_jammy - Generate jammy deb package"
@echo "make itest_jammy - Run tests and integration checks"
@echo "make _itest_jammy - Run only integration checks"
@echo "make release - Prepare debian info for new release"
Expand Down
1 change: 0 additions & 1 deletion bin/tronfig
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ if __name__ == "__main__":
client = Client(args.server)

if args.print_config:
# TODO: use maybe_encode()
content = client.config(args.source)["config"]
if type(content) is not bytes:
content = content.encode("utf8")
Expand Down
4 changes: 2 additions & 2 deletions docs/source/jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ Optional Fields
after this duration.

**trigger_downstreams** (bool or dict)
Upon successfull completion of an action, will emit a trigger for every
Upon successful completion of an action, will emit a trigger for every
item in the dictionary. When set to ``true``, a default dict of
``{shortdate: "{shortdate}"}`` is assumed. Emitted triggers will be in form:
``<namespace>.<job>.<action>.<dict key>.<rendered value>``. See
Expand All @@ -220,7 +220,7 @@ Optional Fields
have been emitted by upstream actions. Unlike with ``requires`` attribute,
dependent actions don't have to belong to the same job. ``triggered_by``
template may contain any pattern allowed in ``command`` attribute.
See :ref:`shortdate` for an explantion of shortdate
See :ref:`shortdate` for an explanation of shortdate

Example:

Expand Down
11 changes: 0 additions & 11 deletions itest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,3 @@ fi

kill -SIGTERM $TRON_PID
wait $TRON_PID || true

/opt/venvs/tron/bin/python - <<EOF
import os
from tron.serialize.runstate.shelvestore import ShelveStateStore, ShelveKey
db = ShelveStateStore('$TRON_WORKDIR/tron_state')
key = ShelveKey('mcp_state', 'StateMetadata')
res = db.restore([key])
ts = res[key][u'create_time']
print("assert db time {} > start time {}".format(ts, int(os.environ['TRON_START_TIME'])))
assert ts > int(os.environ['TRON_START_TIME'])
EOF
1 change: 1 addition & 0 deletions requirements-dev-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pylint
pytest
pytest-asyncio
requirements-tools
types-pytz
types-PyYAML
types-requests<2.31.0.7 # newer types-requests requires urllib3>=2
types-simplejson
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ requirements-tools==1.2.1
toml==0.10.2
tomli==2.0.1
tomlkit==0.11.6
types-pytz==2024.2.0.20240913
types-PyYAML==6.0.12
types-requests==2.31.0.5
types-simplejson==3.19.0.20240310
Expand Down
81 changes: 52 additions & 29 deletions tests/serialize/runstate/dynamodb_state_store_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import pickle
import json
from unittest import mock

import boto3
Expand Down Expand Up @@ -100,24 +100,41 @@ def store():

@pytest.fixture
def small_object():
yield pickle.dumps({"this": "data"})
yield {
"job_name": "example_job",
"run_num": 1,
"run_time": None,
"node_name": "example_node",
"runs": [],
"cleanup_run": None,
"manual": False,
}


@pytest.fixture
def large_object():
yield pickle.dumps([i for i in range(1000000)])
yield {
"job_name": "example_job",
"run_num": 1,
"run_time": None,
"node_name": "example_node",
"runs": [],
"cleanup_run": None,
"manual": False,
"large_data": [i for i in range(1_000_000)],
}


@pytest.mark.usefixtures("store", "small_object", "large_object")
class TestDynamoDBStateStore:
def test_save(self, store, small_object, large_object):
key_value_pairs = [
(
store.build_key("DynamoDBTest", "two"),
store.build_key("job_state", "two"),
small_object,
),
(
store.build_key("DynamoDBTest2", "four"),
store.build_key("job_run_state", "four"),
small_object,
),
]
Expand All @@ -126,21 +143,27 @@ def test_save(self, store, small_object, large_object):

assert store.save_errors == 0
keys = [
store.build_key("DynamoDBTest", "two"),
store.build_key("DynamoDBTest2", "four"),
store.build_key("job_state", "two"),
store.build_key("job_run_state", "four"),
]
vals = store.restore(keys)
for key, value in key_value_pairs:
assert_equal(vals[key], value)

for key in keys:
item = store.table.get_item(Key={"key": key, "index": 0})
assert "Item" in item
assert "json_val" in item["Item"]
assert_equal(json.loads(item["Item"]["json_val"]), small_object)

def test_delete_if_val_is_none(self, store, small_object, large_object):
key_value_pairs = [
(
store.build_key("DynamoDBTest", "two"),
store.build_key("job_state", "two"),
small_object,
),
(
store.build_key("DynamoDBTest2", "four"),
store.build_key("job_run_state", "four"),
small_object,
),
]
Expand All @@ -149,7 +172,7 @@ def test_delete_if_val_is_none(self, store, small_object, large_object):

delete = [
(
store.build_key("DynamoDBTest", "two"),
store.build_key("job_state", "two"),
None,
),
]
Expand All @@ -159,55 +182,55 @@ def test_delete_if_val_is_none(self, store, small_object, large_object):
assert store.save_errors == 0
# Try to restore both, we should just get one back
keys = [
store.build_key("DynamoDBTest", "two"),
store.build_key("DynamoDBTest2", "four"),
store.build_key("job_state", "two"),
store.build_key("job_run_state", "four"),
]
vals = store.restore(keys)
assert vals == {keys[1]: small_object}

def test_save_more_than_4KB(self, store, small_object, large_object):
key_value_pairs = [
(
store.build_key("DynamoDBTest", "two"),
store.build_key("job_state", "two"),
large_object,
),
]
store.save(key_value_pairs)
store._consume_save_queue()

assert store.save_errors == 0
keys = [store.build_key("DynamoDBTest", "two")]
keys = [store.build_key("job_state", "two")]
vals = store.restore(keys)
for key, value in key_value_pairs:
assert_equal(vals[key], value)

def test_restore_more_than_4KB(self, store, small_object, large_object):
keys = [store.build_key("thing", i) for i in range(3)]
value = pickle.loads(large_object)
keys = [store.build_key("job_state", i) for i in range(3)]
value = large_object
pairs = zip(keys, (value for i in range(len(keys))))
store.save(pairs)
store._consume_save_queue()

assert store.save_errors == 0
vals = store.restore(keys)
for key in keys:
assert_equal(pickle.dumps(vals[key]), large_object)
assert_equal(vals[key], large_object)

def test_restore(self, store, small_object, large_object):
keys = [store.build_key("thing", i) for i in range(3)]
value = pickle.loads(small_object)
keys = [store.build_key("job_state", i) for i in range(3)]
value = small_object
pairs = zip(keys, (value for i in range(len(keys))))
store.save(pairs)
store._consume_save_queue()

assert store.save_errors == 0
vals = store.restore(keys)
for key in keys:
assert_equal(pickle.dumps(vals[key]), small_object)
assert_equal(vals[key], small_object)

def test_delete_item(self, store, small_object, large_object):
keys = [store.build_key("thing", i) for i in range(3)]
value = pickle.loads(large_object)
keys = [store.build_key("job_state", i) for i in range(3)]
value = large_object
pairs = list(zip(keys, (value for i in range(len(keys)))))
store.save(pairs)

Expand All @@ -222,8 +245,8 @@ def test_retry_saving(self, store, small_object, large_object):
"moto.dynamodb2.responses.DynamoHandler.transact_write_items",
side_effect=KeyError("foo"),
) as mock_failed_write:
keys = [store.build_key("thing", i) for i in range(1)]
value = pickle.loads(small_object)
keys = [store.build_key("job_state", i) for i in range(1)]
value = small_object
pairs = zip(keys, (value for i in range(len(keys))))
try:
store.save(pairs)
Expand All @@ -236,7 +259,7 @@ def test_retry_reading(self, store, small_object, large_object):
store.name: [
{
"index": {"N": "0"},
"key": {"S": "thing 0"},
"key": {"S": "job_state 0"},
},
],
},
Expand All @@ -246,15 +269,15 @@ def test_retry_reading(self, store, small_object, large_object):
"Keys": [
{
"index": {"N": "0"},
"key": {"S": "thing 0"},
"key": {"S": "job_state 0"},
}
],
},
},
"ResponseMetadata": {},
}
keys = [store.build_key("thing", i) for i in range(1)]
value = pickle.loads(small_object)
keys = [store.build_key("job_state", i) for i in range(1)]
value = small_object
pairs = zip(keys, (value for i in range(len(keys))))
store.save(pairs)
with mock.patch.object(
Expand All @@ -269,7 +292,7 @@ def test_retry_reading(self, store, small_object, large_object):

def test_restore_exception_propagation(self, store, small_object):
# This test is to ensure that restore propagates exceptions upwards: see DAR-2328
keys = [store.build_key("thing", i) for i in range(3)]
keys = [store.build_key("job_state", i) for i in range(3)]

mock_future = mock.MagicMock()
mock_future.result.side_effect = Exception("mocked exception")
Expand Down
44 changes: 1 addition & 43 deletions tests/serialize/runstate/statemanager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
from tron.serialize.runstate.statemanager import PersistenceStoreError
from tron.serialize.runstate.statemanager import PersistentStateManager
from tron.serialize.runstate.statemanager import StateChangeWatcher
from tron.serialize.runstate.statemanager import StateMetadata
from tron.serialize.runstate.statemanager import StateSaveBuffer
from tron.serialize.runstate.statemanager import VersionMismatchError


class TestPersistenceManagerFactory(TestCase):
Expand All @@ -42,24 +40,6 @@ def test_from_config_shelve(self):
shutil.rmtree(tmpdir)


class TestStateMetadata(TestCase):
def test_validate_metadata(self):
metadata = {"version": (0, 5, 2)}
StateMetadata.validate_metadata(metadata)

def test_validate_metadata_no_state_data(self):
metadata = None
StateMetadata.validate_metadata(metadata)

def test_validate_metadata_mismatch(self):
metadata = {"version": (200, 1, 1)}
assert_raises(
VersionMismatchError,
StateMetadata.validate_metadata,
metadata,
)


class TestStateSaveBuffer(TestCase):
@setup
def setup_buffer(self):
Expand Down Expand Up @@ -103,15 +83,7 @@ def test_keys_for_items(self):

def test_restore(self):
job_names = ["one", "two"]
with mock.patch.object(
self.manager,
"_restore_metadata",
autospec=True,
) as mock_restore_metadata, mock.patch.object(
self.manager,
"_restore_dicts",
autospec=True,
) as mock_restore_dicts, mock.patch.object(
with mock.patch.object(self.manager, "_restore_dicts", autospec=True,) as mock_restore_dicts, mock.patch.object(
self.manager,
"_restore_runs_for_job",
autospect=True,
Expand All @@ -125,7 +97,6 @@ def test_restore(self):
]

restored_state = self.manager.restore(job_names)
mock_restore_metadata.assert_called_once_with()
assert mock_restore_dicts.call_args_list == [
mock.call(runstate.JOB_STATE, job_names),
]
Expand Down Expand Up @@ -280,19 +251,6 @@ def test_save_job(self):
mock_job.state_data,
)

@mock.patch(
"tron.serialize.runstate.statemanager.StateMetadata",
autospec=None,
)
def test_save_metadata(self, mock_state_metadata):
self.watcher.save_metadata()
meta_data = mock_state_metadata.return_value
self.watcher.state_manager.save.assert_called_with(
runstate.MCP_STATE,
meta_data.name,
meta_data.state_data,
)

def test_shutdown(self):
self.watcher.shutdown()
assert not self.watcher.state_manager.enabled
Expand Down
Loading
Loading