Документ взят из кэша поисковой машины. Адрес оригинального документа : http://www.apo.nmsu.edu/Telescopes/TCC/html/queue_8py_source.html
Дата изменения: Tue Sep 15 02:25:37 2015
Дата индексирования: Sun Apr 10 00:28:24 2016
Кодировка:
lsst.tcc: python/tcc/cmd/queue.py Source File
lsst.tcc  1.2.2-3-g89ecb63
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
queue.py
Go to the documentation of this file.
1 from __future__ import division, absolute_import
2 
3 __all__ = ["queue"]
4 
5 import glob
6 from opscore.actor import ActorDispatcher
7 import os.path
8 from RO.Comm.TCPConnection import TCPConnection
9 from twistedActor import CommandError, ScriptRunner, BaseWrapper
10 from tcc.base import getJobsDir
11 
12 _CurrJob = None
13 
14 def queue(tccActor, userCmd):
15  """Implement the Queue command
16 
17  @param[in] tccActor tcc actor
18  @param[in,out] userCmd a twistedActor BaseCommand with parseCmd attribute
19  """
20  global _CurrJob
21 
22  cmdParam = userCmd.parsedCmd.paramDict["command"].valueList[0]
23  cmdName = cmdParam.keyword.lower()
24  if cmdName == "run":
25  if _CurrJob and _CurrJob.isRunning:
26  raise CommandError("%s is already running" % (_CurrJob.scriptRunner.name))
27 
28  # start running a new job
29  name = cmdParam.valueList[0]
30  _CurrJob = BatchJob(
31  tccActor = tccActor,
32  name = name,
33  userCmd = userCmd,
34  )
35 
36  elif cmdName == "list":
37  # list available jobs
38  jobDir = getJobsDir()
39  jobPaths = glob.glob(os.path.join(jobDir, "*.py"))
40  names = [os.path.basename(jp)[0:-3] for jp in jobPaths]
41  hubMsg = "JobNames=%s" % (", ".join(names))
42  userCmd.setState(userCmd.Done, hubMsg=hubMsg)
43 
44  elif cmdName == "stop":
45  # stop current job
46  if not _CurrJob:
47  userCmd.setState(userCmd.Done, textMsg="No batch job running; nothing to do")
48  return
49 
50  _CurrJob.stop(userCmd)
51 
52  elif cmdName == "status":
53  # show job status
54  if _CurrJob:
55  name = _CurrJob.name
56  jobState = _CurrJob.state
57  else:
58  name = '""'
59  jobState = BatchJob.Done
60  hubMsg = "JobStatus=%s, %s" % (name, jobState)
61  userCmd.setState(userCmd.Done, hubMsg=hubMsg)
62 
63  else:
64  raise CommandError("Unrecognized queues sub-command %r" % (cmdName,))
65 
66 
67 class BatchJob(object):
68  """A TCC batch job
69 
70  Attributes:
71  - scriptRunner: the current batch job script (a twistedActor.ScriptRunner)
72  - dispatcher: a dispatcher that talks to the TCC
73  """
74  Connecting = "Connecting"
75  ConnectionCancelled = "ConnectionCancelled"
76  ConnectionFailed = "ConnectionFailed"
77  Starting = "Starting"
78  Running = "Running"
79  Done = "Done"
80  Failed = "Failed"
81  Cancelled = "Cancelled"
82  CancelStates = set((ConnectionCancelled, Cancelled))
83  ErrorStates = CancelStates | set((ConnectionFailed, Failed))
84  DoneStates = ErrorStates | set((Done,))
85 
86  def __init__(self, tccActor, name, userCmd):
87  """Create and start a batch job
88 
89  @param[in] tccActor tcc actor
90  @param[in] name job name (name of file in jobs dir, without the .py suffix
91  @param[in,out] userCmd user command; the command is reported done when the command starts running,
92  or failed if it cannot be started.
93  """
94  self.name = name
95  self._runCmd = userCmd # command used to start job
96  self._stopCmd = None
97  self.state = self.Connecting
98  self.writeToUsers = tccActor.writeToUsers
99 
100  # parse batch job file
101  self.scriptRunner = None
102  filePath = os.path.join(getJobsDir(), self.name + ".py")
103  self._srArgs = self._getScript(filePath)
104 
105  # connect to TCC
107  userPort = tccActor.server.port,
108  stateCallback = self._wrapperStateChanged,
109  debug = False,
110  )
111 
112  def stop(self, userCmd):
113  """Abort the batch job
114 
115  @param[in,out] userCmd user command; the command is reported done when the command starts running,
116  or failed if it cannot be started.
117  """
118  self._stopCmd = userCmd
119  if self.scriptRunner and self.scriptRunner.isExecuting:
120  self.scriptRunner.cancel()
121  elif not self.dw.isDone:
122  self.dw.close()
123  else:
124  userCmd.setState(userCmd.Done, textMsg="No batch job running; nothing to do")
125 
126  @property
127  def isDone(self):
128  return self.state in self.DoneStates
129 
130  @property
131  def didFail(self):
132  return self.state in self.ErrorStates
133 
134  @property
135  def wasCancelled(self):
136  return self.state in self.CancelStates
137 
138  @property
139  def isRunning(self):
140  return self.state == self.Running
141 
142  def setState(self, state):
143  if self.isDone:
144  return
145 
146  self.state = state
147 
148  if self.didFail:
149  msgCode = "f"
150  elif self.isDone:
151  msgCode = ":"
152  else:
153  msgCode = "i"
154  self.writeToUsers(msgCode, "JobStatus=%s, %s" % (self.name, state))
155 
156  if self.isDone:
157  if not self._runCmd.isDone:
158  if self.didFail:
159  self._runCmd.setState(self._runCmd.Failed, textMsg=state)
160  else:
161  self._runCmd.setState(self._runCmd.Done)
162  if self._stopCmd and not self._stopCmd.isDone:
163  if self.wasCancelled:
164  self._stopCmd.setState(self._stopCmd.Done)
165  elif self.didFail:
166  self._stopCmd.setState(self._stopCmd.Failed, textMsg=state)
167  else:
168  self._stopCmd.setState(self._stopCmd.Done)
169  elif self.isRunning and not self._runCmd.isDone:
170  self._runCmd.setState(self._runCmd.Done)
171 
172  def _wrapperStateChanged(self, dw):
173  """The TCC device wrapper's state changed
174 
175  @param[in] dw tcc device wrapper
176  """
177  if self.isDone:
178  return
179 
180  if self._runCmd.isDone:
181  # the user command finished before the wrapper finished connecting or failing;
182  # close the wrapper and give up
183  self.setState(self.ConnectionCancelled)
184  dw.close()
185 
186  elif dw.didFail:
187  # could not connect
188  self.setState(self.ConnectionFailed)
189 
190  elif dw.isReady:
191  self.setState(self.Starting)
192  self.scriptRunner = ScriptRunner(
193  name = self.name,
194  dispatcher = self.dw.dispatcher,
195  stateFunc = self._scriptStateChanged,
196  **self._srArgs)
197  self.scriptRunner.start()
198 
199  def _scriptStateChanged(self, sr):
200  """ScriptRunner state changed
201  """
202  if sr.isDone:
203  if sr.state == sr.Cancelled:
204  self.setState(self.Cancelled)
205  elif sr.didFail:
206  self.setState(self.Failed)
207  else:
208  self.setState(self.Done)
209  self.dw.close()
210  elif sr.isExecuting:
211  self.setState(self.Running)
212 
213  def _getScript(self, filePath):
214  """Load a script file, returning the functions as a dict of twistedActor.ScriptRunner args
215 
216  @param[in] fileName name of script file (path is relative to the standard jobs dir)
217  """
218  if not os.path.isfile(filePath):
219  raise CommandError("Job file %r not found" % (filePath,))
220 
221  scriptLocals = {"__file__": filePath}
222  execfile(filePath, scriptLocals)
223 
224  scriptArgDict = dict()
225 
226  scriptClass = scriptLocals.get("ScriptClass")
227  if scriptClass:
228  scriptArgDict["scriptClass"] = scriptClass
229  else:
230  for attrName in ("run", "init", "end"):
231  attr = scriptLocals.get(attrName)
232  if attr:
233  scriptArgDict["%sFunc" % attrName] = attr
234  elif attrName == "run":
235  raise RuntimeError("%r has no %s function" % (filePath, attrName))
236  return scriptArgDict
237 
238  def __repr__(self):
239  return "%s(%s)" % (type(self).__name__, self.name)
240 
241 
242 class TCCDispatcherWrapper(BaseWrapper):
243  """A wrapper for an opscore.ActorDispatcher talking to the TCC
244 
245  This wrapper is responsible for starting and stopping everything:
246  - It builds a connection
247  - It builds a dispatcher when the connection is ready
248  - It stops the connection and dispatcher on close
249 
250  Public attributes include:
251  - connection: the connection (RO.Conn.TwistedSocket.TCPConnection)
252  - dispatcher: the actor dispatcher (twistedActor.ActorDispatcher); None until ready
253  - readyDeferred: called when the dispatcher is ready
254  (for tracking closure use the Deferred returned by the close method, or stateCallback).
255  """
256  def __init__(self,
257  userPort,
258  name = "tcc",
259  readCallback = None,
260  stateCallback = None,
261  debug = False,
262  ):
263  """Construct a TCCDispatcherWrapper that manages everything
264 
265  @param[in] userPort TCC port
266  @param[in] readCallback function to call when the actor dispatcher has data to read
267  @param[in] stateCallback function to call when connection state of of any socket changes;
268  receives one argument: this actor wrapper
269  @param[in] debug print debug messages to stdout?
270  """
271  BaseWrapper.__init__(self,
272  name = name,
273  stateCallback = stateCallback,
274  callNow = False,
275  debug = debug,
276  )
277  self.actor = "tcc"
278  self._readCallback = readCallback
279  self.dispatcher = None # the ActorDispatcher, once it's built
280 
281  connection = TCPConnection(
282  host = 'localhost',
283  port = userPort,
284  readLines = True,
285  name = "tccjob",
286  )
287  self._makeDispatcher(connection)
288  connection.addStateCallback(self._stateChanged)
289  if self._readCallback:
290  connection.addReadCallback(self._readCallback)
291  connection.connect()
292  self._stateChanged()
293 
294  def _makeDispatcher(self, connection):
295  self.debugMsg("_makeDispatcher()")
296  self.dispatcher = ActorDispatcher(
297  connection = connection,
298  name = self.actor, # name of keyword dictionary
299  )
300 
301  @property
302  def isReady(self):
303  """Return True if the dispatcher is connected to the TCC
304  """
305  return self.dispatcher is not None and self.dispatcher.connection.isConnected
306 
307  @property
308  def isDone(self):
309  """Return True if the dispatcher is disconnected from the TCC
310  """
311  return self.dispatcher is not None and self.dispatcher.connection.isDisconnected
312 
313  @property
314  def isFailing(self):
315  """Return True if there is a failure
316  """
317  return self.dispatcher is not None and self.dispatcher.connection.didFail
318 
319  def _basicClose(self):
320  """Close dispatcher and actor
321  """
322  if self.dispatcher:
323  self.dispatcher.disconnect()
string ConnectionFailed
Definition: queue.py:76
string ConnectionCancelled
Definition: queue.py:75
def getJobsDir
Get path to batch job directory.
Definition: getStdDirs.py:18
def queue
Definition: queue.py:14