Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/i10 pytests #65

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/trackteroid/entities/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,8 @@ def _prepare_note_entity(self, ftrack_note_entity, kwargs, parent_source):
for resource_id in resource_ids:
recipient = recipients.create(
note_id=ftrack_note_entity["id"],
resource_id=resource_id
resource_id=resource_id,
no_parent=True
)
ftrack_note_entity["recipients"].append(list(recipient.values())[0].ftrack_entity)

Expand Down
249 changes: 131 additions & 118 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,119 @@
import dataclasses
from collections import OrderedDict
import logging
import random
import uuid
from typing import List

import ftrack_api
from ftrack_api.collection import Collection
import pytest

from trackteroid import SESSION
from trackteroid import SESSION, Query
from trackteroid.entities import *
from trackteroid.entities.base import Entity, EntityCollection


@dataclasses.dataclass
class TestScenario:
project_id: str
sequence_ids: List[str] = dataclasses.field(default_factory=list)
shot_ids: List[str] = dataclasses.field(default_factory=list)
assetbuild_ids: List[str] = dataclasses.field(default_factory=list)
task_ids: List[str] = dataclasses.field(default_factory=list)
asset_ids: List[str] = dataclasses.field(default_factory=list)
version_ids: List[str] = dataclasses.field(default_factory=list)
component_ids: List[str] = dataclasses.field(default_factory=list)
__test__ = False

def __init__(self, project_ids=[]):
self.entity_ids = {
"Project": project_ids,
"Folder": [],
"Sequence": [],
"Shot": [],
"AssetBuild": [],
"Task": [],
"Asset": [],
"AssetVersion": [],
"Component": []
}

def query_project(self, session, projections):
return session.query(
f"select {', '.join(projections)} from Project "
f"where id is {self.project_id}"
f"where id is {self.entity_ids['Project'][0]}"
).one()

def grab(self, session, entity_type, required_fields):

type_ids = {
"Sequence": self.sequence_ids,
"Shot": self.shot_ids,
"AssetBuild": self.assetbuild_ids,
"Task": self.task_ids,
"Asset": self.asset_ids,
"AssetVersion": self.version_ids,
"Component": self.component_ids,
}

if entity_type not in type_ids:
def create_entity(self, ftrack_session, entity_type, params):
entity = ftrack_session.create(entity_type, params)
self.entity_ids[entity_type].append(entity["id"])
return entity["id"]

def create_assets_tasks_versions(self, ftrack_session):

for parent_id in self.entity_ids["AssetBuild"] + self.entity_ids["Shot"]:
parent = ftrack_session.get("TypedContext", parent_id)
for _ in range(3):
params = {"name": "asset_{0}".format(uuid.uuid4()), "parent": parent}
asset_id = self.create_entity(ftrack_session, "Asset", params)
params = {"name": "task_{0}".format(uuid.uuid4()), "parent": parent}
task_id = self.create_entity(ftrack_session, "Task", params=params)
for version in range(1, 4):
asset = ftrack_session.get("Asset", asset_id)
task = ftrack_session.get("Task", task_id)
params = {"version": version, "asset": asset, "task": task}
self.create_entity(ftrack_session, "AssetVersion", params=params)

def grab(self, session, entity_type, required_fields=[], limit=0):
if entity_type not in self.entity_ids:
raise KeyError(entity_type)
if not type_ids[entity_type]:
if not self.entity_ids[entity_type]:
raise ValueError(f"No entities of type {entity_type} exists in the scenario")

query = f"select {', '.join(required_fields)} from {entity_type} where "
for field in required_fields:
query += f"{field} like '%' "
query += f"and id in ({', '.join(type_ids[entity_type])})"
return session.query(query).all()
query = ""
if required_fields:
query += f"select {', '.join(required_fields)} from "
query += f"{entity_type} where "
query += f"id in ({', '.join(self.entity_ids[entity_type])})"
if limit > 0:
query += f" limit {limit}"
result = session.query(query).all()
random.shuffle(result)
return result

