Package flumotion :: Package worker :: Module base
[hide private]

Source Code for Module flumotion.worker.base

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import sys 
 28  import signal 
 29   
 30  from twisted.cred import portal 
 31  from twisted.internet import defer, reactor 
 32  from twisted.spread import pb 
 33  from zope.interface import implements 
 34   
 35  from flumotion.common import errors, log 
 36  from flumotion.common import worker, startset 
 37  from flumotion.common.process import signalPid 
 38  from flumotion.twisted import checkers, fdserver 
 39  from flumotion.twisted import pb as fpb 
 40   
 41  __version__ = "$Rev: 7982 $" 
 42   
 43  JOB_SHUTDOWN_TIMEOUT = 5 
 44   
 45   
46 -def _getSocketPath():
47 # FIXME: there is mkstemp for sockets, so we have a small window 48 # here in which the socket could be created by something else 49 # I didn't succeed in preparing a socket file with that name either 50 51 # caller needs to delete name before using 52 import tempfile 53 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 54 os.close(fd) 55 56 return name
57 58
59 -class JobInfo(object):
60 """ 61 I hold information about a job. 62 63 @cvar pid: PID of the child process 64 @type pid: int 65 @cvar avatarId: avatar identification string 66 @type avatarId: str 67 @cvar type: type of the component to create 68 @type type: str 69 @cvar moduleName: name of the module to create the component from 70 @type moduleName: str 71 @cvar methodName: the factory method to use to create the component 72 @type methodName: str 73 @cvar nice: the nice level to run the job as 74 @type nice: int 75 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 76 create the component 77 @type bundles: list of (str, str) 78 """ 79 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 80 'nice', 'bundles') 81
82 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice, 83 bundles):
84 self.pid = pid 85 self.avatarId = avatarId 86 self.type = type 87 self.moduleName = moduleName 88 self.methodName = methodName 89 self.nice = nice 90 self.bundles = bundles
91 92
93 -class JobProcessProtocol(worker.ProcessProtocol):
94
95 - def __init__(self, heaven, avatarId, startSet):
96 self._startSet = startSet 97 self._deferredStart = startSet.createRegistered(avatarId) 98 worker.ProcessProtocol.__init__(self, heaven, avatarId, 99 'component', 100 heaven.getWorkerName())
101
102 - def sendMessage(self, message):
103 heaven = self.loggable 104 heaven.brain.callRemote('componentAddMessage', self.avatarId, 105 message)
106
107 - def processEnded(self, status):
108 heaven = self.loggable 109 dstarts = self._startSet 110 signum = status.value.signal 111 112 # we need to trigger a failure on the create deferred 113 # if the job failed before logging in to the worker; 114 # otherwise the manager still thinks it's starting up when it's 115 # dead. If the job already attached to the worker however, 116 # the create deferred will already have callbacked. 117 deferred = dstarts.createRegistered(self.avatarId) 118 if deferred is self._deferredStart: 119 if signum: 120 reason = "received signal %d" % signum 121 else: 122 reason = "unknown reason" 123 text = ("Component '%s' has exited early (%s)." % 124 (self.avatarId, reason)) 125 dstarts.createFailed(self.avatarId, 126 errors.ComponentCreateError(text)) 127 128 if dstarts.shutdownRegistered(self.avatarId): 129 dstarts.shutdownSuccess(self.avatarId) 130 131 heaven.jobStopped(self.pid) 132 133 # chain up 134 worker.ProcessProtocol.processEnded(self, status)
135 136
137 -class BaseJobHeaven(pb.Root, log.Loggable):
138 """ 139 I am similar to but not quite the same as a manager-side Heaven. 140 I manage avatars inside the worker for job processes spawned by the worker. 141 142 @ivar avatars: dict of avatarId -> avatar 143 @type avatars: dict of str -> L{base.BaseJobAvatar} 144 @ivar brain: the worker brain 145 @type brain: L{worker.WorkerBrain} 146 """ 147 148 logCategory = "job-heaven" 149 implements(portal.IRealm) 150 151 avatarClass = None 152
153 - def __init__(self, brain):
154 """ 155 @param brain: a reference to the worker brain 156 @type brain: L{worker.WorkerBrain} 157 """ 158 self.avatars = {} # componentId -> avatar 159 self.brain = brain 160 self._socketPath = _getSocketPath() 161 self._port = None 162 self._onShutdown = None # If set, a deferred to fire when 163 # our last child process exits 164 165 self._jobInfos = {} # processid -> JobInfo 166 167 self._startSet = startset.StartSet( 168 lambda x: x in self.avatars, 169 errors.ComponentAlreadyStartingError, 170 errors.ComponentAlreadyRunningError)
171
172 - def listen(self):
173 assert self._port is None 174 assert self.avatarClass is not None 175 # FIXME: we should hand a username and password to log in with to 176 # the job process instead of allowing anonymous 177 checker = checkers.FlexibleCredentialsChecker() 178 checker.allowPasswordless(True) 179 p = portal.Portal(self, [checker]) 180 f = pb.PBServerFactory(p) 181 try: 182 os.unlink(self._socketPath) 183 except OSError: 184 pass 185 186 # Rather than a listenUNIX(), we use listenWith so that we can specify 187 # our particular Port, which creates Transports that we know how to 188 # pass FDs over. 189 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 190 self._port = port
191 192 ### portal.IRealm method 193
194 - def requestAvatar(self, avatarId, mind, *interfaces):
195 if pb.IPerspective in interfaces: 196 avatar = self.avatarClass(self, avatarId, mind) 197 assert avatarId not in self.avatars 198 self.avatars[avatarId] = avatar 199 return pb.IPerspective, avatar, avatar.logout 200 else: 201 raise NotImplementedError("no interface")
202
203 - def removeAvatar(self, avatarId):
204 if avatarId in self.avatars: 205 del self.avatars[avatarId] 206 else: 207 self.warning("some programmer is telling me about an avatar " 208 "I have no idea about: %s", avatarId)
209
210 - def getWorkerName(self):
211 """ 212 Gets the name of the worker that spawns the process. 213 214 @rtype: str 215 """ 216 return self.brain.workerName
217
218 - def addJobInfo(self, processId, jobInfo):
219 self._jobInfos[processId] = jobInfo
220
221 - def getJobInfo(self, processId):
222 return self._jobInfos[processId]
223
224 - def getJobInfos(self):
225 return self._jobInfos.values()
226
227 - def getJobPids(self):
228 return self._jobInfos.keys()
229
230 - def rotateChildLogFDs(self):
231 self.debug('telling kids about new log file descriptors') 232 for avatar in self.avatars.values(): 233 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
234
235 - def jobStopped(self, pid):
236 if pid in self._jobInfos: 237 self.debug('Removing job info for %d', pid) 238 del self._jobInfos[pid] 239 240 if not self._jobInfos and self._onShutdown: 241 self.debug("Last child exited") 242 self._onShutdown.callback(None) 243 else: 244 self.warning("some programmer is telling me about a pid " 245 "I have no idea about: %d", pid)
246
247 - def shutdown(self):
248 self.debug('Shutting down JobHeaven') 249 self.debug('Stopping all jobs') 250 for avatar in self.avatars.values(): 251 avatar.stop() 252 253 if self.avatars: 254 # If our jobs fail to shut down nicely within some period of 255 # time, shut them down less nicely 256 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 257 258 def cancelDelayedCall(res, dc): 259 # be nice to unit tests 260 if dc.active(): 261 dc.cancel() 262 return res
263 264 self._onShutdown = defer.Deferred() 265 self._onShutdown.addCallback(cancelDelayedCall, dc) 266 ret = self._onShutdown 267 else: 268 # everything's gone already, return success 269 ret = defer.succeed(None) 270 271 def stopListening(_): 272 # possible for it to be None, if we haven't been told to 273 # listen yet, as in some test cases 274 if self._port: 275 port = self._port 276 self._port = None 277 return port.stopListening()
278 ret.addCallback(stopListening) 279 return ret 280
281 - def kill(self, signum=signal.SIGKILL):
282 self.warning("Killing all children immediately") 283 for pid in self.getJobPids(): 284 self.killJobByPid(pid, signum)
285
286 - def killJobByPid(self, pid, signum):
287 if pid not in self._jobInfos: 288 raise errors.UnknownComponentError(pid) 289 290 jobInfo = self._jobInfos[pid] 291 self.debug("Sending signal %d to job %s at pid %d", signum, 292 jobInfo.avatarId, jobInfo.pid) 293 signalPid(jobInfo.pid, signum)
294
295 - def killJob(self, avatarId, signum):
296 for job in self._jobInfos.values(): 297 if job.avatarId == avatarId: 298 self.killJobByPid(job.pid, signum)
299 300
301 -class BaseJobAvatar(fpb.Avatar, log.Loggable):
302 """ 303 I am an avatar for the job living in the worker. 304 """ 305 logCategory = 'job-avatar' 306
307 - def __init__(self, heaven, avatarId, mind):
308 """ 309 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 310 @type avatarId: str 311 """ 312 fpb.Avatar.__init__(self, avatarId) 313 self._heaven = heaven 314 self.setMind(mind) 315 self.pid = None
316
317 - def setMind(self, mind):
318 """ 319 @param mind: reference to the job's JobMedium on which we can call 320 @type mind: L{twisted.spread.pb.RemoteReference} 321 """ 322 fpb.Avatar.setMind(self, mind) 323 self.haveMind()
324
325 - def haveMind(self):
326 # implement me in subclasses 327 pass
328
329 - def logout(self):
330 self.log('logout called, %s disconnected', self.avatarId) 331 332 self._heaven.removeAvatar(self.avatarId)
333
334 - def stop(self):
335 """ 336 returns: a deferred marking completed stop. 337 """ 338 raise NotImplementedError
339
340 - def _sendFileDescriptor(self, fd, message):
341 try: 342 # FIXME: pay attention to the return value of 343 # sendFileDescriptor; is the same as the return value of 344 # sendmsg(2) 345 self.mind.broker.transport.sendFileDescriptor(fd, message) 346 return True 347 except OSError, e: 348 # OSError is what is thrown by the C code doing this 349 # when there are issues 350 self.warning("Error %s sending file descriptors", 351 log.getExceptionMessage(e)) 352 return False
353
354 - def logTo(self, stdout, stderr):
355 """ 356 Tell the job to log to the given file descriptors. 357 """ 358 self.debug('Giving job new stdout and stderr') 359 if self.mind: 360 self._sendFileDescriptor(stdout, "redirectStdout") 361 self._sendFileDescriptor(stdout, "redirectStderr")
362