# Copyright (c) 2019 Shotgun Software Inc.
#
# CONFIDENTIAL AND PROPRIETARY
#
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
# not expressly granted therein are reserved by Shotgun Software Inc.
import logging
import os
import re
import glob
import subprocess
import sys
import threading
import uuid
from contextlib import contextmanager
import sgtk
from sgtk.util.filesystem import ensure_folder_exists
try:
from tank_vendor import sgutils
except ImportError:
from tank_vendor import six as sgutils
[docs]class AfterEffectsEngine(sgtk.platform.Engine):
"""
An After Effects CC engine for Shotgun Toolkit.
"""
# the maximum size for a generated thumbnail
MAX_THUMB_SIZE = 512
SHOTGUN_ADOBE_HEARTBEAT_INTERVAL = 1.0
SHOTGUN_ADOBE_HEARTBEAT_TOLERANCE = 2
SHOTGUN_ADOBE_NETWORK_DEBUG = "SHOTGUN_ADOBE_NETWORK_DEBUG" in os.environ
TEST_SCRIPT_BASENAME = "run_tests.py"
PY_TO_JS_LOG_LEVEL_MAPPING = {
"CRITICAL": "error",
"ERROR": "error",
"WARNING": "warn",
"INFO": "info",
"DEBUG": "debug",
}
_SHOTGUN_ADOBE_PORT = os.environ.get("SHOTGUN_ADOBE_PORT")
_SHOTGUN_ADOBE_APPID = os.environ.get("SHOTGUN_ADOBE_APPID")
_COMMAND_UID_COUNTER = 0
_LOCK = threading.Lock()
_FAILED_PINGS = 0
_CONTEXT_CACHE = dict()
_CHECK_CONNECTION_TIMER = None
_CONTEXT_CHANGES_DISABLED = False
_DIALOG_PARENT = None
_WIN32_AFTEREFFECTS_MAIN_HWND = None
_PROXY_WIN_HWND = None
_HEARTBEAT_DISABLED = False
_PROJECT_CONTEXT = None
_AFX_PID = None
_POPUP_CACHE = None
_AFX_WIN32_DIALOG_WINDOW_CLASS = "#32770" # the windows window class name used by After Effects for modal dialogs
__WIN32_GW_CHILD = 5
_CONTEXT_CACHE_KEY = "aftereffects_context_cache"
_HAS_CHECKED_CONTEXT_POST_LAUNCH = False
__CC_VERSION_MAPPING = {
12: "2015",
13: "2016",
14: "2017",
15: "2018",
16: "2019",
17: "2020",
18: "2021",
22: "2022",
23: "2023",
24: "2024",
}
__IS_SEQUENCE_REGEX = re.compile("[\[]?([#@]+|[%]0\dd)[\]]?")
############################################################################
# context changing
def post_context_change(self, old_context, new_context):
"""
Runs after a context change has occurred. This will trigger the
new state to be sent to the Adobe CC host application.
:param old_context: The previous context.
:param new_context: The current context.
"""
# keep track of schema load for the current project to make sure we
# aren't trying to use sg globals prior to load
self.__schema_loaded = False
# get the project id to supply to sg globals
if new_context.project:
project_id = new_context.project["id"]
else:
project_id = None
# callback to set the schema loaded flag
def _on_schema_loaded():
self.__schema_loaded = True
# tell sg globals to load the schema for the current project. sg globals
# will run the callback immediately if already cached so this is likely
# very quick.
self.__shotgun_globals.run_on_schema_loaded(
_on_schema_loaded, project_id=project_id
)
# go ahead and start the process of sending the current state back to js
self.__send_state()
# If the context is set in the environment, then we'll update it with
# the new one. This will mean that a CEP extension restart will come
# back up with the same context that it went down with. We have to set
# this in ExtendScript, because it's the parent process of any CEP and
# Python processes that get spawned. By setting it at the top, it'll be
# propagated down to any of After Effects's subprocesses.
if "TANK_CONTEXT" in os.environ:
self.adobe.dollar.setenv("TANK_CONTEXT", new_context.serialize())
############################################################################
# engine initialization
def pre_app_init(self):
"""
Sets up the engine into an operational state. This method called before
any apps are loaded.
"""
# import and keep a handle on the bundled python module
self.__tk_aftereffects = self.import_module("tk_aftereffects")
# constant command uid lookups for these special commands
self.__jump_to_sg_command_id = self.__get_command_uid()
self.__jump_to_fs_command_id = self.__get_command_uid()
# get the adobe instance. it may have been initialized already by a
# previous instance of the engine. if not, initialize a new one.
self._adobe = self.__tk_aftereffects.AdobeBridge.get_or_create(
identifier=self.instance_name,
port=self._SHOTGUN_ADOBE_PORT,
logger=self.logger,
network_debug=self.SHOTGUN_ADOBE_NETWORK_DEBUG,
)
self.logger.debug("Network debug logging is %s" % self._adobe.network_debug)
self.logger.debug("%s: Initializing..." % (self,))
# connect to all the adobe bridge signals
self.adobe.logging_received.connect(self._handle_logging)
self.adobe.command_received.connect(self._handle_command)
self.adobe.active_document_changed.connect(self._handle_active_document_change)
self.adobe.run_tests_request_received.connect(self._run_tests)
self.adobe.state_requested.connect(self.__send_state)
# in order to use frameworks, they have to be imported via
# import_module. so they're exposed in the bundled python.
shotgun_data = self.__tk_aftereffects.shotgun_data
settings = self.__tk_aftereffects.shotgun_settings
# keep a handle for shotgun globals as they are needed in other
# functions as well
self.__shotgun_globals = self.__tk_aftereffects.shotgun_globals
# import here since the engine is responsible for defining Qt.
from sgtk.platform.qt import QtCore
# create a data retriever for async querying of sg data
self.__sg_data = shotgun_data.ShotgunDataRetriever(
QtCore.QCoreApplication.instance()
)
# get outselves a settings manager where we can store metadata.
self.__settings_manager = settings.UserSettings(self)
# connect the retriever signals
self.__sg_data.work_completed.connect(self.__on_worker_signal)
self.__sg_data.work_failure.connect(self.__on_worker_failure)
# context request uids. we keep track of these to make sure we're only
# processing the current requests.
self.__context_find_uid = None
self.__context_thumb_uid = None
# keep track if sg global schema has been cached
self.__schema_loaded = False
# start the retriever thread
self.__sg_data.start()
# keep a list of handles on the launched dialogs
self.__qt_dialogs = []
def post_app_init(self):
"""
Runs after all apps have been initialized.
"""
if not self.adobe.event_processor:
try:
from sgtk.platform.qt import QtGui
self.adobe.event_processor = QtGui.QApplication.processEvents
except ImportError:
pass
self.__setup_connection_timer()
self.__send_state()
# forward the log file path back to the js side. this is used to direct
# clients to the file in the event of an error
log_file = sgtk.LogManager().base_file_handler.baseFilename
self.adobe.send_log_file_path(log_file)
# If there are fewer than 2 documents open, we don't need the stored
# cache, regardless of whether this is a restart situation or a fresh
# launch of AE. In that case, we take the opportunity to clear anything
# that might exist in the stored cache, as it's data we don't need.
self.logger.debug("Single document found, clearing stored context cache.")
self.__settings_manager.store(
self._CONTEXT_CACHE_KEY,
dict(),
)
# Normally the bootstrap logic would handle the file open, but since the bootstrap logic is handled by
# the adobe framework, and is generic, we should handle it here.
file_to_open = os.environ.get("SGTK_FILE_TO_OPEN")
if file_to_open:
# open the specified script
self.adobe.app.open(self.adobe.File(file_to_open))
# clear the environment variable after loading so that it doesn't get reopened on an engine restart.
del os.environ["SGTK_FILE_TO_OPEN"]
def destroy_engine(self):
"""
Called when the engine should tear down itself and all its apps.
"""
self.logger.debug("Destroying engine...")
# Set our parent widget back to being owned by the window manager
# instead of After Effects's application window.
if self._PROXY_WIN_HWND and sgtk.util.is_windows():
self.__tk_aftereffects.win_32_api.SetParent(self._PROXY_WIN_HWND, 0)
# No longer poll for new messages from this engine.
if self._CHECK_CONNECTION_TIMER:
self._CHECK_CONNECTION_TIMER.stop()
# We're going to hide and force the garbage collection of any dialogs
# that we know about. This will stop memory leaks, and is also prudent
# since we're severing the socket.io connection that will allow them
# to function properly.
dialogs_still_opened = self.created_qt_dialogs[:]
for dialog in dialogs_still_opened:
dialog.close()
# Gracefully stop our data retriever. This call will block until the
# currently-processing request has completed.
self.__sg_data.stop()
# Disconnect from the server.
self.adobe.disconnect()
# Disconnect the signals in case there are references to this engine
# out there. without disconnecting, it will still respond to signals
# from the adobe bridge.
self.adobe.logging_received.disconnect(self._handle_logging)
self.adobe.command_received.disconnect(self._handle_command)
self.adobe.active_document_changed.disconnect(
self._handle_active_document_change
)
self.adobe.run_tests_request_received.disconnect(self._run_tests)
self.adobe.state_requested.disconnect(self.__send_state)
def post_qt_init(self):
"""
Called externally once a ``QApplication`` has been created and completes
the engine setup process.
"""
# We need to have the RPC API call processEvents during its response
# wait loop. This will keep that loop from blocking the UI thread.
from sgtk.platform.qt import QtGui
self.adobe.event_processor = QtGui.QApplication.processEvents
# Since this is running in our own Qt event loop, we'll use the bundled
# dark look and feel. breaking encapsulation to do so.
self.logger.debug("Initializing default styling...")
self._initialize_dark_look_and_feel()
# Sets up the heartbeat timer to run asynchronously.
self.__setup_connection_timer(force=True)
def register_command(self, name, callback, properties=None):
"""
Registers a new command with the engine. For Adobe RPC purposes,
a "uid" property is added to the command's properties.
"""
properties = properties or dict()
properties["uid"] = self.__get_command_uid()
return super(AfterEffectsEngine, self).register_command(
name,
callback,
properties,
)
@property
def host_info(self):
"""
Returns information about the application hosting this engine.
:returns: A {"name": application name, "version": application version}
dictionary. eg: {"name": "AfterFX", "version": "2017.1.1"}
"""
if not self.adobe:
# Don't error out if the bridge was not yet started
return {"name": "AfterFX", "version": "unknown"}
version = self.adobe.app.version
# app.aftereffects.AfterEffectsVersion just returns 18.1.1 which is not what users see in the UI
# extract a more meaningful version from the systemInformation property
# which gives something like:
# Adobe After Effects Version: 2017.1.1 20170425.r.252 2017/04/25:23:00:00 CL 1113967 x64\rNumber of .....
# and use it instead if available.
regex = re.compile(r"(\d+\.?\d*)")
match = regex.search(sgutils.ensure_str(version))
major = int(float(match[0]))
cc_version = self.__CC_VERSION_MAPPING.get(major, version)
return {
"name": "AfterFX",
"version": cc_version,
}
def _initialize_dark_look_and_feel(self):
"""
Override the base engine method.
Apply specific styling for this DCC.
"""
from sgtk.platform.qt import QtGui
# Initialize the SG Toolkit style to the application.
super()._initialize_dark_look_and_feel()
# Apply specific styling
app = QtGui.QApplication.instance()
app_palette = app.palette()
# The default placeholder text for this DCC is black, let's set it back to
# the text color (as it was in Qt5), but with the current placeholder
# text alpha value.
new_placeholder_text_color = app_palette.text().color()
placeholder_text_color = app_palette.placeholderText().color()
new_placeholder_text_color.setAlpha(placeholder_text_color.alpha())
app_palette.setColor(QtGui.QPalette.PlaceholderText, new_placeholder_text_color)
# Set the palette back with the specific styling
app.setPalette(app_palette)
############################################################################
# engine host interaction methods
@property
def project_path(self):
"""
Get the active project filepath.
:returns: The current project path or an empty string
:rtype: str
"""
doc_obj = self.adobe.app.project.file
doc_path = ""
# doc_obj will always be a ProxyWrapper instance so we cannot
# use the `doc_obj is not None` comparison
if doc_obj is not None:
doc_path = doc_obj.fsName
return doc_path
[docs] def save(self, path=None):
"""
Save the project in place or to the given file-path
:param str path: The target file path. optional.
"""
with self.context_changes_disabled():
if path is None:
self.adobe.app.project.save()
else:
# After Effects won't ensure that the folder is
# created when saving, so we must make sure it exists
ensure_folder_exists(os.path.dirname(path))
self.adobe.app.project.save(self.adobe.File(path))
new_path = self.project_path
self.logger.info("Saved file to to {!r}".format(new_path))
[docs] def save_as(self):
"""
Launch a Qt file browser to select a file, then save the supplied
project to that path.
"""
from sgtk.platform.qt import QtGui
doc_path = self.project_path
# After Effects doesn't appear to have a
# "save as" dialog accessible via
# python. so open our own Qt file dialog.
file_dialog = QtGui.QFileDialog(
parent=self._get_dialog_parent(),
caption="Save As",
directory=doc_path,
filter="After Effects Documents (*.aep, *.aepx)",
)
file_dialog.setLabelText(QtGui.QFileDialog.Accept, "Save")
file_dialog.setLabelText(QtGui.QFileDialog.Reject, "Cancel")
file_dialog.setOption(QtGui.QFileDialog.DontResolveSymlinks)
file_dialog.setOption(QtGui.QFileDialog.DontUseNativeDialog)
if not file_dialog.exec_():
return
path = file_dialog.selectedFiles()[0]
if path:
self.save(path)
@property
def AdobeItemTypes(self):
"""
This returns a constant class that maps constants against
adobe aftereffects internal class names.
:returns: AdobeItemTypes constants object. see :class:`~python.tk_aftereffects.AdobeItemTypes`
:rtype: AdobeItemTypes
"""
afx_module = self.import_module("tk_aftereffects")
return afx_module.AdobeItemTypes
[docs] def is_item_of_type(self, item, adobe_type):
"""
Indicates whether the given item is of the given AdobeItemType.
:param item: item to be checked.
:type item: `adobe.ItemObject`_
:param str adobe_type: AdobeItemType-constant. One of the constants held by :class:`~python.tk_aftereffects.AdobeItemTypes`
:returns: Indicating if the given item is of the given type
:rtype: bool
"""
return item.data.get("instanceof", "") == adobe_type
@property
def selected_item(self):
"""
Helper getting the currently selected item in the after effects project
:returns: The active item
:rtype: `adobe.ItemObject`_
"""
return self.adobe.app.project.activeItem
[docs] def is_adobe_sequence(self, path):
"""
Helper to query if an adobe-style render path is
describing a sequence.
The sequencepath must contain one of the following patterns in
order to be recognized::
###
@@@
[###]
%0xd
:param str path: filepath to check
:returns: True if the path describes a sequence
:rtype: bool
"""
if re.search(self.__IS_SEQUENCE_REGEX, path):
return True
return False
[docs] def check_sequence(self, path, queue_item):
"""
Helper to query if all render files of a given queue item
are actually existing.
:param str path: filepath to check
:param queue_item: an after effects render queue item
:type queue_item: `adobe.RenderQueueItemObject`_
:returns: True if the path describes a sequence
:rtype: bool
"""
for file_path, _ in self.get_render_files(path, queue_item):
if not os.path.exists(file_path):
return False
return True
[docs] def iter_collection(self, collection_item):
"""
Helper to iter safely through an adobe-collection item
as its index starts at 1, not 0.
One may use this method like this::
item_collection = engine.adobe.project.items
for item in engine.iter_collection(item_collection):
print item.name
:param collection_item: the after-effects collection item to iter
:type collection_item: `adobe.ItemCollectionObject`_
:yields: the next child item of the collection
:rtype: `adobe.ItemObject`_
"""
for i in range(1, collection_item.length + 1):
yield collection_item[i]
[docs] def get_render_files(self, path, queue_item):
"""
Yields all render-files and its frame number of a given
after effects render queue item.
The path is needed to uniquely identify the correct output_module
:param str path: filepath to iter
:param queue_item: an after effects render queue item
:type queue_item: `adobe.RenderQueueItemObject`_
:yields: 2-item-tuple where the firstitem is the resolved path (str)
of the render file and the second item the frame-number or
None if the path is not an image-sequence.
:rtype: tuple
"""
# is the given render-path a sequence?
match = re.search(self.__IS_SEQUENCE_REGEX, path)
if not match:
# if not, we just check if the file exists
yield path, None
return # Exit the iterator
# if yes, we check the existence of each frame
frame_time = queue_item.comp.frameDuration
start_time = int(round(queue_item.timeSpanStart / frame_time, 3))
frame_numbers = int(round(queue_item.timeSpanDuration / frame_time, 3))
skip_frames = queue_item.skipFrames + 1
padding = len(match.group(1))
test_path = path.replace(match.group(0), "%%0%dd" % padding)
for frame_no in range(start_time, start_time + frame_numbers, skip_frames):
yield test_path % frame_no, frame_no
[docs] def import_filepath(self, path):
"""
Helper method to import footage into the current comp.
This method contains logic to determine as what the given file-path
may be imported (PROJECT, FOOTAGE, COMP etc.)
To influence this logic you can implement the "import_footage_hook"
for this engine.
It will also collect all new items and return those in case
the import leads to more than one (1) Item-objects to be imported.
:param str path: filepath to be imported. Sequences should give either the
first frame or the frames replaced by hashes (#), at (@) or the
percent formatting (%04d) syntax.
:returns: All new items that were imported
:rtype: list-of-`adobe.CompItemObject`_
"""
file_obj = self.adobe.File(path)
import_options = self.adobe.ImportOptions()
import_options.file = file_obj
import_options.sequence = False
self.execute_hook_method(
"import_footage_hook", "set_import_options", import_options=import_options
)
return self.__import_file(import_options)
[docs] def add_items_to_comp(self, item_collection, comp_item):
"""
Helper method that adds the "best-matching" items from a given
adobe.CollectionItem into a given CompItem. Where "best-matching"
means, that the first CompItem found is added alternatively the first
Footage item is added.
In case the collection contains only FolderItems, this method will
recurse into the folders until one ItemObject was found.
:param item_collection: object to be analyzed
:type item_collection: `adobe.ItemCollectionObject`_
:param comp_item: comp that should receive the items from item_collection
:type comp_item: `adobe.CompItemObject`_
:returns: Indicating if an item was added or not.
:rtype: bool
"""
# first we generate a list of items within
# the given collection (FolderItem), because
# we have different include-priorities per
# type
folders = []
comps = []
footage = []
for item in self.iter_collection(item_collection):
if self.is_item_of_type(item, self.AdobeItemTypes.FOLDER_ITEM):
folders.append(item)
elif self.is_item_of_type(item, self.AdobeItemTypes.COMP_ITEM):
comps.append(item)
elif self.is_item_of_type(item, self.AdobeItemTypes.FOOTAGE_ITEM):
footage.append(item)
# if there is a comp in the given folder,
# then we prefer to add this to the comp,
# as it is likely to include the other layers
added = False
comp_and_footage = comps + footage
while not added and comp_and_footage:
comp_item = comp_item.layers.add(comp_and_footage.pop(0))
added = True
# in case no footage was added, we recurse into
# the folders until we find something or return None
while not added and folders:
folder = folders.pop(0)
added = self.add_items_to_comp(folder.items, comp_item)
return added
[docs] def render_queue_item(self, queue_item):
"""
renders a given queue_item, by disabling all other queued items
and only enabling the given item. After rendering the state of the
render-queue is reverted.
:param queue_item: The render queue item to be rendered
:type queue_item: `adobe.RenderQueueItemObject`_
:returns: Indicating successful rendering or not
:rtype: bool
"""
# save the queue state for all unrendered items
queue_item_state_cache = [(queue_item, queue_item.render)]
for item in self.iter_collection(self.adobe.app.project.renderQueue.items):
# one cannot change the status on
if item.status != self.adobe.RQItemStatus.QUEUED:
continue
queue_item_state_cache.append((item, item.render))
item.render = False
success = False
queue_item.render = True
self.logger.debug("Start rendering..")
try:
self.adobe.app.project.renderQueue.render()
except Exception as e:
# This catches errors thrown during the AfterEffects Render process.
# Which may return various different errors. This situation never
# occured during development.
self.logger.error(
("Skipping item due to an error " "while rendering: {}").format(e)
)
finally:
acceptable_states = [
self.adobe.RQItemStatus.DONE,
self.adobe.RQItemStatus.ERR_STOPPED,
self.adobe.RQItemStatus.RENDERING,
]
# reverting the original queued state for all
# unprocessed items
while queue_item_state_cache:
item, status = queue_item_state_cache.pop(0)
if item.status not in acceptable_states:
item.render = status
# we check for success if the render queue item status
# has changed to DONE
success = queue_item.status == self.adobe.RQItemStatus.DONE
return success
[docs] def find_sequence_range(self, path):
"""
Parses the file name in an attempt to determine the first and last
frame number of a sequence. This assumes some sort of common convention
for the file names, where the frame number is an integer at the end of
the basename, just ahead of the file extension, such as
``file.0001.jpg``, or ``file_001.jpg``. We also check for input file names with
abstracted frame number tokens, such as ``file.####.jpg`` or ``file.%04d.jpg``.
:param str path: The file path to parse.
:returns: None if no range could be determined, otherwise (min, max)
:rtype: tuple or None
"""
# This pattern will match the following at the end of a string and
# retain the frame number or frame token as group(1) in the resulting
# match object:
#
# 0001
# ####
# @@@@
# [####]
# %04d
#
# The number of digits or hashes does not matter; we match as many as
# exist.
frame_pattern = re.compile(r"(^|[\.\_\- ])[\[]?([0-9#@]+|[%]0\d+d)[\]]?$")
root, ext = os.path.splitext(path)
match = re.search(frame_pattern, root)
# If we did not match, we don't know how to parse the file name, or there
# is no frame number to extract.
if not match:
return None
# We need to get all files that match the pattern from disk so that we
# can determine what the min and max frame number is.
glob_path = "%s%s" % (
re.sub(frame_pattern, r"\1*", root),
ext,
)
files = glob.glob(glob_path)
# Our pattern from above matches against the file root, so we need
# to chop off the extension at the end.
file_roots = [os.path.splitext(f)[0] for f in files]
# We know that the search will result in a match at this point, otherwise
# the glob wouldn't have found the file. We can search and pull group 1
# to get the integer frame number from the file root name.
frames = [int(re.search(frame_pattern, f).group(2)) for f in file_roots]
return (min(frames), max(frames))
############################################################################
# RPC
def _check_connection(self):
"""Make sure we are still connected to the adobe cc product."""
# If we're in a disabled state, then we don't do anything here. This
# is controlled by the heartbeat_disabled context manager provided
# by this engine.
if self._HEARTBEAT_DISABLED:
return
try:
self.adobe.ping()
except Exception:
if self._FAILED_PINGS >= self.SHOTGUN_ADOBE_HEARTBEAT_TOLERANCE:
from sgtk.platform.qt import QtCore
QtCore.QCoreApplication.instance().quit()
else:
self._FAILED_PINGS += 1
else:
self._FAILED_PINGS = 0
# Will allow queued up messages (like logging calls)
# to be handled on the Python end.
self.adobe.process_new_messages()
# We also have a one-time check we need to make after the timer is
# started. In the event that the user opened a document before the
# integration completed its initialization, we need to make sure
# that the context is correct. Since we rely of After Effects sending
# an event on active document change to control our context, if that
# occurred before we were listening then we likely missed it and
# need to make sure we're not in a stale state.
#
# Note: This is occurring in the timer here because the context
# change can only occur once the engine initialization process has
# completed. As such, we can rely on the timer to delay its execution
# until we're in a state where the context change will succeed.
if not self._HAS_CHECKED_CONTEXT_POST_LAUNCH:
if sgtk.platform.current_engine():
self.logger.debug(
"Engine initialization complete -- checking active "
"document context..."
)
active_document_path = self.project_path
if active_document_path:
self.logger.debug(
"There is an active document. Checking to see if a "
"context change is required."
)
self._handle_active_document_change(active_document_path)
else:
self.logger.debug(
"There is no active document, so there is no need to change context."
)
self._HAS_CHECKED_CONTEXT_POST_LAUNCH = True
else:
self.logger.debug(
"Engine initialization has not completed -- waiting to "
"check the active document context..."
)
def _emit_log_message(self, handler, record):
"""
Called by the engine whenever a new log message is available.
All log messages from the toolkit logging namespace will be passed to
this method.
:param handler: Log handler that this message was dispatched from
:type handler: :class:`~python.logging.LogHandler`
:param record: Std python logging record
:type record: :class:`~python.logging.LogRecord`
"""
# If the _adobe attribute is set, then we can forward logging calls
# back to the js process via rpc.
if hasattr(self, "_adobe"):
level = self.PY_TO_JS_LOG_LEVEL_MAPPING[record.levelname]
# log the message back to js via rpc
self.adobe.log_message(level, record.message)
# prior to the _adobe attribute being set, we rely on the js process
# handling stdout and logging it.
else:
# we don't use the handler's format method here because the adobe
# side expects a certain format.
msg_str = "[%s]: %s" % (record.levelname, record.message)
sys.stdout.write(msg_str)
sys.stdout.flush()
def _handle_active_document_change(self, active_document_path):
"""
Gets the active document from the host application, determines which
context it belongs to, and changes to that context.
:param str active_document_path: The path to the new active document.
:returns: True if the context changed, False if it did not.
"""
# If the config says to not change context on active document change,
# then we don't do anything here.
if not self.get_setting("automatic_context_switch"):
self.logger.debug(
"Engine setting automatic_context_switch is false."
"Not changing context."
)
return
# Make sure we have a properly-encoded string for the path. We can
# possibly get a file path/name that contains unicode, and we don't
# want to deal with that later on.
if active_document_path is not None:
active_document_path = sgutils.ensure_str(active_document_path)
# This will be True if the context_changes_disabled context manager is
# used. We're just in a temporary state of not allowing context changes,
# which is useful when an app is doing a lot of After Effects work that
# might be triggering active document changes that we don't want to
# result in PTR context changes.
with self.heartbeat_disabled():
if self._CONTEXT_CHANGES_DISABLED:
self.logger.debug(
"Engine is in 'no context changes' mode. Not changing context."
)
return False
if active_document_path:
self.logger.debug("New active document is %s" % active_document_path)
cached_context = self.__get_from_context_cache(active_document_path)
if cached_context:
context = cached_context
self.logger.debug("Document found in context cache: %r" % context)
else:
try:
context = self.sgtk.context_from_path(
active_document_path,
previous_context=self.context,
)
self.__add_to_context_cache(active_document_path, context)
except Exception:
self.logger.debug(
"Unable to determine context from path. Setting the Project context."
)
# clear the context finding task ids so that any tasks that
# finish won't send data to js.
self.__context_find_uid = None
self.__context_thumb_uid = None
# We go to the project context if this is a file outside of
# PTR control.
if self._PROJECT_CONTEXT is None:
self._PROJECT_CONTEXT = sgtk.Context(
tk=self.context.sgtk,
project=self.context.project,
)
context = self._PROJECT_CONTEXT
if not context.project and not self.context.project:
self.logger.debug(
"New context doesn't have a Project entity. Not changing "
"context."
)
return False
elif not context.project:
context = self.sgtk.context_from_entity(
self.context.project["type"], self.context.project["id"]
)
if context and context != self.context:
self.adobe.context_about_to_change()
sgtk.platform.change_context(context)
return True
return False
def _handle_command(self, uid):
"""
Handles an RPC engine command execution request.
:param int uid: The unique id of the engine command to run.
"""
self.logger.debug("Handling command request for uid: %s" % (uid,))
with self.heartbeat_disabled():
from sgtk.platform.qt import QtGui
if uid == self.__jump_to_fs_command_id:
# jump to fs special command triggered
self._jump_to_fs()
elif uid == self.__jump_to_sg_command_id:
# jump to sg special command triggered
self._jump_to_sg()
else:
# a registered command was triggered
for command in self.commands.values():
if command.get("properties", dict()).get("uid") == uid:
self.logger.debug(
"Executing callback for command: %s" % (command,)
)
result = command["callback"]()
if isinstance(result, QtGui.QWidget):
# if the callback returns a widget
# keep a handle on it
self.__qt_dialogs.append(result)
def _handle_logging(self, level, message):
"""
Handles an RPC logging request.
:param str level: One of "debug", "info", "warning", or "error".
:param str message: The log message.
"""
# manually create a record to log to the standard file handler.
# we format it to match the regular logs, but tack on the '.js' to
# indicate that it came from javascript.
record = logging.makeLogRecord(
{
"levelname": level.upper(),
"name": "%s.js" % (self.logger.name,),
"msg": message,
}
)
# forward this message to the base file handler so that it is logged
# appropriately.
if sgtk.LogManager().base_file_handler:
sgtk.LogManager().base_file_handler.handle(record)
def _run_tests(self):
"""
Runs the test suite for the tk-aftereffects bundle.
"""
# If we don't know what the tests root directory path is
# via the environment, then we shouldn't be here.
try:
tests_root = os.environ["SHOTGUN_ADOBE_TESTS_ROOT"]
except KeyError:
self.logger.error(
"The SHOTGUN_ADOBE_TESTS_ROOT environment variable "
"must be set to the root directory of the tests to be "
"run. Not running tests!"
)
return
else:
# Make sure we can find the run_tests.py file within the root
# that was specified in the environment.
self.logger.debug("Test root path found. Looking for run_tests.py.")
test_module = os.path.join(tests_root, self.TEST_SCRIPT_BASENAME)
if not os.path.exists(test_module):
self.logger.error(
"Unable to find run_tests.py in the directory "
"specified by the SHOTGUN_ADOBE_TESTS_ROOT "
"environment variable. Not running tests!"
)
return
self.logger.debug("Found run_tests.py. Importing to run tests.")
try:
# We need to prepend to sys.path. We'll set it back to
# what it was before once we're done running the tests.
original_sys_path = sys.path
python_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "python")
)
sys.path = [tests_root, python_root] + sys.path
import run_tests
# The run_tests.py module should make available a run_tests
# function. We need to run that, giving it the engine pointer
# so that it can use that for logging purposes.
run_tests.run_tests(self)
except Exception as exc:
# If we got an unhandled exception, then something went very
# wrong in the test suite. We'll just trap that and print it
# as an error without letting it bubble up any farther.
import traceback
self.logger.error(
"Tests raised the following:\n%s" % traceback.format_exc(exc)
)
finally:
# Reset sys.path back to what it was before we started.
sys.path = original_sys_path
############################################################################
# properties
@property
def adobe(self):
"""
The handle to the Adobe RPC API.
"""
return self._adobe
@property
def app_id(self):
"""
The runtime app id. This will be a string -- something like
PHSP for After Effects, or AEFT for After Effect.
"""
return self._SHOTGUN_ADOBE_APPID
@property
def context_change_allowed(self):
"""
Specifies that context changes are allowed by the engine.
"""
return True
############################################################################
# context manager
@contextmanager
def context_changes_disabled(self):
"""
A context manager that disables context changes on enter, and enables
them on exit. This is useful in apps that might be performing operations
that require changes in the active document that don't want to trigger
a context change.
"""
self._CONTEXT_CHANGES_DISABLED = True
yield
self._CONTEXT_CHANGES_DISABLED = False
@contextmanager
def heartbeat_disabled(self):
"""
A context manager that disables the heartbeat and message processing
timer on enter, and restarts it on exit.
"""
try:
self.logger.debug("Pausing heartbeat...")
self._HEARTBEAT_DISABLED = True
except Exception as e:
self.logger.debug("Unable to pause heartbeat as requested.")
self.logger.error(str(e))
else:
self.logger.debug("Heartbeat paused.")
yield
self._HEARTBEAT_DISABLED = False
self.logger.debug("Heartbeat restarted.")
############################################################################
# UI
def _define_qt_base(self):
"""
This will be called at initialisation time and will allow
a user to control various aspects of how QT is being used
by Toolkit. The method should return a dictionary with a number
of specific keys, outlined below.
* qt_core - the QtCore module to use
* qt_gui - the QtGui module to use
* dialog_base - base class for to use for Toolkit's dialog factory
:returns: dict
"""
# Just call the base implementation and monkey patch QMessageBox.
base = super(AfterEffectsEngine, self)._define_qt_base()
if not base:
raise ImportError("Unable to find a QT Python module")
QtCore = base["qt_core"]
QtGui = base["qt_gui"]
# tell QT4 to interpret C strings as utf-8
# note: this will be ignored on QT5 via our shim
utf8 = QtCore.QTextCodec.codecForName("utf-8")
QtCore.QTextCodec.setCodecForCStrings(utf8)
# override message boxes
self._override_qmessagebox(QtGui.QMessageBox)
return base
def _override_qmessagebox(self, q_message_box):
"""
Redefine the method calls for QMessageBox static methods.
These are often called from within apps and because QT is running in a
separate process, they will pop up behind the After Effects window. Wrap
each of these calls in a raise method to activate the QT process.
:param q_message_box: The QMessageBox class to patch.
"""
info_fn = q_message_box.information
critical_fn = q_message_box.critical
question_fn = q_message_box.question
warning_fn = q_message_box.warning
@staticmethod
def _info_wrapper(*args, **kwargs):
self.__activate_python()
return info_fn(*args, **kwargs)
@staticmethod
def _critical_wrapper(*args, **kwargs):
self.__activate_python()
return critical_fn(*args, **kwargs)
@staticmethod
def _question_wrapper(*args, **kwargs):
self.__activate_python()
return question_fn(*args, **kwargs)
@staticmethod
def _warning_wrapper(*args, **kwargs):
self.__activate_python()
return warning_fn(*args, **kwargs)
q_message_box.information = _info_wrapper
q_message_box.critical = _critical_wrapper
q_message_box.question = _question_wrapper
q_message_box.warning = _warning_wrapper
def __check_for_popups(self):
"""
Method will check if a popup dialog from aftereffects was openend
and if so, it will raise the window to the front.
NOTE:
This is only done in windows.
TODO: Implement an equivalent method on mac
"""
if sgtk.util.is_windows():
# to get all dialogs from After Effects, we have to query the
# After Effects process id first. As this code runs inside a
# separate python process, we use a subprocess for this.
if self._AFX_PID == -1:
return
elif self._AFX_PID is None:
# the windows tasklist command will provide the process id
# of the After Effects executable. As there is always only one
# instance of After Effects, this should be safe to determine the
# process id.
pid_query_process = subprocess.Popen(
[
"tasklist",
"/FI",
"ImageName eq AfterFX.exe",
"/FO",
"CSV",
"/NH",
],
stdout=subprocess.PIPE,
)
out_string, _ = pid_query_process.communicate()
# The out_string will look like:
# "AfterFX.ext","1234","SessionName","SessionNum","MemoryUsage"
# where 1234 describes the pid of After Effects
match = re.match(b"[^0-9]+([0-9]+).*", out_string, re.DOTALL)
if not match:
self._AFX_PID = -1
self._AFX_PID = int(match.group(1))
# with the process id of After Effects, we can get all HWNDS that point to
# dialog classes.
hwnds = self.__tk_aftereffects.win_32_api.find_windows(
process_id=self._AFX_PID,
class_name=self._AFX_WIN32_DIALOG_WINDOW_CLASS,
stop_if_found=True,
)
# To avoid raising a dialog, that we raised before,
# we will compare the list of visible dialog-hwnds with the list
# that we already raised.
# As we cannot compare the hwnd-pointers directly, we need to compare
# the integer value (hwnd long) of the hwnd pointers.
# we build a dict that maps the hwnd longs to the current hwnd pointer
all_hwnds = {}
GetWindow = self.__tk_aftereffects.win_32_api.ctypes.windll.user32.GetWindow
for hwnd in hwnds:
# GetWindow with GW_CHILD is used to get the hwnd long that identifies the child window
# at the top of the Z order, of the specified parent window pointer.
all_hwnds[GetWindow(hwnd, self.__WIN32_GW_CHILD)] = hwnd
# by comparing the cached hwnd longs with the current list of hwnd longs,
# we find out which hwnds are actually pointing to new (unraised) dialogs.
new_hwnds = set(all_hwnds.keys()) - (self._POPUP_CACHE or set([]))
# in case there is a new dialog, we bring it to front and update the cache
for hwnd_long in new_hwnds:
self.__tk_aftereffects.win_32_api.bring_to_front(all_hwnds[hwnd_long])
self._POPUP_CACHE = set(all_hwnds.keys())
return
# in case there are open dialogs, but we already raised them,
# we do nothing.
if all_hwnds:
return
# In case there is no open dialog, we will reset the cache to None
self._POPUP_CACHE = None
def _win32_get_aftereffects_main_hwnd(self):
"""
Windows specific method to find the main After Effects window
handle (HWND)
"""
if not self._WIN32_AFTEREFFECTS_MAIN_HWND:
for major in sorted(self.__CC_VERSION_MAPPING.keys()):
for minor in range(10):
found_hwnds = self.__tk_aftereffects.win_32_api.find_windows(
class_name="AE_CApplication_{}.{}".format(major, minor),
stop_if_found=True,
)
if found_hwnds:
self._WIN32_AFTEREFFECTS_MAIN_HWND = found_hwnds[0]
return self._WIN32_AFTEREFFECTS_MAIN_HWND
return self._WIN32_AFTEREFFECTS_MAIN_HWND
def _win32_get_proxy_window(self):
"""
Windows-specific method to get the proxy window that will 'own' all
Toolkit dialogs. This will be parented to the main After Effects
application.
:returns: A QWidget that has been parented to After Effects's window.
"""
# Get the main After Effects window:
ps_hwnd = self._win32_get_aftereffects_main_hwnd()
win32_proxy_win = None
proxy_win_hwnd = None
if ps_hwnd:
from sgtk.platform.qt import QtGui, QtCore
# Create the proxy QWidget.
win32_proxy_win = QtGui.QWidget()
window_title = "Flow Production Tracking Parent Widget {0}".format(
uuid.uuid4().hex
)
win32_proxy_win.setWindowTitle(window_title)
# With PySide2, we're required to look up our proxy parent
# widget's HWND the hard way, following the same logic used
# to find After Effects's main window. To do that, we actually have
# to show our widget so that Windows knows about it. We can make
# it effectively invisible if we zero out its size, so we do that,
# show the widget, and then look up its HWND by window title before
# hiding it.
win32_proxy_win.setGeometry(0, 0, 0, 0)
win32_proxy_win.show()
try:
proxy_win_hwnd_found = self.__tk_aftereffects.win_32_api.find_windows(
stop_if_found=True, window_text=window_title
)
finally:
win32_proxy_win.hide()
if proxy_win_hwnd_found:
proxy_win_hwnd = proxy_win_hwnd_found[0]
else:
self.logger.debug(
"Unable to determine the HWND of After Effects itself. This means "
"that we can't properly setup window parenting for Toolkit apps."
)
# Parent to the After Effects application window if we found everything
# we needed. If we didn't find our proxy window for some reason, we
# will return None below. In that case, we'll just end up with no
# window parenting, but apps will still launch.
if proxy_win_hwnd is None:
self.logger.warning(
"Unable setup window parenting properly. Dialogs shown will "
"not be parented to After Effects, but they will still function "
"properly otherwise."
)
else:
# Set the window style/flags. We don't need or want our Python
# dialogs to notify the After Effects application window when they're
# opened or closed, so we'll disable that behavior.
win_ex_style = self.__tk_aftereffects.win_32_api.GetWindowLong(
proxy_win_hwnd,
self.__tk_aftereffects.win_32_api.GWL_EXSTYLE,
)
self.__tk_aftereffects.win_32_api.SetWindowLong(
proxy_win_hwnd,
self.__tk_aftereffects.win_32_api.GWL_EXSTYLE,
win_ex_style | self.__tk_aftereffects.win_32_api.WS_EX_NOPARENTNOTIFY,
)
self.__tk_aftereffects.win_32_api.SetParent(proxy_win_hwnd, ps_hwnd)
self._PROXY_WIN_HWND = proxy_win_hwnd
return win32_proxy_win
def _get_dialog_parent(self):
"""
Get the QWidget parent for all dialogs created through
show_dialog & show_modal.
"""
"""
Get the QWidget parent for all dialogs created through
show_dialog & show_modal.
"""
# determine the parent widget to use:
from sgtk.platform.qt import QtGui
if not self._DIALOG_PARENT:
if sgtk.util.is_windows():
# for windows, we create a proxy window parented to the
# main application window that we can then set as the owner
# for all Toolkit dialogs
self._DIALOG_PARENT = self._win32_get_proxy_window()
else:
self._DIALOG_PARENT = QtGui.QApplication.activeWindow()
return self._DIALOG_PARENT
def show_dialog(self, title, bundle, widget_class, *args, **kwargs):
"""
Shows a non-modal dialog window in a way suitable for this engine.
The engine will attempt to parent the dialog nicely to the host
application.
:param title: The title of the window
:param bundle: The app, engine or framework object that is associated
with this window
:param widget_class: The class of the UI to be constructed. This must
derive from QWidget.
Additional parameters specified will be passed through to the
widget_class constructor.
:returns: the created widget_class instance
"""
if not self.has_ui:
self.logger.error(
"Sorry, this environment does not support UI display! Cannot "
"show the requested window '%s'." % title
)
return None
# create the dialog:
dialog, widget = self._create_dialog_with_widget(
title, bundle, widget_class, *args, **kwargs
)
# Note - the base engine implementation will try to clean up
# dialogs and widgets after they've been closed. However this
# can cause a crash in After Effects as the system may try to send
# an event after the dialog has been deleted.
# Keeping track of all dialogs will ensure this doesn't happen
self.__qt_dialogs.append(dialog)
# make python active if possible
self.__activate_python()
# make sure the window raised so it doesn't
# appear behind the main After Effects window
self.logger.debug("Showing dialog: %s" % (title,))
dialog.show()
dialog.raise_()
dialog.activateWindow()
return widget
def show_modal(self, title, bundle, widget_class, *args, **kwargs):
"""
Shows a modal dialog window in a way suitable for this engine. The
engine will attempt to integrate it as seamlessly as possible into the
host application. This call is blocking
until the user closes the dialog.
:param title: The title of the window
:param bundle: The app, engine or framework object that is associated
with this window
:param widget_class: The class of the UI to be constructed. This must
derive from QWidget. Additional parameters specified will be passed
through to the widget_class constructor.
:returns: (a standard QT dialog status return code, the created
widget_class instance)
"""
if not self.has_ui:
self.logger.error(
"Sorry, this environment does not support UI display! Cannot "
"show the requested window '%s'." % title
)
return
# create the dialog:
dialog, widget = self._create_dialog_with_widget(
title, bundle, widget_class, *args, **kwargs
)
# Note - the base engine implementation will try to clean up
# dialogs and widgets after they've been closed. However this
# can cause a crash in After Effects as the system may try to send
# an event after the dialog has been deleted.
# Keeping track of all dialogs will ensure this doesn't happen
self.__qt_dialogs.append(dialog)
# make python active if possible
self.__activate_python()
self.logger.debug("Showing modal: %s" % (title,))
dialog.raise_()
dialog.activateWindow()
status = dialog.exec_()
return status, widget
############################################################################
# internal methods
def __get_command_uid(self):
"""
Returns a guaranteed unique command id.
"""
with self._LOCK:
self._COMMAND_UID_COUNTER += 1
return self._COMMAND_UID_COUNTER
def __get_icon_path(self, properties):
"""
Processes the command properties dictionary to find the most appropriate
icon path.
This code looks for an `icons` dictionary of the following form::
{
"dark": {
"png": "/path/to/dark_icon.png",
"svg": "/path/to/dark_icon.svg"
},
"light": {
"png": "/path/to/light_icon.png",
"svg": "/path/to/light_icon.svg"
}
}
For Adobe, the preference is dark png then light png
If neither of these is found, fall back to the standard
`properties["icon"]`.
"""
icon_path = None
icons = properties.get("icons")
if icons:
dark = icons.get("dark")
light = icons.get("light")
# check for dark icon
if dark:
icon_path = dark.get("png", icon_path)
# if no dark icon, check the light:
if not icon_path and light:
icon_path = dark.get("png", icon_path)
# still no icon path, fall back to regular icon
if not icon_path:
icon_path = properties.get("icon")
return icon_path
def __send_state(self):
"""
Sends information back to javascript representing the current context.
"""
# alert js that the state is about to change. this allows the panel to
# clear its current state and display a loading message.
self.adobe.context_about_to_change()
# ---- process the context for display
# clear existing context requests to prevent unnecessary processing
self.__context_find_uid = None
self.__context_thumb_uid = None
self.__sg_data.clear()
# determine the best entity to show for the current context
context_entity = self.__get_context_entity()
# this will inspect the context and do any additional queries for fields
# that are required to show it
self.__request_context_display(context_entity)
# ---- the engine already has access to all the commands that need to
# be display for the current context. so go ahead and process those
# and send them back separately
# first, we'll process the menu favorites
fav_lookup = {}
fav_index = 0
# create a lookup of the combined app instance name with the display
# name. that should be unique and provide an easy lookup to match
# against. we'll remember the order processed in order to sort our
# favorites list once all the registered commands are processed
for fav_command in self.get_setting("shelf_favorites"):
app_instance_name = fav_command["app_instance"]
display_name = fav_command["name"]
# build unique lookup for this combo of app instance and command
fav_id = app_instance_name + display_name
fav_lookup[fav_id] = fav_index
# give it an index so that we can sort and maintain order later
fav_index += 1
# keep a list of each type of command since they'll be displayed
# differently on the adobe side.
favorites = []
context_menu_cmds = []
commands = []
# iterate over all the registered commands and gather the necessary info
# to display them in adobe
for command_name, command_info in self.commands.items():
# commands come with a dict of properties that may or may not
# contain certain data.
properties = command_info.get("properties", {})
# ---- determine the app's instance name
app_instance = properties.get("app", None)
app_name = None
# check this command's app against the engine's apps.
if app_instance:
for app_instance_name, app_instance_obj in self.apps.items():
if app_instance_obj == app_instance:
app_name = app_instance_name
cmd_type = properties.get("type", "default")
# create the command dict to hand over to adobe
command = dict(
uid=properties.get("uid"),
display_name=command_name,
icon_path=self.__get_icon_path(properties),
description=properties.get("description"),
type=properties.get("type", "default"),
)
# build the lookup string to see if this app is a favorite
fav_name = str(app_name) + command_name
if cmd_type == "context_menu":
# these commands will show up in the panel flyout menu
context_menu_cmds.append(command)
elif fav_name in fav_lookup:
# add the fav index to the command so that we can sort after
# all favorites are identified.
command["fav_index"] = fav_lookup[fav_name]
favorites.append(command)
else:
commands.append(command)
# ---- include the "jump to" commands that are common to all engines
jump_commands = []
# the icon to use for the command. bundled with the engine
sg_icon = os.path.join(self.disk_location, "resources", "shotgun_logo.png")
jump_commands.append(
dict(
uid=self.__jump_to_sg_command_id,
display_name="Jump to Flow Production Tracking",
icon_path=sg_icon,
description="Open the current context in a web browser.",
type="context_menu",
)
)
if self.context.filesystem_locations:
# the icon to use for the command. bundled with the engine
fs_icon = os.path.join(
self.disk_location, "resources", "shotgun_folder.png"
)
jump_commands.append(
dict(
uid=self.__jump_to_fs_command_id,
display_name="Jump to File System",
icon_path=fs_icon,
description="Open the current context in a file browser.",
type="context_menu",
)
)
# sort the favorites based on their index
favorites = sorted(favorites, key=lambda d: d["fav_index"])
# sort the other commands alphabetically by display name
commands = sorted(commands, key=lambda d: d["display_name"])
# sort the context menu commands alphabetically by display name. We
# force the Jump to Shotgun and Jump to Filesystem commands onto the
# front of the list to match other integrations.
context_menu_cmds = jump_commands + sorted(
context_menu_cmds,
key=lambda d: d["display_name"],
)
# ---- populate the state structure to hand over to adobe
all_commands = {
"favorites": favorites,
"commands": commands,
"context_menu_cmds": context_menu_cmds,
}
# send the commands back to adobe
self.adobe.send_commands(all_commands)
def __setup_connection_timer(self, force=False):
"""
Sets up the connection timer that handles monitoring of the live
connection, as well as the triggering of message processing.
:param bool force: Forces the creation of a new connection timer,
even if one already exists. When this occurs,
if a timer already exists, it will be stopped.
"""
if self._CHECK_CONNECTION_TIMER is None or force:
self.logger.debug("Creating connection timer...")
if self._CHECK_CONNECTION_TIMER:
self.logger.debug(
"Connection timer already exists, so it will be stopped."
)
try:
self._CHECK_CONNECTION_TIMER.stop()
except Exception:
# No reason to be alarmed here. Just let it go and it'll
# garbage collected when appropriate.
pass
from sgtk.platform.qt import QtCore
timer = QtCore.QTimer(
parent=QtCore.QCoreApplication.instance(),
)
timer.timeout.connect(self._check_connection)
timer.timeout.connect(self.__check_for_popups)
# The class variable is in seconds, so multiply to get milliseconds.
timer.start(
self.SHOTGUN_ADOBE_HEARTBEAT_INTERVAL * 1000.0,
)
self._CHECK_CONNECTION_TIMER = timer
self.logger.debug("Connection timer created and started.")
def _jump_to_sg(self):
"""
Jump to shotgun, launch web browser
"""
from sgtk.platform.qt import QtGui, QtCore
url = self.context.shotgun_url
QtGui.QDesktopServices.openUrl(QtCore.QUrl(url))
def _jump_to_fs(self):
"""
Jump from context to FS
"""
# launch one window for each location on disk
paths = self.context.filesystem_locations
self.logger.debug("FS paths: %s" % (str(paths),))
for disk_location in paths:
# run the app
if sgtk.util.is_macos():
cmd = 'open "%s"' % disk_location
elif sgtk.util.is_windows():
cmd = 'cmd.exe /C start "Folder" "%s"' % disk_location
elif sgtk.util.is_linux():
cmd = 'xdg-open "%s"' % disk_location
else:
raise Exception("Platform '%s' is not supported." % sys.platform)
exit_code = os.system(cmd)
if exit_code != 0:
self.logger.error("Failed to launch '%s'!" % cmd)
##########################################################################################
# context data methods
def __add_to_context_cache(self, path, context):
"""
Adds the given active document path to the context cache, associating
it with the given context object. This will trigger the storing of a
serialized cache as a user setting for use during panel extension
restarts.
:param str path: The document path to add to the cache.
:param context: The context object to associate with the document.
"""
if path not in self._CONTEXT_CACHE:
# We're storing the context cache in a sgtk user setting at the project
# level. This will ensure that when we read the cache back, we'll only
# be getting contexts in our current project. Anything outside of that
# scope would be unusable, as we don't allow context changing across
# project boundaries.
self._CONTEXT_CACHE[path] = context
serial_cache = dict()
for k, v in self._CONTEXT_CACHE.items():
serial_cache[k] = v.serialize()
self.logger.debug("Storing context cache: %s" % serial_cache)
self.__settings_manager.store(
self._CONTEXT_CACHE_KEY,
serial_cache,
self.__settings_manager.SCOPE_PROJECT,
)
def __get_from_context_cache(self, path):
"""
Gets the document path's associated context object, if one has been cached.
:returns: Context object, or None
"""
return self._CONTEXT_CACHE.get(path)
def __request_context_display(self, entity):
"""
Request fields to show in the context header for the given entity.
Always includes the image url to display a thumbnail.
"""
if not entity:
# no entity. this will retrieve the html to display for the site.
fields_html = self.execute_hook_method(
"context_fields_display_hook",
"get_context_html",
entity=None,
sg_globals=self.__shotgun_globals,
)
self.adobe.send_context_display(fields_html)
# go ahead and forward the site thumbnail back to js
data = dict(
thumb_path="../images/default_Site_thumb_dark.png",
url=self.sgtk.shotgun_url,
)
self.adobe.send_context_thumbnail(data)
return
# get the fields to query from the hook
fields = self.execute_hook_method(
"context_fields_display_hook",
"get_entity_fields",
entity_type=entity["type"],
)
# always try to query the image for the entity
if "image" not in fields:
fields.append("image")
entity_type = entity["type"]
entity_id = entity["id"]
# kick off an async request to query the necessary fields
self.__context_find_uid = self.__sg_data.execute_find_one(
entity_type, [["id", "is", entity_id]], fields
)
def __on_worker_failure(self, uid, msg):
"""
Asynchronous callback - the worker thread errored.
"""
# log a message if the worker failed to retrieve the necessary info.
if uid == self.__context_find_uid:
# clear the find id since we are now processing it
self.__context_find_uid = None
# send an error message back to the context header.
self.adobe.send_context_display(
"""
There was an error retrieving fields for this context. Please
see the logs for the specific error message. If this is a
recurring error and you need further assistance, please
contact our support team via <a href='{url}'>
{url}</a>.
""".format(
url=sgtk.support_url
)
)
self.logger.error("Failed to query context fields: %s" % (msg,))
elif uid == self.__context_thumb_uid:
# clear the thumb id since we are now processing it
self.__context_thumb_uid = None
# log this. the panel will display a default thumbnail, so this
# should be sufficient
self.logger.error("Failed to query context thumbnail: %s" % (msg,))
def __on_worker_signal(self, uid, request_type, data):
"""
Signaled whenever the worker completes something.
"""
self.logger.debug("Worker signal: %s" % (data,))
# the find query for the context entity with the specified fields
if uid == self.__context_find_uid:
# clear the find id since we are now processing it
self.__context_find_uid = None
context_entity = data["sg"]
# should have an image url now. submit a request to download the
# entity's thumbnail.
if "image" in context_entity and context_entity["image"]:
self.__context_thumb_uid = self.__sg_data.request_thumbnail(
context_entity["image"],
context_entity["type"],
context_entity["id"],
"image",
load_image=False,
)
# no image, use a default image based on the entity type
else:
if context_entity["type"] in ["Asset", "Project", "Shot", "Task"]:
thumb_path = "../images/default_%s_thumb_dark.png" % (
context_entity["type"]
)
data["thumb_path"] = thumb_path
else:
thumb_path = "../images/default_Entity_thumb_dark.png"
data = dict(
thumb_path=thumb_path,
url=self.get_entity_url(context_entity),
)
self.adobe.send_context_thumbnail(data)
# now that we have all the field values, go back to the hook and
# build the html to display them.
fields_html = self.execute_hook_method(
"context_fields_display_hook",
"get_context_html",
entity=context_entity,
sg_globals=self.__shotgun_globals,
)
# forward the display html back to the js panel
self.adobe.send_context_display(fields_html)
# thumbnail download. forward the path and a url back to js
elif uid == self.__context_thumb_uid:
# clear the thumb id since we already processed it
self.__context_thumb_uid = None
context_entity = self.__get_context_entity()
# add a url to allow the panel to make the thumbnail clickable
data["url"] = self.get_entity_url(context_entity)
self.adobe.send_context_thumbnail(data)
def __get_project_id(self):
"""Helper method to return the project id for the current context."""
if self.context.project:
return self.context.project["id"]
else:
return None
def __get_context_entity(self):
"""Helper method to return an entity to display for current context."""
context = self.context
# determine the best entity for displaying the thumbnail. just return
# the first of task, entity, project that is defined
for entity in [context.task, context.entity, context.project]:
if not entity:
continue
return entity
def get_entity_url(self, entity):
"""Helper method to return a PTR url for the supplied entity."""
return "%s/detail/%s/%d" % (self.sgtk.shotgun_url, entity["type"], entity["id"])
def get_panel_link(self, url, text):
"""
Helper method to return an html link to display in the panel and
will launch the supplied url in the default browser.
"""
return """
<a
href='#'
class='sg_value_link'
onclick='sg_panel.Panel.open_external_url("{url}")'
>{text}</a>
""".format(
url=url,
text=text,
)
def __activate_python(self):
"""
Do the Os-specific thing to show this process above all others.
"""
if sgtk.util.is_macos():
# a little action script to activate the given python process.
osx_activate_script = """
tell application "System Events"
set frontmost of the first process whose unix id is {pid} to true
end tell
""".format(
pid=os.getpid()
)
# force this python process to the front
cmd = ["osascript", "-e", osx_activate_script]
status = subprocess.call(cmd)
if status:
self.logger.error("Could not activate python.")
elif sgtk.util.is_windows():
pass
def __import_file(self, import_options):
"""
Does the import and gets all new items, by caching the list of existing
items before importing and comparing them to the list of existing items
after importing.
:param import_options: Import options object that should be imported
:type import_options: `adobe.ImportOptionsObject`_
:returns: All new items that were imported
:rtype: list-of-`adobe.CompItemObject`_
"""
# as the return value of importFile will not
# reliably return the new items, one
# has to track the changes
item_cache = []
for item in self.iter_collection(self.adobe.app.project.rootFolder.items):
item_cache.append(item)
# do the import
self.adobe.app.project.importFile(import_options)
new_items = []
for item in self.iter_collection(self.adobe.app.project.rootFolder.items):
if item not in item_cache:
new_items.append(item)
return new_items