Commit a9345a98 authored by Kirill Smelkov's avatar Kirill Smelkov

sync.RWMutex: Teach it to downgrade from write-locked into read-locked state

Go version does not provide this, but the topic of sync.RWMutex
downgrading was raised up several times, at least

	https://github.com/golang/go/issues/4026
	https://github.com/golang/go/issues/23513
	https://groups.google.com/forum/#!topic/golang-nuts/MmIDUzl8HA0
	...

Atomic downgrading is often useful to avoid race window in between
Unlock and RLock and, as consequence, having the need to recheck things
after RLock.

We can put this complexity and logic into well-defined RWMutex primitive
instead of throwing it to be solved by every RWMutex user.
parent 1ad3c2d5
...@@ -49,6 +49,7 @@ cdef extern from "golang/sync.h" namespace "golang::sync" nogil: ...@@ -49,6 +49,7 @@ cdef extern from "golang/sync.h" namespace "golang::sync" nogil:
void Unlock() void Unlock()
void RLock() void RLock()
void RUnlock() void RUnlock()
void UnlockToRLock()
cppclass Once: cppclass Once:
void do "do_" (...) # ... = func<void()> void do "do_" (...) # ... = func<void()>
......
...@@ -100,6 +100,11 @@ cdef class PyRWMutex: ...@@ -100,6 +100,11 @@ cdef class PyRWMutex:
with nogil: with nogil:
rwmutex_runlock_pyexc(&pymu.mu) rwmutex_runlock_pyexc(&pymu.mu)
def UnlockToRLock(PyRWMutex pymu):
# NOTE nogil needed (see ^^^)
with nogil:
rwmutex_unlocktorlock_pyexc(&pymu.mu)
# with support (write by default) # with support (write by default)
__enter__ = Lock __enter__ = Lock
def __exit__(PyRWMutex pymu, exc_typ, exc_val, exc_tb): def __exit__(PyRWMutex pymu, exc_typ, exc_val, exc_tb):
...@@ -286,6 +291,8 @@ cdef nogil: ...@@ -286,6 +291,8 @@ cdef nogil:
mu.RLock() mu.RLock()
void rwmutex_runlock_pyexc(RWMutex *mu) except +topyexc: void rwmutex_runlock_pyexc(RWMutex *mu) except +topyexc:
mu.RUnlock() mu.RUnlock()
void rwmutex_unlocktorlock_pyexc(RWMutex *mu) except +topyexc:
mu.UnlockToRLock()
void waitgroup_done_pyexc(WaitGroup *wg) except +topyexc: void waitgroup_done_pyexc(WaitGroup *wg) except +topyexc:
wg.done() wg.done()
......
...@@ -114,6 +114,20 @@ void RWMutex::Unlock() { ...@@ -114,6 +114,20 @@ void RWMutex::Unlock() {
mu._g.unlock(); mu._g.unlock();
} }
void RWMutex::UnlockToRLock() {
RWMutex& mu = *this;
mu._g.lock();
if (!mu._write_active) {
mu._g.unlock();
panic("sync: UnlockToRLock of unlocked RWMutex");
}
mu._write_active = false;
mu._nread_active++;
mu._wakeup_all();
mu._g.unlock();
}
// Once // Once
Once::Once() { Once::Once() {
......
...@@ -121,6 +121,13 @@ public: ...@@ -121,6 +121,13 @@ public:
LIBGOLANG_API void RLock(); LIBGOLANG_API void RLock();
LIBGOLANG_API void RUnlock(); LIBGOLANG_API void RUnlock();
// UnlockToRLock atomically downgrades write-locked RWMutex into read-locked.
//
// NOTE opposite operation - atomic upgrade from read-locked into
// write-locked - is generally not possible due to deadlock if 2 threads
// try to upgrade at the same time.
LIBGOLANG_API void UnlockToRLock();
private: private:
void _wakeup_all(); void _wakeup_all();
......
...@@ -22,7 +22,7 @@ from __future__ import print_function, absolute_import ...@@ -22,7 +22,7 @@ from __future__ import print_function, absolute_import
from golang import go, chan, select, default from golang import go, chan, select, default
from golang import sync, context, time from golang import sync, context, time
from pytest import raises from pytest import raises, mark
from golang.golang_test import import_pyx_tests, panics from golang.golang_test import import_pyx_tests, panics
from golang.time_test import dt from golang.time_test import dt
from six.moves import range as xrange from six.moves import range as xrange
...@@ -89,13 +89,19 @@ def test_rwmutex(): ...@@ -89,13 +89,19 @@ def test_rwmutex():
done.recv() done.recv()
assert l == ['a', 'b'] assert l == ['a', 'b']
# verify Lock/Unlock vs RLock/RUnlock interaction.
# if unlock_via_downgrade=Y, Lock is released via UnlockToRLock + RUnlock.
@mark.parametrize('unlock_via_downgrade', [False, True])
def test_rwmutex_lock_vs_rlock(unlock_via_downgrade):
mu = sync.RWMutex()
# Lock vs RLock # Lock vs RLock
l = [] # accessed as R R R ... R W R R R ... R l = [] # accessed as R R R ... R W R R R ... R
Nr1 = 10 # Nreaders queued before W Nr1 = 10 # Nreaders queued before W
Nr2 = 15 # Nreaders queued after W Nr2 = 15 # Nreaders queued after W
mu.RLock() mu.RLock()
locked = chan(Nr1+1+Nr2) # main <- R|W: mu locked locked = chan(Nr1 + 1*3 + Nr2) # main <- R|W: mu locked
rcont = chan() # main -> R: continue rcont = chan() # main -> R: continue
def R(): # readers def R(): # readers
mu.RLock() mu.RLock()
locked.send(('R', len(l))) locked.send(('R', len(l)))
...@@ -114,7 +120,15 @@ def test_rwmutex(): ...@@ -114,7 +120,15 @@ def test_rwmutex():
time.sleep(Nr2*dt) # give R2 readers more chance to call mu.RLock and run first time.sleep(Nr2*dt) # give R2 readers more chance to call mu.RLock and run first
locked.send('W') locked.send('W')
l.append('a') l.append('a')
mu.Unlock() if not unlock_via_downgrade:
locked.send('_WUnlock')
mu.Unlock()
else:
locked.send('_WUnlockToRLock')
mu.UnlockToRLock()
time.sleep(Nr2*dt)
locked.send('_WRUnlock')
mu.RUnlock()
go(W) go(W)
# spawn more readers to verify that Lock has priority over RLock # spawn more readers to verify that Lock has priority over RLock
...@@ -136,8 +150,14 @@ def test_rwmutex(): ...@@ -136,8 +150,14 @@ def test_rwmutex():
# W must get the lock first and all R2 readers only after it # W must get the lock first and all R2 readers only after it
assert locked.recv() == 'W' assert locked.recv() == 'W'
if not unlock_via_downgrade:
assert locked.recv() == '_WUnlock'
else:
assert locked.recv() == '_WUnlockToRLock'
for i in range(Nr2): for i in range(Nr2):
assert locked.recv() == ('R', 1) assert locked.recv() == ('R', 1)
if unlock_via_downgrade:
assert locked.recv() == '_WRUnlock'
# verify that sema.acquire can be woken up by sema.release not from the same # verify that sema.acquire can be woken up by sema.release not from the same
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment