| 
735
 | 
     1 #!/usr/bin/env python
 | 
| 
 | 
     2 
 | 
| 
 | 
     3 """
 | 
| 
 | 
     4 multiprocessing/subprocess front-end
 | 
| 
 | 
     5 """
 | 
| 
 | 
     6 
 | 
| 
 | 
     7 # imports
 | 
| 
 | 
     8 import argparse
 | 
| 
 | 
     9 import atexit
 | 
| 
178
 | 
    10 import os
 | 
| 
735
 | 
    11 import signal
 | 
| 
 | 
    12 import subprocess # http://bugs.python.org/issue1731717
 | 
| 
178
 | 
    13 import sys
 | 
| 
735
 | 
    14 import time
 | 
| 
 | 
    15 import tempfile
 | 
| 
 | 
    16 
 | 
| 
 | 
    17 
 | 
| 
 | 
    18 # globals
 | 
| 
 | 
    19 __all__ = ['Process']
 | 
| 
 | 
    20 string = (str, unicode)
 | 
| 
 | 
    21 PIDS = set()
 | 
| 
 | 
    22 
 | 
| 
 | 
    23 # ensure subprocesses gets killed on exit
 | 
| 
 | 
    24 def killall():
 | 
| 
 | 
    25     """kill all subprocess PIDs"""
 | 
| 
 | 
    26     global PIDS
 | 
| 
 | 
    27     for pid in PIDS.copy():
 | 
| 
 | 
    28         try:
 | 
| 
 | 
    29             os.kill(pid, 9) # SIGKILL
 | 
| 
 | 
    30             PIDS.discard(pid)
 | 
| 
 | 
    31         except:
 | 
| 
 | 
    32             sys.stderr.write("Unable to kill PID {}\n".format(pid))
 | 
| 
 | 
    33 atexit.register(killall)
 | 
| 
 | 
    34 
 | 
| 
 | 
    35 
 | 
| 
 | 
    36 signals = (signal.SIGHUP, signal.SIGINT, signal.SIGQUIT, signal.SIGSEGV, signal.SIGTERM) # signals to handle
 | 
| 
 | 
    37 fatal = set([signal.SIGINT, signal.SIGSEGV, signal.SIGKILL, signal.SIGTERM])
 | 
| 
 | 
    38 # ensure subprocesses get killed on signals
 | 
| 
 | 
    39 def sighandler(signum, frame):
 | 
| 
 | 
    40     """https://docs.python.org/2/library/signal.html"""
 | 
| 
 | 
    41     sys.stderr.write('Signal handler called with signal {}\n; terminating subprocesses: {}'.format(signum,
 | 
| 
 | 
    42                                                                                                    ', '.join([str(pid) for pid in sorted(PIDS)])))
 | 
| 
 | 
    43     killall()
 | 
| 
 | 
    44     if signum in fatal:
 | 
| 
 | 
    45         print ("Caught signal {}; exiting".format(signum))
 | 
| 
 | 
    46         sys.exit()
 | 
| 
 | 
    47 for signum in signals:
 | 
| 
 | 
    48     try:
 | 
| 
 | 
    49         signal.signal(signum, sighandler)
 | 
| 
 | 
    50     except RuntimeError as e:
 | 
| 
 | 
    51         print ('[{}] {}'.format(signum, e))
 | 
| 
 | 
    52         raise
 | 
| 
 | 
    53 
 | 
| 
 | 
    54 class Process(subprocess.Popen):
 | 
| 
 | 
    55     """why would you name a subprocess object Popen?"""
 | 
| 
 | 
    56 
 | 
| 
 | 
    57     # http://docs.python.org/2/library/subprocess.html#popen-constructor
 | 
| 
 | 
    58     defaults = {'bufsize': 1, # line buffered
 | 
| 
 | 
    59                 'store_output': True, # store stdout
 | 
| 
 | 
    60                 }
 | 
| 
 | 
    61 
 | 
| 
 | 
    62     def __init__(self, command, **kwargs):
 | 
| 
 | 
    63 
 | 
| 
 | 
    64         # get verbosity
 | 
| 
 | 
    65         self.verbose = kwargs.pop('verbose', False)
 | 
