1
2
3
4
5
6
7
8
9
10
11
12
13
14 import subprocess
15 import logging
16 import hashlib
17 import os
18 import time
19 import re
20
21 logger = logging.getLogger('madgraph.cluster')
22
23 try:
24 from madgraph import MadGraph5Error
25 except:
26 from internal import MadGraph5Error
30
33
35
36 def deco_retry(f):
37 def deco_f_retry(*args, **opt):
38 for i in range(nb_try):
39 try:
40 return f(*args, **opt)
41 except KeyboardInterrupt:
42 raise
43 except:
44 time.sleep(sleep * (i+1))
45 raise
46 return deco_f_retry
47 return deco_retry
48
50
51 def deco_interupt(f):
52 def deco_f_interupt(self, *args, **opt):
53 try:
54 return f(self, *args, **opt)
55 except error:
56 self.remove(*args, **opt)
57 raise error
58 return deco_f_interupt
59 return deco_interupt
60
62 """Basic Class for all cluster type submission"""
63 name = 'mother class'
64
66 """Init the cluster"""
67 self.submitted = 0
68 self.submitted_ids = []
69 self.finish = 0
70 self.cluster_queue = cluster_queue
71
72
73 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
74 """How to make one submission. Return status id on the cluster."""
75 raise NotImplemented, 'No implementation of how to submit a job to cluster \'%s\'' % self.name
76
78 """Check the status of job associated to directory me_dir. return (idle, run, finish, fail)"""
79 if not self.submitted_ids:
80 raise NotImplemented, 'No implementation of how to control the job status to cluster \'%s\'' % self.name
81 idle, run, fail = 0, 0, 0
82 for id in self.submitted_ids[:]:
83 status = self.control_one_job(id)
84 if status == 'I':
85 idle += 1
86 elif status == 'R':
87 run += 1
88 elif status == 'F':
89 self.finish +=1
90 self.submitted_ids.remove(id)
91 else:
92 fail += 1
93
94 return idle, run, self.finish, fail
95
97 """ control the status of a single job with it's cluster id """
98 raise NotImplemented, 'No implementation of how to control the job status to cluster \'%s\'' % self.name
99
100 @check_interupt()
101 - def wait(self, me_dir, fct):
102 """Wait that all job are finish"""
103
104 while 1:
105 idle, run, finish, fail = self.control(me_dir)
106 if fail:
107 raise ClusterManagmentError('Some Jobs are in a Hold/... state. Please try to investigate or contact the IT team')
108 if idle + run == 0:
109 time.sleep(20)
110 logger.info('All jobs finished')
111 break
112 fct(idle, run, finish)
113 time.sleep(30)
114 self.submitted = 0
115 self.submitted_ids = []
116
117 @check_interupt()
118 - def launch_and_wait(self, prog, argument=[], cwd=None, stdout=None,
119 stderr=None, log=None):
120 """launch one job on the cluster and wait for it"""
121
122 special_output = False
123 if stderr == -2 and stdout:
124
125 special_output = True
126 stderr = stdout + '.err'
127 id = self.submit(prog, argument, cwd, stdout, stderr, log)
128 while 1:
129 status = self.control_one_job(id)
130 if not status in ['R','I']:
131 time.sleep(20)
132 break
133 time.sleep(30)
134
135 if special_output:
136
137
138 for i in range(5):
139 if os.path.exists(stdout):
140 if not os.path.exists(stderr):
141 time.sleep(5)
142 if os.path.exists(stderr):
143 err_text = open(stderr).read()
144 if not err_text:
145 return
146 logger.warning(err_text)
147 text = open(stdout).read()
148 open(stdout,'w').write(text + err_text)
149 else:
150 return
151 time.sleep(10)
152
154 """ """
155 logger.warning("""This cluster didn't support job removal,
156 the jobs are still running on the cluster.""")
157
159 """Basic class for dealing with cluster submission"""
160
161 name = 'condor'
162
163 @multiple_try()
164 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
165 """Submit the """
166
167 text = """Executable = %(prog)s
168 output = %(stdout)s
169 error = %(stderr)s
170 log = %(log)s
171 %(argument)s
172 Universe = vanilla
173 notification = Error
174 Initialdir = %(cwd)s
175 %(requirement)s
176 getenv=True
177 queue 1
178 """
179
180 if self.cluster_queue not in ['None', None]:
181 requirement = 'Requirements = %s=?=True' % self.cluster_queue
182 else:
183 requirement = ''
184
185 if cwd is None:
186 cwd = os.getcwd()
187 if stdout is None:
188 stdout = '/dev/null'
189 if stderr is None:
190 stderr = '/dev/null'
191 if log is None:
192 log = '/dev/null'
193 if not os.path.exists(prog):
194 prog = os.path.join(cwd, prog)
195 if argument:
196 argument = 'Arguments = %s' % ' '.join(argument)
197 else:
198 argument = ''
199
200
201 dico = {'prog': prog, 'cwd': cwd, 'stdout': stdout,
202 'stderr': stderr,'log': log,'argument': argument,
203 'requirement': requirement}
204
205 open('submit_condor','w').write(text % dico)
206 a = subprocess.Popen(['condor_submit','submit_condor'], stdout=subprocess.PIPE)
207 output = a.stdout.read()
208
209
210
211 pat = re.compile("submitted to cluster (\d*)",re.MULTILINE)
212 try:
213 id = pat.search(output).groups()[0]
214 except:
215 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \
216 % output
217 self.submitted += 1
218 self.submitted_ids.append(id)
219 return id
220
221 @multiple_try(nb_try=10, sleep=10)
223 """ control the status of a single job with it's cluster id """
224 cmd = 'condor_q '+str(id)+" -format \'%-2s \\n\' \'ifThenElse(JobStatus==0,\"U\",ifThenElse(JobStatus==1,\"I\",ifThenElse(JobStatus==2,\"R\",ifThenElse(JobStatus==3,\"X\",ifThenElse(JobStatus==4,\"C\",ifThenElse(JobStatus==5,\"H\",ifThenElse(JobStatus==6,\"E\",string(JobStatus))))))))\'"
225 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE,
226 stderr=subprocess.PIPE)
227
228 error = status.stderr.read()
229 if status.returncode or error:
230 raise ClusterManagmentError, 'condor_q returns error: %s' % error
231
232 return status.stdout.readline().strip()
233
234 @check_interupt()
235 @multiple_try(nb_try=10, sleep=10)
237 """ control the status of a single job with it's cluster id """
238
239 if not self.submitted_ids:
240 return 0, 0, 0, 0
241
242 cmd = "condor_q " + ' '.join(self.submitted_ids) + " -format \'%-2s \\n\' \'ifThenElse(JobStatus==0,\"U\",ifThenElse(JobStatus==1,\"I\",ifThenElse(JobStatus==2,\"R\",ifThenElse(JobStatus==3,\"X\",ifThenElse(JobStatus==4,\"C\",ifThenElse(JobStatus==5,\"H\",ifThenElse(JobStatus==6,\"E\",string(JobStatus))))))))\'"
243 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE,
244 stderr=subprocess.PIPE)
245 error = status.stderr.read()
246 if status.returncode or error:
247 raise ClusterManagmentError, 'condor_q returns error: %s' % error
248
249
250 idle, run, fail = 0, 0, 0
251 for line in status.stdout:
252 status = line.strip()
253 if status in ['I','U']:
254 idle += 1
255 elif status == 'R':
256 run += 1
257 elif status != 'C':
258 fail += 1
259
260 return idle, run, self.submitted - (idle+run+fail), fail
261
262 @multiple_try()
264 """Clean the jobson the cluster"""
265
266 if not self.submitted_ids:
267 return
268 cmd = "condor_rm %s" % ' '.join(self.submitted_ids)
269
270 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
271
273 """Basic class for dealing with cluster submission"""
274
275 name = 'pbs'
276 idle_tag = ['Q']
277 running_tag = ['T','E','R']
278 complete_tag = ['C']
279
280 @multiple_try()
281 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
282 """Submit the prog to the cluser"""
283
284 me_dir = os.path.realpath(os.path.join(cwd,prog)).rsplit('/SubProcesses',1)[0]
285 me_dir = hashlib.md5(me_dir).hexdigest()[-14:]
286 if not me_dir[0].isalpha():
287 me_dir = 'a' + me_dir[1:]
288
289 text = ""
290 if cwd is None:
291 cwd = os.getcwd()
292 else:
293 text = " cd %s;" % cwd
294 if stdout is None:
295 stdout = '/dev/null'
296 if stderr is None:
297 stderr = '/dev/null'
298 elif stderr == -2:
299 stderr = stdout
300 if log is None:
301 log = '/dev/null'
302
303 text += prog
304 if argument:
305 text += ' ' + ' '.join(argument)
306
307 command = ['qsub','-o', stdout,
308 '-N', me_dir,
309 '-e', stderr,
310 '-V']
311
312 if self.cluster_queue and self.cluster_queue != 'None':
313 command.extend(['-q', self.cluster_queue])
314
315 a = subprocess.Popen(command, stdout=subprocess.PIPE,
316 stderr=subprocess.STDOUT,
317 stdin=subprocess.PIPE, cwd=cwd)
318
319 output = a.communicate(text)[0]
320 id = output.split('.')[0]
321 if not id.isdigit():
322 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \
323 % output
324 self.submitted += 1
325 self.submitted_ids.append(id)
326 return id
327
328 @multiple_try()
330 """ control the status of a single job with it's cluster id """
331 cmd = 'qstat '+str(id)
332 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE,
333 stderr=open(os.devnull,'w'))
334
335 for line in status.stdout:
336 line = line.strip()
337 if 'Unknown' in line:
338 return 'F'
339 elif line.startswith(str(id)):
340 status = line.split()[4]
341 if status in self.idle_tag:
342 return 'I'
343 elif status in self.running_tag:
344 return 'R'
345 return 'F'
346
347
348 @multiple_try()
350 """ control the status of a single job with it's cluster id """
351 cmd = "qstat"
352 status = subprocess.Popen([cmd], stdout=subprocess.PIPE)
353
354 if me_dir.endswith('/'):
355 me_dir = me_dir[:-1]
356 me_dir = hashlib.md5(me_dir).hexdigest()[-14:]
357 if not me_dir[0].isalpha():
358 me_dir = 'a' + me_dir[1:]
359
360 idle, run, fail = 0, 0, 0
361 for line in status.stdout:
362 if me_dir in line:
363 status = line.split()[4]
364 if status in self.idle_tag:
365 idle += 1
366 elif status in self.running_tag:
367 run += 1
368 elif status in self.complete_tag:
369 continue
370 else:
371 fail += 1
372
373 return idle, run, self.submitted - (idle+run+fail), fail
374
375 @multiple_try()
377 """Clean the jobs on the cluster"""
378
379 if not self.submitted_ids:
380 return
381 cmd = "qdel %s" % ' '.join(self.submitted_ids)
382 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
383
386 """Basic class for dealing with cluster submission"""
387
388
389 name = 'sge'
390 idle_tag = ['qw', 'hqw','hRqw','w']
391 running_tag = ['r','t','Rr','Rt']
392
394 """replace string for path issues"""
395 location = os.path.realpath(location)
396 homePath = os.getenv("HOME")
397 if homePath:
398 location = location.replace(homePath,'$HOME')
399 return location
400
401 @multiple_try()
402 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
403 """Submit the prog to the cluser"""
404
405 me_dir = os.path.realpath(os.path.join(cwd,prog)).rsplit('/SubProcesses',1)[0]
406 me_dir = hashlib.md5(me_dir).hexdigest()[-10:]
407 if not me_dir[0].isalpha():
408 me_dir = 'a' + me_dir[1:]
409
410 text = ""
411 if cwd is None:
412
413 cwd = self.def_get_path(os.getcwd())
414 else:
415 text = " cd %s;" % cwd
416 cwd1 = self.def_get_path(cwd)
417 text = " cd %s;" % cwd1
418 if stdout is None:
419 stdout = '/dev/null'
420 else:
421 stdout = self.def_get_path(stdout)
422 if stderr is None:
423 stderr = '/dev/null'
424 elif stderr == -2:
425 stderr = stdout
426 if log is None:
427 log = '/dev/null'
428 else:
429 log = self.def_get_path(log)
430
431 text += prog
432 if argument:
433 text += ' ' + ' '.join(argument)
434
435
436
437
438 homePath = os.getenv("HOME")
439 if homePath:
440 text = text.replace(homePath,'$HOME')
441
442 logger.debug("!=== input %s" % text)
443 logger.debug("!=== output %s" % stdout)
444 logger.debug("!=== error %s" % stderr)
445 logger.debug("!=== logs %s" % log)
446
447 command = ['qsub','-o', stdout,
448 '-N', me_dir,
449 '-e', stderr,
450 '-V']
451
452 if self.cluster_queue and self.cluster_queue != 'None':
453 command.extend(['-q', self.cluster_queue])
454
455 a = subprocess.Popen(command, stdout=subprocess.PIPE,
456 stderr=subprocess.STDOUT,
457 stdin=subprocess.PIPE, cwd=cwd)
458
459 output = a.communicate(text)[0]
460 id = output.split(' ')[2]
461 if not id.isdigit():
462 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \
463 % output
464 self.submitted += 1
465 self.submitted_ids.append(id)
466 logger.debug(output)
467
468 return id
469
470 @multiple_try()
472 """ control the status of a single job with it's cluster id """
473
474 cmd = 'qstat '
475 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE)
476 for line in status.stdout:
477
478
479
480
481
482
483 if str(id) in line:
484 status = line.split()[4]
485
486 if status in self.idle_tag:
487 return 'I'
488 elif status in self.running_tag:
489 return 'R'
490 return 'F'
491
492 @multiple_try()
494 """ control the status of a single job with it's cluster id """
495 cmd = "qstat "
496 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE)
497
498 if me_dir.endswith('/'):
499 me_dir = me_dir[:-1]
500 me_dir = hashlib.md5(me_dir).hexdigest()[-10:]
501 if not me_dir[0].isalpha():
502 me_dir = 'a' + me_dir[1:]
503
504 idle, run, fail = 0, 0, 0
505 for line in status.stdout:
506 if me_dir in line:
507 status = line.split()[4]
508 if status in self.idle_tag:
509 idle += 1
510 elif status in self.running_tag:
511 run += 1
512 else:
513 logger.debug(line)
514 fail += 1
515
516 return idle, run, self.submitted - (idle+run+fail), fail
517
518
519
520 @multiple_try()
522 """Clean the jobs on the cluster"""
523
524 if not self.submitted_ids:
525 return
526 cmd = "qdel %s" % ' '.join(self.submitted_ids)
527 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
528
531 """Basic class for dealing with cluster submission"""
532
533 name = 'lsf'
534
535 @multiple_try()
536 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
537 """Submit the """
538
539 me_dir = os.path.realpath(os.path.join(cwd,prog)).rsplit('/SubProcesses',1)[0]
540 me_dir = hashlib.md5(me_dir).hexdigest()[-14:]
541 if not me_dir[0].isalpha():
542 me_dir = 'a' + me_dir[1:]
543
544 text = ""
545 if cwd is None:
546 cwd = os.getcwd()
547 else:
548 text = " cd %s;" % cwd
549 if stdout is None:
550 stdout = '/dev/null'
551 if stderr is None:
552 stderr = '/dev/null'
553 elif stderr == -2:
554 stderr = stdout
555 if log is None:
556 log = '/dev/null'
557
558 text += prog
559 if argument:
560 text += ' ' + ' '.join(argument)
561
562 command = ['bsub','-o', stdout,
563 '-J', me_dir,
564 '-e', stderr]
565
566 if self.cluster_queue and self.cluster_queue != 'None':
567 command.extend(['-q', self.cluster_queue])
568
569 a = subprocess.Popen(command, stdout=subprocess.PIPE,
570 stderr=subprocess.STDOUT,
571 stdin=subprocess.PIPE, cwd=cwd)
572
573 output = a.communicate(text)[0]
574
575 try:
576 id = output.split('>',1)[0].split('<')[1]
577 except:
578 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \
579 % output
580 if not id.isdigit():
581 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \
582 % output
583 self.submitted += 1
584 self.submitted_ids.append(id)
585 return id
586
587
588 @multiple_try()
590 """ control the status of a single job with it's cluster id """
591
592 cmd = 'bjobs '+str(id)
593 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE)
594
595 for line in status.stdout:
596 line = line.strip().upper()
597 if 'JOBID' in line:
598 continue
599 elif str(id) not in line:
600 continue
601 status = line.split()[2]
602 if status == 'RUN':
603 return 'R'
604 elif status == 'PEND':
605 return 'I'
606 elif status == 'DONE':
607 return 'F'
608 else:
609 return 'H'
610 return 'F'
611
612 @multiple_try()
614 """ control the status of a single job with it's cluster id """
615
616 if not self.submitted_ids:
617 return 0, 0, 0, 0
618
619 cmd = "bjobs " + ' '.join(self.submitted_ids)
620 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE)
621
622 idle, run, fail = 0, 0, 0
623 for line in status.stdout:
624 line = line.strip()
625 if 'JOBID' in line:
626 continue
627 splitline = line.split()
628 id = splitline[0]
629 if id not in self.submitted_ids:
630 continue
631 status = splitline[2]
632 if status == 'RUN':
633 run += 1
634 elif status == 'PEND':
635 idle += 1
636 elif status == 'DONE':
637 pass
638 else:
639 fail += 1
640
641 return idle, run, self.submitted - (idle+run+fail), fail
642
643 @multiple_try()
645 """Clean the jobs on the cluster"""
646
647 if not self.submitted_ids:
648 return
649 cmd = "bdel %s" % ' '.join(self.submitted_ids)
650 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
651
653 """Class for dealing with cluster submission on a GE cluster"""
654
655 name = 'ge'
656 idle_tag = ['qw']
657 running_tag = ['r']
658
659 @multiple_try()
660 - def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, log=None):
661 """Submit the prog to the cluser"""
662
663 text = ""
664 if cwd is None:
665 cwd = os.getcwd()
666 else:
667 text = " cd %s; bash " % cwd
668 if stdout is None:
669 stdout = os.path.join(cwd, "log.%s" % prog.split('/')[-1])
670 if stderr is None:
671 stderr = os.path.join(cwd, "err.%s" % prog.split('/')[-1])
672 elif stderr == -2:
673 stderr = stdout
674 if log is None:
675 log = '/dev/null'
676
677 text += prog
678 if argument:
679 text += ' ' + ' '.join(argument)
680 text += '\n'
681 tmp_submit = os.path.join(cwd, 'tmp_submit')
682 open(tmp_submit,'w').write(text)
683
684 a = subprocess.Popen(['qsub','-o', stdout,
685 '-e', stderr,
686 tmp_submit],
687 stdout=subprocess.PIPE,
688 stderr=subprocess.STDOUT,
689 stdin=subprocess.PIPE, cwd=cwd)
690
691 output = a.communicate()[0]
692
693 pat = re.compile("Your job (\d*) \(",re.MULTILINE)
694 try:
695 id = pat.search(output).groups()[0]
696 except:
697 raise ClusterManagmentError, 'fail to submit to the cluster: \n%s' \
698 % output
699 self.submitted += 1
700 self.submitted_ids.append(id)
701 return id
702
703 @multiple_try()
705 """ control the status of a single job with it's cluster id """
706 cmd = 'qstat | grep '+str(id)
707 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE)
708 if not status:
709 return 'F'
710
711 pat = re.compile("^(\d+)\s+[\d\.]+\s+[\w\d\.]+\s+[\w\d\.]+\s+(\w+)\s")
712 stat = ''
713 for line in status.stdout.read().split('\n'):
714 if not line:
715 continue
716 line = line.strip()
717 try:
718 groups = pat.search(line).groups()
719 except:
720 raise ClusterManagmentError, 'bad syntax for stat: \n\"%s\"' % line
721 if groups[0] != id: continue
722 stat = groups[1]
723 if not stat:
724 return 'F'
725 if stat in self.idle_tag:
726 return 'I'
727 if stat in self.running_tag:
728 return 'R'
729
730 @multiple_try()
732 """Check the status of job associated to directory me_dir. return (idle, run, finish, fail)"""
733 if not self.submitted_ids:
734 return 0, 0, 0, 0
735 idle, run, fail = 0, 0, 0
736 ongoing = []
737 for statusflag in ['p', 'r', 'sh']:
738 cmd = 'qstat -s %s' % statusflag
739 status = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE)
740
741 pat = re.compile("^(\d+)")
742 for line in status.stdout.read().split('\n'):
743 line = line.strip()
744 try:
745 id = pat.search(line).groups()[0]
746 except:
747 pass
748 else:
749 if id not in self.submitted_ids:
750 continue
751 ongoing.append(id)
752 if statusflag == 'p':
753 idle += 1
754 if statusflag == 'r':
755 run += 1
756 if statusflag == 'sh':
757 fail += 1
758
759 self.submitted_ids = ongoing
760
761 return idle, run, self.submitted - idle - run - fail, fail
762
763 @multiple_try()
765 """Clean the jobs on the cluster"""
766
767 if not self.submitted_ids:
768 return
769 cmd = "qdel %s" % ' '.join(self.submitted_ids)
770 status = subprocess.Popen([cmd], shell=True, stdout=open(os.devnull,'w'))
771
772 from_name = {'condor':CondorCluster, 'pbs': PBSCluster, 'sge': SGECluster,
773 'lsf': LSFCluster, 'ge':GECluster}
774