From 5e0afd4d5729d148c8e5dbc726b25b2ae3cacc8b Mon Sep 17 00:00:00 2001 From: Salvador Olmos Miralles Date: Wed, 5 Jul 2023 18:46:38 +0200 Subject: [PATCH] added pytest tests ported from the old unitests --- tests/conftest.py | 206 +++++++++++++++++++++++++ tests/test_authoring.py | 317 ++++++++++++++++++++++++++++++++++++++ tests/test_placeholder.py | 2 - tests/test_session.py | 75 +++++++++ tox.ini | 1 + 5 files changed, 599 insertions(+), 2 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_authoring.py delete mode 100644 tests/test_placeholder.py create mode 100644 tests/test_session.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..4a99ba9 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,206 @@ +import dataclasses +import uuid +from typing import List + +import ftrack_api +import pytest + +from trackteroid import SESSION + + +@dataclasses.dataclass +class TestScenario: + project_id: str + sequence_ids: List[str] = dataclasses.field(default_factory=list) + shot_ids: List[str] = dataclasses.field(default_factory=list) + assetbuild_ids: List[str] = dataclasses.field(default_factory=list) + task_ids: List[str] = dataclasses.field(default_factory=list) + asset_ids: List[str] = dataclasses.field(default_factory=list) + version_ids: List[str] = dataclasses.field(default_factory=list) + component_ids: List[str] = dataclasses.field(default_factory=list) + + def query_project(self, session, projections): + return session.query( + f"select {', '.join(projections)} from Project " + f"where id is {self.project_id}" + ).one() + + def grab(self, session, entity_type, required_fields): + + type_ids = { + "Sequence": self.sequence_ids, + "Shot": self.shot_ids, + "AssetBuild": self.assetbuild_ids, + "Task": self.task_ids, + "Asset": self.asset_ids, + "AssetVersion": self.version_ids, + "Component": self.component_ids, + } + + if entity_type not in type_ids: + raise KeyError(entity_type) + if not type_ids[entity_type]: + raise ValueError(f"No entities of type {entity_type} exists in the scenario") + + query = f"select {', '.join(required_fields)} from {entity_type} where " + for field in required_fields: + query += f"{field} like '%' " + query += f"and id in ({', '.join(type_ids[entity_type])})" + return session.query(query).all() + + +@pytest.fixture(autouse=True) +def reconnect_session(): + return SESSION.reconnect() + + +@pytest.fixture() +def ftrack_session(): + with ftrack_api.Session() as session: + yield session + + +@pytest.fixture(scope="session") +def ftrack_project_id(): + session = ftrack_api.Session() + name = "unittests_{0}".format(uuid.uuid1().hex) + + # Naively pick the first project schema. For this example to work the + # schema must contain `Shot` and `Sequence` object types. + required_types = ["Sequence", "Shot"] + project_schema = None + for schema in session.query("ProjectSchema").all(): + types = [x["name"] for x in schema["object_types"]] + if all([x in types for x in required_types]): + project_schema = schema + break + + if not project_schema: + raise ValueError( + f"A project schema with the following types could not be found on {session.server_url}:" + f" {', '.join(required_types)}" + ) + + # Create the project with the chosen schema. + project = session.create( + "Project", + {"name": name, "full_name": name + "_full", "project_schema": project_schema}, + ) + session.commit() + + yield project["id"] + + session.delete(project) + session.commit() + + +@pytest.fixture +def scenario_project(ftrack_project_id) -> TestScenario: + return TestScenario(project_id=ftrack_project_id) + + +@pytest.fixture +def scenario_sequence(ftrack_session, scenario_project) -> TestScenario: + project = scenario_project.query_project(ftrack_session, ["project_schema"]) + + # Create sequences, shots and tasks. + for sequence_number in range(1, 5): + sequence = ftrack_session.create( + "Sequence", + {"name": "seq_{0}".format(uuid.uuid4()), "parent": project}, + ) + + scenario_project.sequence_ids.append(sequence["id"]) + ftrack_session.commit() + return scenario_project + + +@pytest.fixture +def scenario_assetbuild(ftrack_session, scenario_project) -> TestScenario: + project = scenario_project.query_project(ftrack_session, ["project_schema"]) + + for _ in range(4): + sequence = ftrack_session.create( + "AssetBuild", + {"name": "ab_{0}".format(uuid.uuid4()), "parent": project}, + ) + + scenario_project.assetbuild_ids.append(sequence["id"]) + ftrack_session.commit() + return scenario_project + + +@pytest.fixture +def scenario_shot(ftrack_session, scenario_sequence) -> TestScenario: + project = scenario_sequence.query_project(ftrack_session, ["project_schema"]) + project_schema = project["project_schema"] + default_shot_status = project_schema.get_statuses("Shot")[0] + + for sequence_id in scenario_sequence.sequence_ids: + sequence = ftrack_session.get("Sequence", sequence_id) + for shot_number in range(1, 8): + shot = ftrack_session.create( + "Shot", + { + "name": "shot_{}".format(uuid.uuid4()), + "parent": sequence, + "status": default_shot_status, + }, + ) + scenario_sequence.shot_ids.append(shot["id"]) + ftrack_session.commit() + return scenario_sequence + + +def _create_tasks(session, scenario, parent_ids): + for id_ in parent_ids: + parent = session.get("TypedContext", id_) + for task_number in range(1, 5): + task = session.create( + "Task", + { + "name": "task_{0}".format(uuid.uuid4()), + "parent": parent + }, + ) + scenario.task_ids.append(task["id"]) + + session.commit() + return scenario + + +@pytest.fixture +def scenario_shot_task(ftrack_session, scenario_shot) -> TestScenario: + return _create_tasks(ftrack_session, scenario_shot, scenario_shot.shot_ids) + + +@pytest.fixture +def scenario_assetbuild_task(ftrack_session, scenario_shot) -> TestScenario: + return _create_tasks(ftrack_session, scenario_shot, scenario_shot.assetbuild_ids) + + +def _create_assets(session, scenario, parent_ids): + for id_ in parent_ids: + parent = session.get("TypedContext", id_) + for task_number in range(1, 5): + task = session.create( + "Asset", + { + "name": "asset_{0}".format(uuid.uuid4()), + "parent": parent + }, + ) + scenario.asset_ids.append(task["id"]) + + session.commit() + return scenario + + +@pytest.fixture +def scenario_shot_asset(ftrack_session, scenario_shot) -> TestScenario: + return _create_assets(ftrack_session, scenario_shot, scenario_shot.shot_ids) + + +@pytest.fixture +def scenario_assetbuild_asset(ftrack_session, scenario_assetbuild) -> TestScenario: + return _create_assets(ftrack_session, scenario_assetbuild, scenario_assetbuild.assetbuild_ids) diff --git a/tests/test_authoring.py b/tests/test_authoring.py new file mode 100644 index 0000000..c34cd5f --- /dev/null +++ b/tests/test_authoring.py @@ -0,0 +1,317 @@ +import random +import uuid +from collections import OrderedDict + +import pytest + +from trackteroid import ( + Query, + Sequence, + Project, + Shot, + Task, + PROJECT_SCHEMAS, + TASK_TYPES, + NoteCategory, + Note, +) +from trackteroid.entities.base import EntityCollection, Entity + + +def test_link_inputs(scenario_sequence): + sequences = ( + Query(Sequence).by_id(*scenario_sequence.sequence_ids).get_all(order_by="name") + ) + sequence1 = sequences[0] + sequence2 = sequences[1] + sequence3 = sequences[2:4] + + after_link1 = sequence1.link_inputs(sequence2) + after_link2 = sequence2.link_inputs(sequence3) + + assert isinstance(after_link1, EntityCollection) + assert "Sequence" == after_link1._entity.__class__.__name__ + assert isinstance(after_link2, EntityCollection) + assert "Sequence" == after_link2._entity.__class__.__name__ + + sequence1.commit() + + assert ( + Query(Sequence) + .by_id(*sequence1.id) + .get_one(projections=["incoming_links.from_id"]) + .incoming_links.from_id + == sequence2.id + ) + + assert ( + Query(Sequence) + .by_id(*sequence2.id) + .get_one(projections=["incoming_links.from_id"]) + .incoming_links.from_id + == sequence3.id, + ) + + +def test_create(scenario_project, ftrack_session): + project = Query(Project).by_id(scenario_project.project_id).get_one() + + test_entities = [] + + # create a sequence + sequence = project.children[Sequence].create(name="Sequence") + test_entities.append(("Sequence", sequence.id[0])) + + # create some shots + shot1 = sequence.children[Shot].create(name="Shot1") + shot2 = sequence.children[Shot].create(name="Shot2") + test_entities.append(("Shot", shot1.id[0])) + test_entities.append(("Shot", shot2.id[0])) + + # create some tasks["name + shot1_task1, shot1_task2 = shot1.children[Task].create_batch( + {"name": "Modeling", "type": "Modeling"}, + {"name": "Rigging", "type": "Rigging"}, + ) + shot2_tasks = shot2.children[Task].create_batch( + {"name": "Modeling", "type": "Modeling"}, + ) + shot2_tasks2 = shot2_tasks.create_batch( + {"name": "Rigging", "type": "Rigging"}, + {"name": "Animation", "type": "Animation"}, + ) + tasks = [ + shot1_task1, + shot1_task2, + shot2_tasks, + shot2_tasks2, + ] + for task in tasks: + for id in task.id: + test_entities.append(("Task", id)) + + asset_types = ftrack_session.query("AssetType").all() + + # create some assets + asset1 = shot1.assets.create(name="Asset1", type=asset_types[0]["name"]) + asset2 = shot1.assets.create(name="Asset2", type=asset_types[1]["name"]) + test_entities.append(("Asset", asset1.id[0])) + test_entities.append(("Asset", asset2.id[0])) + + # create some versions + assetversion1 = asset1.versions.create(task=shot1_task1) + assetversion2 = asset2.versions.create(task=shot1_task2) + test_entities.append(("AssetVersion", assetversion1.id[0])) + test_entities.append(("AssetVersion", assetversion2.id[0])) + + # link some versions + assetversion1.link_outputs(assetversion2) + + # push to server + project.commit() + + retrieved_entities = [] + for entity in test_entities: + retrieved_entities.append(ftrack_session.get(*entity)) + assert all(retrieved_entities), "Some entities were not created" + + +def test_create_project(ftrack_session): + some_project = Query(Project).get_first() + + with pytest.raises(AssertionError): + some_project.create() + + with pytest.raises(AssertionError): + some_project.create(name="Foobar") + + with pytest.raises(AssertionError): + some_project.create(name="Foobar", project_schema="DuDoedl") + + created_project = some_project.create( + name=str(uuid.uuid4()), + project_schema=random.choice(list(PROJECT_SCHEMAS.types.keys())), + ) + + try: + assert isinstance(created_project, EntityCollection) + assert "Project" == created_project._entity.__class__.__name__ + + created_project.commit() + + assert Query(Project).by_id(*created_project.id).get_one() == created_project + finally: + ftrack_session.delete(ftrack_session.get("Project", created_project.id[0])) + ftrack_session.commit() + + +def test_create_sequence(scenario_project): + test_project = Query(Project).by_id(scenario_project.project_id).get_one() + + with pytest.raises(AssertionError): + test_project.children[Sequence].create() + + created_sequence = test_project.children[Sequence].create(name=str(uuid.uuid4())) + + assert isinstance(created_sequence, EntityCollection) + assert "Sequence" == created_sequence._entity.__class__.__name__ + + test_project.commit() + + queried_sequence = Query(Sequence).by_id(*created_sequence.id).get_one() + assert isinstance(queried_sequence, EntityCollection) + assert test_project.id == queried_sequence.parent_id + + +def test_create_shot(scenario_sequence): + test_sequence = ( + Query(Sequence).by_id(Project, scenario_sequence.project_id).get_all() + ) + + with pytest.raises(AssertionError): + test_sequence.children[Shot].create() + + with pytest.raises(AssertionError) as excinfo: + test_sequence.children[Shot].create(name=str(uuid.uuid4())) + + assert "Ambiguous context" in str(excinfo.value) + + test_sequence = ( + Query(Sequence).by_id(Project, scenario_sequence.project_id).get_first() + ) + created_shot = test_sequence.children[Shot].create(name=str(uuid.uuid4())) + + assert isinstance(created_shot, EntityCollection) + assert "Shot" == created_shot._entity.__class__.__name__ + + test_sequence.commit() + + queried_shot = Query(Shot).by_id(*created_shot.id).get_one() + assert queried_shot == created_shot + assert test_sequence.id == queried_shot.parent_id + + +def test_create_task(scenario_shot): + test_shot = ( + Query(Shot) + .by_id(Project, scenario_shot.project_id) + .get_all(projections=["project.project_schema._task_type_schema.types.name"]) + ) + task_types = [ + TASK_TYPES._to_camel_case(_) + for _ in test_shot.project.project_schema._task_type_schema.types.name + ] + + with pytest.raises(AssertionError) as context: + test_shot.children[Task].create() + + with pytest.raises(AssertionError) as context: + test_shot.children[Task].create(name=str(uuid.uuid4())) + + with pytest.raises(AssertionError) as context: + test_shot.children[Task].create( + name=str(uuid.uuid4()), type=random.choice(task_types) + ) + assert "Ambiguous context" in str(context.value) + + test_shot = Query(Shot).by_id(Project, scenario_shot.project_id).get_first() + + created_task = test_shot.children[Task].create( + name=str(uuid.uuid4()), type=random.choice(task_types) + ) + + assert isinstance(created_task, EntityCollection) + assert "Task" == created_task._entity.__class__.__name__ + + test_shot.commit() + + queried_task = Query(Task).by_id(*created_task.id).get_one() + + assert queried_task == created_task + assert test_shot.id == queried_task.parent_id + + +def _construct_collection_from_ftrack_entities(ftrack_entities, session): + import trackteroid.entities + assert ftrack_entities + + if not isinstance(ftrack_entities, list): + ftrack_entities = [ftrack_entities] + entities = [] + + for ftrack_entity in ftrack_entities: + entities.append( + ( + ftrack_entity["id"], + Entity( + _cls=getattr(trackteroid.entities, ftrack_entity.entity_type), + ftrack_entity=ftrack_entity, + ), + ) + ) + + collection = EntityCollection( + _cls=entities[0][1].__class__, entities=OrderedDict(entities), session=session + ) + collection.query = Query(entities[0][1].__class__).by_id(*[_["id"] for _ in ftrack_entities]) + return collection + + +def test_create_on_ambigious_context(scenario_shot_asset, scenario_assetbuild_asset, ftrack_session): + shots = scenario_shot_asset.grab(ftrack_session, "Shot", ["assets.id"]) + asset_builds = scenario_assetbuild_asset.grab(ftrack_session, "AssetBuild", ["assets.id"]) + + mix = [shots[0]["assets"][0], asset_builds[0]["assets"][0]] + + collection4 = _construct_collection_from_ftrack_entities(mix, ftrack_session) + + collection5 = _construct_collection_from_ftrack_entities( + shots[1]["assets"][0], ftrack_session + ) + + asset_type = ftrack_session.query("AssetType").first() + with pytest.raises(AssertionError) as context: + collection4.intersection(collection5).create( + name=str(uuid.uuid4()), type=asset_type["name"] + ) + assert "Ambiguous context" in str(context.value) + + +def test_create_note(scenario_shot): + categories = Query(NoteCategory).get_all() + # Run it three times, once on an empty collection, another in a collection + # with a single entity and then two (this last one will test that the parent + # ambiguity error is not raised) + for i in range(3): + chosen_category = random.choice(categories) + # Query every time to ensure the cache is not fooling the tests + test_shot = Query(Shot).by_id(*scenario_shot.shot_ids).get_first(projections=["notes.content"]) + + assert ( + len(test_shot.notes) == i + ), "Expected shot to have {} notes, {} found".format(i, len(test_shot.notes)) + + with pytest.raises(AssertionError): + test_shot.notes.create(contents="A note", category="A String") + + note = test_shot.notes.create(content="A note", category=chosen_category) + note.commit() + + # For some unknown reason, note.commit() converts parent_type to "Resource", so we can ignore testing + # the parent_type, as it'll be wrong anyway + assert ( + note.parent_id[0] == test_shot.id[0] + ), "Expected parent id {!r}, got {!r}".format( + test_shot.id[0], note.parent_id[0] + ) + + # To ensure it's on the entity and not only on the session cache, a new query + # should do the trick + notes = Query(Note).inject("parent_id is {}".format(test_shot.id[0])).get_all() + assert len(notes) == (i + 1), "Expected {} notes, {} found".format( + i + 1, len(notes) + ) + assert ( + notes.filter(lambda x: x.id[0] == note.id[0]).category.name[0] + == chosen_category.name[0] + ) diff --git a/tests/test_placeholder.py b/tests/test_placeholder.py deleted file mode 100644 index 3ada1ee..0000000 --- a/tests/test_placeholder.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_placeholder(): - assert True diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..a38436e --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,75 @@ +import os +import shutil +import tempfile + +import pytest + +from trackteroid import SESSION, Query, AssetVersion +import dbm + + +@pytest.fixture +def dumped_operations_file(): + temp = tempfile.gettempdir() + yield os.path.join(temp, "operations.dbm") + # TODO: This fails as it's in use when it reaches teardown. Why? + # shutil.rmtree(temp) + + +@pytest.fixture +def initial_operations_file(): + return os.path.join(tempfile.gettempdir(), "operations.dbm") + + +def test_deferred_operations(dumped_operations_file): + # case 1. clear an existing cache temporarily + query_result = Query(AssetVersion).get_first(projections=["task", "version"]) + operations_count = len(SESSION.recorded_operations) + + # do a query -> cache a result + with SESSION.deferred_operations(dumped_operations_file): + assert 0 == len(SESSION.recorded_operations) + + avs = Query(AssetVersion).by_id(*query_result.id).get_one(projections=["task"]) + avs.version = avs.version[0] + 10 + avs2 = avs.create(task=avs.task) # -> create entity, update task, update asset + avs2.version = avs.version[0] + 10 + + assert 5 == len(SESSION.recorded_operations) + + assert operations_count == len(SESSION.recorded_operations) + # check the created file database + database = dbm.open(dumped_operations_file, "r") + + def make_keys(entity_collection): + return entity_collection.map( + lambda x: "('{}', ['{}'])".format(x.entity_type.__name__, x.id[0]) + ) + + expected_keys = make_keys(avs.union(avs2)) + make_keys(avs.task) + make_keys(avs.asset) + ["__operations__"] + assert expected_keys, database.keys() + + +def test_reconnect_and_commit(initial_operations_file): + + + + SESSION.reconnect_and_commit(initial_operations_file) + + assetversion = Query(AssetVersion).by_id("fbb682b6-e9e6-4111-8edb-38d0797c9ffe").get_one( + projections=["components.name"] + ) + assert 99 == assetversion.version[0] + assert "pymelle" == assetversion.components.name[0] + + +def test_get_cached_collections(initial_operations_file): + SESSION.reconnect_and_commit(initial_operations_file) + + avs = SESSION.get_cached_collections()[AssetVersion] + avs.fetch_attributes("asset.versions") + + v99, rest = avs.asset.versions.partition(lambda x: x.version[0] == 99) + v99.uses_versions = rest + + SESSION.commit() diff --git a/tox.ini b/tox.ini index 590288c..613de32 100644 --- a/tox.ini +++ b/tox.ini @@ -11,3 +11,4 @@ deps = pytest-sugar commands = pytest {posargs:tests} +passenv = FTRACK_API_KEY,FTRACK_API_USER,FTRACK_SERVER