| 
 | 
    66 
 | 
| 
 | 
    67         # setup arguments
 | 
| 
 | 
    68         self.command = command
 | 
| 
 | 
    69         _kwargs = self.defaults.copy()
 | 
| 
 | 
    70         _kwargs.update(kwargs)
 | 
| 
 | 
    71 
 | 
| 
 | 
    72 
 | 
| 
 | 
    73         # on unix, ``shell={True|False}`` should always come from the
 | 
| 
 | 
    74         # type of command (string or list)
 | 
| 
 | 
    75         if not subprocess.mswindows:
 | 
| 
 | 
    76             _kwargs['shell'] = isinstance(command, string)
 | 
| 
 | 
    77 
 | 
| 
 | 
    78         # output buffer
 | 
| 
 | 
    79         self.location = 0
 | 
| 
 | 
    80         self.output_buffer = tempfile.SpooledTemporaryFile()
 | 
| 
 | 
    81         self.output = '' if _kwargs.pop('store_output') else None
 | 
| 
 | 
    82         _kwargs['stdout'] = self.output_buffer
 | 
| 
 | 
    83 
 | 
| 
 | 
    84         # ensure child in process group
 | 
| 
 | 
    85         # see :
 | 
| 
 | 
    86         # - http://pymotw.com/2/subprocess/#process-groups-sessions
 | 
| 
 | 
    87         # - http://ptspts.blogspot.com/2012/11/how-to-start-and-kill-unix-process-tree.html
 | 
| 
 | 
    88         _kwargs['preexec_fn'] = os.setpgrp
 | 
| 
 | 
    89 
 | 
| 
 | 
    90         # runtime
 | 
| 
 | 
    91         self.start = time.time()
 | 
| 
 | 
    92         self.end = None
 | 
| 
 | 
    93 
 | 
| 
 | 
    94         if self.verbose:
 | 
| 
 | 
    95             # print useful info
 | 
| 
 | 
    96             print ("Running `{}`; started: {}".format(str(self), self.start))
 | 
| 
 | 
    97 
 | 
| 
 | 
    98         # launch subprocess
 | 
| 
 | 
    99         try:
 | 
| 
 | 
   100             subprocess.Popen.__init__(self, command, **_kwargs)
 | 
| 
 | 
   101             PIDS.add(self.pid)
 | 
| 
 | 
   102             if self.verbose:
 | 
| 
 | 
   103                 # print the PID
 | 
| 
 | 
   104                 print ("PID: {}".format(self.pid))
 | 
| 
 | 
   105         except:
 | 
| 
 | 
   106             # print the command
 | 
| 
 | 
   107             print ("Failure to run:")
 | 
| 
 | 
   108             print (self.command)
 | 
| 
 | 
   109 
 | 
| 
 | 
   110             # reraise the hard way:
 | 
| 
 | 
   111             # http://www.ianbicking.org/blog/2007/09/re-raising-exceptions.html
 | 
| 
 | 
   112             exc = sys.exc_info()
 | 
| 
 | 
   113             raise exc[0], exc[1], exc[2]
 | 
| 
 | 
   114 
 | 
| 
 | 
   115 
 | 
| 
 | 
   116     def _finalize(self, process_output):
 | 
| 
 | 
   117         """internal function to finalize"""
 | 
| 
 | 
   118 
 | 
| 
 | 
   119         # read final output
 | 
| 
 | 
   120         if process_output is not None:
 | 
| 
 | 
   121             self.read(process_output)
 | 
| 
 | 
   122 
 | 
| 
 | 
   123         # reset output buffer location
 | 
| 
 | 
   124         self.output_buffer.seek(0)
 | 
| 
 | 
   125 
 | 
| 
 | 
   126         # set end time
 | 
| 
 | 
   127         self.end = time.time()
 | 
| 
 | 
   128 
 | 
| 
 | 
   129         # remove PID from list
 | 
| 
 | 
   130         PIDS.discard(self.pid)
 | 
| 
 | 
   131 
 | 
| 
 | 
   132     def poll(self, process_output=None):
 | 
| 
178
 | 
   133 
 | 
| 
735
 | 
   134         if process_output is not None:
 | 
| 
 | 
   135             self.read(process_output) # read from output buffer
 | 
