Source code for tank.context

# Copyright (c) 2013 Shotgun Software Inc.
#
# CONFIDENTIAL AND PROPRIETARY
#
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
# not expressly granted therein are reserved by Shotgun Software Inc.

"""
Management of the current context, e.g. the current shotgun entity/step/task.

"""

import os
import copy
import json

from tank_vendor import yaml
from . import authentication

from .util import login
from .util import shotgun_entity
from .util import shotgun
from .util import get_sg_entity_name_field
from .util import pickle, json as sgjson
from . import constants
from .errors import TankError, TankContextDeserializationError
from .path_cache import PathCache
from .template import TemplatePath


[docs]class Context(object): """ A context instance is used to collect a set of key fields describing the current Context. We sometimes refer to the context as the current work area. Typically this would be the current shot or asset that someone is working on. The context captures the current point in both shotgun and the file system and context objects are launch a toolkit engine via the :meth:`sgtk.platform.start_engine` method. The context points the engine to a particular point in shotgun and on disk - it could be something as detailed as a task inside a Shot, and something as vague as an empty context. The context is split up into several levels of granularity, reflecting the fundamental hierarchy of Shotgun itself. - The project level defines which shotgun project the context reflects. - The entity level defines which entity the context reflects. For example, this may be a Shot or an Asset. Note that in the case of a Shot, the context does not contain any direct information of which sequence the shot is linked to, however the context can still resolve such relationships implicitly if needed - typically via the :meth:`Context.as_context_fields` method. - The step level defines the current pipeline step. This is often a reflection of a department or a general step in a workflow or pipeline (e.g. Modeling, Rigging). - The task level defines a current Shotgun task. - The user level defines the current user. The data forms a hierarchy, so implicitly, the task belongs to the entity which in turn belongs to the project. The exception to this is the user, which simply reflects the currently operating user. """ def __init__( self, tk, project=None, entity=None, step=None, task=None, user=None, additional_entities=None, source_entity=None, ): """ Context objects are not constructed by hand but are fabricated by the methods :meth:`Sgtk.context_from_entity`, :meth:`Sgtk.context_from_entity_dictionary` and :meth:`Sgtk.context_from_path`. """ self.__tk = tk self.__project = project self.__entity = entity self.__step = step self.__task = task self.__user = user self.__additional_entities = additional_entities or [] self.__source_entity = source_entity self._entity_fields_cache = {} def __repr__(self): # multi line repr msg = [] msg.append(" Project: %s" % str(self.__project)) msg.append(" Entity: %s" % str(self.__entity)) msg.append(" Step: %s" % str(self.__step)) msg.append(" Task: %s" % str(self.__task)) msg.append(" User: %s" % str(self.__user)) msg.append(" PTR URL: %s" % self.shotgun_url) msg.append(" Additional Entities: %s" % str(self.__additional_entities)) msg.append(" Source Entity: %s" % str(self.__source_entity)) return "<Sgtk Context: %s>" % ("\n".join(msg)) def __str__(self): """ String representation for context """ if self.project is None: # We're in a "site" context, so we'll give the site's url # minus the "https://" if that's attached. ctx_name = self.shotgun_url.split("//")[-1] elif self.entity is None: # project-only, e.g 'Project foobar' ctx_name = "Project %s" % self.project.get("name") elif self.step is None and self.task is None: # entity only # e.g. Shot ABC_123 # resolve custom entities to their real display entity_display_name = shotgun.get_entity_type_display_name( self.__tk, self.entity.get("type") ) ctx_name = "%s %s" % (entity_display_name, self.entity.get("name")) else: # we have either step or task task_step = None if self.step: task_step = self.step.get("name") if self.task: task_step = self.task.get("name") # e.g. Lighting, Shot ABC_123 # resolve custom entities to their real display entity_display_name = shotgun.get_entity_type_display_name( self.__tk, self.entity.get("type") ) ctx_name = "%s, %s %s" % ( task_step, entity_display_name, self.entity.get("name"), ) return ctx_name def __eq__(self, other): """ Test if this Context instance is equal to the other Context instance :param other: The other Context instance to compare with :returns: True if self represents the same context as other, otherwise False """ def _entity_dicts_eq(d1, d2): """ Test to see if two entity dictionaries are equal. They are considered equal if both are dictionaries containing 'type' and 'id' with the same values for both keys, For example: Comparing these two dictionaries would return True: - {"type":"Shot", "id":123, "foo":"foo"} - {"type":"Shot", "id":123, "foo":"bar", "bar":"foo"} But comparing these two dictionaries would return False: - {"type":"Shot", "id":123, "foo":"foo"} - {"type":"Shot", "id":567, "foo":"foo"} :param d1: First entity dictionary :param d2: Second entity dictionary :returns: True if d1 and d2 are considered equal, otherwise False. """ if d1 == d2 == None: return True if d1 == None or d2 == None: return False return d1["type"] == d2["type"] and d1["id"] == d2["id"] if not isinstance(other, Context): return NotImplemented if not _entity_dicts_eq(self.project, other.project): return False if not _entity_dicts_eq(self.entity, other.entity): return False if not _entity_dicts_eq(self.step, other.step): return False if not _entity_dicts_eq(self.task, other.task): return False # compare additional entities if self.additional_entities and other.additional_entities: # compare type, id tuples of all additional entities to ensure they are exactly the same. # this compare ignores duplicates in either list and just ensures that the intersection # of both lists contains all unique elements from both lists. types_and_ids = set( [(e["type"], e["id"]) for e in self.additional_entities if e] ) other_types_and_ids = set( [(e["type"], e["id"]) for e in other.additional_entities if e] ) if types_and_ids != other_types_and_ids: return False elif self.additional_entities or other.additional_entities: return False # finally compare the user - this may result in a Shotgun look-up # so do this last! if not _entity_dicts_eq(self.user, other.user): return False return True def __ne__(self, other): """ Test if this Context instance is not equal to the other Context instance :param other: The other Context instance to compare with :returns: True if self != other, False otherwise """ is_equal = self.__eq__(other) if is_equal is NotImplemented: return NotImplemented return not is_equal def __hash__(self): """ Generates a unique hash for the Context. :returns: int hash for this Context. """ # Use sort_keys to ensure dict order does not affect hash. # This hash is only guaranteed to be stable within a Python session, # however this is expected behavior for __hash__. return hash(json.dumps(self.to_dict(), sort_keys=True)) def __deepcopy__(self, memo): """ Allow Context objects to be deepcopied - Note that the tk member is _never_ copied """ # construct copy with current api instance: ctx_copy = Context(self.__tk) # deepcopy all other members: ctx_copy.__project = copy.deepcopy(self.__project, memo) ctx_copy.__entity = copy.deepcopy(self.__entity, memo) ctx_copy.__step = copy.deepcopy(self.__step, memo) ctx_copy.__task = copy.deepcopy(self.__task, memo) ctx_copy.__user = copy.deepcopy(self.__user, memo) ctx_copy.__additional_entities = copy.deepcopy(self.__additional_entities, memo) ctx_copy.__source_entity = copy.deepcopy(self.__source_entity, memo) # except: # ctx_copy._entity_fields_cache return ctx_copy ################################################################################################ # properties @property def project(self): """ The shotgun project associated with this context. If the context is incomplete, it is possible that the property is None. Example:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Light/work") >>> ctx.project {'type': 'Project', 'id': 4, 'name': 'demo_project'} :returns: A std shotgun link dictionary with keys id, type and name, or None if not defined """ return self.__project @property def entity(self): """ The shotgun entity associated with this context. If the context is incomplete, it is possible that the property is None. Example:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Light/work") >>> ctx.entity {'type': 'Shot', 'id': 412, 'name': 'ABC'} :returns: A std shotgun link dictionary with keys id, type and name, or None if not defined """ return self.__entity @property def source_entity(self): """ The Shotgun entity that was used to construct this Context. This is not necessarily the same as the context's "entity", as there are situations where a context is interpreted from an input entity, such as when a PublishedFile entity is used to determine a context. In that case, the original PublishedFile becomes the source_entity, and project, entity, task, and step are determined by what the PublishedFile entity is linked to. A specific example of where this is useful is in a pick_environment core hook. In that hook, an environment is determined based on a provided Context object. In the case where we want to provide a specific environment for a Context built from a PublishedFile entity, the context's source_entity can be used to know for certain that it was constructured from a PublishedFile. :returns: A Shotgun entity dictionary. :rtype: dict or None """ return self.__source_entity @property def step(self): """ The shotgun step associated with this context. If the context is incomplete, it is possible that the property is None. Example:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Light/work") >>> ctx.step {'type': 'Step', 'id': 12, 'name': 'Light'} :returns: A std shotgun link dictionary with keys id, type and name, or None if not defined """ return self.__step @property def task(self): """ The shotgun task associated with this context. If the context is incomplete, it is possible that the property is None. Example:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Lighting/first_pass_lgt/work") >>> ctx.task {'type': 'Task', 'id': 212, 'name': 'first_pass_lgt'} :returns: A std shotgun link dictionary with keys id, type and name, or None if not defined """ return self.__task @property def user(self): """ A property which holds the user associated with this context. If the context is incomplete, it is possible that the property is None. The user property is special - either it represents a user value that was baked into a template path upon folder creation, or it represents the current user:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Lighting/dirk.gently/work") >>> ctx.user {'type': 'HumanUser', 'id': 23, 'name': 'Dirk Gently'} :returns: A std shotgun link dictionary with keys id, type and name, or None if not defined """ # NOTE! get_shotgun_user returns more fields than just type, id and name # so make sure we get rid of those. We should make sure we return the data # in a consistent way, similar to all other entities. No more. No less. if self.__user is None: user = login.get_current_user(self.__tk) if user is not None: self.__user = { "type": user.get("type"), "id": user.get("id"), "name": user.get("name"), } return self.__user @property def additional_entities(self): """ List of entities that are required to provide a full context in non-standard configurations. The "context_additional_entities" core hook gives the context construction code hints about how this data should be populated. .. warning:: This is an old and advanced option and may be deprecated in the future. We strongly recommend not using it. :returns: A list of std shotgun link dictionaries. Will be an empty list in most cases. """ return self.__additional_entities @property def entity_locations(self): """ A list of paths on disk which correspond to the **entity** which this context represents. If no folders have been created for this context yet, the value of this property will be an empty list:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_entity("Task", 8) >>> ctx.entity_locations ['/studio.08/demo_project/sequences/AAA/ABC'] :returns: A list of paths """ if self.entity is None: return [] paths = self.__tk.paths_from_entity(self.entity["type"], self.entity["id"]) return paths @property def shotgun_url(self): """ Returns the shotgun detail page url that best represents this context. Depending on the context, this may be a task, a shot, an asset or a project. If the context is completely empty, the root url of the associated shotgun installation is returned. >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_entity("Task", 8) >>> ctx.shotgun_url 'https://mystudio.shotgunstudio.com/detail/Task/8' """ # walk up task -> entity -> project -> site if self.task is not None: return "%s/detail/%s/%d" % (self.__tk.shotgun_url, "Task", self.task["id"]) if self.entity is not None: return "%s/detail/%s/%d" % ( self.__tk.shotgun_url, self.entity["type"], self.entity["id"], ) if self.project is not None: return "%s/detail/%s/%d" % ( self.__tk.shotgun_url, "Project", self.project["id"], ) # fall back on just the site main url return self.__tk.shotgun_url @property def filesystem_locations(self): """ A property which holds a list of paths on disk which correspond to this context. If no folders have been created for this context yet, the value of this property will be an empty list:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_entity("Task", 8) >>> ctx.filesystem_locations ['/studio.08/demo_project/sequences/AAA/ABC/light/initial_pass'] :returns: A list of paths """ # first handle special cases: empty context if self.project is None: return [] # first handle special cases: project context if self.entity is None: return self.__tk.paths_from_entity("Project", self.project["id"]) # at this stage we know that the context contains an entity # start off with all the paths matching this entity and then cull it down # based on constraints. entity_paths = self.__tk.paths_from_entity( self.entity["type"], self.entity["id"] ) # for each of these paths, get the context and compare it against our context # todo: optimize this! matching_paths = [] for p in entity_paths: ctx = self.__tk.context_from_path(p) # the stuff we need to compare against are all the "child" levels # below entity: task and user matching = False if ctx.user is None and self.user is None: # no user data in either context matching = True elif ctx.user is not None and self.user is not None: # both contexts have user data - is it matching? if ctx.user["id"] == self.user["id"]: matching = True if matching: # ok so user looks good, now check task. # it is possible that with a context that comes from shotgun # there is a task populated which is not being used in the file system # so when we compare tasks, only if there are differing task ids, # we should treat it as a mismatch. task_matching = True if ctx.task is not None and self.task is not None: if ctx.task["id"] != self.task["id"]: task_matching = False if task_matching: # both user and task is matching matching_paths.append(p) return matching_paths @property def sgtk(self): """ The Toolkit API instance associated with this context :returns: :class:`Sgtk` """ return self.__tk @property def tank(self): """ Legacy equivalent of :meth:`sgtk` :returns: :class:`Sgtk` """ return self.__tk ################################################################################################ # public methods
[docs] def as_template_fields(self, template, validate=False): """ Returns the context object as a dictionary of template fields. This is useful if you want to use a Context object as part of a call to the Sgtk API. In order for the system to pass suitable values, you need to pass the template you intend to use the data with as a parameter to this method. The values are derived from existing paths on disk, or in the case of keys with shotgun_entity_type and shotgun_entity_field settings, direct queries to the Shotgun server. The validate parameter can be used to ensure that the method returns all context fields required by the template and if it can't then a :class:`TankError` will be raised. Example:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") # Create a template based on a path on disk. Because this path has been # generated through Toolkit's folder processing and there are corresponding # FilesystemLocation entities stored in Shotgun, the context can resolve # the path into a set of Shotgun entities. # # Note how the context object, once resolved, does not contain # any information about the sequence associated with the Shot. >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Lighting/work") >>> ctx.project {'type': 'Project', 'id': 4, 'name': 'demo_project'} >>> ctx.entity {'type': 'Shot', 'id': 2, 'name': 'ABC'} >>> ctx.step {'type': 'Step', 'id': 1, 'name': 'Light'} # now if we have a template object that we want to turn into a path, # we can request that the context object attempts to resolve as many # fields as it can. These fields can then be plugged into the template # object to generate a path on disk >>> templ = tk.templates["maya_shot_publish"] >>> templ <Sgtk TemplatePath maya_shot_publish: sequences/{Sequence}/{Shot}/{Step}/publish/{name}.v{version}.ma> >>> fields = ctx.as_template_fields(templ) >>> fields {'Step': 'Lighting', 'Shot': 'ABC', 'Sequence': 'AAA'} # the fields dictionary above contains all the 'high level' data that is necessary to realise # the template path. An app or integration can now go ahead and populate the fields specific # for the app's business logic - in this case name and version - and resolve the fields dictionary # data into a path. :param template: :class:`Template` for which the fields will be used. :param validate: If True then the fields found will be checked to ensure that all expected fields for the context were found. If a field is missing then a :class:`TankError` will be raised :returns: A dictionary of template files representing the context. Handy to pass to for example :meth:`Template.apply_fields`. :raises: :class:`TankError` if the fields can't be resolved for some reason or if 'validate' is True and any of the context fields for the template weren't found. """ # Get all entities into a dictionary entities = {} if self.entity: entities[self.entity["type"]] = self.entity if self.step: entities["Step"] = self.step if self.task: entities["Task"] = self.task if self.user: entities["HumanUser"] = self.user if self.project: entities["Project"] = self.project # If there are any additional entities, use them as long as they don't # conflict with types we already have values for (Step, Task, Shot/Asset/etc) for add_entity in self.additional_entities: if add_entity["type"] not in entities: entities[add_entity["type"]] = add_entity fields = {} # Try to populate fields using paths caches for entity if isinstance(template, TemplatePath): # first, sanity check that we actually have a path cache entry # this relates to ticket 22541 where it is possible to create # a context object purely from Shotgun without having it in the path cache # (using tk.context_from_entity(Task, 1234) for example) # # Such a context can result in erronous lookups in the later commands # since these make the assumption that the path cache contains the information # that is being saught after. # # therefore, if the context object contains an entity object and this entity is # not represented in the path cache, raise an exception. if self.entity and len(self.entity_locations) == 0: # context has an entity associated but no path cache entries raise TankError( "Cannot resolve template data for context '%s' - this context " "does not have any associated folders created on disk yet and " "therefore no template data can be extracted. Please run the folder " "creation for %s and try again!" % (self, self.shotgun_url) ) # first look at which ENTITY paths are associated with this context object # and use these to extract the right fields for this template fields = self._fields_from_entity_paths(template) # filter the list of fields to just those that don't have a 'None' value. # Note: A 'None' value for a field indicates an ambiguity and was set in the # _fields_from_entity_paths method (!) non_none_fields = dict( [(key, value) for key, value in fields.items() if value is not None] ) # Determine additional field values by walking down the template tree fields.update( self._fields_from_template_tree(template, non_none_fields, entities) ) # get values for shotgun query keys in template fields.update(self._fields_from_shotgun(template, entities, validate)) if validate: # check that all context template fields were found and if not then raise a TankError missing_fields = [] for key_name in template.keys.keys(): if key_name in entities and key_name not in fields: # we have a template key that should have been found but wasn't! missing_fields.append(key_name) if missing_fields: raise TankError( "Cannot resolve template fields for context '%s' - the following " "keys could not be resolved: '%s'. Please run the folder creation " "for '%s' and try again!" % (self, ", ".join(missing_fields), self.shotgun_url) ) return fields
[docs] def create_copy_for_user(self, user): """ Provides the ability to create a copy of an existing Context for a specific user. This is useful if you need to determine a user specific version of a path, e.g. when copying files between different user sandboxes. Example:: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Lighting/dirk.gently/work") >>> ctx.user {'type': 'HumanUser', 'id': 23, 'name': 'Dirk Gently'} >>> >>> copied_ctx = tk.create_copy_for_user({'type': 'HumanUser', 'id': 7, 'name': 'John Snow'}) >>> copied_ctx.user {'type': 'HumanUser', 'id': 23, 'name': 'John Snow'} :param user: The Shotgun user entity dictionary that should be set on the copied context :returns: :class:`Context` """ ctx_copy = copy.deepcopy(self) ctx_copy.__user = user return ctx_copy
################################################################################################ # serialization
[docs] def serialize(self, with_user_credentials=True, use_json=False): """ Serializes the context into a string. Any Context object can be serialized to/deserialized from a string. This can be useful if you need to pass a Context between different processes. As an example, the ``tk-multi-launchapp`` uses this mechanism to pass the Context from the launch process (e.g. for example PTR desktop app) to the Application (e.g. Maya) being launched. Example: >>> import sgtk >>> tk = sgtk.sgtk_from_path("/studio.08/demo_project") >>> ctx = tk.context_from_path("/studio.08/demo_project/sequences/AAA/ABC/Lighting/dirk.gently/work") >>> context_str = ctx.serialize(ctx) >>> new_ctx = sgtk.Context.deserialize(context_str) :param with_user_credentials: If ``True``, the currently authenticated user's credentials, as returned by :meth:`sgtk.get_authenticated_user`, will also be serialized with the context. :param use_json: If ``True``, the context will be stored in the JSON representation instead of using the pickled representation. .. note:: For example, credentials should be omitted (``with_user_credentials=False``) when serializing the context from a user's current session to send it to a render farm. By doing so, invoking :meth:`sgtk.Context.deserialize` on the render farm will only restore the context and not the authenticated user. :returns: String representation """ # Avoids cyclic imports from .api import get_authenticated_user data = self.to_dict() data["_pc_path"] = self.tank.pipeline_configuration.get_path() if with_user_credentials: # If there is an authenticated user. user = get_authenticated_user() if user: # We should serialize it as well so that the next process knows who to # run as. data["_current_user"] = authentication.serialize_user( user, use_json=use_json ) if use_json: return json.dumps(data) else: return pickle.dumps(data)
[docs] @classmethod def deserialize(cls, context_str): """ The inverse of :meth:`Context.serialize`. :param context_str: String representation of context, created with :meth:`Context.serialize` .. note:: If the context was serialized with the user credentials, the currently authenticated user will be updated with these credentials. :returns: :class:`Context` """ # lazy load this to avoid cyclic dependencies from .api import Tank, set_authenticated_user try: # If the serialized payload starts with a {, we have a # JSON-encoded string. if context_str[0] in ("{", b"{"): data = sgjson.loads(context_str) else: data = pickle.loads(context_str) except Exception as e: raise TankContextDeserializationError(str(e)) # first get the pipeline config path out of the dict pipeline_config_path = data["_pc_path"] del data["_pc_path"] # Authentication in Toolkit requires that credentials are passed from # one process to another so the currently authenticated user is carried # from one process to another. The current user needs to be part of the # context because multiple DCCs can run at the same time under different # users, e.g. launching Maya from the site as user A and Nuke from the tank # command as user B. user_string = data.get("_current_user") if user_string: # Remove it from the data del data["_current_user"] # and set the authenticated user user. user = authentication.deserialize_user(user_string) set_authenticated_user(user) # create a Sgtk API instance. tk = Tank(pipeline_config_path) data["tk"] = tk # add it to the constructor instance # and lastly make the obejct return cls._from_dict(data)
[docs] def to_dict(self): """ Converts the context into a dictionary with keys ``project``, ``entity``, ``user``, ``step``, ``task``, ``additional_entities`` and ``source_entity``. .. note :: Contrary to :meth:`Context.serialize`, this method discards information about the Toolkit instance associated with the context or the currently authenticated user. :returns: A dictionary representing the context. """ return { "project": self._cleanup_entity(self.project), "entity": self._cleanup_entity(self.entity), "user": self._cleanup_entity(self.user), "step": self._cleanup_entity(self.step), "task": self._cleanup_entity(self.task), "additional_entities": [ self._cleanup_entity(entity) for entity in self.additional_entities ], "source_entity": self._cleanup_entity(self.source_entity), }
def _cleanup_entity(self, entity): """ Cleanup the entity dictionary. :param dict entity: Shotgun entity object. On Python 3, we have to be very careful about what we pickle because if we start serializing non-builtin types like datetimes we won't be able to cast back the pickle into a string and we won't be able to use those values in environment variables. Because of this, we will keep the smallest amount of fields we need for the context. This is the type, id, name and the actual real name field for the entity (code, content, etc...) :returns: Dictionary with keys id and type and optionally name and the entity's real name field. """ if entity is None: return None filtered_entity = {"type": entity["type"], "id": entity["id"]} if "name" in entity: filtered_entity["name"] = entity["name"] return filtered_entity
[docs] @classmethod def from_dict(cls, tk, data): """ Converts a dictionary into a :class:`Context` object. You should only pass in a dictionary that was created with the :meth:`Context.to_dict` method. :param dict data: A dictionary generated from :meth:`Context.to_dict`. :param tk: Toolkit instance to associate with the context. :type tk: :class:`Sgtk` :returns: A newly created :class:`Context` object. """ data = copy.deepcopy(data) data["tk"] = tk return cls._from_dict(data)
@classmethod def _from_dict(cls, data): """ Creates a Context object based on the arguments found in a dictionary, but only the ones that the Context understands. This ensures that if a more recent version of Toolkit serializes a context and this API reads it that it won't blow-up. :param dict data: Data for the context. :returns: :class:`Context` """ return Context( tk=data.get("tk"), project=data.get("project"), entity=data.get("entity"), step=data.get("step"), task=data.get("task"), user=data.get("user"), additional_entities=data.get("additional_entities"), source_entity=data.get("source_entity"), ) ################################################################################################ # private methods def _fields_from_shotgun(self, template, entities, validate): """ Query Shotgun server for keys used by this template whose values come directly from Shotgun fields. :param template: Template to retrieve Shotgun fields for. :param entities: Dictionary of entities for the current context. :param validate: If True, missing fields will raise a TankError. :returns: Dictionary of field values extracted from Shotgun. :rtype: dict :raises TankError: Raised if a key is missing from the entities list when ``validate`` is ``True``. """ fields = {} # for any sg query field for key in template.keys.values(): # check each key to see if it has shotgun query information that we should resolve if key.shotgun_field_name: # this key is a shotgun value that needs fetching! # ensure that the context actually provides the desired entities if not key.shotgun_entity_type in entities: if validate: raise TankError( "Key '%s' in template '%s' could not be populated by " "context '%s' because the context does not contain a " "PTR entity of type '%s'!" % (key, template, self, key.shotgun_entity_type) ) else: continue entity = entities[key.shotgun_entity_type] # check the context cache cache_key = (entity["type"], entity["id"], key.shotgun_field_name) if cache_key in self._entity_fields_cache: # already have the value cached - no need to fetch from shotgun fields[key.name] = self._entity_fields_cache[cache_key] else: # get the value from shotgun filters = [["id", "is", entity["id"]]] query_fields = [key.shotgun_field_name] result = self.__tk.shotgun.find_one( key.shotgun_entity_type, filters, query_fields ) if not result: # no record with that id in shotgun! raise TankError( "Could not retrieve PTR data for key '%s' in " "template '%s'. No records in PTR are matching " "entity '%s' (Which is part of the current " "context '%s')" % (key, template, entity, self) ) value = result.get(key.shotgun_field_name) # note! It is perfectly possible (and may be valid) to return None values from # shotgun at this point. In these cases, a None field will be returned in the # fields dictionary from as_template_fields, and this may be injected into # a template with optional fields. if value is None: processed_val = None else: # now convert the shotgun value to a string. # note! This means that there is no way currently to create an int key # in a tank template which matches an int field in shotgun, since we are # force converting everything into strings... processed_val = shotgun_entity.sg_entity_to_string( self.__tk, key.shotgun_entity_type, entity.get("id"), key.shotgun_field_name, value, ) if not key.validate(processed_val): raise TankError( "Template validation failed for value '%s'. This " "value was retrieved from entity %s in PTR to " "represent key '%s' in " "template '%s'." % (processed_val, entity, key, template) ) # all good! # populate dictionary and cache fields[key.name] = processed_val self._entity_fields_cache[cache_key] = processed_val return fields def _fields_from_entity_paths(self, template): """ Determines a template's key values based on context by walking up the context entities paths until matches for the template are found. :param template: The template to find fields for :returns: A dictionary of field name, value pairs for any fields found for the template """ fields = {} project_roots = self._get_project_roots() # get all locations on disk for our context object from the path cache path_cache_locations = self.entity_locations # now loop over all those locations and check if one of the locations # are matching the template that is passed in. In that case, try to # extract the fields values. for cur_path in path_cache_locations: previous_path = None # walk up path until we reach the project root and get values while cur_path not in project_roots: cur_fields = template.validate_and_get_fields(cur_path) if cur_fields is not None: # If there are conflicts, there is ambiguity in the schema for key, value in cur_fields.items(): if value != fields.get(key, value): # Value is ambiguous for this key cur_fields[key] = None fields.update(cur_fields) break else: cur_path = os.path.dirname(cur_path) # There's odd behavior on Windows in Python 2.7.14 that causes # os.path.dirname to leave the last folder on a UNC path # unchanged, including the trailing delimiter: # # Example (on Windows using Python 2.7.14): # # >>> os.path.dirname(r"\\foo\bar\") # '\\foo\bar\' # # The problem is really that the trailing slash isn't removed, when # it seems to have been in older versions of Python. The trailing slash # trips up the logic behind validate_and_get_fields, so we end up in an # infinite loop. The fix is to just remove the trailing path separator # if it's there. # # The fact that it does not consider "\\foo" to be the dirname for # "\\foo\bar\" is actually correct, as explained here: # # https://bugs.python.org/issue27403 # if cur_path.endswith(os.path.sep): cur_path = cur_path[:-1] # We really should never be in this case now with the fix above, but # just in case, we'll raise here if it looks like we're just looping # over the same path multiple times. This will also make our tests # fail in a reasonable way if there's a problem rather than just # hanging up forever until the process is killed. if cur_path == previous_path: raise TankError( "There was a problem parsing an entity location to determine " "its project root. The path in question is %s, and the " "project's roots are %s. This problem would have resulted in " "an infinite loop, but has now been stopped." % (cur_path, project_roots) ) else: previous_path = cur_path return fields def _fields_from_template_tree(self, template, known_fields, context_entities): """ Determines values for a template's keys based on the context by walking down the template tree matching template keys with entity types. This method attempts to find as many fields as possible from the path cache but will try to ensure that incorrect fields are never returned, even if the path cache is not 100% clean (e.g. contains out-of-date paths for one or more of the entities in the context). :param template: The template to find fields for :param known_fields: Dictionary of fields that are already known for this template. The logic in this method will ensure that any fields found match these. :param context_entities: A dictionary of {entity_type:entity_dict} that contains all the entities belonging to this context. :returns: A dictionary of all fields found by this method """ # Step 1 - Walk up the template tree and collect templates # # Use cached paths to find field values # these will be returned in top-down order: # [<Sgtk TemplatePath sequences/{Sequence}>, # <Sgtk TemplatePath sequences/{Sequence}/{Shot}>, # <Sgtk TemplatePath sequences/{Sequence}/{Shot}/{Step}>, # <Sgtk TemplatePath sequences/{Sequence}/{Shot}/{Step}/publish>, # <Sgtk TemplatePath sequences/{Sequence}/{Shot}/{Step}/publish/maya>, # <Sgtk TemplatePath maya_shot_publish: sequences/{Sequence}/{Shot}/{Step}/publish/maya/{name}.v{version}.ma>] templates = _get_template_ancestors(template) # Step 2 - walk templates from the root down. # for each template, get all paths we have stored in the database and find any fields we can for it, making # sure that none of the found fields conflict with the list of entities provided to this method # # build up a list of fields as we go so that each level matches # at least the fields from all previous levels found_fields = {} # get a path cache handle path_cache = PathCache(self.__tk) try: for template in templates: # iterate over all keys in the list of keys for the template # from lowest to highest looking for any that represent context # entities (key name == entity type) for key in reversed(template.ordered_keys): key_name = key.name # Check to see if we already have a value for this key: if key_name in known_fields or key_name in found_fields: # already have a value so skip continue if key_name not in context_entities: # key doesn't represent an entity so skip continue # find fields for any paths associated with this entity by looking in the path cache: entity_fields = _values_from_path_cache( context_entities[key_name], template, path_cache, required_fields=found_fields, ) # entity_fields may contain additional fields that correspond to entities # so we should be sure to validate these as well if we can. # # The following example illustrates where the code could previously return incorrect entity # information from this method: # # With the following template: # /{Sequence}/{Shot}/{Step} # # And a path cache that contains: # Type | Id | Name | Path # ---------------------------------------------------- # Sequence | 001 | Seq_001 | /Seq_001 # Shot | 002 | Shot_A | /Seq_001/Shot_A # Step | 003 | Lighting | /Seq_001/Shot_A/Lighting # Step | 003 | Lighting | /Seq_001/blah/Shot_B/Lighting <- this is out of date! # Shot | 004 | Shot_B | /Seq_001/blah/Shot_B <- this is out of date! # # (Note: the schema/templates have been changed since the entries for Shot_b were added) # # The sub-templates used to search for fields are: # /{Sequence} # /{Sequence}/{Shot} # /{Sequence}/{Shot}/{Step} # # And the entities passed into the method are: # Sequence: Seq_001 # Shot: Shot_B # Step: Lighting # # We are searching for fields for 'Shot_B' that has a broken entry in the path cache so the fields # returned for each level of the template will be: # /{Sequence} -> {"Sequence":"Seq_001"} <- Correct # /{Sequence}/{Shot} -> {} <- entry not found for Shot_B matching # the template # /{Sequence}/{Shot}/{Step} -> {"Sequence":"Seq_001", <- Correct # "Shot":"Shot_A", <- Wrong! # "Step":"Lighting"} <- Correct # # In previous implementations, the final fields would incorrectly be returned as: # # {"Sequence":"Seq_001", # "Shot":"Shot_A", # "Step":"Lighting"} # # The wrong Shot (Shot_A) is returned and not caught because the code only tested that the Step # entity matches and just assumes that the rest is correct - this isn't the case when there is # a one-to-many relationship between entities! # # Therefore, we need to validate that we didn't find any entity fields that we should have found # previously/higher up in the template definition. If we did then the entries that were found # may not be correct so we have to discard them! found_mismatching_field = False for field_name, field_value in entity_fields.items(): if field_name in known_fields: # We found a field we already knew about... if field_value != known_fields[field_name]: # ...but it doesn't match! found_mismatching_field = True elif field_name in found_fields: # We found a field we found before... if field_value != found_fields[field_name]: # ...but it doesn't match! found_mismatching_field = True elif field_name == key_name: # We found a field that matches the entity we were searching for so it must be valid! found_fields[field_name] = field_value elif field_name in context_entities: # We found an entity type that we should have found before (in a previous/shorter # template). This means we can't trust any other fields that were found as they # may belong to a completely different entity/path! found_mismatching_field = True if not found_mismatching_field: # all fields are ok so we can add them all to the list of found fields :) found_fields.update(entity_fields) finally: path_cache.close() return found_fields def _get_project_roots(self): """ Gets the project root paths for the current pipeline configuration. :returns: A list of project root file paths as strings. :rtype: list """ return list(self.__tk.pipeline_configuration.get_data_roots().values())
################################################################################################ # factory methods for constructing new Context objects, primarily called from the Tank object def create_empty(tk): """ Constructs an empty context. :returns: a context object """ return Context(tk) def from_entity(tk, entity_type, entity_id): """ Constructs a context from a shotgun entity. For more information, see :meth:`Sgtk.context_from_entity`. :param tk: Sgtk API handle :param entity_type: The shotgun entity type to produce a context for :param entity_id: The shotgun entity id to produce a context for :returns: :class:`Context` """ return _from_entity_type_and_id(tk, dict(type=entity_type, id=entity_id)) def _from_entity_type_and_id(tk, entity, source_entity=None): """ Constructs a context from the entity type and id as stored in the given entity. Any other data necessary to construct the context beyond the type and id keys will be queried from Shotgun. To get a context from a fully populated entity dictionary, see the from_entity_dictionary function. For more information, see :meth:`Sgtk.context_from_entity`. :param tk: Sgtk API handle :param dict entity: The entity to construct the context from, containing a minimum of type and id keys. :param dict source_entity: The entity dictionary to add to the context as its source_entity. The source entity can be different from the entity, which is useful in the situation where a context is being built from what the source entity is linked to, but its desirable to maintain a reference back to the original entity. A specific example of when this is used is for PublishedFile entities, where the Context object represents the location in the pipeline of what the PublishedFile is linked to. In that situation, we store the original PublishedFile entity as the source entity, which can then be used in a pick_environment hook to return a specific environment for PublishedFiles. :returns: :class:`Context` """ entity_type = entity.get("type") entity_id = entity.get("id") if entity_type is None: raise TankError("Cannot create a context from an entity type 'None'!") if entity_id is None: raise TankError("Cannot create a context from an entity id set to 'None'!") # prep our return data structure context = { "tk": tk, "project": None, "entity": None, "step": None, "user": None, "task": None, "additional_entities": [], "source_entity": source_entity, } if entity_type == "Task": # For tasks get data from shotgun query task_context = _task_from_sg(tk, entity_id) context.update(task_context) elif entity_type in ["PublishedFile", "TankPublishedFile"]: sg_entity = tk.shotgun.find_one( entity_type, [["id", "is", entity_id]], ["project", "entity", "task"] ) if sg_entity is None: raise TankError( "Entity %s with id %s not found in Flow Production Tracking!" % (entity_type, entity_id) ) if sg_entity.get("task"): # base the context on the task for the published file return _from_entity_type_and_id(tk, sg_entity["task"], sg_entity) elif sg_entity.get("entity"): # base the context on the entity that the published is linked with return _from_entity_type_and_id(tk, sg_entity["entity"], sg_entity) elif sg_entity.get("project"): # base the context on the project that the published is linked with return _from_entity_type_and_id(tk, sg_entity["project"], sg_entity) else: # Get data from path cache entity_context = _context_data_from_cache(tk, entity_type, entity_id) # make sure this was actually found in the cache # fall back on a shotgun lookup if not found if entity_context["project"] is None: entity_context = _entity_from_sg(tk, entity_type, entity_id) context.update(entity_context) if entity_type == "Project": # no need to set entity to point at project in this case # that only produces double entries. context["entity"] = None # If there isn't an explicit source_entity, we set it to be # the same as the entity property. context["source_entity"] = context["source_entity"] or context["entity"] return Context._from_dict(context) def from_entity_dictionary(tk, entity_dictionary): """ Constructs a context from a shotgun entity dictionary. For more information, see :meth:`Sgtk.context_from_entity_dictionary`. :param tk: :class:`Sgtk` :param dict entity_dictionary: The entity dictionary to create the context from containing at least: {"type":entity_type, "id":entity_id} :returns: :class:`Context` """ return _from_entity_dictionary(tk, entity_dictionary) def _from_entity_dictionary(tk, entity_dictionary, source_entity=None): """ Constructs a context from a Shotgun entity dictionary. For more information, see :meth:`Sgtk.context_from_entity_dictionary`. :param tk: :class:`Sgtk` :param entity_dictionary: The entity dictionary to create the context from containing at least: {"type":entity_type, "id":entity_id} :param dict source_entity: The entity dictionary to add to the context as its source_entity. The source entity can be different from the entity, which is useful in the situation where a context is being built from what the source entity is linked to, but its desirable to maintain a reference back to the original entity. A specific example of when this is used is for PublishedFile entities, where the Context object represents the location in the pipeline of what the PublishedFile is linked to. In that situation, we store the original PublishedFile entity as the source entity, which can then be used in a pick_environment hook to return a specific environment for PublishedFiles. :returns: :class:`Context` """ # perform validation of the entity dictionary: if not isinstance(entity_dictionary, dict): raise TankError( "Cannot create a context from an empty or invalid entity dictionary!" ) if "type" not in entity_dictionary: raise TankError("Cannot create a context without an entity type!") if "id" not in entity_dictionary: raise TankError("Cannot create a context without an entity id!") # prep our context data structure context = { "tk": tk, "project": None, "entity": None, "step": None, "user": None, "task": None, "additional_entities": [], "source_entity": copy.deepcopy(source_entity or entity_dictionary), } entity_type = entity_dictionary["type"] entity_id = entity_dictionary["id"] # try to determine the various entities from the entity dictionary: project = None entity = None step = None task = None fallback_to_ctx_from_entity = False if entity_type == "Project": # find entities for a project context project = entity_dictionary elif entity_type == "Task": # find entities for a task context task = entity_dictionary if "project" not in task or "entity" not in task or "step" not in task: fallback_to_ctx_from_entity = True else: project = task["project"] entity = task["entity"] step = task["step"] elif entity_type in ["PublishedFile", "TankPublishedFile"]: # special case handling for published files: if entity_dictionary.get("task"): # construct a task context return _from_entity_dictionary( tk, entity_dictionary["task"], source_entity=context["source_entity"] ) elif entity_dictionary.get("entity"): # construct an entity context return _from_entity_dictionary( tk, entity_dictionary["entity"], source_entity=context["source_entity"] ) elif entity_dictionary.get("project"): # construct project context return _from_entity_dictionary( tk, entity_dictionary["project"], source_entity=context["source_entity"] ) else: # fall back on from_entity: fallback_to_ctx_from_entity = True else: # find entities for an entity context entity = entity_dictionary if "project" not in entity: fallback_to_ctx_from_entity = True else: project = entity["project"] if not fallback_to_ctx_from_entity: # clean up entities and populate context structure: def _build_clean_entity(ent): """ Ensure entity has id, type and name fields and build a clean entity dictionary containing just those fields to return, stripping out all other fields. :param ent: The entity dictionary to build a clean dictionary from :returns: A clean entity dictionary containing just 'type', 'id' and 'name' if all three exist in the input dictionary or None if they don't. """ # make sure we have id, type and name: if "id" not in ent or "type" not in ent: return None ent_name = _get_entity_name(ent) if ent_name == None: return None # return a clean dictionary: return {"type": ent["type"], "id": ent["id"], "name": ent_name} if project: context["project"] = _build_clean_entity(project) if not context["project"]: fallback_to_ctx_from_entity = True if not fallback_to_ctx_from_entity and entity: context["entity"] = _build_clean_entity(entity) if not context["entity"]: fallback_to_ctx_from_entity = True if not fallback_to_ctx_from_entity and step: context["step"] = _build_clean_entity(step) if not context["step"]: fallback_to_ctx_from_entity = True if not fallback_to_ctx_from_entity and task: context["task"] = _build_clean_entity(task) if not context["task"]: fallback_to_ctx_from_entity = True if fallback_to_ctx_from_entity: # entity dict doesn't contain enough information to build a # safe, valid context so fall back on 'from_entity': return _from_entity_type_and_id( tk, entity_dictionary, source_entity=context["source_entity"] ) if task: # one final check if we have a task: additional_fields = tk.execute_core_hook("context_additional_entities").get( "entity_fields_on_task", [] ) if additional_fields: # unfortunately we have to fall back to an sg query to get the additional entities :( task_context = _task_from_sg(tk, task["id"], additional_fields) context.update(task_context) return Context._from_dict(context) def from_path(tk, path, previous_context=None): """ Factory method that constructs a context object from a path on disk. The algorithm will navigate upwards in the file system and collect as much tank metadata as possible to construct a Tank context. :param path: a file system path :param previous_context: A context object to use to try to automatically extend the generated context if it is incomplete when extracted from the path. For example, the Task may be carried across from the previous context if it is suitable and if the task wasn't already expressed in the file system path passed in via the path argument. :type previous_context: :class:`Context` :returns: :class:`Context` """ # prep our return data structure context = { "tk": tk, "project": None, "entity": None, "step": None, "user": None, "task": None, "additional_entities": [], } # ask hook for extra entity types we should recognize and insert into the additional_entities list. additional_types = tk.execute_core_hook("context_additional_entities").get( "entity_types_in_path", [] ) # get a cache handle path_cache = PathCache(tk) # gather all roots as lower case project_roots = [ x.lower() for x in tk.pipeline_configuration.get_data_roots().values() ] # first gather entities entities = [] secondary_entities = [] curr_path = path while True: curr_entity = path_cache.get_entity(curr_path) if curr_entity: # Don't worry about entity types we've already got in the context. In the future # we should look for entity ids that conflict in order to flag a degenerate schema. entities.append(curr_entity) # add secondary entities secondary_entities.extend(path_cache.get_secondary_entities(curr_path)) if curr_path.lower() in project_roots: # TODO this could fail with windows path variations # we have reached a root! break # and continue with parent path parent_path = os.path.abspath(os.path.join(curr_path, "..")) if curr_path == parent_path: # We're at the disk root, probably a degenerate path break else: curr_path = parent_path path_cache.close() # now populate the context # go from the root down, so that in the case there are a path with # multiple entities (like PROJECT/SEQUENCE/SHOT), the last entry # is the most relevant one, and will be assigned as the entity for curr_entity in entities[::-1]: # handle the special context fields first if curr_entity["type"] == "Project": context["project"] = curr_entity elif curr_entity["type"] == "Step": context["step"] = curr_entity elif curr_entity["type"] == "Task": context["task"] = curr_entity elif curr_entity["type"] == "HumanUser": context["user"] = curr_entity elif curr_entity["type"] in additional_types: context["additional_entities"].append(curr_entity) else: context["entity"] = curr_entity # now that the context has been populated as much as possible using the # primary entities, fill in any blanks based on the secondary entities. for curr_entity in secondary_entities[::-1]: # handle the special context fields first if curr_entity["type"] == "Project": if context["project"] is None: context["project"] = curr_entity elif curr_entity["type"] == "Step": if context["step"] is None: context["step"] = curr_entity elif curr_entity["type"] == "Task": if context["task"] is None: context["task"] = curr_entity elif curr_entity["type"] == "HumanUser": if context["user"] is None: context["user"] = curr_entity elif curr_entity["type"] in additional_types: # is this entity in the list already if curr_entity not in context["additional_entities"]: context["additional_entities"].append(curr_entity) else: if context["entity"] is None: context["entity"] = curr_entity # see if we can populate it based on the previous context if ( previous_context and context.get("entity") == previous_context.entity and context.get("additional_entities") == previous_context.additional_entities ): # cool, everything is matching down to the step/task level. # if context is missing a step and a task, we try to auto populate it. # (note: weird edge that a context can have a task but no step) if context.get("task") is None and context.get("step") is None: context["step"] = previous_context.step # now try to assign previous task but only if the step matches! if context.get("task") is None and context.get("step") == previous_context.step: context["task"] = previous_context.task # ensure that we don't have a Project as the entity. Projects should only # appear on the projects level, despite being entities. if ( context["project"] and context["entity"] and context["entity"]["type"] == "Project" ): # remove double entry! context["entity"] = None return Context._from_dict(context) ################################################################################################ # serialization def serialize(context): """ Serializes the context into a string. .. deprecated:: v0.18.12 Use :meth:`Context.serialize` """ return context.serialize() def deserialize(context_str): """ The inverse of :meth:`serialize`. .. deprecated:: v0.18.12 Use :meth:`Context.deserialize` """ return Context.deserialize(context_str) ################################################################################################ # YAML representer/constructor def context_yaml_representer(dumper, context): """ Custom serializer. Creates yaml code for a context object. Legacy, kept for compatibility reasons, can probably be removed at this point. .. note:: Contrary to :meth:`sgtk.Context.serialize`, this method doesn't serialize the currently authenticated user. """ # first get the stuff which represents all the Context() # constructor parameters context_dict = context.to_dict() # now we also need to pass a TK instance to the constructor when we # are deserializing the object. For this purpose, pass a # pipeline config path as part of the dict context_dict["_pc_path"] = context.tank.pipeline_configuration.get_path() return dumper.represent_mapping(u"!TankContext", context_dict) def context_yaml_constructor(loader, node): """ Custom deserializer. Constructs a context object given the yaml data provided. Legacy, kept for compatibility reasons, can probably be removed at this point. .. note:: Contrary to :meth:`sgtk.Context.deserialize`, this method doesn't can't restore the currently authenticated user. """ # lazy load this to avoid cyclic dependencies from .api import Tank # get the dict from yaml context_constructor_dict = loader.construct_mapping(node, deep=True) # first get the pipeline config path out of the dict pipeline_config_path = context_constructor_dict["_pc_path"] del context_constructor_dict["_pc_path"] # create a Sgtk API instance. tk = Tank(pipeline_config_path) context_constructor_dict["tk"] = tk # and lastly make the object return Context._from_dict(context_constructor_dict) yaml.add_representer(Context, context_yaml_representer) yaml.add_constructor(u"!TankContext", context_yaml_constructor) ################################################################################################ # utility methods def _get_entity_name(entity_dictionary): """ Extract the entity name from the specified entity dictionary if it can be found. The entity dictionary must contain at least 'type' :param entity_dictionary: An entity dictionary to extract the name from :returns: The name of the entity if found in the entity dictionary, otherwise None """ name_field = shotgun_entity.get_sg_entity_name_field(entity_dictionary["type"]) entity_name = entity_dictionary.get(name_field) if entity_name == None: # Also check to see if entity contains 'name': if name_field != "name": entity_name = entity_dictionary.get("name") return entity_name def _task_from_sg(tk, task_id, additional_fields=None): """ Constructs a context from a shotgun task. Because we are constructing the context from a task, we will get a context which has both a project, an entity a step and a task associated with it. Manne 9 April 2013: could we use the path cache primarily and fall back onto a shotgun lookup? :param tk: An Sgtk API instance :param task_id: The shotgun task id to produce a context for. :param additional_fields: List of additional fields to query for additional entities. If this is 'None' then the function will execute the hook to determine them. """ context = {} # Look up task's step and entity. This information should be static in practice, so we could # likely cache it in the future. standard_fields = ["content", "entity", "step", "project"] # theses keys map directly to linked entities, users will be handled separately context_keys = ["project", "entity", "step", "task"] if additional_fields is None: # ask hook for extra Task entity fields we should query and insert into the additional_entities list. additional_fields = tk.execute_core_hook("context_additional_entities").get( "entity_fields_on_task", [] ) task = tk.shotgun.find_one( "Task", [["id", "is", task_id]], standard_fields + additional_fields ) if not task: raise TankError("Unable to locate Task with id %s in Flow Production Tracking" % task_id) # add task so it can be processed with other shotgun entities task["task"] = {"type": "Task", "id": task_id, "name": task["content"]} for key in context_keys + additional_fields: data = task.get(key) if data is None: # gracefully skip stuff we don't have # for example tasks may not have a step continue # be explicit about what we pull in - make no assumptions about what is # being returned from sg (the unit tests mocker doesn't return the same as the API) value = { "name": data.get("name"), "id": data.get("id"), "type": data.get("type"), } if key in context_keys: context[key] = value elif key in additional_fields: additional_entities = context.get("additional_entities", []) additional_entities.append(value) context["additional_entities"] = additional_entities return context def _entity_from_sg(tk, entity_type, entity_id): """ Determines the entity details for the specified entity type and id by querying Shotgun. If entity_type is 'Project' then this will return a single dictionary for the project. For all other entity types, this will return dictionaries for both the entity and the project the entity exists under. :param tk: The sgtk api instance :param entity_type: The entity type to build a context for :param entity_id: The entity id to build a context for :returns: Dictionary containing either a project entity-dictionary or both project and entity entity-dictionaries depending on the input entity type. e.g. { "project":{"type":"Project", "id":123, "name":"My Project"}, "entity":{"type":"Shot", "id":456, "name":"My Shot"} } """ # get the sg name field for the specified entity type: name_field = shotgun_entity.get_sg_entity_name_field(entity_type) # get the entity data from Shotgun data = tk.shotgun.find_one( entity_type, [["id", "is", entity_id]], ["project", name_field] ) if not data: raise TankError( "Unable to locate %s with id %s in Flow Production Tracking" % (entity_type, entity_id) ) # create context context = {} if entity_type == "Project": context["project"] = { "type": "Project", "id": entity_id, "name": data.get(name_field), } else: context["entity"] = { "type": entity_type, "id": entity_id, "name": data.get(name_field), } context["project"] = data.get("project") return context def _context_data_from_cache(tk, entity_type, entity_id): """Adds data to context based on path cache. :param tk: a Sgtk API instance :param entity_type: a Shotgun entity type :param entity_id: a Shotgun entity id """ context = {} # Set entity info for input entity context["entity"] = {"type": entity_type, "id": entity_id} # Map entity types to context fields types_fields = {"Project": "project", "Step": "step", "Task": "task"} # Use the path cache to look up all paths linked to the entity and use that to extract # extra entities we should include in the context path_cache = PathCache(tk) # Grab all project roots project_roots = list(tk.pipeline_configuration.get_data_roots().values()) # Special case for project as we have the primary data path, which # always points at a project. We only check if the associated configuration # has any associated data roots, otherwise a primary config won't exist. if tk.pipeline_configuration.has_associated_data_roots(): context["project"] = path_cache.get_entity( tk.pipeline_configuration.get_primary_data_root() ) else: context["project"] = None paths = path_cache.get_paths(entity_type, entity_id, primary_only=True) for path in paths: # now recurse upwards and look for entity types we haven't found yet curr_path = path curr_entity = path_cache.get_entity(curr_path) if curr_entity is None: # this is some sort of anomaly! the path returned by get_paths # does not resolve in get_entity. This can happen if the storage # mappings are not consistent or if there is not a 1 to 1 relationship # # This can also happen if there are extra slashes at the end of the path # in the local storage defs and in the pipeline_configuration.yml file. raise TankError( "The path '%s' associated with %s id %s does not " "resolve correctly. This may be an indication of an issue " "with the local storage setup. Please contact support at %s." % (curr_path, entity_type, entity_id, constants.SUPPORT_URL) ) # grab the name for the context entity if curr_entity["type"] == entity_type and curr_entity["id"] == entity_id: context["entity"]["name"] = curr_entity["name"] # note - paths returned by get_paths are always prefixed with a # project root so there is no risk we end up with an infinite loop here.. while curr_path not in project_roots: curr_path = os.path.abspath(os.path.join(curr_path, "..")) curr_entity = path_cache.get_entity(curr_path) if curr_entity: cur_type = curr_entity["type"] if cur_type in types_fields: field_name = types_fields[cur_type] context[field_name] = curr_entity path_cache.close() return context def _values_from_path_cache(entity, cur_template, path_cache, required_fields): """ Determine values for template fields based on an entities cached paths. :param entity: The entity to search for fields for :param cur_template: The template to use to search the path cache :path_cache: An instance of the path_cache to search in :param required_fields: A list of fields that must exist in any matched path :return: Dictionary of fields found by matching the template against all paths found for the entity """ # use the databsae to go from shotgun type/id --> paths entity_paths = path_cache.get_paths(entity["type"], entity["id"], primary_only=True) # Mapping for field values found in conjunction with this entities paths unique_fields = {} # keys whose values should be removed from return values remove_keys = set() for path in entity_paths: # validate path and get fields: path_fields = cur_template.validate_and_get_fields( path, required_fields=required_fields ) if not path_fields: continue # Check values against those found for other paths for key, value in path_fields.items(): if key in unique_fields and value != unique_fields[key]: # value for this key isn't unique! if key == entity["type"]: # Ambiguity for Entity key # now it is possible that we have ambiguity here, but it is normally # an edge case. For example imagine that an asset has paths # /proj/hero_HIGH # /proj/hero_LOW # and we are mapping against template /%(Project)s/%(Asset)s # both paths are valid matches, so we have ambiguous state for the entity msg = "Ambiguous data. Multiple paths cached for %s which match template %s" raise TankError(msg % (str(entity), str(cur_template))) else: # ambiguity for Static key unique_fields[key] = None remove_keys.add(key) else: unique_fields[key] = value # we want to remove the None/ambiguous values so they don't interfere with other entities for remove_key in remove_keys: del unique_fields[remove_key] return unique_fields def _get_template_ancestors(template): """Return templates branch of the template tree, ordered from first template below the project root down to and including the input template. """ # TODO this would probably be better as the Template's responsibility templates = [template] cur_template = template while cur_template.parent is not None and len(cur_template.parent.keys) > 0: next_template = cur_template.parent templates.insert(0, next_template) cur_template = next_template return templates