def construct_collection_from_ftrack_entities(self, ftrack_entities):
entities = []
for ftrack_entity in ftrack_entities:
entities.append(
(ftrack_entity["id"], Entity(_cls=globals()[ftrack_entity.entity_type], ftrack_entity=ftrack_entity)))
collection = EntityCollection(
_cls=entities[0][1].__class__,
entities=OrderedDict(entities),
session=SESSION
)
collection.query = Query(entities[0][1].__class__).by_id(*[_["id"] for _ in ftrack_entities])
return collection

def construct_simple_collection(self, session, entity_type, limit=0, required_fields=[]):
entities = self.grab(session, entity_type, limit=limit, required_fields=required_fields)
return self.construct_collection_from_ftrack_entities(entities)

def _assert_attributes(self, ftrack_entity, entity_collection):
not_implemented_msg = "Skipped attribute testing for '{key}', because required Entity is not implemented"

for key, value in ftrack_entity.items():
if isinstance(value, Collection):
try:
_collection_ids = []
if len(value) > 0:
for _ in value:
_collection_ids.append(_["id"])
logging.debug("Test for secondary")
assert _collection_ids == getattr(entity_collection, key).id
except NotImplementedError:
logging.warning(not_implemented_msg.format(key=key))
else:
# testing primary attributes
logging.debug("Testing against attribute '{}'".format(key))
try:
_attribute_value = getattr(entity_collection, key)
if isinstance(_attribute_value, EntityCollection):
_ftrack_entities = [_.ftrack_entity for _ in _attribute_value._entities.values()]
assert [value] == _ftrack_entities
else:
assert [value] == getattr(entity_collection, key)
except NotImplementedError:
logging.warning(not_implemented_msg.format(key=key))


@pytest.fixture(autouse=True)
Expand All @@ -60,7 +127,7 @@ def ftrack_session():
yield session


@pytest.fixture(scope="session")
@pytest.fixture
def ftrack_project_id():
session = ftrack_api.Session()
name = "unittests_{0}".format(uuid.uuid1().hex)
Expand Down Expand Up @@ -96,111 +163,57 @@ def ftrack_project_id():

@pytest.fixture
def scenario_project(ftrack_project_id) -> TestScenario:
return TestScenario(project_id=ftrack_project_id)
return TestScenario(project_ids=[ftrack_project_id])


@pytest.fixture
def scenario_sequence(ftrack_session, scenario_project) -> TestScenario:
def scenario_sequence_folder(ftrack_session, scenario_project) -> TestScenario:
project = scenario_project.query_project(ftrack_session, ["project_schema"])

# Create sequences, shots and tasks.
for sequence_number in range(1, 5):
sequence = ftrack_session.create(
"Sequence",
{"name": "seq_{0}".format(uuid.uuid4()), "parent": project},
)

scenario_project.sequence_ids.append(sequence["id"])
for _ in range(2):
params = {"name": "seq_{0}".format(uuid.uuid4()), "parent": project}
scenario_project.create_entity(ftrack_session=ftrack_session, entity_type="Sequence", params=params)
params = {"name": "asset_builds".format(uuid.uuid4()), "parent": project}
scenario_project.create_entity(ftrack_session=ftrack_session, entity_type="Folder", params=params)
ftrack_session.commit()
return scenario_project


@pytest.fixture
def scenario_assetbuild(ftrack_session, scenario_project) -> TestScenario:
project = scenario_project.query_project(ftrack_session, ["project_schema"])

for _ in range(4):
sequence = ftrack_session.create(
"AssetBuild",
{"name": "ab_{0}".format(uuid.uuid4()), "parent": project},
)

scenario_project.assetbuild_ids.append(sequence["id"])
ftrack_session.commit()
return scenario_project