| 
 | 
   136         poll = subprocess.Popen.poll(self)
 | 
| 
 | 
   137         if poll is not None:
 | 
| 
 | 
   138             self._finalize(process_output)
 | 
| 
 | 
   139         return poll
 | 
| 
 | 
   140 
 | 
| 
 | 
   141     def wait(self, maxtime=None, sleep=1., process_output=None):
 | 
| 
 | 
   142         """
 | 
| 
 | 
   143         maxtime -- timeout in seconds
 | 
| 
 | 
   144         sleep -- number of seconds to sleep between polling
 | 
| 
 | 
   145         """
 | 
| 
 | 
   146         while self.poll(process_output) is None:
 | 
| 
 | 
   147 
 | 
| 
 | 
   148             # check for timeout
 | 
| 
 | 
   149             curr_time = time.time()
 | 
| 
 | 
   150             run_time = self.runtime()
 | 
| 
 | 
   151             if maxtime is not None and run_time > maxtime:
 | 
| 
 | 
   152                 self.kill()
 | 
| 
 | 
   153                 self._finalize(process_output)
 | 
| 
 | 
   154                 return
 | 
| 
 | 
   155 
 | 
| 
 | 
   156             # naptime
 | 
| 
 | 
   157             if sleep:
 | 
| 
 | 
   158                 time.sleep(sleep)
 | 
| 
 | 
   159 
 | 
| 
 | 
   160         # finalize
 | 
| 
 | 
   161         self._finalize(process_output)
 | 
| 
 | 
   162 
 | 
| 
 | 
   163         return self.returncode # set by ``.poll()``
 | 
| 
 | 
   164 
 | 
| 
 | 
   165     def read(self, process_output=None):
 | 
| 
 | 
   166         """read from the output buffer"""
 | 
| 
 | 
   167 
 | 
| 
 | 
   168         self.output_buffer.seek(self.location)
 | 
| 
 | 
   169         read = self.output_buffer.read()
 | 
| 
 | 
   170         if self.output is not None:
 | 
| 
 | 
   171             self.output += read
 | 
| 
 | 
   172         if process_output:
 | 
| 
 | 
   173             process_output(read)
 | 
| 
 | 
   174         self.location += len(read)
 | 
| 
 | 
   175         return read
 | 
| 
 | 
   176 
 | 
| 
 | 
   177     def commandline(self):
 | 
| 
 | 
   178         """returns string of command line"""
 | 
| 
 | 
   179 
 | 
| 
 | 
   180         if isinstance(self.command, string):
 | 
| 
 | 
   181             return self.command
 | 
| 
 | 
   182         return subprocess.list2cmdline(self.command)
 | 
| 
 | 
   183 
 | 
| 
 | 
   184     __str__ = commandline
 | 
| 
 | 
   185 
 | 
| 
 | 
   186     def runtime(self):
 | 
| 
 | 
   187         """returns time spent running or total runtime if completed"""
 | 
| 
 | 
   188 
 | 
| 
 | 
   189         if self.end is None:
 | 
| 
 | 
   190             return time.time() - self.start
 | 
| 
 | 
   191         return self.end - self.start
 | 
| 
 | 
   192 
 | 
| 
 | 
   193 
 | 
| 
 | 
   194 def main(args=sys.argv[1:]):
 | 
| 
 | 
   195     """CLI"""
 | 
| 
 | 
   196 
 | 
| 
 | 
   197     description = """demonstration of how to do things with subprocess"""
 | 
| 
 | 
   198 
 | 
| 
 | 
   199     # available programs
 | 
| 
 | 
   200     progs = {'yes': ["yes"],
 | 
| 
 | 
   201              'ping': ['ping', 'google.com']}
 | 
| 
178
 | 
   202 
 | 
| 
735
 | 
   203     # parse command line
 | 
| 
 | 
   204     parser = argparse.ArgumentParser(description=description)
 | 
| 
 | 
   205     parser.add_argument("-t", "--time", dest="time",
 | 
| 
 | 
   206                         type=float, default=4.,
 | 
| 
 | 
   207                         help="seconds to run for (or <= 0 for forever)")
 | 
