Commit 50507d2b authored by Xavier Thompson's avatar Xavier Thompson

SlapPopen: Fix select-based timeout reads

Read from stdout or stderr only when the event returned by the
selector guarantees that the read will be non-blocking.

Switch to using high-level selectors API instead of low-level
select/poll.
parent 581b8aa6
...@@ -33,7 +33,7 @@ import hashlib ...@@ -33,7 +33,7 @@ import hashlib
import os import os
import pkg_resources import pkg_resources
import pwd import pwd
import select import selectors
import stat import stat
import sys import sys
import logging import logging
...@@ -155,49 +155,46 @@ class SlapPopen(subprocess.Popen): ...@@ -155,49 +155,46 @@ class SlapPopen(subprocess.Popen):
self.stdin.close() self.stdin.close()
self.stdin = None self.stdin = None
stderr_fileno = stdout_fileno = None
buffers = {} buffers = {}
if kwargs['stdout'] is subprocess.PIPE: if kwargs['stdout'] is subprocess.PIPE:
line_logger = LineLogger(logger) line_logger = LineLogger(logger)
stdout_fileno = self.stdout.fileno() buffers[self.stdout] = []
buffers[stdout_fileno] = []
if kwargs['stderr'] is subprocess.PIPE: if kwargs['stderr'] is subprocess.PIPE:
stderr_fileno = self.stderr.fileno() buffers[self.stderr] = []
buffers[stderr_fileno] = []
poll = select.poll() try:
for fd in buffers: with selectors.DefaultSelector() as selector:
poll.register(fd) for fileobj in buffers:
selector.register(fileobj, selectors.EVENT_READ)
active = len(buffers)
if timeout is not None: if timeout is not None:
deadline = time.time() + timeout deadline = time.time() + timeout
while active:
for fd, _ in poll.poll(timeout): while selector.get_map():
data = os.read(fd, 4096).decode('utf-8', 'replace') for key, _ in selector.select(timeout):
data = os.read(key.fd, 4096).decode('utf-8', 'replace')
if data: if data:
if fd == stdout_fileno: if key.fileobj == self.stdout:
line_logger.update(data) line_logger.update(data)
buffers[fd].append(data) buffers[key.fileobj].append(data)
else: else:
if fd == stdout_fileno: if key.fileobj == self.stdout:
line_logger.flush() line_logger.flush()
poll.unregister(fd) selector.unregister(key.fileobj)
active -= 1 key.fileobj.close()
if timeout is not None: if timeout is not None:
timeout = deadline - time.time() timeout = deadline - time.time()
if timeout <= 0: if timeout <= 0:
timeout = 0 timeout = 0
break break
try:
self.wait(timeout=timeout) self.wait(timeout=timeout)
except subprocess.TimeoutExpired as e: except subprocess.TimeoutExpired as e:
for p in killProcessTree(self.pid, logger): for p in killProcessTree(self.pid, logger):
p.wait(timeout=10) # arbitrary timeout, wait until process is killed p.wait(timeout=10) # arbitrary timeout, wait until process is killed
self.poll() # set returncode (and avoid still-running warning) self.poll() # set returncode (and avoid still-running warning)
e.output = e.stdout = ''.join(buffers.get(stdout_fileno, ())) e.output = e.stdout = ''.join(buffers.get(self.stdout, ()))
e.stderr = ''.join(buffers.get(stderr_fileno, ())) e.stderr = ''.join(buffers.get(self.stderr, ()))
raise raise
finally: finally:
for s in (self.stdout, self.stderr): for s in (self.stdout, self.stderr):
...@@ -207,8 +204,8 @@ class SlapPopen(subprocess.Popen): ...@@ -207,8 +204,8 @@ class SlapPopen(subprocess.Popen):
except OSError: except OSError:
pass pass
self.output = ''.join(buffers.get(stdout_fileno, ())) self.output = ''.join(buffers.get(self.stdout, ()))
self.error = ''.join(buffers.get(stderr_fileno, ())) self.error = ''.join(buffers.get(self.stderr, ()))
def md5digest(url): def md5digest(url):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment