Source code for tank.util.shotgun

# Copyright (c) 2013 Shotgun Software Inc.
# 
# CONFIDENTIAL AND PROPRIETARY
# 
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit 
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your 
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights 
# not expressly granted therein are reserved by Shotgun Software Inc.

"""
Shotgun utilities

"""
from __future__ import with_statement

import os
import sys
import uuid
import urllib
import urllib2
import urlparse
import urllib
import pprint
import time
import threading
import tempfile

# use api json to cover py 2.5
from tank_vendor import shotgun_api3

from .errors import UnresolvableCoreConfigurationError, ShotgunAttachmentDownloadError
from .errors import ShotgunPublishError
from ..errors import TankError, TankMultipleMatchingTemplatesError
from ..log import LogManager
from .. import hook
from .shotgun_path import ShotgunPath
from . import constants
from . import login
from . import yaml_cache
from .zip import unzip_file
from . import filesystem
from .metrics import log_user_attribute_metric

log = LogManager.get_logger(__name__)


def __get_api_core_config_location():
    """

    Walk from the location of this file on disk to the config area.
    this operation is guaranteed to work on any valid tank installation

    Pipeline Configuration / Studio Location
       |
       |- Install
       |     \- Core
       |          \- Python
       |                \- tank
       |
       \- Config
             \- Core
    """
    # local import to avoid cyclic references
    from ..pipelineconfig_utils import get_path_to_current_core
    core_api_root = get_path_to_current_core()
    core_cfg = os.path.join(core_api_root, "config", "core")

    if not os.path.exists(core_cfg):
        full_path_to_file = os.path.abspath(os.path.dirname(__file__))
        raise UnresolvableCoreConfigurationError(full_path_to_file)

    return core_cfg

def __get_sg_config():
    """
    Returns the site sg config yml file for this install
    
    :returns: full path to to shotgun.yml config file
    """
    core_cfg = __get_api_core_config_location()
    path = os.path.join(core_cfg, "shotgun.yml")
    return path

def get_project_name_studio_hook_location():
    """
    Returns the studio level hook that is used to compute the default project name
    
    :returns: The path to the studio level project naming hook.
    """
    
    # NOTE! This code is located here because it needs to be able to run without a project.
    # the natural place would probably have been to put this inside the pipeline configuration
    # class, however this object assumes a project that exists.
    #
    # @todo longterm we should probably establish a place in the code where we define 
    # an API or set of functions which can be executed outside the remit of a 
    # pipeline configuration/Toolkit project.
    
    core_cfg = __get_api_core_config_location()
    path = os.path.join(core_cfg, constants.STUDIO_HOOK_PROJECT_NAME)
    return path

def __get_sg_config_data(shotgun_cfg_path, user="default"):
    """
    Returns the shotgun configuration yml parameters given a config file.
    
    The shotgun.yml may look like:

        host: str
        api_script: str
        api_key: str
        http_proxy: str
    
        or may now look like:
    
        <User>:
            host: str
            api_script: str
            api_key: str
            http_proxy: str
    
        <User>:
            host: str
            api_script: str
            api_key: str
            http_proxy: str

    The optional user param refers to the <User> in the shotgun.yml.
    If a user is not found the old style is attempted.    
    
    :param shotgun_cfg_path: path to config file
    :param user: Optional user to pass when a multi-user config is being read 

    :returns: dictionary with key host and optional keys api_script, api_key and http_proxy
    """
    # load the config file
    try:
        file_data = yaml_cache.g_yaml_cache.get(shotgun_cfg_path, deepcopy_data=False)
    except Exception, error:
        raise TankError("Cannot load config file '%s'. Error: %s" % (shotgun_cfg_path, error))

    return _parse_config_data(file_data, user, shotgun_cfg_path)

def __get_sg_config_data_with_script_user(shotgun_cfg_path, user="default"):
    """
    Returns the Shotgun configuration yml parameters given a config file, just like
    __get_sg_config_data, but the script user is expected to be present or an exception will be
    thrown.

    :param shotgun_cfg_path: path to config file
    :param user: Optional user to pass when a multi-user config is being read

    :raises TankError: Raised if the script user is not configured.

    :returns: dictionary with mandatory keys host, api_script, api_key and optionally http_proxy
    """
    config_data = __get_sg_config_data(shotgun_cfg_path, user)
    # If the user is configured, we're happy.
    if config_data.get("api_script") and config_data.get("api_key"):
        return config_data
    else:
        raise TankError("Missing required script user in config '%s'" % shotgun_cfg_path)


def _parse_config_data(file_data, user, shotgun_cfg_path):
    """
    Parses configuration data and overrides it with the studio level hook's result if available.
    :param file_data: Dictionary with all the values from the configuration data.
    :param user: Picks the configuration for a specific user in the configuration data.
    :param shotgun_cfg_path: Path the configuration was loaded from.
    :raises: TankError if there are missing fields in the configuration. The accepted configurations are:
            - host
            - host, api_script, api_key
            In both cases, http_proxy is optional.
    :returns: A dictionary holding the configuration data.
    """
    if user in file_data:
        # new config format!
        # we have explicit users defined!
        config_data = file_data[user]
    else:
        # old format - not grouped by user
        config_data = file_data

    # now check if there is a studio level override hook which want to refine these settings
    sg_hook_path = os.path.join(__get_api_core_config_location(), constants.STUDIO_HOOK_SG_CONNECTION_SETTINGS)

    if os.path.exists(sg_hook_path):
        # custom hook is available!
        config_data = hook.execute_hook(sg_hook_path,
                                        parent=None,
                                        config_data=config_data,
                                        user=user,
                                        cfg_path=shotgun_cfg_path)

    def _raise_missing_key(key):
        raise TankError(
            "Missing required field '%s' in config '%s' for script user authentication." % (key, shotgun_cfg_path)
        )

    if not config_data.get("host"):
        _raise_missing_key("host")

    # The script authentication credentials need to be complete in order to work. They can be completely
    # omitted or fully specified, but not halfway configured.
    if config_data.get("api_script") and not config_data.get("api_key"):
        _raise_missing_key("api_key")
    if not config_data.get("api_script") and config_data.get("api_key"):
        _raise_missing_key("api_script")

    # If the appstore proxy is set, but the value is falsy.
    if "app_store_http_proxy" in config_data and not config_data["app_store_http_proxy"]:
        # Make sure it is None.
        config_data["app_store_http_proxy"] = None

    return config_data

