Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 """ 23 worker-side objects to handle worker clients 24 """ 25 26 import os 27 import sys 28 import signal 29 30 from twisted.cred import portal 31 from twisted.internet import defer, reactor 32 from twisted.spread import pb 33 from zope.interface import implements 34 35 from flumotion.common import errors, log 36 from flumotion.common import worker, startset 37 from flumotion.common.process import signalPid 38 from flumotion.twisted import checkers, fdserver 39 from flumotion.twisted import pb as fpb 40 41 __version__ = "$Rev: 7982 $" 42 43 JOB_SHUTDOWN_TIMEOUT = 5 44 4547 # FIXME: there is mkstemp for sockets, so we have a small window 48 # here in which the socket could be created by something else 49 # I didn't succeed in preparing a socket file with that name either 50 51 # caller needs to delete name before using 52 import tempfile 53 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 54 os.close(fd) 55 56 return name57 5860 """ 61 I hold information about a job. 62 63 @cvar pid: PID of the child process 64 @type pid: int 65 @cvar avatarId: avatar identification string 66 @type avatarId: str 67 @cvar type: type of the component to create 68 @type type: str 69 @cvar moduleName: name of the module to create the component from 70 @type moduleName: str 71 @cvar methodName: the factory method to use to create the component 72 @type methodName: str 73 @cvar nice: the nice level to run the job as 74 @type nice: int 75 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 76 create the component 77 @type bundles: list of (str, str) 78 """ 79 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 80 'nice', 'bundles') 8191 9284 self.pid = pid 85 self.avatarId = avatarId 86 self.type = type 87 self.moduleName = moduleName 88 self.methodName = methodName 89 self.nice = nice 90 self.bundles = bundles94135 13696 self._startSet = startSet 97 self._deferredStart = startSet.createRegistered(avatarId) 98 worker.ProcessProtocol.__init__(self, heaven, avatarId, 99 'component', 100 heaven.getWorkerName())101103 heaven = self.loggable 104 heaven.brain.callRemote('componentAddMessage', self.avatarId, 105 message)106108 heaven = self.loggable 109 dstarts = self._startSet 110 signum = status.value.signal 111 112 # we need to trigger a failure on the create deferred 113 # if the job failed before logging in to the worker; 114 # otherwise the manager still thinks it's starting up when it's 115 # dead. If the job already attached to the worker however, 116 # the create deferred will already have callbacked. 117 deferred = dstarts.createRegistered(self.avatarId) 118 if deferred is self._deferredStart: 119 if signum: 120 reason = "received signal %d" % signum 121 else: 122 reason = "unknown reason" 123 text = ("Component '%s' has exited early (%s)." % 124 (self.avatarId, reason)) 125 dstarts.createFailed(self.avatarId, 126 errors.ComponentCreateError(text)) 127 128 if dstarts.shutdownRegistered(self.avatarId): 129 dstarts.shutdownSuccess(self.avatarId) 130 131 heaven.jobStopped(self.pid) 132 133 # chain up 134 worker.ProcessProtocol.processEnded(self, status)138 """ 139 I am similar to but not quite the same as a manager-side Heaven. 140 I manage avatars inside the worker for job processes spawned by the worker. 141 142 @ivar avatars: dict of avatarId -> avatar 143 @type avatars: dict of str -> L{base.BaseJobAvatar} 144 @ivar brain: the worker brain 145 @type brain: L{worker.WorkerBrain} 146 """ 147 148 logCategory = "job-heaven" 149 implements(portal.IRealm) 150 151 avatarClass = None 152278 ret.addCallback(stopListening) 279 return ret 280154 """ 155 @param brain: a reference to the worker brain 156 @type brain: L{worker.WorkerBrain} 157 """ 158 self.avatars = {} # componentId -> avatar 159 self.brain = brain 160 self._socketPath = _getSocketPath() 161 self._port = None 162 self._onShutdown = None # If set, a deferred to fire when 163 # our last child process exits 164 165 self._jobInfos = {} # processid -> JobInfo 166 167 self._startSet = startset.StartSet( 168 lambda x: x in self.avatars, 169 errors.ComponentAlreadyStartingError, 170 errors.ComponentAlreadyRunningError)171173 assert self._port is None 174 assert self.avatarClass is not None 175 # FIXME: we should hand a username and password to log in with to 176 # the job process instead of allowing anonymous 177 checker = checkers.FlexibleCredentialsChecker() 178 checker.allowPasswordless(True) 179 p = portal.Portal(self, [checker]) 180 f = pb.PBServerFactory(p) 181 try: 182 os.unlink(self._socketPath) 183 except OSError: 184 pass 185 186 # Rather than a listenUNIX(), we use listenWith so that we can specify 187 # our particular Port, which creates Transports that we know how to 188 # pass FDs over. 189 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 190 self._port = port191 192 ### portal.IRealm method 193195 if pb.IPerspective in interfaces: 196 avatar = self.avatarClass(self, avatarId, mind) 197 assert avatarId not in self.avatars 198 self.avatars[avatarId] = avatar 199 return pb.IPerspective, avatar, avatar.logout 200 else: 201 raise NotImplementedError("no interface")202204 if avatarId in self.avatars: 205 del self.avatars[avatarId] 206 else: 207 self.warning("some programmer is telling me about an avatar " 208 "I have no idea about: %s", avatarId)209211 """ 212 Gets the name of the worker that spawns the process. 213 214 @rtype: str 215 """ 216 return self.brain.workerName217 220 223 226228 return self._jobInfos.keys()229231 self.debug('telling kids about new log file descriptors') 232 for avatar in self.avatars.values(): 233 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())234236 if pid in self._jobInfos: 237 self.debug('Removing job info for %d', pid) 238 del self._jobInfos[pid] 239 240 if not self._jobInfos and self._onShutdown: 241 self.debug("Last child exited") 242 self._onShutdown.callback(None) 243 else: 244 self.warning("some programmer is telling me about a pid " 245 "I have no idea about: %d", pid)246248 self.debug('Shutting down JobHeaven') 249 self.debug('Stopping all jobs') 250 for avatar in self.avatars.values(): 251 avatar.stop() 252 253 if self.avatars: 254 # If our jobs fail to shut down nicely within some period of 255 # time, shut them down less nicely 256 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 257 258 def cancelDelayedCall(res, dc): 259 # be nice to unit tests 260 if dc.active(): 261 dc.cancel() 262 return res263 264 self._onShutdown = defer.Deferred() 265 self._onShutdown.addCallback(cancelDelayedCall, dc) 266 ret = self._onShutdown 267 else: 268 # everything's gone already, return success 269 ret = defer.succeed(None) 270 271 def stopListening(_): 272 # possible for it to be None, if we haven't been told to 273 # listen yet, as in some test cases 274 if self._port: 275 port = self._port 276 self._port = None 277 return port.stopListening()282 self.warning("Killing all children immediately") 283 for pid in self.getJobPids(): 284 self.killJobByPid(pid, signum)285287 if pid not in self._jobInfos: 288 raise errors.UnknownComponentError(pid) 289 290 jobInfo = self._jobInfos[pid] 291 self.debug("Sending signal %d to job %s at pid %d", signum, 292 jobInfo.avatarId, jobInfo.pid) 293 signalPid(jobInfo.pid, signum)294296 for job in self._jobInfos.values(): 297 if job.avatarId == avatarId: 298 self.killJobByPid(job.pid, signum)299 300302 """ 303 I am an avatar for the job living in the worker. 304 """ 305 logCategory = 'job-avatar' 306362308 """ 309 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 310 @type avatarId: str 311 """ 312 fpb.Avatar.__init__(self, avatarId) 313 self._heaven = heaven 314 self.setMind(mind) 315 self.pid = None316318 """ 319 @param mind: reference to the job's JobMedium on which we can call 320 @type mind: L{twisted.spread.pb.RemoteReference} 321 """ 322 fpb.Avatar.setMind(self, mind) 323 self.haveMind()324 328330 self.log('logout called, %s disconnected', self.avatarId) 331 332 self._heaven.removeAvatar(self.avatarId)333 339341 try: 342 # FIXME: pay attention to the return value of 343 # sendFileDescriptor; is the same as the return value of 344 # sendmsg(2) 345 self.mind.broker.transport.sendFileDescriptor(fd, message) 346 return True 347 except OSError, e: 348 # OSError is what is thrown by the C code doing this 349 # when there are issues 350 self.warning("Error %s sending file descriptors", 351 log.getExceptionMessage(e)) 352 return False353355 """ 356 Tell the job to log to the given file descriptors. 357 """ 358 self.debug('Giving job new stdout and stderr') 359 if self.mind: 360 self._sendFileDescriptor(stdout, "redirectStdout") 361 self._sendFileDescriptor(stdout, "redirectStderr")
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Sat Mar 13 05:15:39 2010 | http://epydoc.sourceforge.net |