Package madgraph :: Package various :: Module cluster
[hide private]
[frames] | no frames]

Source Code for Module madgraph.various.cluster

  1  ################################################################################ 
  2  # Copyright (c) 2009 The MadGraph Development team and Contributors              
  3  # 
  4  # This file is a part of the MadGraph 5 project, an application which            
  5  # automatically generates Feynman diagrams and matrix elements for arbitrary     
  6  # high-energy processes in the Standard Model and beyond.                        
  7  # 
  8  # It is subject to the MadGraph license which should accompany this              
  9  # distribution.                                                                  
 10  #                                                                                
 11  # For more information, please visit: http://madgraph.phys.ucl.ac.be             
 12  #                                                                                
 13  ################################################################################ 
 14  import subprocess 
 15  import logging 
 16  import hashlib 
 17  import os 
 18  import time 
 19  import re 
 20   
 21  logger = logging.getLogger('madgraph.cluster')  
 22   
 23  try: 
 24      from madgraph import MadGraph5Error 
 25  except: 
 26      from internal import MadGraph5Error 
27 28 -class ClusterManagmentError(MadGraph5Error):
29 pass
30
31 -class NotImplemented(MadGraph5Error):
32 pass
33
34 -def multiple_try(nb_try=5, sleep=20):
35 36 def deco_retry(f): 37 def deco_f_retry(*args, **opt): 38 for i in range(nb_try): 39 try: 40 return f(*args, **opt) 41 except KeyboardInterrupt: 42 raise 43 except: 44 time.sleep(sleep * (i+1)) 45 raise
46 return deco_f_retry 47 return deco_retry 48
49 -def check_interupt(error=KeyboardInterrupt):
50 51 def deco_interupt(f): 52 def deco_f_interupt(self, *args, **opt): 53 try: 54 return f(self, *args, **opt) 55 except error: 56 self.remove(*args, **opt) 57 raise error
58 return deco_f_interupt 59 return deco_interupt 60
61 -class Cluster(object):
62 """Basic Class for all cluster type submission""" 63 name = 'mother class' 64
65 - def __init__(self, cluster_queue=None):
66 """Init the cluster""" 67 self.submitted = 0 68 self.submitted_ids = [] 69 self.finish = 0 70 self.cluster_queue = cluster_queue
71 72
73 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
74 """How to make one submission. Return status id on the cluster.""" 75 raise NotImplemented, 'No implementation of how to submit a job to cluster \'%s\'' % self.name
76
77 - def control(self, me_dir=None):
78 """Check the status of job associated to directory me_dir. return (idle, run, finish, fail)""" 79 if not self.submitted_ids: 80 raise NotImplemented, 'No implementation of how to control the job status to cluster \'%s\'' % self.name 81 idle, run, fail = 0, 0, 0 82 for id in self.submitted_ids[:]: 83 status = self.control_one_job(id) 84 if status == 'I': 85 idle += 1 86 elif status == 'R': 87 run += 1 88 elif status == 'F': 89 self.finish +=1 90 self.submitted_ids.remove(id) 91 else: 92 fail += 1 93 94 return idle, run, self.finish, fail
95
96 - def control_one_job(self, id):
97 """ control the status of a single job with it's cluster id """ 98 raise NotImplemented, 'No implementation of how to control the job status to cluster \'%s\'' % self.name
99 100 @check_interupt()
101 - def wait(self, me_dir, fct):
102 """Wait that all job are finish""" 103 104 while 1: 105 idle, run, finish, fail = self.control(me_dir) 106 if fail: 107 raise ClusterManagmentError('Some Jobs are in a Hold/... state. Please try to investigate or contact the IT team') 108 if idle + run == 0: 109 time.sleep(20) #security to ensure that the file are really written on the disk 110 logger.info('All jobs finished') 111 break 112 fct(idle, run, finish) 113 time.sleep(30) 114 self.submitted = 0 115 self.submitted_ids = []
116 117 @check_interupt()
118 - def launch_and_wait(self, prog, argument=[], cwd=None, stdout=None, 119 stderr=None, log=None):
120 """launch one job on the cluster and wait for it""" 121 122 special_output = False # tag for concatenate the error with the output. 123 if stderr == -2 and stdout: 124 #We are suppose to send the output to stdout 125 special_output = True 126 stderr = stdout + '.err' 127 id = self.submit(prog, argument, cwd, stdout, stderr, log) 128 while 1: 129 status = self.control_one_job(id) 130 if not status in ['R','I']: 131 time.sleep(20) #security to ensure that the file are really written on the disk 132 break 133 time.sleep(30) 134 135 if special_output: 136 # combine the stdout and the stderr 137 #wait up to 50 s to see if those files exists 138 for i in range(5): 139 if os.path.exists(stdout): 140 if not os.path.exists(stderr): 141 time.sleep(5) 142 if os.path.exists(stderr): 143 err_text = open(stderr).read() 144 if not err_text: 145 return 146 logger.warning(err_text) 147 text = open(stdout).read() 148 open(stdout,'w').write(text + err_text) 149 else: 150 return 151 time.sleep(10)
152
153 - def remove(self, *args):
154 """ """ 155 logger.warning("""This cluster didn't support job removal, 156 the jobs are still running on the cluster.""")
157
158 -class CondorCluster(Cluster):
159 """Basic class for dealing with cluster submission""" 160 161 name = 'condor' 162 163 @multiple_try()
164 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
165 """Submit the """ 166 167 text = """Executable = %(prog)s 168 output = %(stdout)s 169 error = %(stderr)s 170 log = %(log)s 171 %(argument)s 172 Universe = vanilla 173 notification = Error 174 Initialdir = %(cwd)s 175 %(requirement)s 176 getenv=True 177 queue 1 178 """ 179 180 if self.cluster_queue not in ['None', None]: 181 requirement = 'Requirements = %s=?=True' % self.cluster_queue 182 else: 183 requirement = '' 184 185 if cwd is None: 186 cwd = os.getcwd() 187 if stdout is None: 188 stdout = '/dev/null' 189 if stderr is None: 190 stderr = '/dev/null' 191 if log is None: 192 log = '/dev/null' 193 if not os.path.exists(prog): 194 prog = os.path.join(cwd, prog) 195 if argument: 196 argument = 'Arguments = %s' % ' '.join(argument) 197 else: 198 argument = '' 199 200 201 dico = {'prog': prog, 'cwd': cwd, 'stdout': stdout, 202 'stderr': stderr,'log': log,'argument': argument, 203 'requirement': requirement} 204 205 open('submit_condor','w').write(text % dico) 206 a = subprocess.Popen(['condor_submit','submit_condor'], stdout=subprocess.PIPE) 207 output = a.stdout.read() 208 #Submitting job(s). 209 #Logging submit event(s). 210 #1 job(s) submitted to cluster 2253622. 211 pat = re.compile("submitted to cluster (\d*)",re.MULTILINE) 212 try: 213 id = pat.search(output).groups()[0] 214 except: 215 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \ 216 % output 217 self.submitted += 1 218 self.submitted_ids.append(id) 219 return id
220 221 @multiple_try(nb_try=10, sleep=10)
222 - def control_one_job(self, id):
223 """ control the status of a single job with it's cluster id """ 224 cmd = 'condor_q '+str(id)+" -format \'%-2s \\n\' \'ifThenElse(JobStatus==0,\"U\",ifThenElse(JobStatus==1,\"I\",ifThenElse(JobStatus==2,\"R\",ifThenElse(JobStatus==3,\"X\",ifThenElse(JobStatus==4,\"C\",ifThenElse(JobStatus==5,\"H\",ifThenElse(JobStatus==6,\"E\",string(JobStatus))))))))\'" 225 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, 226 stderr=subprocess.PIPE) 227 228 error = status.stderr.read() 229 if status.returncode or error: 230 raise ClusterManagmentError, 'condor_q returns error: %s' % error 231 232 return status.stdout.readline().strip()
233 234 @check_interupt() 235 @multiple_try(nb_try=10, sleep=10)
236 - def control(self, me_dir):
237 """ control the status of a single job with it's cluster id """ 238 239 if not self.submitted_ids: 240 return 0, 0, 0, 0 241 242 cmd = "condor_q " + ' '.join(self.submitted_ids) + " -format \'%-2s \\n\' \'ifThenElse(JobStatus==0,\"U\",ifThenElse(JobStatus==1,\"I\",ifThenElse(JobStatus==2,\"R\",ifThenElse(JobStatus==3,\"X\",ifThenElse(JobStatus==4,\"C\",ifThenElse(JobStatus==5,\"H\",ifThenElse(JobStatus==6,\"E\",string(JobStatus))))))))\'" 243 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, 244 stderr=subprocess.PIPE) 245 error = status.stderr.read() 246 if status.returncode or error: 247 raise ClusterManagmentError, 'condor_q returns error: %s' % error 248 249 250 idle, run, fail = 0, 0, 0 251 for line in status.stdout: 252 status = line.strip() 253 if status in ['I','U']: 254 idle += 1 255 elif status == 'R': 256 run += 1 257 elif status != 'C': 258 fail += 1 259 260 return idle, run, self.submitted - (idle+run+fail), fail
261 262 @multiple_try()
263 - def remove(self, *args):
264 """Clean the jobson the cluster""" 265 266 if not self.submitted_ids: 267 return 268 cmd = "condor_rm %s" % ' '.join(self.submitted_ids) 269 270 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
271
272 -class PBSCluster(Cluster):
273 """Basic class for dealing with cluster submission""" 274 275 name = 'pbs' 276 idle_tag = ['Q'] 277 running_tag = ['T','E','R'] 278 complete_tag = ['C'] 279 280 @multiple_try()
281 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
282 """Submit the prog to the cluser""" 283 284 me_dir = os.path.realpath(os.path.join(cwd,prog)).rsplit('/SubProcesses',1)[0] 285 me_dir = hashlib.md5(me_dir).hexdigest()[-14:] 286 if not me_dir[0].isalpha(): 287 me_dir = 'a' + me_dir[1:] 288 289 text = "" 290 if cwd is None: 291 cwd = os.getcwd() 292 else: 293 text = " cd %s;" % cwd 294 if stdout is None: 295 stdout = '/dev/null' 296 if stderr is None: 297 stderr = '/dev/null' 298 elif stderr == -2: # -2 is subprocess.STDOUT 299 stderr = stdout 300 if log is None: 301 log = '/dev/null' 302 303 text += prog 304 if argument: 305 text += ' ' + ' '.join(argument) 306 307 command = ['qsub','-o', stdout, 308 '-N', me_dir, 309 '-e', stderr, 310 '-V'] 311 312 if self.cluster_queue and self.cluster_queue != 'None': 313 command.extend(['-q', self.cluster_queue]) 314 315 a = subprocess.Popen(command, stdout=subprocess.PIPE, 316 stderr=subprocess.STDOUT, 317 stdin=subprocess.PIPE, cwd=cwd) 318 319 output = a.communicate(text)[0] 320 id = output.split('.')[0] 321 if not id.isdigit(): 322 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \ 323 % output 324 self.submitted += 1 325 self.submitted_ids.append(id) 326 return id
327 328 @multiple_try()
329 - def control_one_job(self, id):
330 """ control the status of a single job with it's cluster id """ 331 cmd = 'qstat '+str(id) 332 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, 333 stderr=open(os.devnull,'w')) 334 335 for line in status.stdout: 336 line = line.strip() 337 if 'Unknown' in line: 338 return 'F' 339 elif line.startswith(str(id)): 340 status = line.split()[4] 341 if status in self.idle_tag: 342 return 'I' 343 elif status in self.running_tag: 344 return 'R' 345 return 'F'
346 347 348 @multiple_try()
349 - def control(self, me_dir):
350 """ control the status of a single job with it's cluster id """ 351 cmd = "qstat" 352 status = subprocess.Popen([cmd], stdout=subprocess.PIPE) 353 354 if me_dir.endswith('/'): 355 me_dir = me_dir[:-1] 356 me_dir = hashlib.md5(me_dir).hexdigest()[-14:] 357 if not me_dir[0].isalpha(): 358 me_dir = 'a' + me_dir[1:] 359 360 idle, run, fail = 0, 0, 0 361 for line in status.stdout: 362 if me_dir in line: 363 status = line.split()[4] 364 if status in self.idle_tag: 365 idle += 1 366 elif status in self.running_tag: 367 run += 1 368 elif status in self.complete_tag: 369 continue 370 else: 371 fail += 1 372 373 return idle, run, self.submitted - (idle+run+fail), fail
374 375 @multiple_try()
376 - def remove(self, *args):
377 """Clean the jobs on the cluster""" 378 379 if not self.submitted_ids: 380 return 381 cmd = "qdel %s" % ' '.join(self.submitted_ids) 382 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
383
384 385 -class SGECluster(Cluster):
386 """Basic class for dealing with cluster submission""" 387 # Class written by Arian Abrahantes. 388 389 name = 'sge' 390 idle_tag = ['qw', 'hqw','hRqw','w'] 391 running_tag = ['r','t','Rr','Rt'] 392
393 - def def_get_path(self,location):
394 """replace string for path issues""" 395 location = os.path.realpath(location) 396 homePath = os.getenv("HOME") 397 if homePath: 398 location = location.replace(homePath,'$HOME') 399 return location
400 401 @multiple_try()
402 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
403 """Submit the prog to the cluser""" 404 405 me_dir = os.path.realpath(os.path.join(cwd,prog)).rsplit('/SubProcesses',1)[0] 406 me_dir = hashlib.md5(me_dir).hexdigest()[-10:] 407 if not me_dir[0].isalpha(): 408 me_dir = 'a' + me_dir[1:] 409 410 text = "" 411 if cwd is None: 412 #cwd = os.getcwd() 413 cwd = self.def_get_path(os.getcwd()) 414 else: 415 text = " cd %s;" % cwd 416 cwd1 = self.def_get_path(cwd) 417 text = " cd %s;" % cwd1 418 if stdout is None: 419 stdout = '/dev/null' 420 else: 421 stdout = self.def_get_path(stdout) 422 if stderr is None: 423 stderr = '/dev/null' 424 elif stderr == -2: # -2 is subprocess.STDOUT 425 stderr = stdout 426 if log is None: 427 log = '/dev/null' 428 else: 429 log = self.def_get_path(log) 430 431 text += prog 432 if argument: 433 text += ' ' + ' '.join(argument) 434 435 #if anything slips through argument 436 #print "!=== inteded change ",text.replace('/srv/nfs','') 437 #text = text.replace('/srv/nfs','') 438 homePath = os.getenv("HOME") 439 if homePath: 440 text = text.replace(homePath,'$HOME') 441 442 logger.debug("!=== input %s" % text) 443 logger.debug("!=== output %s" % stdout) 444 logger.debug("!=== error %s" % stderr) 445 logger.debug("!=== logs %s" % log) 446 447 command = ['qsub','-o', stdout, 448 '-N', me_dir, 449 '-e', stderr, 450 '-V'] 451 452 if self.cluster_queue and self.cluster_queue != 'None': 453 command.extend(['-q', self.cluster_queue]) 454 455 a = subprocess.Popen(command, stdout=subprocess.PIPE, 456 stderr=subprocess.STDOUT, 457 stdin=subprocess.PIPE, cwd=cwd) 458 459 output = a.communicate(text)[0] 460 id = output.split(' ')[2] 461 if not id.isdigit(): 462 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \ 463 % output 464 self.submitted += 1 465 self.submitted_ids.append(id) 466 logger.debug(output) 467 468 return id
469 470 @multiple_try()
471 - def control_one_job(self, id):
472 """ control the status of a single job with it's cluster id """ 473 #cmd = 'qstat '+str(id) 474 cmd = 'qstat ' 475 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) 476 for line in status.stdout: 477 #print "!==",line 478 #line = line.strip() 479 #if 'Unknown' in line: 480 # return 'F' 481 #elif line.startswith(str(id)): 482 # status = line.split()[4] 483 if str(id) in line: 484 status = line.split()[4] 485 #print "!=status", status 486 if status in self.idle_tag: 487 return 'I' 488 elif status in self.running_tag: 489 return 'R' 490 return 'F'
491 492 @multiple_try()
493 - def control(self, me_dir):
494 """ control the status of a single job with it's cluster id """ 495 cmd = "qstat " 496 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) 497 498 if me_dir.endswith('/'): 499 me_dir = me_dir[:-1] 500 me_dir = hashlib.md5(me_dir).hexdigest()[-10:] 501 if not me_dir[0].isalpha(): 502 me_dir = 'a' + me_dir[1:] 503 504 idle, run, fail = 0, 0, 0 505 for line in status.stdout: 506 if me_dir in line: 507 status = line.split()[4] 508 if status in self.idle_tag: 509 idle += 1 510 elif status in self.running_tag: 511 run += 1 512 else: 513 logger.debug(line) 514 fail += 1 515 516 return idle, run, self.submitted - (idle+run+fail), fail
517 518 519 520 @multiple_try()
521 - def remove(self, *args):
522 """Clean the jobs on the cluster""" 523 524 if not self.submitted_ids: 525 return 526 cmd = "qdel %s" % ' '.join(self.submitted_ids) 527 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
528
529 530 -class LSFCluster(Cluster):
531 """Basic class for dealing with cluster submission""" 532 533 name = 'lsf' 534 535 @multiple_try()
536 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
537 """Submit the """ 538 539 me_dir = os.path.realpath(os.path.join(cwd,prog)).rsplit('/SubProcesses',1)[0] 540 me_dir = hashlib.md5(me_dir).hexdigest()[-14:] 541 if not me_dir[0].isalpha(): 542 me_dir = 'a' + me_dir[1:] 543 544 text = "" 545 if cwd is None: 546 cwd = os.getcwd() 547 else: 548 text = " cd %s;" % cwd 549 if stdout is None: 550 stdout = '/dev/null' 551 if stderr is None: 552 stderr = '/dev/null' 553 elif stderr == -2: # -2 is subprocess.STDOUT 554 stderr = stdout 555 if log is None: 556 log = '/dev/null' 557 558 text += prog 559 if argument: 560 text += ' ' + ' '.join(argument) 561 562 command = ['bsub','-o', stdout, 563 '-J', me_dir, 564 '-e', stderr] 565 566 if self.cluster_queue and self.cluster_queue != 'None': 567 command.extend(['-q', self.cluster_queue]) 568 569 a = subprocess.Popen(command, stdout=subprocess.PIPE, 570 stderr=subprocess.STDOUT, 571 stdin=subprocess.PIPE, cwd=cwd) 572 573 output = a.communicate(text)[0] 574 #Job <nnnn> is submitted to default queue <normal>. 575 try: 576 id = output.split('>',1)[0].split('<')[1] 577 except: 578 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \ 579 % output 580 if not id.isdigit(): 581 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \ 582 % output 583 self.submitted += 1 584 self.submitted_ids.append(id) 585 return id
586 587 588 @multiple_try()
589 - def control_one_job(self, id):
590 """ control the status of a single job with it's cluster id """ 591 592 cmd = 'bjobs '+str(id) 593 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) 594 595 for line in status.stdout: 596 line = line.strip().upper() 597 if 'JOBID' in line: 598 continue 599 elif str(id) not in line: 600 continue 601 status = line.split()[2] 602 if status == 'RUN': 603 return 'R' 604 elif status == 'PEND': 605 return 'I' 606 elif status == 'DONE': 607 return 'F' 608 else: 609 return 'H' 610 return 'F'
611 612 @multiple_try()
613 - def control(self, me_dir):
614 """ control the status of a single job with it's cluster id """ 615 616 if not self.submitted_ids: 617 return 0, 0, 0, 0 618 619 cmd = "bjobs " + ' '.join(self.submitted_ids) 620 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) 621 622 idle, run, fail = 0, 0, 0 623 for line in status.stdout: 624 line = line.strip() 625 if 'JOBID' in line: 626 continue 627 splitline = line.split() 628 id = splitline[0] 629 if id not in self.submitted_ids: 630 continue 631 status = splitline[2] 632 if status == 'RUN': 633 run += 1 634 elif status == 'PEND': 635 idle += 1 636 elif status == 'DONE': 637 pass 638 else: 639 fail += 1 640 641 return idle, run, self.submitted - (idle+run+fail), fail
642 643 @multiple_try()
644 - def remove(self, *args):
645 """Clean the jobs on the cluster""" 646 647 if not self.submitted_ids: 648 return 649 cmd = "bdel %s" % ' '.join(self.submitted_ids) 650 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
651
652 -class GECluster(Cluster):
653 """Class for dealing with cluster submission on a GE cluster""" 654 655 name = 'ge' 656 idle_tag = ['qw'] 657 running_tag = ['r'] 658 659 @multiple_try()
660 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
661 """Submit the prog to the cluser""" 662 663 text = "" 664 if cwd is None: 665 cwd = os.getcwd() 666 else: 667 text = " cd %s; bash " % cwd 668 if stdout is None: 669 stdout = os.path.join(cwd, "log.%s" % prog.split('/')[-1]) 670 if stderr is None: 671 stderr = os.path.join(cwd, "err.%s" % prog.split('/')[-1]) 672 elif stderr == -2: # -2 is subprocess.STDOUT 673 stderr = stdout 674 if log is None: 675 log = '/dev/null' 676 677 text += prog 678 if argument: 679 text += ' ' + ' '.join(argument) 680 text += '\n' 681 tmp_submit = os.path.join(cwd, 'tmp_submit') 682 open(tmp_submit,'w').write(text) 683 684 a = subprocess.Popen(['qsub','-o', stdout, 685 '-e', stderr, 686 tmp_submit], 687 stdout=subprocess.PIPE, 688 stderr=subprocess.STDOUT, 689 stdin=subprocess.PIPE, cwd=cwd) 690 691 output = a.communicate()[0] 692 #Your job 874511 ("test.sh") has been submitted 693 pat = re.compile("Your job (\d*) \(",re.MULTILINE) 694 try: 695 id = pat.search(output).groups()[0] 696 except: 697 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \ 698 % output 699 self.submitted += 1 700 self.submitted_ids.append(id) 701 return id
702 703 @multiple_try()
704 - def control_one_job(self, id):
705 """ control the status of a single job with it's cluster id """ 706 cmd = 'qstat | grep '+str(id) 707 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) 708 if not status: 709 return 'F' 710 #874516 0.00000 test.sh alwall qw 03/04/2012 22:30:35 1 711 pat = re.compile("^(\d+)\s+[\d\.]+\s+[\w\d\.]+\s+[\w\d\.]+\s+(\w+)\s") 712 stat = '' 713 for line in status.stdout.read().split('\n'): 714 if not line: 715 continue 716 line = line.strip() 717 try: 718 groups = pat.search(line).groups() 719 except: 720 raise ClusterManagmentError, 'bad syntax for stat: \n\"%s\"' % line 721 if groups[0] != id: continue 722 stat = groups[1] 723 if not stat: 724 return 'F' 725 if stat in self.idle_tag: 726 return 'I' 727 if stat in self.running_tag: 728 return 'R'
729 730 @multiple_try()
731 - def control(self, me_dir=None):
732 """Check the status of job associated to directory me_dir. return (idle, run, finish, fail)""" 733 if not self.submitted_ids: 734 return 0, 0, 0, 0 735 idle, run, fail = 0, 0, 0 736 ongoing = [] 737 for statusflag in ['p', 'r', 'sh']: 738 cmd = 'qstat -s %s' % statusflag 739 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) 740 #874516 0.00000 test.sh alwall qw 03/04/2012 22:30:35 1 741 pat = re.compile("^(\d+)") 742 for line in status.stdout.read().split('\n'): 743 line = line.strip() 744 try: 745 id = pat.search(line).groups()[0] 746 except: 747 pass 748 else: 749 if id not in self.submitted_ids: 750 continue 751 ongoing.append(id) 752 if statusflag == 'p': 753 idle += 1 754 if statusflag == 'r': 755 run += 1 756 if statusflag == 'sh': 757 fail += 1 758 759 self.submitted_ids = ongoing 760 761 return idle, run, self.submitted - idle - run - fail, fail
762 763 @multiple_try()
764 - def remove(self, *args):
765 """Clean the jobs on the cluster""" 766 767 if not self.submitted_ids: 768 return 769 cmd = "qdel %s" % ' '.join(self.submitted_ids) 770 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
771 772 from_name = {'condor':CondorCluster, 'pbs': PBSCluster, 'sge': SGECluster, 773 'lsf': LSFCluster, 'ge':GECluster} 774