@LogManager.log_timing
[docs]def download_url(sg, url, location, use_url_extension=False): """ Convenience method that downloads a file from a given url. This method will take into account any proxy settings which have been defined in the Shotgun connection parameters. In some cases, the target content of the url is not known beforehand. For example, the url ``https://my-site.shotgunstudio.com/thumbnail/full/Asset/1227`` may redirect into ``https://some-site/path/to/a/thumbnail.png``. In such cases, you can set the optional use_url_extension parameter to True - this will cause the method to append the file extension of the resolved url to the filename passed in via the location parameter. So for the urls given above, you would get the following results: - location="/path/to/file" and use_url_extension=False would return "/path/to/file" - location="/path/to/file" and use_url_extension=True would return "/path/to/file.png" :param sg: Shotgun API instance to get proxy connection settings from :param url: url to download :param location: path on disk where the payload should be written. this path needs to exists and the current user needs to have write permissions :param bool use_url_extension: Optionally append the file extension of the resolved URL's path to the input ``location`` to construct the full path name to the downloaded contents. The newly constructed full path name will be returned. :returns: Full filepath to the downloaded file. This may have been altered from the input ``location`` if ``use_url_extension`` is True and a file extension could be determined from the resolved url. :raises: :class:`TankError` on failure. """ # We only need to set the auth cookie for downloads from Shotgun server, # input URLs like: https://my-site.shotgunstudio.com/thumbnail/full/Asset/1227 if sg.config.server in url: # this method also handles proxy server settings from the shotgun API __setup_sg_auth_and_proxy(sg) elif sg.config.proxy_handler: # These input URLs have generally already been authenticated and are # in the form: https://sg-media-staging-usor-01.s3.amazonaws.com/9d93f... # %3D&response-content-disposition=filename%3D%22jackpot_icon.png%22. # Grab proxy server settings from the shotgun API opener = urllib2.build_opener(sg.config.proxy_handler) urllib2.install_opener(opener) # inherit the timeout value from the sg API timeout = sg.config.timeout_secs # download the given url try: request = urllib2.Request(url) if timeout and sys.version_info >= (2,6): # timeout parameter only available in python 2.6+ response = urllib2.urlopen(request, timeout=timeout) else: # use system default response = urllib2.urlopen(request) if use_url_extension: # Make sure the disk location has the same extension as the url path. # Would be nice to see this functionality moved to back into Shotgun # API and removed from here. url_ext = os.path.splitext(urlparse.urlparse(response.geturl()).path)[-1] if url_ext: location = "%s%s" % (location, url_ext) f = open(location, "wb") try: f.write(response.read()) finally: f.close() except Exception, e: raise TankError("Could not download contents of url '%s'. Error reported: %s" % (url, e)) return location
def __setup_sg_auth_and_proxy(sg): """ Borrowed from the Shotgun Python API, setup urllib2 with a cookie for authentication on Shotgun instance. Looks up session token and sets that in a cookie in the :mod:`urllib2` handler. This is used internally for downloading attachments from the Shotgun server. :param sg: Shotgun API instance """ # Importing this module locally to reduce clutter and facilitate clean up when/if this # functionality gets ported back into the Shotgun API. import cookielib sid = sg.get_session_token() cj = cookielib.LWPCookieJar() c = cookielib.Cookie('0', '_session_id', sid, None, False, sg.config.server, False, False, "/", True, False, None, True, None, None, {}) cj.set_cookie(c) cookie_handler = urllib2.HTTPCookieProcessor(cj) if sg.config.proxy_handler: opener = urllib2.build_opener(sg.config.proxy_handler, cookie_handler) else: opener = urllib2.build_opener(cookie_handler) urllib2.install_opener(opener) @LogManager.log_timing def download_and_unpack_attachment(sg, attachment_id, target, retries=5): """ Downloads the given attachment from Shotgun, assumes it is a zip file and attempts to unpack it into the given location. :param sg: Shotgun API instance :param attachment_id: Attachment to download :param target: Folder to unpack zip to. if not created, the method will try to create it. :param retries: Number of times to retry before giving up :raises: ShotgunAttachmentDownloadError on failure """ # @todo: progress feedback here - when the SG api supports it! # sometimes people report that this download fails (because of flaky connections etc) # engines can often be 30-50MiB - as a quick fix, just retry the download if it fails attempt = 0 done = False while not done and attempt < retries: zip_tmp = os.path.join(tempfile.gettempdir(), "%s_tank.zip" % uuid.uuid4().hex) try: time_before = time.time() log.debug("Downloading attachment id %s..." % attachment_id) bundle_content = sg.download_attachment(attachment_id) log.debug("Download complete. Saving into %s" % zip_tmp) with open(zip_tmp, "wb") as fh: fh.write(bundle_content) file_size = os.path.getsize(zip_tmp) # log connection speed time_to_download = time.time() - time_before broadband_speed_bps = file_size * 8.0 / time_to_download broadband_speed_mibps = broadband_speed_bps / (1024 * 1024) log.debug("Download speed: %4f Mbit/s" % broadband_speed_mibps) log_user_attribute_metric("Tk attachment download speed", "%4f Mbit/s" % broadband_speed_mibps) log.debug("Unpacking %s bytes to %s..." % (file_size, target)) filesystem.ensure_folder_exists(target) unzip_file(zip_tmp, target) except Exception, e: log.warning( "Attempt %s: Attachment download of id %s from %s failed: %s" % (attempt, attachment_id, sg.base_url, e) ) attempt += 1 # sleep 500ms before we retry time.sleep(0.5) else: done = True finally: # remove zip file filesystem.safe_delete_file(zip_tmp) if not done: # we were not successful raise ShotgunAttachmentDownloadError( "Failed to download from '%s' after %s retries. See error log for details." % (sg.base_url, retries) ) else: log.debug("Attachment download and unpack complete.") def get_associated_sg_base_url(): """ Returns the shotgun url which is associated with this Toolkit setup. This is an optimization, allowing code to get the Shotgun site URL without having to create a shotgun connection and then inspecting the base_url property. This method is equivalent to calling: create_sg_connection().base_url :returns: The base url for the associated Shotgun site """ # Avoids cyclic imports. from .. import api sg_user = api.get_authenticated_user() if sg_user: return sg_user.host else: # look up in core/shotgun.yml return get_associated_sg_config_data()["host"] def get_associated_sg_config_data(): """ Returns the shotgun configuration which is associated with this Toolkit setup. :returns: The configuration data dictionary with keys host and optional entries api_script, api_key and http_proxy. """ cfg = __get_sg_config() return __get_sg_config_data(cfg) def get_deferred_sg_connection(): """ Returns a shotgun API instance that is lazily initialized. This is a method intended only to support certain legacy cases where some operations in Toolkit are not fully authenticated. When descriptor objects are constructed, they are associated with a SG API handle. This handle is not necessary for basic operations such as path resolution. By passing a deferred connection object to descriptors, authentication is essentially deferred until the need for more complex operations arises, allowing for simple, *legacy* non-authenticated pathways. :return: Proxied SG API handle """ class DeferredInitShotgunProxy(object): def __init__(self): self._sg = None def __getattr__(self, key): if self._sg is None: self._sg = get_sg_connection() return getattr(self._sg, key) return DeferredInitShotgunProxy() _g_sg_cached_connections = threading.local() def get_sg_connection(): """ Returns a shotgun connection and maintains a global cache of connections so that only one API instance is ever returned per thread, no matter how many times this call is made. .. note:: Because Shotgun API instances are not safe to share across threads, this method caches SG Instances per-thread. :return: SG API handle """ global _g_sg_cached_connections sg = getattr(_g_sg_cached_connections, "sg", None) if sg is None: sg = create_sg_connection() _g_sg_cached_connections.sg = sg return sg @LogManager.log_timing def create_sg_connection(user="default"): """ Creates a standard tank shotgun connection. Note! This method returns *a brand new sg API instance*. It is slow. Always consider using tk.shotgun and if you don't have a tk instance, consider using get_sg_connection(). Whenever a Shotgun API instance is created, it pings the server to check that it is running the right versions etc. This is slow and inefficient and means that there will be a delay every time create_sg_connection is called. :param user: Optional shotgun config user to use when connecting to shotgun, as defined in shotgun.yml. This is a deprecated flag and should not be used. :returns: SG API instance """ # Avoids cyclic imports. from .. import api sg_user = api.get_authenticated_user() # If there is no user, that's probably because we're running in an old script that doesn't use # the authenticated user concept. In that case, we'll do what we've always been doing in the # past, which is read shotgun.yml and expect there to be a script user. if sg_user is None: log.debug( "This tk session has no associated authenticated user. Falling back to " "creating a shotgun API instance based on script based credentials in the " "shotgun.yml configuration file." ) # try to find the shotgun.yml path try: config_file_path = __get_sg_config() except TankError, e: log.error( "Trying to create a shotgun connection but this tk session does not have " "an associated authenticated user. Therefore attempted to fall back on " "a legacy authentication method where script based credentials are " "located in a file relative to the location of the core API code. This " "lookup in turn failed. No credentials can be determined and no connection " "to Shotgun can be made. Details: %s" % e ) raise TankError("Cannot connect to Shotgun - this tk session does not have " "an associated user and attempts to determine a valid shotgun " "via legacy configuration files failed. Details: %s" % e) log.debug("Creating shotgun connection based on details in %s" % config_file_path) config_data = __get_sg_config_data_with_script_user(config_file_path, user) # Credentials were passed in, so let's run the legacy authentication # mechanism for script user. api_handle = shotgun_api3.Shotgun( config_data["host"], script_name=config_data["api_script"], api_key=config_data["api_key"], http_proxy=config_data.get("http_proxy"), connect=False ) else: # Otherwise use the authenticated user to create the connection. log.debug("Creating shotgun connection from %r..." % sg_user) api_handle = sg_user.create_sg_connection() # bolt on our custom user agent manager so that we can # send basic version metrics back via http headers. api_handle.tk_user_agent_handler = ToolkitUserAgentHandler(api_handle) return api_handle g_entity_display_name_lookup = None
[docs]def get_entity_type_display_name(tk, entity_type_code): """ Returns the display name for an entity type given its type name. For example, if a custom entity is named "Workspace" in the Shotgun preferences, but is addressed as "CustomEntity03" in the Shotgun API, this method will resolve the display name:: >>> get_entity_type_display_name(tk, "CustomEntity03") 'Workspace' :param tk: :class:`~sgtk.Sgtk` instance :param entity_type_code: API entity type name :returns: display name """ global g_entity_display_name_lookup if g_entity_display_name_lookup is None: # now resolve the entity types into display names using the schema_entity_read method. g_entity_display_name_lookup = tk.shotgun.schema_entity_read() # returns a dictionary on the following form: # { 'Booking': {'name': {'editable': False, 'value': 'Booking'}}, ... } display_name = entity_type_code try: if entity_type_code in g_entity_display_name_lookup: display_name = g_entity_display_name_lookup[entity_type_code]["name"]["value"] except: pass return display_name
@LogManager.log_timing
[docs]def find_publish(tk, list_of_paths, filters=None, fields=None): """ Finds publishes in Shotgun given paths on disk. This method is similar to the find method in the Shotgun API, except that instead of an Entity type, a list of files is passed into the function. In addition to a list of files, shotgun filters can also be specified. This may be useful if for example all publishes with a certain status should be retrieved. By default, the shotgun id is returned for each successfully identified path. If you want to retrieve additional fields, you can specify these using the fields parameter. The method will return a dictionary, keyed by path. The value will be a standard shotgun query result dictionary, for example:: { "/foo/bar" : { "id": 234, "type": "TankPublishedFile", "code": "some data" }, "/foo/baz" : { "id": 23, "type": "TankPublishedFile", "code": "some more data" } } Fields that are not found, or filtered out by the filters parameter, are not returned in the dictionary. :param tk: :class:`~sgtk.Sgtk` instance :param list_of_paths: List of full paths for which information should be retrieved :param filters: Optional list of shotgun filters to apply. :param fields: Optional list of fields from the matched entities to return. Defaults to id and type. :returns: dictionary keyed by path """ # Map path caches to full paths, grouped by storage # in case of sequences, there will be more than one file # per path cache # {<storage name>: { path_cache: [full_path, full_path]}} storages_paths = _group_by_storage(tk, list_of_paths) filters = filters or [] fields = fields or [] # make copy sg_fields = fields[:] # add these parameters to the fields list - they are needed for the internal processing # of the result set. sg_fields.append("created_at") sg_fields.append("path_cache") # PASS 1 # because the file locations are split for each publish in shotgun into two fields # - the path_cache which is a storage relative, platform agnostic path # - a link to a storage entity # ...we need to group the paths per storage and then for each storage do a # shotgun query on the form find all records where path_cache, in, /foo, /bar, /baz etc. published_files = {} # get a list of all storages that we should look up. # for 0.12 backwards compatibility, add the Tank Storage. local_storage_names = storages_paths.keys() if constants.PRIMARY_STORAGE_NAME in local_storage_names: local_storage_names.append("Tank") published_file_entity_type = get_published_file_entity_type(tk) for local_storage_name in local_storage_names: local_storage = tk.shotgun.find_one("LocalStorage", [["code", "is", local_storage_name]]) if not local_storage: # fail gracefully here - it may be a storage which has been deleted published_files[local_storage_name] = [] continue # make copy sg_filters = filters[:] path_cache_filter = ["path_cache", "in"] # now get the list of normalized files for this storage # 0.12 backwards compatibility: if the storage name is Tank, # this is the same as the primary storage. if local_storage_name == "Tank": normalized_paths = storages_paths[constants.PRIMARY_STORAGE_NAME].keys() else: normalized_paths = storages_paths[local_storage_name].keys() # add all of those to the query filter for path_cache_path in normalized_paths: path_cache_filter.append(path_cache_path) sg_filters.append(path_cache_filter) sg_filters.append( ["path_cache_storage", "is", local_storage] ) # organize the returned data by storage published_files[local_storage_name] = tk.shotgun.find(published_file_entity_type, sg_filters, sg_fields) # PASS 2 # take the published_files structure, containing the shotgun data # grouped by storage, and turn that into the final data structure # matches = {} for local_storage_name, publishes in published_files.items(): # get a dictionary which maps shotgun paths to file system paths if local_storage_name == "Tank": normalized_path_lookup_dict = storages_paths[constants.PRIMARY_STORAGE_NAME] else: normalized_path_lookup_dict = storages_paths[local_storage_name] # now go through all publish entities found for current storage for publish in publishes: path_cache = publish["path_cache"] # get the list of real paths matching this entry for full_path in normalized_path_lookup_dict.get(path_cache, []): if full_path not in matches: # this path not yet in the list of matching publish entity data matches[full_path] = publish else: # found a match! This is most likely because the same file # has been published more than once. In this case, we return # the entity data for the file that is more recent. existing_publish = matches[full_path] if existing_publish["created_at"] < publish["created_at"]: matches[full_path] = publish # PASS 3 - # clean up resultset # note that in order to do this we have pulled in additional fields from # shotgun (path_cache, created_at etc) - unless these are specifically asked for # by the caller, get rid of them. # for path in matches: delete_fields = [] # find fields for field in matches[path]: if field not in fields and field not in ("id", "type"): # field is not the id field and was not asked for. delete_fields.append(field) # remove fields for f in delete_fields: del matches[path][f] return matches
def _group_by_storage(tk, list_of_paths): """ Given a list of paths on disk, groups them into a data structure suitable for shotgun. In shotgun, the path_cache field contains an abstracted representation of the publish field, with a normalized path and the storage chopped off. This method aims to process the paths to make them useful for later shotgun processing. Returns a dictionary, keyed by storage name. Each storage in the dict contains another dict, with an item for each path_cache entry. Examples:: ['/studio/project_code/foo/bar.0003.exr', '/secondary_storage/foo/bar'] {'Tank': {'project_code/foo/bar.%04d.exr': ['/studio/project_code/foo/bar.0003.exr'] } 'Secondary_Storage': {'foo/bar': ['/secondary_storage/foo/bar'] } } ['c:\studio\project_code\foo\bar', '/secondary_storage/foo/bar'] {'Tank': {'project_code/foo/bar': ['c:\studio\project_code\foo\bar'] } 'Secondary_Storage': {'foo/bar': ['/secondary_storage/foo/bar'] } } """ storages_paths = {} for path in list_of_paths: # use abstracted path if path is part of a sequence abstract_path = _translate_abstract_fields(tk, path) root_name, dep_path_cache = _calc_path_cache(tk, abstract_path) # make sure that the path is even remotely valid, otherwise skip if dep_path_cache is None: continue # Update data for this storage storage_info = storages_paths.get(root_name, {}) paths = storage_info.get(dep_path_cache, []) paths.append(path) storage_info[dep_path_cache] = paths storages_paths[root_name] = storage_info return storages_paths @LogManager.log_timing
[docs]def create_event_log_entry(tk, context, event_type, description, metadata=None): """ Creates an event log entry inside of Shotgun. Event log entries can be handy if you want to track a process or a sequence of events. :param tk: :class:`~sgtk.Sgtk` instance :param context: A :class:`~sgtk.Context` to associate with the event log entry. :param event_type: String which defines the event type. The Shotgun standard suggests that this should be on the form Company_Item_Action. Examples include:: Shotgun_Asset_New Shotgun_Asset_Change Shotgun_User_Login :param description: A verbose description explaining the meaning of this event. :param metadata: A dictionary of metadata information which will be attached to the event log record in Shotgun. This dictionary may only contain simple data types such as ints, strings etc. :returns: The newly created shotgun record """ data = {} data["description"] = description data["event_type"] = event_type data["entity"] = context.entity data["project"] = context.project data["meta"] = metadata sg_user = login.get_current_user(tk) if sg_user: data["user"] = sg_user return tk.shotgun.create("EventLogEntry", data)
[docs]def get_published_file_entity_type(tk): """ Return the entity type that this toolkit uses for its Publishes. .. note:: This is for backwards compatibility situations only. Code targeting new installations can assume that the published file type in Shotgun is always ``PublishedFile``. :param tk: :class:`~sgtk.Sgtk` instance :returns: "PublishedFile" or "TankPublishedFile" """ return tk.pipeline_configuration.get_published_file_entity_type()
@LogManager.log_timing
[docs]def register_publish(tk, context, path, name, version_number, **kwargs): """ Creates a Published File in Shotgun. **Introduction** The publish will be associated with the current context and point at the given file. The method will attempt to add the publish to Shotgun as a local file link, and failing that it will generate a ``file://`` url to represent the path. In addition to the path, a version number and a name needs to be provided. The version number should reflect the iteration or revision of the publish and will be used to populate the number field of the publish that is created in Shotgun. The name should represent the name of the item, without any version number. This is used to group together publishes in Shotgun and various integrations. For example, if the file you are publishing belongs to Shot AAA_003 and is named ``AAA_003_foreground.v012.ma``, you could set the name to be ``foreground`` and the version to be ``12``. The Shot name will be implied by the associated context. The path will first be checked against the current template definitions. If it matches any template definition and is determined to be a sequence of some kind (per frame, per eye), any sequence tokens such as ``@@@@``, ``$4F`` etc. will be normalised to a ``%04d`` form before written to Shotgun. If the path matches any local storage roots defined by the toolkit project, it will be uploaded as a local file link to Shotgun. If not matching roots are found, the method will retrieve the list of local storages from Shotgun and try to locate a suitable storage. Failing that, it will fall back on a register the path as a ``file://`` url. **Examples** The example below shows a basic publish. In addition to the required parameters, it is recommended to supply at least a comment and a Publish Type:: >>> file_path = '/studio/demo_project/sequences/Sequence-1/shot_010/Anm/publish/layout.v001.ma' >>> name = 'layout' >>> version_number = 1 >>> >>> sgtk.util.register_publish( tk, context, file_path, name, version_number, comment = 'Initial layout composition.', published_file_type = 'Layout Scene' ) {'code': 'layout.v001.ma', 'created_by': {'id': 40, 'name': 'John Smith', 'type': 'HumanUser'}, 'description': 'Initial layout composition.', 'entity': {'id': 2, 'name': 'shot_010', 'type': 'Shot'}, 'id': 2, 'name': 'layout', 'path': {'content_type': None, 'link_type': 'local', 'local_path': '/studio/demo_project/sequences/Sequence-1/shot_010/Anm/publish/layout.v001.ma', 'local_path_linux': '/studio/demo_project/sequences/Sequence-1/shot_010/Anm/publish/layout.v001.ma', 'local_path_mac': '/studio/demo_project/sequences/Sequence-1/shot_010/Anm/publish/layout.v001.ma', 'local_path_windows': 'c:\\studio\\demo_project\\sequences\\Sequence-1\\shot_010\\Anm\\publish\\layout.v001.ma', 'local_storage': {'id': 1, 'name': 'primary', 'type': 'LocalStorage'}, 'name': 'layout.v001.ma', 'url': 'file:///studio/demo_project/sequences/Sequence-1/shot_010/Anm/publish/layout.v001.ma'}, 'path_cache': 'demo_project/sequences/Sequence-1/shot_010/Anm/publish/layout.v001.ma', 'project': {'id': 4, 'name': 'Demo Project', 'type': 'Project'}, 'published_file_type': {'id': 12, 'name': 'Layout Scene', 'type': 'PublishedFileType'}, 'task': None, 'type': 'PublishedFile', 'version_number': 1} **Parameters** :param tk: :class:`~sgtk.Sgtk` instance :param context: A :class:`~sgtk.Context` to associate with the publish. This will populate the ``task`` and ``entity`` link in Shotgun. :param path: The path to the file or sequence we want to publish. If the path is a sequence path it will be abstracted so that any sequence keys are replaced with their default values. :param name: A name, without version number, which helps distinguish this publish from other publishes. This is typically used for grouping inside of Shotgun so that all the versions of the same "file" can be grouped into a cluster. For example, for a Maya publish, where we track only the scene name, the name would simply be that: the scene name. For something like a render, it could be the scene name, the name of the AOV and the name of the render layer. :param version_number: The version number of the item we are publishing. In addition to the above, the following optional arguments exist: - ``task`` - A shotgun entity dictionary with id and type (which should always be Task). if no value is specified, the task will be grabbed from the context object. - ``comment`` - A string containing a description of the comment - ``thumbnail_path`` - A path to a thumbnail (png or jpeg) which will be uploaded to shotgun and associated with the publish. - ``dependency_paths`` - A list of file system paths that should be attempted to be registered as dependencies. Files in this listing that do not appear as publishes in shotgun will be ignored. - ``dependency_ids`` - A list of publish ids which should be registered as dependencies. - ``published_file_type`` - A tank type in the form of a string which should match a tank type that is registered in Shotgun. - ``update_entity_thumbnail`` - Push thumbnail up to the attached entity - ``update_task_thumbnail`` - Push thumbnail up to the attached task - ``created_by`` - Override for the user that will be marked as creating the publish. This should be in the form of shotgun entity, e.g. {"type":"HumanUser", "id":7}. If not set, the user will be determined using :meth:`sgtk.util.get_current_user`. - ``created_at`` - Override for the date the publish is created at. This should be a python datetime object - ``version_entity`` - The Shotgun version entity this published file should be linked to - ``sg_fields`` - Some additional Shotgun fields as a dict (e.g. ``{'tag_list': ['foo', 'bar']}``) :raises: :class:`ShotgunPublishError` on failure. :returns: The created entity dictionary. """ log.debug( "Publish: Begin register publish for context %s and path %s" % (context, path) ) entity = None try: # get the task from the optional args, fall back on context task if not set task = kwargs.get("task") if task is None: task = context.task thumbnail_path = kwargs.get("thumbnail_path") comment = kwargs.get("comment") dependency_paths = kwargs.get('dependency_paths', []) dependency_ids = kwargs.get('dependency_ids', []) published_file_type = kwargs.get("published_file_type") if not published_file_type: # check for legacy name: published_file_type = kwargs.get("tank_type") update_entity_thumbnail = kwargs.get("update_entity_thumbnail", False) update_task_thumbnail = kwargs.get("update_task_thumbnail", False) created_by_user = kwargs.get("created_by") created_at = kwargs.get("created_at") version_entity = kwargs.get("version_entity") sg_fields = kwargs.get("sg_fields", {}) published_file_entity_type = get_published_file_entity_type(tk) log.debug("Publish: Resolving the published file type") sg_published_file_type = None # query shotgun for the published_file_type if published_file_type: if not isinstance(published_file_type, basestring): raise TankError("published_file_type must be a string") if published_file_entity_type == "PublishedFile": filters = [["code", "is", published_file_type]] sg_published_file_type = tk.shotgun.find_one('PublishedFileType', filters=filters) if not sg_published_file_type: # create a published file type on the fly sg_published_file_type = tk.shotgun.create("PublishedFileType", {"code": published_file_type}) else:# == TankPublishedFile filters = [ ["code", "is", published_file_type], ["project", "is", context.project] ] sg_published_file_type = tk.shotgun.find_one('TankType', filters=filters) if not sg_published_file_type: # create a tank type on the fly sg_published_file_type = tk.shotgun.create("TankType", {"code": published_file_type, "project": context.project}) # create the publish log.debug("Publish: Creating publish in Shotgun") entity = _create_published_file(tk, context, path, name, version_number, task, comment, sg_published_file_type, created_by_user, created_at, version_entity, sg_fields) # upload thumbnails log.debug("Publish: Uploading thumbnails") if thumbnail_path and os.path.exists(thumbnail_path): # publish tk.shotgun.upload_thumbnail(published_file_entity_type, entity["id"], thumbnail_path) # entity if update_entity_thumbnail == True and context.entity is not None: tk.shotgun.upload_thumbnail(context.entity["type"], context.entity["id"], thumbnail_path) # task if update_task_thumbnail == True and task is not None: tk.shotgun.upload_thumbnail("Task", task["id"], thumbnail_path) else: # no thumbnail found - instead use the default one this_folder = os.path.abspath(os.path.dirname(__file__)) no_thumb = os.path.join(this_folder, "resources", "no_preview.jpg") tk.shotgun.upload_thumbnail(published_file_entity_type, entity.get("id"), no_thumb) # register dependencies log.debug("Publish: Register dependencies") _create_dependencies(tk, entity, dependency_paths, dependency_ids) log.debug("Publish: Complete") return entity except Exception, e: # Log the exception so the original traceback is available log.exception(e) # Raise our own exception with the original message and the created entity, # if any raise ShotgunPublishError( error_message="%s" % e, entity=entity )
def _translate_abstract_fields(tk, path): """ Translates abstract fields for a path into the default abstract value. For example, the path /foo/bar/xyz.0003.exr will be transformed into /foo/bar/xyz.%04d.exr :param tk: :class:`~sgtk.Sgtk` instance :param path: a normalized path with slashes matching os.path.sep :returns: the path with any abstract fields normalized. """ try: template = tk.template_from_path(path) except TankMultipleMatchingTemplatesError: log.debug( "Path matches multiple templates. Not translating abstract fields: %s" % path ) else: if template: abstract_key_names = [k.name for k in template.keys.values() if k.is_abstract] if len(abstract_key_names) > 0: # we want to use the default values for abstract keys cur_fields = template.get_fields(path) for abstract_key_name in abstract_key_names: del(cur_fields[abstract_key_name]) path = template.apply_fields(cur_fields) else: log.debug( "Path does not match a template. Not translating abstract fields: %s" % path ) return path def _create_dependencies(tk, publish_entity, dependency_paths, dependency_ids): """ Creates dependencies in shotgun from a given entity to a list of paths and ids. Paths not recognized are skipped. :param tk: API handle :param publish_entity: The publish entity to set the dependencies for. This is a dictionary with keys type and id. :param dependency_paths: List of paths on disk. List of strings. :param dependency_ids: List of publish entity ids to associate. List of ints """ published_file_entity_type = get_published_file_entity_type(tk) publishes = find_publish(tk, dependency_paths) # create a single batch request for maximum speed sg_batch_data = [] for dependency_path in dependency_paths: # did we manage to resolve this file path against # a publish in shotgun? published_file = publishes.get(dependency_path) if published_file: if published_file_entity_type == "PublishedFile": req = {"request_type": "create", "entity_type": "PublishedFileDependency", "data": {"published_file": publish_entity, "dependent_published_file": published_file } } sg_batch_data.append(req) else:# == "TankPublishedFile" req = {"request_type": "create", "entity_type": "TankDependency", "data": {"tank_published_file": publish_entity, "dependent_tank_published_file": published_file } } sg_batch_data.append(req) for dependency_id in dependency_ids: if published_file_entity_type == "PublishedFile": req = {"request_type": "create", "entity_type": "PublishedFileDependency", "data": {"published_file": publish_entity, "dependent_published_file": {"type": "PublishedFile", "id": dependency_id } } } sg_batch_data.append(req) else:# == "TankPublishedFile" req = {"request_type": "create", "entity_type": "TankDependency", "data": {"tank_published_file": publish_entity, "dependent_tank_published_file": {"type": "TankPublishedFile", "id": dependency_id } } } sg_batch_data.append(req) # push to shotgun in a single xact if len(sg_batch_data) > 0: tk.shotgun.batch(sg_batch_data) def _create_published_file(tk, context, path, name, version_number, task, comment, published_file_type, created_by_user, created_at, version_entity, sg_fields=None): """ Creates a publish entity in shotgun given some standard fields. :param tk: :class:`~sgtk.Sgtk` instance :param context: A :class:`~sgtk.Context` to associate with the publish. This will populate the ``task`` and ``entity`` link in Shotgun. :param path: The path to the file or sequence we want to publish. If the path is a sequence path it will be abstracted so that any sequence keys are replaced with their default values. :param name: A name, without version number, which helps distinguish this publish from other publishes. This is typically used for grouping inside of Shotgun so that all the versions of the same "file" can be grouped into a cluster. For example, for a Maya publish, where we track only the scene name, the name would simply be that: the scene name. For something like a render, it could be the scene name, the name of the AOV and the name of the render layer. :param version_number: The version number of the item we are publishing. :param task: Shotgun Task dictionary to associate with publish or ``None`` :param comment: Comments string to associate with publish :param published_file_type: Shotgun publish type dictionary to associate with publish :param created_by_user: User entity to associate with publish or ``None`` if current user (via :meth:`sgtk.util.get_current_user`) should be used. :param created_at: Timestamp to associate with publish or None for default. :param version_entity: Version dictionary to associate with publish or ``None``. :param sg_fields: Dictionary of additional data to add to publish. :returns: The result of the shotgun API create method. """ data = { "description": comment, "name": name, "task": task, "version_number": version_number, } # we set the optional additional fields first so we don't allow overwriting the standard parameters if sg_fields is None: sg_fields = {} data.update(sg_fields) if created_by_user: data["created_by"] = created_by_user else: # use current user sg_user = login.get_current_user(tk) if sg_user: data["created_by"] = sg_user if created_at: data["created_at"] = created_at published_file_entity_type = get_published_file_entity_type(tk) if published_file_type: if published_file_entity_type == "PublishedFile": data["published_file_type"] = published_file_type else: # using legacy type TankPublishedFile data["tank_type"] = published_file_type if version_entity: data["version"] = version_entity # Determine the value of the link field based on the given context if context.project is None: # when running toolkit as a standalone plugin, the context may be # empty and not contain a project. Publishes are project entities # in Shotgun, so we cannot proceed without a project. raise TankError("Your context needs to at least have a project set in order to publish.") elif context.entity is None: # If the context does not have an entity, link it up to the project. # This happens for project specific workflows such as editorial # workflows, ingest and when running zero config toolkit plugins in # a generic project mode. data["entity"] = context.project else: data["entity"] = context.entity # set the associated project data["project"] = context.project # Check if path is a url or a straight file path. Path # is assumed to be a url if it has a scheme: # # scheme://netloc/path # path_is_url = False res = urlparse.urlparse(path) if res.scheme: # handle windows drive letters - note this adds a limitation # but one that is not likely to be a problem as single-character # schemes are unlikely! if len(res.scheme) > 1 or not res.scheme.isalpha(): path_is_url = True # naming and path logic is different depending on url if path_is_url: # extract name from url: # # scheme://hostname.com/path/to/file.ext -> file.ext # scheme://hostname.com -> hostname.com if res.path: # scheme://hostname.com/path/to/file.ext -> file.ext data["code"] = res.path.split("/")[-1] else: # scheme://hostname.com -> hostname.com data["code"] = res.netloc # make sure that the url is escaped property, otherwise # shotgun might not accept it. # # for quoting logic, see bugfix here: # http://svn.python.org/view/python/trunk/Lib/urllib.py?r1=71780&r2=71779&pathrev=71780 # # note: by applying a safe pattern like this, we guarantee that already quoted paths # are not touched, e.g. quote('foo bar') == quote('foo%20bar') data["path"] = { "url": urllib.quote(path, safe="%/:=&?~#+!$,;'@()*[]"), "name": data["code"] # same as publish name } else: # normalize the path to native slashes norm_path = ShotgunPath.from_current_os_path(path).current_os if norm_path != path: log.debug("Normalized input path '%s' -> '%s'" % (path, norm_path)) path = norm_path # convert the abstract fields to their defaults path = _translate_abstract_fields(tk, path) # name of publish is the filename data["code"] = os.path.basename(path) # Make path platform agnostic and determine if it belongs # to a storage that is associated with this toolkit config. storage_name, path_cache = _calc_path_cache(tk, path) if path_cache: # there is a toolkit storage mapping defined for this storage log.debug( "The path '%s' is associated with config root '%s'." % (path, storage_name) ) # specify the full path in shotgun data["path"] = {"local_path": path} # note - #30005 - there appears to be an issue on the serverside # related to the explicit storage format and paths containing # sequence tokens such as %04d. Commenting out the logic to handle # the new explicit storage format for the time being while this is # being investigated. # # check if the shotgun server supports the storage and relative_path parameters # # which allows us to specify exactly which storage to bind a publish to rather # # than relying on Shotgun to compute this # supports_specific_storage_syntax = ( # hasattr(tk.shotgun, "server_caps") and # tk.shotgun.server_caps.version and # tk.shotgun.server_caps.version >= (6, 3, 17) # ) # # if supports_specific_storage_syntax: # # explicitly pass relative path and storage to shotgun # storage = tk.shotgun.find_one("LocalStorage", [["code", "is", storage_name]]) # # if storage is None: # # there is no storage in Shotgun that matches the one toolkit expects. # # this *may* be ok because there may be another storage in Shotgun that # # magically picks up the publishes and associates with them. In this case, # # issue a warning and fall back on the server-side functionality # log.warning( # "Could not find the expected storage '%s' in Shotgun to associate " # "publish '%s' with - falling back to Shotgun's built-in storage " # "resolution logic. It is recommended that you add the '%s' storage " # "to Shotgun" % (storage_name, path, storage_name)) # data["path"] = {"local_path": path} # # else: # data["path"] = {"relative_path": path_cache, "local_storage": storage} # # else: # # use previous syntax where we pass the whole path to Shotgun # # and shotgun will do the storage/relative path split server side. # # This operation may do unexpected things if you have multiple # # storages that are identical or overlapping # data["path"] = {"local_path": path} # fill in the path cache field which is used for filtering in Shotgun # (because SG does not support data["path_cache"] = path_cache else: # path does not map to any configured root - fall back gracefully: # 1. look for storages in Shotgun and see if we can create a local path # 2. failing that, just register the entry as a file:// resource. log.debug("Path '%s' does not have an associated config root." % path) log.debug("Will check shotgun local storages to see if there is a match.") matching_local_storage = False log.debug("Retrieving local storages from Shotgun...") storages = tk.shotgun.find( "LocalStorage", [], ["code"] + ShotgunPath.SHOTGUN_PATH_FIELDS ) for storage in storages: local_storage_path = ShotgunPath.from_shotgun_dict(storage).current_os # assume case preserving file systems rather than case sensitive if local_storage_path and path.lower().startswith(local_storage_path.lower()): log.debug("Path matches Shotgun local storage '%s'" % storage["code"]) matching_local_storage = True break if matching_local_storage: # there is a local storage matching this path # so use that when publishing data["path"] = {"local_path": path} else: # no local storage defined so publish as a file:// url log.debug( "No local storage matching path '%s' - path will be " "registered as a file:// url." % (path, ) ) # (see http://stackoverflow.com/questions/11687478/convert-a-filename-to-a-file-url) file_url = urlparse.urljoin("file:", urllib.pathname2url(path)) log.debug("Converting '%s' -> '%s'" % (path, file_url)) data["path"] = { "url": file_url, "name": data["code"] # same as publish name } # now call out to hook just before publishing data = tk.execute_core_hook(constants.TANK_PUBLISH_HOOK_NAME, shotgun_data=data, context=context) log.debug("Registering publish in Shotgun: %s" % pprint.pformat(data)) return tk.shotgun.create(published_file_entity_type, data) def _calc_path_cache(tk, path): """ Calculates root path name and relative path (including project directory). returns (root_name, path_cache) If the location cannot be computed, because the path does not belong to a valid root, (None, None) is returned. """ # paths may be c:/foo in Maya on windows - don't rely on os.sep here! # normalize input path first c:\foo -> c:/foo norm_path = path.replace(os.sep, "/") # get roots - don't assume data is returned on any particular form # may return c:\foo, c:/foo or /foo - assume that we need to normalize this path roots = tk.pipeline_configuration.get_data_roots() for root_name, root_path in roots.items(): norm_root_path = root_path.replace(os.sep, "/") if norm_path.lower().startswith(norm_root_path.lower()): norm_parent_dir = os.path.dirname(norm_root_path) # Remove parent dir plus "/" - be careful to handle the case where # the parent dir ends with a '/', e.g. 'T:/' for a Windows drive path_cache = norm_path[len(norm_parent_dir):].lstrip("/") log.debug( "Split up path '%s' into storage %s and relative path '%s'" % (path, root_name, path_cache) ) return root_name, path_cache # not found, return None values log.debug("Unable to split path '%s' into a storage and a relative path." % path) return None, None ################################################################################################# # wrappers around the shotgun API's http header API methods class ToolkitUserAgentHandler(object): """ Convenience wrapper to handle the user agent management """ def __init__(self, sg): self._sg = sg self._app = None self._framework = None self._engine = None self._core_version = None def __clear_bundles(self): """ Resets the currently active bundle. """ self._app = None self._framework = None self._engine = None def set_current_app(self, name, version, engine_name, engine_version): """ Update the user agent headers for the currently active app """ # first clear out the other bundle settings - there can only # be one active bundle at a time self.__clear_bundles() # populate the currently running bundle data self._app = (name, version) self._engine = (engine_name, engine_version) # push to shotgun self.__update() def set_current_framework(self, name, version, engine_name, engine_version): """ Update the user agent headers for the currently active framework """ # first clear out the other bundle settings - there can only # be one active bundle at a time self.__clear_bundles() # populate the currently running bundle data self._framework = (name, version) self._engine = (engine_name, engine_version) # push to shotgun self.__update() def set_current_engine(self, name, version): """ Update the user agent headers for the currently active engine """ # first clear out the other bundle settings - there can only # be one active bundle at a time self.__clear_bundles() # populate the currently running bundle data self._engine = (name, version) # push to shotgun self.__update() def set_current_core(self, core_version): """ Update the user agent headers for the currently active core """ self._core_version = core_version self.__update() def __update(self): """ Perform changes to the Shotgun API """ # note that because of shortcomings in the API, # we have to reference the member variable directly. # # sg._user_agents is a list of strings. By default, # its value is [ "shotgun-json (1.2.3)" ] # First, remove any old Toolkit settings new_agents = [] for x in self._sg._user_agents: if x.startswith("tk-core") or \ x.startswith("tk-app") or \ x.startswith("tk-engine") or \ x.startswith("tk-fw"): continue new_agents.append(x) # Add new Toolkit settings if self._core_version: new_agents.append("tk-core (%s)" % self._core_version) if self._engine: new_agents.append("tk-engine (%s %s)" % self._engine) if self._app: new_agents.append("tk-app (%s %s)" % self._app) if self._framework: new_agents.append("tk-fw (%s %s)" % self._framework) # and update shotgun self._sg._user_agents = new_agents