Source code for hal.system.process

# -*- coding: utf-8 -*-

"""OS tools """

import os
import shlex
from subprocess import PIPE, STDOUT, Popen, call

import psutil


[docs]class Process: """OS process""" def __init__(self, cmd): """ :param cmd: Command to execute """ self.cmd = cmd
[docs] def get_simple_output(self, stderr=STDOUT): """Executes a simple external command and get its output The command contains no pipes. Error messages are redirected to the standard output by default :param stderr: where to put stderr :return: output of command """ args = shlex.split(self.cmd) proc = Popen(args, stdout=PIPE, stderr=stderr) return proc.communicate()[0].decode("utf8")
[docs] def get_complex_output(self, stderr=STDOUT): """Executes a piped command and get the lines of the output in a list :param stderr: where to put stderr :return: output of command """ proc = Popen(self.cmd, shell=True, stdout=PIPE, stderr=stderr) return proc.stdout.readlines()
[docs] def get_output_from_pipe(self, input_file): """Executes an external command and get its output. The command receives its input_file from the stdin through a pipe :param input_file: input file :return: output of command """ args = shlex.split(self.cmd) p = Popen(args, stdout=PIPE, stdin=PIPE) # | grep es p.stdin.write(bytearray(input_file.encode("utf8"))) # echo test | return p.communicate()[0].decode("utf8")
[docs] def get_return_code(self, stderr=STDOUT): """Executes a simple external command and return its exit status :param stderr: where to put stderr :return: return code of command """ args = shlex.split(self.cmd) return call(args, stdout=PIPE, stderr=stderr)
[docs] def get_exit_code(self): """Executes the external command and get its exitcode, stdout and stderr :return: exit code of command """ args = shlex.split(self.cmd) proc = Popen(args, stdout=PIPE, stderr=PIPE) out, err = proc.communicate() out, err = out.decode("utf8"), err.decode("utf8") exitcode = proc.returncode # return exitcode, out, err
[docs] def execute(self): """Executes a simple external command""" args = shlex.split(self.cmd) call(args)
[docs] def execute_in_background(self): """Executes a (shell) command in the background :return: the process' pid """ # http://stackoverflow.com/questions/1605520 args = shlex.split(self.cmd) p = Popen(args) return p.pid
[docs] def keep_alive(self): """Keeps a process alive. If the process terminates, it will restart it The terminated processes become zombies. They die when their parent terminates """ while True: pid = self.execute_in_background() p = psutil.Process(pid) while p.is_running() and str(p.status) != 'zombie': os.system('sleep 5')
[docs] @staticmethod def get_process_list(): """Gets the list of running processes :return: (generator of) running processes """ return psutil.process_iter()