1 from __future__
import division, absolute_import
6 from opscore.actor
import ActorDispatcher
8 from RO.Comm.TCPConnection
import TCPConnection
9 from twistedActor
import CommandError, ScriptRunner, BaseWrapper
15 """Implement the Queue command
17 @param[in] tccActor tcc actor
18 @param[in,out] userCmd a twistedActor BaseCommand with parseCmd attribute
22 cmdParam = userCmd.parsedCmd.paramDict[
"command"].valueList[0]
23 cmdName = cmdParam.keyword.lower()
25 if _CurrJob
and _CurrJob.isRunning:
26 raise CommandError(
"%s is already running" % (_CurrJob.scriptRunner.name))
29 name = cmdParam.valueList[0]
36 elif cmdName ==
"list":
39 jobPaths = glob.glob(os.path.join(jobDir,
"*.py"))
40 names = [os.path.basename(jp)[0:-3]
for jp
in jobPaths]
41 hubMsg =
"JobNames=%s" % (
", ".join(names))
42 userCmd.setState(userCmd.Done, hubMsg=hubMsg)
44 elif cmdName ==
"stop":
47 userCmd.setState(userCmd.Done, textMsg=
"No batch job running; nothing to do")
50 _CurrJob.stop(userCmd)
52 elif cmdName ==
"status":
56 jobState = _CurrJob.state
59 jobState = BatchJob.Done
60 hubMsg =
"JobStatus=%s, %s" % (name, jobState)
61 userCmd.setState(userCmd.Done, hubMsg=hubMsg)
64 raise CommandError(
"Unrecognized queues sub-command %r" % (cmdName,))
71 - scriptRunner: the current batch job script (a twistedActor.ScriptRunner)
72 - dispatcher: a dispatcher that talks to the TCC
74 Connecting =
"Connecting"
75 ConnectionCancelled =
"ConnectionCancelled"
76 ConnectionFailed =
"ConnectionFailed"
81 Cancelled =
"Cancelled"
82 CancelStates = set((ConnectionCancelled, Cancelled))
83 ErrorStates = CancelStates | set((ConnectionFailed, Failed))
84 DoneStates = ErrorStates | set((Done,))
87 """Create and start a batch job
89 @param[in] tccActor tcc actor
90 @param[in] name job name (name of file in jobs dir, without the .py suffix
91 @param[in,out] userCmd user command; the command is reported done when the command starts running,
92 or failed if it cannot be started.
107 userPort = tccActor.server.port,
113 """Abort the batch job
115 @param[in,out] userCmd user command; the command is reported done when the command starts running,
116 or failed if it cannot be started.
120 self.scriptRunner.cancel()
121 elif not self.dw.isDone:
124 userCmd.setState(userCmd.Done, textMsg=
"No batch job running; nothing to do")
157 if not self._runCmd.isDone:
159 self._runCmd.setState(self._runCmd.Failed, textMsg=state)
161 self._runCmd.setState(self._runCmd.Done)
162 if self.
_stopCmd and not self._stopCmd.isDone:
164 self._stopCmd.setState(self._stopCmd.Done)
166 self._stopCmd.setState(self._stopCmd.Failed, textMsg=state)
168 self._stopCmd.setState(self._stopCmd.Done)
169 elif self.
isRunning and not self._runCmd.isDone:
170 self._runCmd.setState(self._runCmd.Done)
172 def _wrapperStateChanged(self, dw):
173 """The TCC device wrapper's state changed
175 @param[in] dw tcc device wrapper
180 if self._runCmd.isDone:
194 dispatcher = self.dw.dispatcher,
197 self.scriptRunner.start()
199 def _scriptStateChanged(self, sr):
200 """ScriptRunner state changed
203 if sr.state == sr.Cancelled:
213 def _getScript(self, filePath):
214 """Load a script file, returning the functions as a dict of twistedActor.ScriptRunner args
216 @param[in] fileName name of script file (path is relative to the standard jobs dir)
218 if not os.path.isfile(filePath):
219 raise CommandError(
"Job file %r not found" % (filePath,))
221 scriptLocals = {
"__file__": filePath}
222 execfile(filePath, scriptLocals)
224 scriptArgDict = dict()
226 scriptClass = scriptLocals.get(
"ScriptClass")
228 scriptArgDict[
"scriptClass"] = scriptClass
230 for attrName
in (
"run",
"init",
"end"):
231 attr = scriptLocals.get(attrName)
233 scriptArgDict[
"%sFunc" % attrName] = attr
234 elif attrName ==
"run":
235 raise RuntimeError(
"%r has no %s function" % (filePath, attrName))
239 return "%s(%s)" % (type(self).__name__, self.
name)
243 """A wrapper for an opscore.ActorDispatcher talking to the TCC
245 This wrapper is responsible for starting and stopping everything:
246 - It builds a connection
247 - It builds a dispatcher when the connection is ready
248 - It stops the connection and dispatcher on close
250 Public attributes include:
251 - connection: the connection (RO.Conn.TwistedSocket.TCPConnection)
252 - dispatcher: the actor dispatcher (twistedActor.ActorDispatcher); None until ready
253 - readyDeferred: called when the dispatcher is ready
254 (for tracking closure use the Deferred returned by the close method, or stateCallback).
260 stateCallback =
None,
263 """Construct a TCCDispatcherWrapper that manages everything
265 @param[in] userPort TCC port
266 @param[in] readCallback function to call when the actor dispatcher has data to read
267 @param[in] stateCallback function to call when connection state of of any socket changes;
268 receives one argument: this actor wrapper
269 @param[in] debug print debug messages to stdout?
271 BaseWrapper.__init__(self,
273 stateCallback = stateCallback,
281 connection = TCPConnection(
288 connection.addStateCallback(self._stateChanged)
294 def _makeDispatcher(self, connection):
295 self.debugMsg(
"_makeDispatcher()")
297 connection = connection,
303 """Return True if the dispatcher is connected to the TCC
305 return self.
dispatcher is not None and self.dispatcher.connection.isConnected
309 """Return True if the dispatcher is disconnected from the TCC
311 return self.
dispatcher is not None and self.dispatcher.connection.isDisconnected
315 """Return True if there is a failure
317 return self.
dispatcher is not None and self.dispatcher.connection.didFail
319 def _basicClose(self):
320 """Close dispatcher and actor
323 self.dispatcher.disconnect()
string ConnectionCancelled
def getJobsDir
Get path to batch job directory.