Commit e973d814 authored by Denis Bilenko's avatar Denis Bilenko

FileObjectThread: synchronize calls with semaphore

parent 6e968707
from __future__ import absolute_import from __future__ import absolute_import, with_statement
import sys import sys
import os import os
from gevent.hub import get_hub from gevent.hub import get_hub
from gevent.socket import EBADF from gevent.socket import EBADF
from gevent.os import _read, _write, ignored_errors from gevent.os import _read, _write, ignored_errors
from gevent.lock import Semaphore, DummySemaphore
try: try:
...@@ -215,8 +216,15 @@ class FileObjectThread(object): ...@@ -215,8 +216,15 @@ class FileObjectThread(object):
def __init__(self, fobj, *args, **kwargs): def __init__(self, fobj, *args, **kwargs):
self._close = kwargs.pop('close', True) self._close = kwargs.pop('close', True)
self.threadpool = kwargs.pop('threadpool', None) self.threadpool = kwargs.pop('threadpool', None)
self.lock = kwargs.pop('lock', True)
if kwargs: if kwargs:
raise TypeError('Unexpected arguments: %r' % kwargs.keys()) raise TypeError('Unexpected arguments: %r' % kwargs.keys())
if self.lock is True:
self.lock = Semaphore()
elif not self.lock:
self.lock = DummySemaphore()
if not hasattr(self.lock, '__enter__'):
raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
if isinstance(fobj, (int, long)): if isinstance(fobj, (int, long)):
if not self._close: if not self._close:
# we cannot do this, since fdopen object will close the descriptor # we cannot do this, since fdopen object will close the descriptor
...@@ -226,6 +234,10 @@ class FileObjectThread(object): ...@@ -226,6 +234,10 @@ class FileObjectThread(object):
if self.threadpool is None: if self.threadpool is None:
self.threadpool = get_hub().threadpool self.threadpool = get_hub().threadpool
def _apply(self, func, args=None, kwargs=None):
with self.lock:
return self.threadpool.apply_e(BaseException, func, args, kwargs)
def close(self): def close(self):
fobj = self._fobj fobj = self._fobj
if fobj is None: if fobj is None:
...@@ -244,7 +256,7 @@ class FileObjectThread(object): ...@@ -244,7 +256,7 @@ class FileObjectThread(object):
fobj = self._fobj fobj = self._fobj
if fobj is None: if fobj is None:
raise FileObjectClosed raise FileObjectClosed
return self.threadpool.apply_e(BaseException, fobj.flush) return self._apply(fobj.flush)
def __repr__(self): def __repr__(self):
return '<%s _fobj=%r threadpool=%r>' % (self.__class__.__name__, self._fobj, self.threadpool) return '<%s _fobj=%r threadpool=%r>' % (self.__class__.__name__, self._fobj, self.threadpool)
...@@ -261,7 +273,7 @@ class FileObjectThread(object): ...@@ -261,7 +273,7 @@ class FileObjectThread(object):
fobj = self._fobj fobj = self._fobj
if fobj is None: if fobj is None:
raise FileObjectClosed raise FileObjectClosed
return self.threadpool.apply_e(BaseException, fobj.%s, args, kwargs)''' % (method, method) return self._apply(fobj.%s, args, kwargs)''' % (method, method)
def __iter__(self): def __iter__(self):
return self return self
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment