Source code for eye.procutils
# this project is licensed under the WTFPLv2, see COPYING.txt for details
import logging
import os
from PyQt5.QtCore import QProcess
from eye.qt import Signal, Slot
__all__ = ('find_command', 'LineProcess', 'run_blocking')
LOGGER = logging.getLogger(__name__)
[docs]
def find_command(cmd):
"""Find `cmd` in `$PATH`"""
for elem in os.getenv('PATH').split(os.pathsep):
path = os.path.join(elem, cmd)
if os.path.isfile(path):
return path
[docs]
class LineProcess(QProcess):
"""Process with stdout/stderr line handling
The process is run in background. Signals are emitted when full lines are read from stdout or stderr.
"""
stdout_line_read = Signal(str)
"""Signal stdout_line_read(str)"""
stderr_line_read = Signal(str)
"""Signal stderr_line_read(str)"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.bufs = [b'', b'']
self.encoding = 'utf-8'
self.linesep = b'\n'
self.readyReadStandardError.connect(self.on_stderr)
self.readyReadStandardOutput.connect(self.on_stdout)
self.stateChanged.connect(self.on_state_changed)
self.finished.connect(self.on_finish)
self.errorOccurred.connect(self.on_error)
[docs]
def stop(self, wait=0):
"""Terminate process"""
if wait and self.state() != self.NotRunning:
self.terminate()
self.waitForFinished(wait)
if self.state() != self.NotRunning:
self.kill()
[docs]
@Slot(QProcess.ProcessState)
def on_state_changed(self, state):
if state == self.Starting:
cmd = [self.program()] + self.arguments()
LOGGER.debug('starting process %r', cmd)
[docs]
def set_encoding(self, encoding):
"""Set encoding for reading stdout/stderr"""
self.encoding = encoding
def _perform(self, fd, incoming, sig):
# TODO decode before searching newline?
self.bufs[fd] += bytes(incoming)
while b'\n' in self.bufs[fd]:
line, self.bufs[fd] = self.bufs[fd].split(self.linesep, 1)
sig.emit(line.decode(self.encoding))
[docs]
@Slot()
def on_stdout(self):
self._perform(0, self.readAllStandardOutput(), self.stdout_line_read)
[docs]
@Slot()
def on_stderr(self):
self._perform(1, self.readAllStandardError(), self.stderr_line_read)
[docs]
@Slot()
def on_error(self):
cmd = [self.program()] + self.arguments()
LOGGER.warning('error when running %r: %s', cmd, self.errorString())
[docs]
@Slot(int)
def on_finish(self, ret):
if ret != 0:
cmd = [self.program()] + self.arguments()
LOGGER.info('command %r exited with code %r', cmd, ret)
class ReadingProcess(QProcess):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.stdin = b''
self.buf = []
self.started.connect(self.on_started)
self.bytesWritten.connect(self.on_started)
self.readyReadStandardOutput.connect(self.on_stdout)
@Slot()
def on_started(self, _=None):
while True:
written = self.write(self.stdin)
if written < 0:
LOGGER.warning('error writing data to stdin for command %r', self.arguments())
return
elif written == 0:
break
self.stdin = self.stdin[written:]
if not self.stdin:
self.closeWriteChannel()
@Slot()
def on_stdout(self):
while self.bytesAvailable() > 0:
self.buf.append(self.read(self.bytesAvailable()))
def set_standard_input(self, data):
self.stdin = data
def get_buffer(self):
return b''.join(self.buf)
[docs]
def run_blocking(cmd, stdin=b'', cwd=''):
proc = ReadingProcess()
if stdin:
proc.set_standard_input(stdin)
if cwd:
proc.setWorkingDirectory(cwd)
proc.start(cmd[0], cmd[1:])
proc.waitForFinished(-1)
return proc.exitCode(), proc.get_buffer()