Source code for qmaster

#!/usr/bin/env python
# -*- coding: utf-8 -*-

##  Transmogrifier: qmaster
##  A Python interface for submitting and monitoring compressor/qmaster jobs.
##  Written by: Beau Hunter
##  318 Inc 01/2011
##  Copyright © 2009-2011 Beau Hunter, 318 Inc.
##  This file is part of Transmogrifier.
##  Transmogrifier is free software: you can redistribute it and/or modify
##  it under the terms of version 3 the GNU General Public License as published
##  by the Free Software Foundation, either version 3 of the License, or
##  (at your option) any later version.
##  Transmogrifier is distributed in the hope that it will be useful,
##  but WITHOUT ANY WARRANTY; without even the implied warranty of
##  GNU General Public License for more details.
##  You should have received a copy of the GNU General Public License
##  along with Transmogrifier. If not, see <>.

import HTMLParser
import os
import re
import sys
import datetime
import signal
import subprocess
import time

#import createDailyReel

## init our vars
version = '1.0b'
build = '2011011202'

global debug
debug = False

######################### START FUNCTIONS ###############################

######################### END FUNCTIONS #################################

[docs]class batch(): '''Our main qmaster batch class, used to submit and monitor qmaster batches''' ## Init vars sourceFile = '' destinationFile = '' destinationDirectory = '' batchID = '' compressorCluster = 'FCS Cluster' compressorPriority = 'low' compressorSettingFile = '/Applications/ 8-bit .setting' compressorSubmissionTimeout = 600 localCompressorFallback = True ## Fall back to 'This Computer' if specified cluster fails. transcodeSuffix = '.mov' ## suffix of format to be transcoded into. monitorSleeptime = 30 ## Sleep time between checking transcode status batchMonitorPath = '/Applications/Utilities/Batch Monitor' compressorPath = '/Applications/' ## Logging vars log = [] lastError = "" lastMSG = "" debug = False keepFiles = False isError = False logOffset = 1 printLogs = False printLogDate = True printClassInLog = False def __init__(self,sourceFile='',destinationFile='',compressorCluster='', compressorPriority='',compressorSettingFile='',batchID='',batchName=''): self.batchID = '' self.batchName = '' self.sourceFile = '' self.destinationFile = '' self.destinationDirectory = '' self.compressorCluster = 'This Computer' self.compressorSettingFile = '/Applications/ 8-bit .setting' self.compressorPriority = 'low' self.compressorSubmissionTimeout = 600 self.localCompressorFallback = True self.monitorSleeptime = 30 if sourceFile: self.sourceFile = sourceFile if destinationFile: self.destinationFile = destinationFile if compressorCluster: self.compressorCluster = compressorCluster if compressorPriority: self.compressorPriority = compressorPriority if compressorSettingFile: self.compressorSettingFile = compressorSettingFile if batchID: self.batchID = batchID if batchName: self.batchName = batchName ## Logging vars self.log = [] self.lastError = '' self.lastMSG = '' self.debug = False self.keepFiles = False self.isError = False self.logOffset = 1 self.printLogs = True self.printLogDate = True self.printClassInLog = False
[docs] def submitBatch(self): '''Submits the currently provided batch to compressor/qmaster for transcoding''' ## sanity checks: if not self.sourceFile: raise SourceFileError('No source file was provided!') if not os.path.isfile(self.sourceFile): raise SourceFileError('No source file was present at path: \'%s\'!' % self.sourceFile ) ## If no destination file was provided, check for destination directory, ## if it was provided, determine the new filename and suffix if not self.destinationFile: if self.destinationDirectory: baseFileName = os.path.splitext(os.path.basename(self.sourceFile))[0] self.destinationFile = os.path.join(self.destinationDirectory,"%s%s" % (baseFileName,self.transcodeSuffix)) else: raise DestinationError('No destination directory or file path has been provided!') if not self.batchName: self.batchName = "%s Transcode" % os.path.basename(self.sourceFile) self.logger('Submitting job to Compressor - Transcoding file: "%s" to ' 'destination: "%s"' % (self.sourceFile,self.destinationFile),'detailed') submitCMDString = ('%s -clustername "%s" -batchname "%s" -priority "%s"' ' -jobpath \"%s\" -settingpath \"%s\" -destinationpath \"%s\"' % (self.compressorPath,self.compressorCluster,self.batchName, self.compressorPriority,self.sourceFile,self.compressorSettingFile, self.destinationFile)) self.logger('Submission syntax: %s' % submitCMDString,'debug') ## Set our timeout # Set the signal handler and a 5-second alarm signal.signal(signal.SIGALRM, self.qmasterTimeoutHandler) signal.alarm(self.compressorSubmissionTimeout) try: submitCMD = subprocess.Popen(submitCMDString,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,universal_newlines=True) cmd_STDOUT,cmd_STDERR = submitCMD.communicate() ## Throw an error if non-zero return code from compressor if not submitCMD.returncode == 0: raise QmasterSubmissionError(error=cmd_STDERR,retCode=submitCMD.returncode) except QmasterSubmissionTimeoutError: ## If we are Python 2.5, use os to kill the process. versionInfo = sys.version_info if versionInfo[0] == 2 and versionInfo[1] <= 5: os.kill(, signal.SIGKILL) else: submitCMD.kill() raise QmasterSubmissionTimeoutError() ## Cancel our timeout signal.alarm(0) if not cmd_STDERR: raise QmasterNullDataError() ## Parse our submission output, fetch our batchID try: searchResults ='.*<jobID (.*) />.*<batchID (.*) />.*',cmd_STDERR) self.batchID = searchResults.groups()[1] except: ## If batchID couldn't be fetched, check for a submission error try: searchResults ='.*Submission Error: (.*)',cmd_STDERR) errorString = searchResults.groups()[0] raise QmasterSubmissionError(error=errorString) except: ## If submission error string could not be parsed, search for cluster down try: searchResults ='.*(Could not find cluster "(.*)").*',cmd_STDERR) errorString = searchResults.groups()[0] ## If we aren't submitting to local instance, and local fallback is on ## retry with 'This Computer' if not self.compressorCluster == 'This Computer' and self.localCompressorFallback: self.logger('Could not find cluster: "%s", falling back to ' '"This Computer"' % self.compressorCluster,'error') self.compressorCluster = 'This Computer' return self.submitBatch() else: raise QmasterClusterNotFound(clusterName = self.compressorCluster) except: ## Report a generic error raise QmasterSubmissionError(error=('An unknown error occurred ' 'while submitting file: %s! ' % os.path.basename(self.sourceFile))) return True
[docs] def submitBatchAndWait(self): '''This method will submit a running job and will only return once the job has completed successfully or failed entirely''' transcodeDidTimeout = False transcodeStatus = 'Processing' self.submitBatch() transcodeStatus = self.getStatus() while (transcodeStatus == 'Processing' or transcodeStatus == 'Waiting' or transcodeStatus == 'Waiting ' or transcodeStatus == 'PostProcessing' or transcodeStatus == 'Hold'): time.sleep(self.monitorSleeptime) transcodeStatus = self.getStatus() if transcodeStatus == 'Cancelled': raise QmasterJobCancelled(fileName=os.path.basename(self.sourceFile)) elif not transcodeStatus == 'Successful': raise QmasterJobFailed(status=transcodeStatus,fileName=os.path.basename(self.sourceFile))
[docs] def getStatus(self): '''Returns the status of a running job''' ## Sanity checks if not self.batchID: raise SyntaxError(error='Could not check status: no batch ID was provided!') statusCMDString = ('"%s" -clustername "%s" -batchid "%s" -query 0' % (self.batchMonitorPath,self.compressorCluster,self.batchID)) self.logger('Checking batch status using syntax: \'%s\'' % statusCMDString,'debug') statusCMD = subprocess.Popen(statusCMDString,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) cmd_STDOUT,cmd_STDERR = statusCMD.communicate() ## Batch Monitor returns malformed markup, we have to modify ## the closing tag to parse this properly data=cmd_STDOUT.replace('/batchStatus','/').replace('/jobStatus','/') batchmonitor = batchmonitorParser(data=data) return batchmonitor.status
def qmasterTimeoutHandler(signum,frame): raise QmasterSubmissionTimeoutError
[docs] def logger(self, logMSG, logLevel='normal'): '''(very) Basic Logging Function, we'll probably migrate to msg module''' if logLevel == 'error' or logLevel == 'normal' or self.debug: i = 0 headerText = '' while i < self.logOffset: headerText+=' ' i+=1 if logLevel.lower() == 'error': headerText = ' ERROR : %s' % headerText elif logLevel.lower() == 'debug': headerText = ' DEBUG : %s' % headerText elif logLevel.lower() == 'warning': headerText = 'WARNING : %s' % headerText elif logLevel.lower() == 'detailed': headerText = ' DETAIL : %s' % headerText else: headerText = ' INFO : %s' % headerText if self.printLogDate: dateString ="%b %d %H:%M:%S") headerText = "%s: %s" % (dateString,headerText) if self.printLogs: if self.printClassInLog: print '%s%s: %s' % (headerText,self.__class__.__name__,logMSG) else: print '%s%s' % (headerText,logMSG) sys.stdout.flush() self.lastMSG = logMSG if logLevel == 'error': self.lastError = logMSG self.log.append({'logLevel' : logLevel, 'logMSG' : logMSG})
[docs] def logs(self,logLevel=''): '''Returns an array of logs matching logLevel''' returnedLogs = [] logs = self.log for log in logs: if logLevel and logLevel.lower() == log['logLevel'].lower(): returnedLogs.append(log)
[docs] def lastError(self): '''Returns last error''' errorLogs = self.logs('error') return errorLogs[len(errorLogs)] ######################### MAIN SCRIPT START #############################
[docs]def main(): '''Our main function, filters passed arguments and loads the appropriate object''' ## Init vars sourceFile = "" filePath = "" afvNumber = "" assetID = "" ## Path to directory to save Kyte downloads compressorSubmissionTimeout = 900 ## Whether we should relocate failed media files to 'failedPath' relocateFailedMediaFiles = True ##errorPath = '/Volumes/FCSProxy/FCSSupport/XMLout_Kyte_Error' ## Set whether we keep files: if set to False, XML files will be deleted ## after an 'update' action fires. This only applies to update actions: ## 'download' actions will never remove the XML file. keepFiles = True addToDailyReel = False action = 'download' global debug ## Get our flags try: optlist, list = getopt.getopt(sys.argv[1:],':hvdu:f::',['help', 'version', 'file=', 'download', 'downloadAndTranscode', 'deleteFiles', 'addToDailyReel', 'updateAssetWithID=']) except getopt.GetoptError: print "Syntax Error!" helpMessage() return 1 ## If no options are passed, output help if len(optlist) == 0: printVersionInfo() helpMessage() return 1 #### PROCESS OUR PASSED ARGUMENTS #### for opt in optlist: if opt[0] == '-f' or opt[0] == '--file': filePath = opt[1] elif opt[0] == '-h' or opt[0] == '--help': helpMessage() return 0 elif opt[0] == '-v' or opt[0] == '--version': printVersionInfo() return 0 elif opt[0] == '-d' or opt[0] == '--download': action = 'download' elif opt[0] == '--downloadAndTranscode': action = 'downloadAndTranscode' elif opt[0] == '--deleteFiles': keepFiles = False elif opt[0] == '--addToDailyReel': addToDailyReel = True elif opt[0] == '-u' or opt[0] == '--updateAssetWithID': action = 'update' assetID = opt[1] ## If we called this file directly call main()
if __name__ == '__main__': sys.exit(main()) class SourceFileError(Exception): def __init__(self, error): self.error = error def __str__(self): return repr(self.error) class DestinationError(Exception): def __init__(self, error): self.error = error def __str__(self): return repr(self.error) class QmasterClusterNotFound(Exception): def __init__(self, clusterName =''): self.clusterName = clusterName def __str__(self): return repr("Could not find cluster: %s" % (self.clusterName)) class QmasterSubmissionError(Exception): def __init__(self, error,retCode = ''): self.error = error self.retCode = retCode def __str__(self): errorString = 'Submission error: %s ' % self.error if self.retCode: errorString += 'Qmaster exited with non-zero return code: %s' % self.retCode return repr(errorString) class QmasterJobCancelled(Exception): def __init__(self,fileName=''): self.fileName = fileName def __str__(self): if self.fileName: return repr('Processing for file: %s was cancelled by user!' % self.fileName) else: return repr('Processing was cancelled by user!') class QmasterJobFailed(Exception): def __init__(self,fileName='',status=''): self.fileName = fileName self.status = status def __str__(self): returnString = 'Job failed ' if self.fileName: returnString += 'for file: %s ' % self.fileName if self.status: returnString += ' with status: \'%s\' ' % self.status return repr(returnString) class QmasterSubmissionTimeoutError(Exception): pass class SyntaxError(Exception): def __init__(self, error): self.error = error def __str__(self): return repr(self.error) class JobTimeOutError(Exception): def __init__(self, error): self.error = error def __str__(self): return repr(self.error)
[docs]class batchmonitorParser(HTMLParser.HTMLParser): '''HTMLParser subclass to parse batch monitor output''' status = '' debug = False def __init__(self, data): self.status = '' self.debug = False HTMLParser.HTMLParser.__init__(self) if self.debug: print 'Parsing data: %s' % data self.feed(data) if self.debug: print "Final Status: %s" % self.status def handle_startendtag(self,tag,attrs): if tag == 'batchstatus': if self.debug: print 'Found batchStatus tag: => %s' % (attrs) for key,value in attrs: if key == 'status': if self.debug: print 'Setting new status: \'%s\'' % value self.status = value