Package backend :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module backend.dispatcher

  1  import fcntl 
  2  import re 
  3  import os 
  4  import sys 
  5  import multiprocessing 
  6  import time 
  7  import Queue 
  8  import json 
  9  import mockremote 
 10  from callback import FrontendCallback 
 11  from bunch import Bunch 
 12  import errors 
 13  import ansible 
 14  import ansible.playbook 
 15  import ansible.errors 
 16  from ansible import callbacks 
 17  import requests 
 18  import subprocess 
 19  import string 
 20  import setproctitle 
 21  from IPy import IP 
 22   
 23  try: 
 24      import fedmsg 
 25  except ImportError: 
 26      pass  # fedmsg is optional 
 27   
 28   
 29   
 30   
31 -class SilentPlaybookCallbacks(callbacks.PlaybookCallbacks):
32 ''' playbook callbacks - quietly! ''' 33
34 - def __init__(self, verbose=False):
35 super(SilentPlaybookCallbacks, self).__init__() 36 self.verbose = verbose
37
38 - def on_start(self):
39 callbacks.call_callback_module('playbook_on_start')
40
41 - def on_notify(self, host, handler):
42 callbacks.call_callback_module('playbook_on_notify', host, handler)
43
44 - def on_no_hosts_matched(self):
45 callbacks.call_callback_module('playbook_on_no_hosts_matched')
46
47 - def on_no_hosts_remaining(self):
48 callbacks.call_callback_module('playbook_on_no_hosts_remaining')
49
50 - def on_task_start(self, name, is_conditional):
51 callbacks.call_callback_module('playbook_on_task_start', name, is_conditional)
52
53 - def on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None):
54 result = None 55 sys.stderr.write("***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****\n") 56 callbacks.call_callback_module('playbook_on_vars_prompt', varname, private=private, prompt=prompt, encrypt=encrypt, confirm=confirm, salt_size=salt_size, salt=None) 57 return result
58
59 - def on_setup(self):
60 callbacks.call_callback_module('playbook_on_setup')
61
62 - def on_import_for_host(self, host, imported_file):
63 callbacks.call_callback_module('playbook_on_import_for_host', host, imported_file)
64
65 - def on_not_import_for_host(self, host, missing_file):
66 callbacks.call_callback_module('playbook_on_not_import_for_host', host, missing_file)
67
68 - def on_play_start(self, pattern):
69 callbacks.call_callback_module('playbook_on_play_start', pattern)
70
71 - def on_stats(self, stats):
72 callbacks.call_callback_module('playbook_on_stats', stats)
73 74
75 -class WorkerCallback(object):
76 - def __init__(self, logfile=None):
77 self.logfile = logfile
78
79 - def log(self, msg):
80 if self.logfile: 81 now = time.strftime('%F %T') 82 try: 83 with open(self.logfile, 'a') as lf: 84 fcntl.flock(lf, fcntl.LOCK_EX) 85 lf.write(str(now) + ': ' + msg + '\n') 86 fcntl.flock(lf, fcntl.LOCK_UN) 87 except (IOError, OSError), e: 88 print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e))
89 90
91 -class Worker(multiprocessing.Process):
92 - def __init__(self, opts, jobs, events, worker_num, ip=None, create=True, callback=None):
93 94 # base class initialization 95 multiprocessing.Process.__init__(self, name="worker-builder") 96 97 98 # job management stuff 99 self.jobs = jobs 100 self.events = events # event queue for communicating back to dispatcher 101 self.worker_num = worker_num 102 self.ip = ip 103 self.opts = opts 104 self.kill_received = False 105 self.callback = callback 106 self.create = create 107 self.frontend_callback = FrontendCallback(opts) 108 if not self.callback: 109 self.logfile = self.opts.worker_logdir + '/worker-%s.log' % self.worker_num 110 self.callback = WorkerCallback(logfile = self.logfile) 111 112 if ip: 113 self.callback.log('creating worker: %s' % ip) 114 self.event('worker.create', 'creating worker: {ip}', dict(ip=ip)) 115 else: 116 self.callback.log('creating worker: dynamic ip') 117 self.event('worker.create', 'creating worker: dynamic ip')
118
119 - def event(self, topic, template, content=None):
120 """ Multi-purpose logging method. 121 122 Logs messages to two different destinations: 123 - To log file 124 - The internal "events" queue for communicating back to the 125 dispatcher. 126 - The fedmsg bus. Messages are posted asynchronously to a 127 zmq.PUB socket. 128 129 """ 130 131 content = content or {} 132 what = template.format(**content) 133 134 if self.ip: 135 who = 'worker-%s-%s' % (self.worker_num, self.ip) 136 else: 137 who = 'worker-%s' % (self.worker_num) 138 139 self.callback.log("event: who: %s, what: %s" % ( who, what)) 140 self.events.put({'when':time.time(), 'who':who, 'what':what}) 141 try: 142 content['who'] = who 143 content['what'] = what 144 if self.opts.fedmsg_enabled: 145 fedmsg.publish(modname="copr", topic=topic, msg=content) 146 # pylint: disable=W0703 147 except Exception, e: 148 # XXX - Maybe log traceback as well with traceback.format_exc() 149 self.callback.log('failed to publish message: %s' % e)
150
151 - def spawn_instance(self, retry=0):
152 """call the spawn playbook to startup/provision a building instance""" 153 154 155 self.callback.log('spawning instance begin') 156 start = time.time() 157 158 #Does not work, do not know why. See: 159 #https://groups.google.com/forum/#!topic/ansible-project/DNBD2oHv5k8 160 #stats = callbacks.AggregateStats() 161 #playbook_cb = SilentPlaybookCallbacks(verbose=False) 162 #runner_cb = callbacks.DefaultRunnerCallbacks() 163 ## fixme - extra_vars to include ip as a var if we need to specify ips 164 ## also to include info for instance type to handle the memory requirements of builds 165 #play = ansible.playbook.PlayBook(stats=stats, playbook=self.opts.spawn_playbook, 166 # callbacks=playbook_cb, runner_callbacks=runner_cb, 167 # remote_user='root', transport='ssh') 168 #play.run() 169 try: 170 result = subprocess.check_output("ansible-playbook -c ssh %s" % self.opts.spawn_playbook, 171 shell=True) 172 except subprocess.CalledProcessError, e: 173 result = e.output 174 sys.stderr.write("%s\n" % result) 175 self.callback.log("CalledProcessError: %s" % result) 176 # well mostly we run out of space in OpenStack, wait some time and try again 177 if retry < 3: 178 time.sleep(self.opts.sleeptime) 179 self.spawn_instance(retry+1) 180 else: 181 raise subprocess.CalledProcessError, None, sys.exc_info()[2] 182 self.callback.log('Raw output from playbook: %s' % result) 183 match = re.search(r'IP=([^\{\}"]+)', result, re.MULTILINE) 184 185 if not match: 186 return None 187 188 ipaddr = match.group(1) 189 190 self.callback.log('spawning instance end') 191 self.callback.log('got instance ip: %s' % ipaddr) 192 self.callback.log('Instance spawn/provision took %s sec' % (time.time() - start)) 193 194 if self.ip: 195 return self.ip 196 197 #for i in play.SETUP_CACHE: 198 # if i =='localhost': 199 # continue 200 # return i 201 try: 202 IP(ipaddr) 203 return ipaddr 204 except ValueError: 205 # if we get here we're in trouble 206 self.callback.log('No IP back from spawn_instance - dumping cache output') 207 self.callback.log(str(result)) 208 self.callback.log('Test spawn_instance playbook manually') 209 return None
210
211 - def terminate_instance(self,ip):
212 """call the terminate playbook to destroy the building instance""" 213 self.callback.log('terminate instance begin') 214 215 #stats = callbacks.AggregateStats() 216 #playbook_cb = SilentPlaybookCallbacks(verbose=False) 217 #runner_cb = callbacks.DefaultRunnerCallbacks() 218 #play = ansible.playbook.PlayBook(host_list=ip +',', stats=stats, playbook=self.opts.terminate_playbook, 219 # callbacks=playbook_cb, runner_callbacks=runner_cb, 220 # remote_user='root', transport='ssh') 221 #play.run() 222 subprocess.check_output("/usr/bin/ansible-playbook -c ssh -i '%s,' %s " % (ip, self.opts.terminate_playbook), shell=True) 223 self.callback.log('terminate instance end')
224
225 - def parse_job(self, jobfile):
226 # read the json of the job in 227 # break out what we need return a bunch of the info we need 228 try: 229 build = json.load(open(jobfile)) 230 except ValueError: 231 # empty file? 232 return None 233 jobdata = Bunch() 234 jobdata.pkgs = build['pkgs'].split(' ') 235 jobdata.repos = [r for r in build['repos'].split(' ') if r.strip() ] 236 jobdata.chroot = build['chroot'] 237 jobdata.buildroot_pkgs = build['buildroot_pkgs'] 238 jobdata.memory_reqs = build['memory_reqs'] 239 if build['timeout']: 240 jobdata.timeout = build['timeout'] 241 else: 242 jobdata.timeout = self.opts.timeout 243 jobdata.destdir = os.path.normpath(self.opts.destdir + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name']) 244 jobdata.build_id = build['id'] 245 jobdata.results = self.opts.results_baseurl + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/' 246 jobdata.copr_id = build['copr']['id'] 247 jobdata.user_id = build['user_id'] 248 jobdata.user_name = build['copr']['owner']['name'] 249 jobdata.copr_name = build['copr']['name'] 250 return jobdata
251 252 # maybe we move this to the callback?
253 - def post_to_frontend(self, data):
254 """send data to frontend""" 255 i = 10 256 while i > 0: 257 result = self.frontend_callback.post_to_frontend(data) 258 if not result: 259 self.callback.log(self.frontend_callback.msg) 260 i -= 1 261 time.sleep(5) 262 else: 263 i = 0 264 return result
265 266 # maybe we move this to the callback?
267 - def mark_started(self, job):
268 269 270 build = {'id':job.build_id, 271 'started_on': job.started_on, 272 'results': job.results, 273 'chroot': job.chroot, 274 'status': 3, # running 275 } 276 data = {'builds':[build]} 277 278 if not self.post_to_frontend(data): 279 raise errors.CoprWorkerError, "Could not communicate to front end to submit status info"
280 281 # maybe we move this to the callback?
282 - def return_results(self, job):
283 284 self.callback.log('%s status %s. Took %s seconds' % (job.build_id, job.status, job.ended_on - job.started_on)) 285 build = {'id':job.build_id, 286 'ended_on': job.ended_on, 287 'status': job.status, 288 'chroot': job.chroot, 289 } 290 data = {'builds':[build]} 291 292 if not self.post_to_frontend(data): 293 raise errors.CoprWorkerError, "Could not communicate to front end to submit results" 294 295 os.unlink(job.jobfile)
296
297 - def run(self):
298 """ Worker should startup and check if it can function 299 for each job it takes from the jobs queue 300 run opts.setup_playbook to create the instance 301 do the build (mockremote) 302 terminate the instance 303 """ 304 305 setproctitle.setproctitle("worker %s" % self.worker_num) 306 while not self.kill_received: 307 try: 308 jobfile = self.jobs.get() 309 except Queue.Empty: 310 break 311 312 # parse the job json into our info 313 job = self.parse_job(jobfile) 314 315 if job is None: 316 self.callback.log('jobfile %s is mangled, please investigate' % jobfile) 317 time.sleep(self.opts.sleeptime) 318 continue 319 # FIXME 320 # this is our best place to sanity check the job before starting 321 # up any longer process 322 323 job.jobfile = jobfile 324 325 # spin up our build instance 326 if self.create: 327 try: 328 ip = self.spawn_instance() 329 if not ip: 330 raise errors.CoprWorkerError, "No IP found from creating instance" 331 332 except ansible.errors.AnsibleError, e: 333 self.callback.log('failure to setup instance: %s' % e) 334 raise 335 336 try: 337 # This assumes there are certs and a fedmsg config on disk 338 try: 339 if self.opts.fedmsg_enabled: 340 fedmsg.init(name="relay_inbound", cert_prefix="copr", active=True) 341 except Exception, e: 342 self.callback.log('failed to initialize fedmsg: %s' % e) 343 344 status = 1 # succeeded 345 job.started_on = time.time() 346 self.mark_started(job) 347 348 template = 'build start: user:{user} copr:{copr} build:{build} ip:{ip} pid:{pid}' 349 content = dict(user=job.user_name, copr=job.copr_name, 350 build=job.build_id, ip=ip, pid=self.pid) 351 self.event('build.start', template, content) 352 353 template = 'chroot start: chroot:{chroot} user:{user} copr:{copr} build:{build} ip:{ip} pid:{pid}' 354 content = dict(chroot=job.chroot, user=job.user_name, 355 copr=job.copr_name, build=job.build_id, 356 ip=ip, pid=self.pid) 357 self.event('chroot.start', template, content) 358 359 chroot_destdir = os.path.normpath(job.destdir + '/' + job.chroot) 360 # setup our target dir locally 361 if not os.path.exists(chroot_destdir): 362 try: 363 os.makedirs(chroot_destdir) 364 except (OSError, IOError), e: 365 msg = "Could not make results dir for job: %s - %s" % (chroot_destdir, str(e)) 366 self.callback.log(msg) 367 status = 0 # fail 368 369 if status == 1: # succeeded 370 # FIXME 371 # need a plugin hook or some mechanism to check random 372 # info about the pkgs 373 # this should use ansible to download the pkg on the remote system 374 # and run a series of checks on the package before we 375 # start the build - most importantly license checks. 376 377 self.callback.log('Starting build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id,ip, job.timeout, job.destdir, job.chroot, str(job.repos))) 378 self.callback.log('building pkgs: %s' % ' '.join(job.pkgs)) 379 try: 380 chroot_repos = list(job.repos) 381 chroot_repos.append(job.results + '/' + job.chroot) 382 chrootlogfile = chroot_destdir + '/build-%s.log' % job.build_id 383 macros = {'copr_username': job.user_name, 384 'copr_projectname': job.copr_name, 385 'vendor': "Fedora Project COPR (%s/%s)" % (job.user_name, job.copr_name)} 386 mr = mockremote.MockRemote(builder=ip, timeout=job.timeout, 387 destdir=job.destdir, chroot=job.chroot, cont=True, recurse=True, 388 repos=chroot_repos, macros=macros, buildroot_pkgs=job.buildroot_pkgs, 389 callback=mockremote.CliLogCallBack(quiet=True,logfn=chrootlogfile)) 390 mr.build_pkgs(job.pkgs) 391 except mockremote.MockRemoteError, e: 392 # record and break 393 self.callback.log('%s - %s' % (ip, e)) 394 status = 0 # failure 395 else: 396 # we can't really trace back if we just fail normally 397 # check if any pkgs didn't build 398 if mr.failed: 399 status = 0 # failure 400 self.callback.log('Finished build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id, ip, job.timeout, job.destdir, job.chroot, str(job.repos))) 401 job.ended_on = time.time() 402 403 job.status = status 404 self.return_results(job) 405 self.callback.log('worker finished build: %s' % ip) 406 template = 'build end: user:{user} copr:{copr} build:{build} ip:{ip} pid:{pid} status:{status}' 407 content = dict(user=job.user_name, copr=job.copr_name, 408 build=job.build_id, ip=ip, pid=self.pid, 409 status=job.status) 410 self.event('build.end', template, content) 411 finally: 412 # clean up the instance 413 if self.create: 414 self.terminate_instance(ip)
415