Package flumotion :: Package admin :: Module admin
[hide private]

Source Code for Module flumotion.admin.admin

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  model abstraction for administration clients supporting different views 
 24  """ 
 25   
 26  from twisted.internet import error, defer, reactor 
 27  from zope.interface import implements 
 28   
 29  from flumotion.common import common, errors, interfaces, log 
 30  from flumotion.common import medium 
 31  from flumotion.common import messages, signals 
 32  from flumotion.common import planet, worker # register jelly 
 33  from flumotion.common.i18n import N_, gettexter 
 34  from flumotion.configure import configure 
 35  from flumotion.twisted import pb as fpb 
 36   
 37  __version__ = "$Rev: 8014 $" 
 38  T_ = gettexter() 
 39   
 40   
41 -class AdminClientFactory(fpb.ReconnectingFPBClientFactory):
42 perspectiveInterface = interfaces.IAdminMedium 43
44 - def __init__(self, medium, extraTenacious=False, maxDelay=20):
45 """ 46 @type medium: AdminModel 47 """ 48 fpb.ReconnectingFPBClientFactory.__init__(self) 49 self.medium = medium 50 self.maxDelay = maxDelay 51 52 self.extraTenacious = extraTenacious 53 self.hasBeenConnected = 0 54 self.hasBeenAuthenticated = 0 55 56 self._connector = None
57
58 - def startedConnecting(self, connector):
59 self._connector = connector 60 return fpb.ReconnectingFPBClientFactory.startedConnecting( 61 self, connector)
62
63 - def clientConnectionMade(self, broker):
64 self.hasBeenConnected = 1 65 66 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
67
68 - def clientConnectionLost(self, connector, reason):
69 """ 70 @type connector: implementation of 71 L{twisted.internet.interfaces.IConnector} 72 @param reason: L{twisted.spread.pb.failure.Failure} 73 """ 74 self.debug("Lost connection to %s: %s", 75 connector.getDestination(), log.getFailureMessage(reason)) 76 if self.hasBeenAuthenticated: 77 self.log("Have been authenticated before. Trying again.") 78 elif self.extraTenacious: 79 self.log("We are extra tenacious, trying again") 80 else: 81 self.log("Telling medium about connection failure") 82 self.medium.connectionFailed(reason) 83 return 84 85 RFC = fpb.ReconnectingFPBClientFactory 86 RFC.clientConnectionLost(self, connector, reason)
87
88 - def clientConnectionFailed(self, connector, reason):
89 """ 90 @type connector: implementation of 91 L{twisted.internet.interfaces.IConnector} 92 @param reason: L{twisted.spread.pb.failure.Failure} 93 """ 94 if reason.check(error.DNSLookupError): 95 self.debug('DNS lookup error') 96 if not self.extraTenacious: 97 self.medium.connectionFailed(reason) 98 return 99 elif (reason.check(error.ConnectionRefusedError) 100 or reason.check(error.ConnectError)): 101 # If we're logging in for the first time, we want to make this a 102 # real error; we present a dialog, etc. 103 # However, if we fail later on (e.g. manager shut down, and 104 # hasn't yet been restarted), we want to keep trying to reconnect, 105 # so we just log a message. 106 self.debug("Error connecting to %s: %s", 107 connector.getDestination(), 108 log.getFailureMessage(reason)) 109 if self.hasBeenConnected: 110 self.log("we've been connected before though, so going " 111 "to retry") 112 # fall through 113 elif self.extraTenacious: 114 self.log("trying again due to +100 tenacity") 115 # fall through 116 else: 117 self.log("telling medium about connection failure") 118 self.medium.connectionFailed(reason) 119 return 120 121 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self, 122 connector, reason)
123 124 # vmethod implementation 125
126 - def gotDeferredLogin(self, d):
127 128 def success(remote): 129 self.hasBeenAuthenticated = 1 130 self.medium.setRemoteReference(remote)
131 132 def error(failure): 133 if self.extraTenacious: 134 self.debug('connection problem to %s: %s', 135 self._connector.getDestination(), 136 log.getFailureMessage(failure)) 137 self.debug('we are tenacious, so trying again later') 138 self.disconnect() 139 elif failure.check(errors.ConnectionFailedError): 140 self.debug("emitting connection-failed") 141 self.medium.emit('connection-failed', "I failed my master") 142 self.debug("emitted connection-failed") 143 elif failure.check(errors.ConnectionRefusedError): 144 self.debug("emitting connection-refused") 145 self.medium.emit('connection-refused') 146 self.debug("emitted connection-refused") 147 elif failure.check(errors.NotAuthenticatedError): 148 # FIXME: unauthorized login emit ! 149 self.debug("emitting connection-refused") 150 self.medium.emit('connection-refused') 151 self.debug("emitted connection-refused") 152 else: 153 self.medium.emit('connection-error', failure) 154 self.warning('connection error to %s:: %s', 155 self._connector.getDestination(), 156 log.getFailureMessage(failure))
157 # swallow error 158 159 d.addCallbacks(success, error) 160 return d 161 162 # FIXME: stop using signals, we can provide a richer interface with actual 163 # objects and real interfaces for the views a model communicates with 164 165
166 -class AdminModel(medium.PingingMedium, signals.SignalMixin):
167 """ 168 I live in the admin client. 169 I am a data model for any admin view implementing a UI to 170 communicate with one manager. 171 I send signals when things happen. 172 173 Manager calls on us through L{flumotion.manager.admin.AdminAvatar} 174 """ 175 __signals__ = ('connected', 'disconnected', 'connection-refused', 176 'connection-failed', 'connection-error', 'reloading', 177 'message', 'update') 178 179 logCategory = 'adminmodel' 180 181 implements(interfaces.IAdminMedium) 182 183 # Public instance variables (read-only) 184 planet = None 185
186 - def __init__(self):
187 # All of these instance variables are private. Cuidado cabrones! 188 self.connectionInfo = None 189 self.keepTrying = None 190 self._writeConnection = True 191 192 self.managerId = '<uninitialized>' 193 194 self.connected = False 195 self.clientFactory = None 196 197 self._deferredConnect = None 198 199 self._components = {} # dict of components 200 self.planet = None 201 self._workerHeavenState = None
202
203 - def disconnectFromManager(self):
204 """ 205 Disconnects from the actual manager and frees the connection. 206 """ 207 if self.clientFactory: 208 # We are disconnecting, so we don't want to be 209 # notified by the model about it. 210 if self.remote: 211 self.remote.dontNotifyOnDisconnect(self._remoteDisconnected) 212 213 self.clientFactory.stopTrying() 214 215 self.clientFactory.disconnect() 216 self.clientFactory = None
217
218 - def connectToManager(self, connectionInfo, keepTrying=False, 219 writeConnection=True):
220 """ 221 Connects to the specified manager. 222 223 @param connectionInfo: data for establishing the connection 224 @type connectionInfo: a L{PBConnectionInfo} 225 @param keepTrying: when this is L{True} the Factory will try to 226 reconnect when it loses the connection 227 @type keepTrying: bool 228 @param writeConnection: when this is L{True} the connection is saved 229 for future uses on cache 230 @type writeConnection: bool 231 232 @rtype: L{twisted.internet.defer.Deferred} 233 """ 234 assert self.clientFactory is None 235 236 self.connectionInfo = connectionInfo 237 self._writeConnection = writeConnection 238 239 # give the admin an id unique to the manager -- if a program is 240 # adminning multiple managers, this id should tell them apart 241 # (and identify duplicates) 242 self.managerId = str(connectionInfo) 243 self.logName = self.managerId 244 245 self.info('Connecting to manager %s with %s', 246 self.managerId, connectionInfo.use_ssl and 'SSL' or 'TCP') 247 248 self.clientFactory = AdminClientFactory(self, 249 extraTenacious=keepTrying, 250 maxDelay=20) 251 self.clientFactory.startLogin(connectionInfo.authenticator) 252 253 if connectionInfo.use_ssl: 254 common.assertSSLAvailable() 255 from twisted.internet import ssl 256 reactor.connectSSL(connectionInfo.host, connectionInfo.port, 257 self.clientFactory, ssl.ClientContextFactory()) 258 else: 259 reactor.connectTCP(connectionInfo.host, connectionInfo.port, 260 self.clientFactory) 261 262 def connected(model, d): 263 # model is really "self". yay gobject? 264 d.callback(model)
265 266 def disconnected(model, d): 267 # can happen after setRemoteReference but before 268 # getPlanetState or getWorkerHeavenState returns 269 if not keepTrying: 270 d.errback(errors.ConnectionFailedError('Lost connection'))
271 272 def connection_refused(model, d): 273 if not keepTrying: 274 d.errback(errors.ConnectionRefusedError()) 275 276 def connection_failed(model, reason, d): 277 if not keepTrying: 278 d.errback(errors.ConnectionFailedError(reason)) 279 280 def connection_error(model, failure, d): 281 if not keepTrying: 282 d.errback(failure) 283 284 d = defer.Deferred() 285 ids = [] 286 ids.append(self.connect('connected', connected, d)) 287 ids.append(self.connect('disconnected', disconnected, d)) 288 ids.append(self.connect('connection-refused', connection_refused, d)) 289 ids.append(self.connect('connection-failed', connection_failed, d)) 290 ids.append(self.connect('connection-error', connection_error, d)) 291 292 def success(model): 293 map(self.disconnect, ids) 294 self._deferredConnect = None 295 return model 296 297 def failure(f): 298 map(self.disconnect, ids) 299 self._deferredConnect = None 300 return f 301 302 d.addCallbacks(success, failure) 303 self._deferredConnect = d 304 return d 305
306 - def bundleErrback(self, failure, fileName='<unknown>'):
307 """ 308 Handle all coding mistakes that could be triggered by loading bundles. 309 This is a convenience method to help in properly reporting problems. 310 The EntrySyntaxError should be caught and wrapped in a UI message, 311 with the message generated here as debug information. 312 313 @param failure: the failure to be handled 314 @type failure: L{twisted.python.failure.Failure} 315 @param filename: name of the file being loaded 316 @type filename: str 317 318 @raises: L{errors.EntrySyntaxError} 319 """ 320 try: 321 raise failure.value 322 except SyntaxError, e: 323 # the syntax error can happen in the entry file, or any import 324 where = getattr(e, 'filename', "<entry file>") 325 lineno = getattr(e, 'lineno', 0) 326 msg = "Syntax Error at %s:%d while executing %s" % ( 327 where, lineno, fileName) 328 self.warning(msg) 329 raise errors.EntrySyntaxError(msg) 330 except NameError, e: 331 msg = "NameError while executing %s: %s" % ( 332 fileName, " ".join(e.args)) 333 self.warning(msg) 334 raise errors.EntrySyntaxError(msg) 335 except ImportError, e: 336 msg = "ImportError while executing %s: %s" % (fileName, 337 " ".join(e.args)) 338 self.warning(msg) 339 raise errors.EntrySyntaxError(msg)
340
341 - def shutdown(self):
342 self.debug('shutting down') 343 if self.clientFactory is not None: 344 # order not semantically important, but this way we avoid a 345 # "reconnecting in X seconds" in the log 346 self.clientFactory.stopTrying() 347 self.clientFactory.disconnect() 348 self.clientFactory = None 349 350 if self._deferredConnect is not None: 351 # this can happen with keepTrying=True 352 self.debug('cancelling connection attempt') 353 self._deferredConnect.errback(errors.ConnectionCancelledError())
354
355 - def reconnect(self, keepTrying=False):
356 """Close any existing connection to the manager and 357 reconnect.""" 358 self.debug('asked to log in again') 359 self.shutdown() 360 return self.connectToManager(self.connectionInfo, keepTrying)
361 362 # FIXME: give these three sensible names 363
364 - def adminInfoStr(self):
365 return self.managerId
366
367 - def connectionInfoStr(self):
368 return '%s:%s (%s)' % (self.connectionInfo.host, 369 self.connectionInfo.port, 370 self.connectionInfo.use_ssl 371 and 'https' or 'http')
372 373 # used in fgc 374
375 - def managerInfoStr(self):
376 assert self.planet 377 return '%s (%s)' % (self.planet.get('name'), self.managerId)
378
379 - def connectionFailed(self, failure):
380 # called by client factory 381 if failure.check(error.DNSLookupError): 382 message = ("Could not look up host '%s'." 383 % self.connectionInfo.host) 384 elif failure.check(error.ConnectionRefusedError): 385 message = ("Could not connect to host '%s' on port %d." 386 % (self.connectionInfo.host, 387 self.connectionInfo.port)) 388 else: 389 message = ("Unexpected failure.\nDebug information: %s" 390 % log.getFailureMessage(failure)) 391 self.debug('emitting connection-failed') 392 self.emit('connection-failed', message) 393 self.debug('emitted connection-failed')
394
395 - def setRemoteReference(self, remoteReference):
396 self.debug("setRemoteReference %r", remoteReference) 397 398 def gotPlanetState(planet): 399 self.planet = planet 400 # monkey, Monkey, MONKEYPATCH!!!!! 401 self.planet.admin = self 402 self.debug('got planet state') 403 return self.callRemote('getWorkerHeavenState')
404 405 def gotWorkerHeavenState(whs): 406 self._workerHeavenState = whs 407 self.debug('got worker state') 408 409 self.debug('Connected to manager and retrieved all state') 410 self.connected = True 411 if self._writeConnection: 412 writeConnection() 413 self.emit('connected') 414 415 def writeConnection(): 416 i = self.connectionInfo 417 if not (i.authenticator.username 418 and i.authenticator.password): 419 self.log('not caching connection information') 420 return 421 s = ''.join(['<connection>', 422 '<host>%s</host>' % i.host, 423 '<manager>%s</manager>' % self.planet.get('name'), 424 '<port>%d</port>' % i.port, 425 '<use_insecure>%d</use_insecure>' 426 % ((not i.use_ssl) and 1 or 0), 427 '<user>%s</user>' % i.authenticator.username, 428 '<passwd>%s</passwd>' % i.authenticator.password, 429 '</connection>']) 430 431 import os 432 from flumotion.common import python 433 md5sum = python.md5(s).hexdigest() 434 f = os.path.join(configure.registrydir, '%s.connection' % md5sum) 435 try: 436 h = open(f, 'w') 437 h.write(s) 438 h.close() 439 except Exception, e: 440 self.info('failed to write connection cache file %s: %s', 441 f, log.getExceptionMessage(e)) 442 443 # chain up 444 medium.PingingMedium.setRemoteReference(self, remoteReference) 445 446 # fixme: push the disconnect notification upstream 447 448 self.remote.notifyOnDisconnect(self._remoteDisconnected) 449 450 d = self.callRemote('getPlanetState') 451 d.addCallback(gotPlanetState) 452 d.addCallback(gotWorkerHeavenState) 453 return d 454 455 ### model functions; called by UI's to send requests to manager or comp 456 457 ## view management functions 458
459 - def isConnected(self):
460 return self.connected
461 462 ## generic remote call methods 463
464 - def componentCallRemote(self, componentState, methodName, *args, **kwargs):
465 """ 466 Call the given method on the given component with the given args. 467 468 @param componentState: component to call the method on 469 @type componentState: L{flumotion.common.planet.AdminComponentState} 470 @param methodName: name of method to call; serialized to a 471 remote_methodName on the worker's medium 472 473 @rtype: L{twisted.internet.defer.Deferred} 474 """ 475 d = self.callRemote('componentCallRemote', 476 componentState, methodName, 477 *args, **kwargs) 478 479 def errback(failure): 480 msg = None 481 if failure.check(errors.NoMethodError): 482 msg = "Remote method '%s' does not exist." % methodName 483 msg += "\n" + failure.value 484 else: 485 msg = log.getFailureMessage(failure) 486 487 # FIXME: we probably need a nicer way of getting component 488 # messages shown from the admin model, but this allows us to 489 # make sure every type of admin has these messages 490 self.warning(msg) 491 m = messages.Warning(T_(N_("Internal error in component.")), 492 debug=msg) 493 componentState.observe_append('messages', m) 494 return failure
495 496 d.addErrback(errback) 497 # FIXME: dialog for other errors ? 498 return d 499
500 - def workerCallRemote(self, workerName, methodName, *args, **kwargs):
501 """ 502 Call the the given method on the given worker with the given args. 503 504 @param workerName: name of the worker to call the method on 505 @param methodName: name of method to call; serialized to a 506 remote_methodName on the worker's medium 507 508 @rtype: L{twisted.internet.defer.Deferred} 509 """ 510 return self.callRemote('workerCallRemote', workerName, 511 methodName, *args, **kwargs)
512 513 ## manager remote methods 514
515 - def loadConfiguration(self, xml_string):
516 return self.callRemote('loadConfiguration', xml_string)
517
518 - def getConfiguration(self):
519 return self.callRemote('getConfiguration')
520
521 - def getScenarios(self):
522 """ 523 Obtains the available scenarios from the manager. 524 525 @rtype: L{twisted.internet.defer.Deferred} 526 """ 527 return self.callRemote('getScenarios')
528
529 - def getScenarioByType(self, type):
530 """ 531 Obtains an scenario given its type. 532 533 @rtype: L{twisted.internet.defer.Deferred} 534 """ 535 return self.callRemote('getScenarioByType', type)
536
537 - def cleanComponents(self):
538 return self.callRemote('cleanComponents')
539 540 ## worker remote methods 541
542 - def checkElements(self, workerName, elements):
543 return self.workerCallRemote(workerName, 'checkElements', elements)
544
545 - def checkImport(self, workerName, moduleName):
546 return self.workerCallRemote(workerName, 'checkImport', moduleName)
547
548 - def workerRun(self, workerName, moduleName, functionName, *args, **kwargs):
549 """ 550 Run the given function and args on the given worker. If the 551 worker does not already have the module, or it is out of date, 552 it will be retrieved from the manager. 553 554 @rtype: L{twisted.internet.defer.Deferred} firing an 555 L{flumotion.common.messages.Result} 556 """ 557 return self.workerCallRemote(workerName, 'runFunction', moduleName, 558 functionName, *args, **kwargs)
559
560 - def getWizardEntries(self, wizardTypes=None, provides=None, accepts=None):
561 return self.callRemote('getWizardEntries', 562 wizardTypes, provides, accepts)
563
564 - def getWorkerHeavenState(self):
565 return self._workerHeavenState
566
567 - def _remoteDisconnected(self, remoteReference):
568 self.debug("emitting disconnected") 569 self.connected = False 570 self.emit('disconnected') 571 self.debug("emitted disconnected")
572