@pytest.fixture
def scenario_shot(ftrack_session, scenario_sequence) -> TestScenario:
project = scenario_sequence.query_project(ftrack_session, ["project_schema"])
def scenario_assetbuild_shot(ftrack_session, scenario_sequence_folder) -> TestScenario:
assetbuild_folder_id = scenario_sequence_folder.entity_ids["Folder"][0]
assetbuild_folder = ftrack_session.get("Folder", assetbuild_folder_id)
for _ in range(3):
params = {"name": "asset_build_{0}".format(uuid.uuid4()), "parent": assetbuild_folder}
scenario_sequence_folder.create_entity(ftrack_session, "AssetBuild", params=params)

project = scenario_sequence_folder.query_project(ftrack_session, ["project_schema"])
project_schema = project["project_schema"]
default_shot_status = project_schema.get_statuses("Shot")[0]

for sequence_id in scenario_sequence.sequence_ids:
for sequence_id in scenario_sequence_folder.entity_ids["Sequence"]:
sequence = ftrack_session.get("Sequence", sequence_id)
for shot_number in range(1, 8):
shot = ftrack_session.create(
"Shot",
{
"name": "shot_{}".format(uuid.uuid4()),
"parent": sequence,
"status": default_shot_status,
},
)
scenario_sequence.shot_ids.append(shot["id"])
for shot in range(3):
params = {"name": "shot_{}".format(uuid.uuid4()), "parent": sequence, "status": default_shot_status}
scenario_sequence_folder.create_entity(ftrack_session, "Shot", params=params)
ftrack_session.commit()
return scenario_sequence


def _create_tasks(session, scenario, parent_ids):
for id_ in parent_ids:
parent = session.get("TypedContext", id_)
for task_number in range(1, 5):
task = session.create(
"Task",
{
"name": "task_{0}".format(uuid.uuid4()),
"parent": parent
},
)
scenario.task_ids.append(task["id"])

session.commit()
return scenario
return scenario_sequence_folder


@pytest.fixture
def scenario_shot_task(ftrack_session, scenario_shot) -> TestScenario:
return _create_tasks(ftrack_session, scenario_shot, scenario_shot.shot_ids)


@pytest.fixture
def scenario_assetbuild_task(ftrack_session, scenario_shot) -> TestScenario:
return _create_tasks(ftrack_session, scenario_shot, scenario_shot.assetbuild_ids)


def _create_assets(session, scenario, parent_ids):
for id_ in parent_ids:
parent = session.get("TypedContext", id_)
for task_number in range(1, 5):
task = session.create(
"Asset",
{
"name": "asset_{0}".format(uuid.uuid4()),
"parent": parent
},
)
scenario.asset_ids.append(task["id"])
def scenario_asset_task_version(ftrack_session, scenario_assetbuild_shot) -> TestScenario:
scenario_assetbuild_shot.create_assets_tasks_versions(ftrack_session)
ftrack_session.commit()

session.commit()
return scenario
return scenario_assetbuild_shot


@pytest.fixture
def scenario_shot_asset(ftrack_session, scenario_shot) -> TestScenario:
return _create_assets(ftrack_session, scenario_shot, scenario_shot.shot_ids)

def scenario_components(ftrack_session, scenario_asset_task_version) -> TestScenario:
for assetversion_id in scenario_asset_task_version.entity_ids["AssetVersion"]:
assetversion = ftrack_session.get("AssetVersion", assetversion_id)
params = {"name": "component_{}".format(uuid.uuid4()), "parent": assetversion}
scenario_asset_task_version.create_entity(ftrack_session, "Component", params=params)
ftrack_session.commit()

@pytest.fixture
def scenario_assetbuild_asset(ftrack_session, scenario_assetbuild) -> TestScenario:
return _create_assets(ftrack_session, scenario_assetbuild, scenario_assetbuild.assetbuild_ids)
return scenario_asset_task_version
Loading