| 
 | 
   208     parser.add_argument("-s", "--sleep", dest="sleep",
 | 
| 
 | 
   209                         type=float, default=1.,
 | 
| 
 | 
   210                         help="sleep this number of seconds between polling")
 | 
| 
 | 
   211     parser.add_argument("-p", "--prog", dest='program',
 | 
| 
 | 
   212                         choices=progs.keys(), default='ping',
 | 
| 
 | 
   213                         help="subprocess to run")
 | 
| 
 | 
   214     parser.add_argument("--list-programs", dest='list_programs',
 | 
| 
 | 
   215                         action='store_true', default=False,
 | 
| 
 | 
   216                         help="list available programs")
 | 
| 
 | 
   217     parser.add_argument("--wait", dest='wait',
 | 
| 
 | 
   218                         action='store_true', default=False,
 | 
| 
 | 
   219                         help="run with ``.wait()`` and a callback")
 | 
| 
 | 
   220     parser.add_argument("--callback", dest='callback',
 | 
| 
 | 
   221                         action='store_true', default=False,
 | 
| 
 | 
   222                         help="run with polling and a callback")
 | 
| 
 | 
   223     options = parser.parse_args(args)
 | 
| 
 | 
   224 
 | 
| 
 | 
   225     # list programs
 | 
| 
 | 
   226     if options.list_programs:
 | 
| 
 | 
   227         for key in sorted(progs.keys()):
 | 
| 
 | 
   228             print ('{}: {}'.format(key, subprocess.list2cmdline(progs[key])))
 | 
| 
 | 
   229         sys.exit(0)
 | 
| 
 | 
   230 
 | 
| 
 | 
   231     # select program
 | 
| 
 | 
   232     prog = progs[options.program]
 | 
| 
 | 
   233 
 | 
| 
 | 
   234     # start process
 | 
| 
 | 
   235     proc = Process(prog)
 | 
| 
 | 
   236 
 | 
| 
 | 
   237     # demo function for processing output
 | 
| 
 | 
   238     def output_processor(output):
 | 
| 
 | 
   239         print ('[{}]:\n{}\n{}'.format(proc.runtime(),
 | 
| 
 | 
   240                                       output.upper(),
 | 
| 
 | 
   241                                       '-==-'*10))
 | 
| 
 | 
   242     if options.callback:
 | 
| 
 | 
   243         process_output = output_processor
 | 
| 
 | 
   244     else:
 | 
| 
 | 
   245         process_output = None
 | 
| 
 | 
   246 
 | 
| 
 | 
   247     if options.wait:
 | 
| 
 | 
   248         # wait for being done
 | 
| 
 | 
   249         proc.wait(maxtime=options.time, sleep=options.sleep, process_output=output_processor)
 | 
| 
 | 
   250     else:
 | 
| 
 | 
   251         # start the main subprocess loop
 | 
| 
 | 
   252         while proc.poll(process_output) is None:
 | 
| 
 | 
   253 
 | 
| 
 | 
   254             if options.time > 0 and proc.runtime() > options.time:
 | 
| 
 | 
   255                 proc.kill()
 | 
| 
 | 
   256 
 | 
| 
 | 
   257             if options.sleep:
 | 
| 
 | 
   258                 time.sleep(options.sleep)
 | 
| 
 | 
   259 
 | 
| 
 | 
   260             if process_output is None:
 | 
| 
 | 
   261                 # process the output with ``.read()`` call
 | 
| 
 | 
   262                 read = proc.read()
 | 
| 
 | 
   263                 output_processor(read)
 | 
| 
 | 
   264 
 | 
| 
 | 
   265     # correctness tests
 | 
| 
 | 
   266     assert proc.end is not None
 | 
| 
 | 
   267 
 | 
| 
 | 
   268     # print summary
 | 
| 
 | 
   269     output = proc.output
 | 
| 
 | 
   270     n_lines = len(output.splitlines())
 | 
| 
 | 
   271     print ("{}: {} lines, ran for {} seconds".format(subprocess.list2cmdline(prog), n_lines, proc.runtime()))
 | 
| 
178
 | 
   272 
 | 
| 
 | 
   273 if __name__ == '__main__':
 | 
| 
735
 | 
   274     main()
 |