Commit f7feaec1 authored by Benjamin Peterson's avatar Benjamin Peterson

revert r66114 for Jesse

parent 27cc8e1d
......@@ -97,6 +97,13 @@ def Manager():
m.start()
return m
def Pipe(duplex=True):
'''
Returns two connection object connected by a pipe
'''
from multiprocessing.connection import Pipe
return Pipe(duplex)
def cpu_count():
'''
Returns the number of CPUs in the system
......@@ -131,28 +138,134 @@ def freeze_support():
from multiprocessing.forking import freeze_support
freeze_support()
def get_logger():
'''
Return package logger -- if it does not already exist then it is created
'''
from multiprocessing.util import get_logger
return get_logger()
def log_to_stderr(level=None):
'''
Turn on logging and add a handler which prints to stderr
'''
from multiprocessing.util import log_to_stderr
return log_to_stderr(level)
def allow_connection_pickling():
'''
Install support for sending connections and sockets between processes
'''
from multiprocessing import reduction
# Alias some names from submodules in the package namespace
from multiprocessing.connection import Pipe
from multiprocessing.util import (get_logger, log_to_stderr)
#
# Definitions depending on native semaphores
#
# Alias some names from submodules in the package namespace
from multiprocessing.synchronize import (Lock, RLock, Condition, Event,
Semaphore, BoundedSemaphore)
from multiprocessing.queues import (Queue, JoinableQueue)
from multiprocessing.pool import Pool
from multiprocessing.sharedctypes import (RawValue, Value,
RawArray, Array)
def Lock():
'''
Returns a non-recursive lock object
'''
from multiprocessing.synchronize import Lock
return Lock()
def RLock():
'''
Returns a recursive lock object
'''
from multiprocessing.synchronize import RLock
return RLock()
def Condition(lock=None):
'''
Returns a condition object
'''
from multiprocessing.synchronize import Condition
return Condition(lock)
def Semaphore(value=1):
'''
Returns a semaphore object
'''
from multiprocessing.synchronize import Semaphore
return Semaphore(value)
def BoundedSemaphore(value=1):
'''
Returns a bounded semaphore object
'''
from multiprocessing.synchronize import BoundedSemaphore
return BoundedSemaphore(value)
def Event():
'''
Returns an event object
'''
from multiprocessing.synchronize import Event
return Event()
def Queue(maxsize=0):
'''
Returns a queue object
'''
from multiprocessing.queues import Queue
return Queue(maxsize)
def JoinableQueue(maxsize=0):
'''
Returns a queue object
'''
from multiprocessing.queues import JoinableQueue
return JoinableQueue(maxsize)
def Pool(processes=None, initializer=None, initargs=()):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs)
def RawValue(typecode_or_type, *args):
'''
Returns a shared object
'''
from multiprocessing.sharedctypes import RawValue
return RawValue(typecode_or_type, *args)
def RawArray(typecode_or_type, size_or_initializer):
'''
Returns a shared array
'''
from multiprocessing.sharedctypes import RawArray
return RawArray(typecode_or_type, size_or_initializer)
def Value(typecode_or_type, *args, **kwds):
'''
Returns a synchronized shared object
'''
from multiprocessing.sharedctypes import Value
return Value(typecode_or_type, *args, **kwds)
def Array(typecode_or_type, size_or_initializer, **kwds):
'''
Returns a synchronized shared array
'''
from multiprocessing.sharedctypes import Array
return Array(typecode_or_type, size_or_initializer, **kwds)
#
#
#
if sys.platform == 'win32':
from multiprocessing.forking import set_executable
def set_executable(executable):
'''
Sets the path to a python.exe or pythonw.exe binary used to run
child processes on Windows instead of sys.executable.
Useful for people embedding Python.
'''
from multiprocessing.forking import set_executable
set_executable(executable)
__all__ += ['set_executable']
......@@ -371,7 +371,13 @@ class Server(object):
self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
if ident not in self.id_to_refcount:
self.id_to_refcount[ident] = None
self.id_to_refcount[ident] = 0
# increment the reference count immediately, to avoid
# this object being garbage collected before a Proxy
# object for it can be created. The caller of create()
# is responsible for doing a decref once the Proxy object
# has been created.
self.incref(c, ident)
return ident, tuple(exposed)
finally:
self.mutex.release()
......@@ -393,11 +399,7 @@ class Server(object):
def incref(self, c, ident):
self.mutex.acquire()
try:
try:
self.id_to_refcount[ident] += 1
except TypeError:
assert self.id_to_refcount[ident] is None
self.id_to_refcount[ident] = 1
self.id_to_refcount[ident] += 1
finally:
self.mutex.release()
......@@ -634,6 +636,8 @@ class BaseManager(object):
token, self._serializer, manager=self,
authkey=self._authkey, exposed=exp
)
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
temp.__name__ = typeid
setattr(cls, typeid, temp)
......@@ -726,10 +730,13 @@ class BaseProxy(object):
elif kind == '#PROXY':
exposed, token = result
proxytype = self._manager._registry[token.typeid][-1]
return proxytype(
proxy = proxytype(
token, self._serializer, manager=self._manager,
authkey=self._authkey, exposed=exposed
)
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
raise convert_to_error(kind, result)
def _getvalue(self):
......
......@@ -63,7 +63,7 @@ def RawArray(typecode_or_type, size_or_initializer):
def Value(typecode_or_type, *args, **kwds):
'''
Return a synchronization wrapper for a RawValue
Return a synchronization wrapper for a Value
'''
lock = kwds.pop('lock', None)
if kwds:
......
......@@ -65,9 +65,7 @@ class SemLock(object):
#
class Semaphore(SemLock):
'''
A semaphore object
'''
def __init__(self, value=1):
SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
......@@ -86,9 +84,7 @@ class Semaphore(SemLock):
#
class BoundedSemaphore(Semaphore):
'''
A bounded semaphore object
'''
def __init__(self, value=1):
SemLock.__init__(self, SEMAPHORE, value, value)
......@@ -105,9 +101,7 @@ class BoundedSemaphore(Semaphore):
#
class Lock(SemLock):
'''
A non-recursive lock object
'''
def __init__(self):
SemLock.__init__(self, SEMAPHORE, 1, 1)
......@@ -132,9 +126,7 @@ class Lock(SemLock):
#
class RLock(SemLock):
'''
A recursive lock object
'''
def __init__(self):
SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
......@@ -160,9 +152,6 @@ class RLock(SemLock):
#
class Condition(object):
'''
A condition object
'''
def __init__(self, lock=None):
self._lock = lock or RLock()
......@@ -263,9 +252,7 @@ class Condition(object):
#
class Event(object):
'''
An event object
'''
def __init__(self):
self._cond = Condition(Lock())
self._flag = Semaphore(0)
......
......@@ -54,7 +54,7 @@ def sub_warning(msg, *args):
def get_logger():
'''
Return package logger -- if it does not already exist then it is created
Returns logger used by multiprocessing
'''
global _logger
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment