Commit a806da23 authored by Jacob Vosmaer (GitLab)'s avatar Jacob Vosmaer (GitLab)

Merge branch '111-watchkey-fix' into 'master'

Fix for internal/redis.TestWatchKey{AlreadyChanged,MassiveParallel}

Closes #111

See merge request !136
parents e798a664 951f21c8
......@@ -36,153 +36,128 @@ func createUnsubscribeMessage(key string) []interface{} {
}
}
func countWatchers(key string) int {
keyWatcherMutex.Lock()
defer keyWatcherMutex.Unlock()
return len(keyWatcher[key])
}
func deleteWatchers(key string) {
keyWatcherMutex.Lock()
defer keyWatcherMutex.Unlock()
delete(keyWatcher, key)
}
// Forces a run of the `Process` loop against a mock PubSubConn.
func processMessages(numWatchers int, value string) {
psc := redigomock.NewConn()
// Setup the initial subscription message
psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel))
psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel))
psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value))
// Wait for all the `WatchKey` calls to be registered
for countWatchers(runnerKey) != numWatchers {
time.Sleep(time.Millisecond)
}
processInner(psc)
}
func TestWatchKeySeenChange(t *testing.T) {
mconn, td := setupMockPool()
conn, td := setupMockPool()
defer td()
go Process(false)
// Setup the initial subscription message
mconn.Command("SUBSCRIBE", keySubChannel).
Expect(createSubscribeMessage(keySubChannel))
mconn.Command("UNSUBSCRIBE", keySubChannel).
Expect(createUnsubscribeMessage(keySubChannel))
mconn.Command("GET", runnerKey).
Expect("something").
Expect("somethingelse")
mconn.ReceiveWait = true
mconn.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"=somethingelse"))
// ACTUALLY Fill the buffers
go func(mconn *redigomock.Conn) {
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
}(mconn)
val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second))
conn.Command("GET", runnerKey).Expect("something")
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
val, err := WatchKey(runnerKey, "something", time.Second)
assert.NoError(t, err, "Expected no error")
assert.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
wg.Done()
}()
processMessages(1, "somethingelse")
wg.Wait()
}
func TestWatchKeyNoChange(t *testing.T) {
mconn, td := setupMockPool()
conn, td := setupMockPool()
defer td()
go Process(false)
// Setup the initial subscription message
mconn.Command("SUBSCRIBE", keySubChannel).
Expect(createSubscribeMessage(keySubChannel))
mconn.Command("UNSUBSCRIBE", keySubChannel).
Expect(createUnsubscribeMessage(keySubChannel))
mconn.Command("GET", runnerKey).
Expect("something").
Expect("something")
mconn.ReceiveWait = true
mconn.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"=something"))
// ACTUALLY Fill the buffers
go func(mconn *redigomock.Conn) {
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
}(mconn)
val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second))
conn.Command("GET", runnerKey).Expect("something")
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
val, err := WatchKey(runnerKey, "something", time.Second)
assert.NoError(t, err, "Expected no error")
assert.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value")
wg.Done()
}()
processMessages(1, "something")
wg.Wait()
}
func TestWatchKeyTimeout(t *testing.T) {
mconn, td := setupMockPool()
conn, td := setupMockPool()
defer td()
go Process(false)
// Setup the initial subscription message
mconn.Command("SUBSCRIBE", keySubChannel).
Expect(createSubscribeMessage(keySubChannel))
mconn.Command("UNSUBSCRIBE", keySubChannel).
Expect(createUnsubscribeMessage(keySubChannel))
mconn.Command("GET", runnerKey).
Expect("something").
Expect("something")
mconn.ReceiveWait = true
// ACTUALLY Fill the buffers
go func(mconn *redigomock.Conn) {
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
}(mconn)
val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second))
conn.Command("GET", runnerKey).Expect("something")
val, err := WatchKey(runnerKey, "something", time.Millisecond)
assert.NoError(t, err, "Expected no error")
assert.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change")
// Clean up watchers since Process isn't doing that for us (not running)
deleteWatchers(runnerKey)
}
func TestWatchKeyAlreadyChanged(t *testing.T) {
mconn, td := setupMockPool()
conn, td := setupMockPool()
defer td()
go Process(false)
// Setup the initial subscription message
mconn.Command("SUBSCRIBE", keySubChannel).
Expect(createSubscribeMessage(keySubChannel))
mconn.Command("UNSUBSCRIBE", keySubChannel).
Expect(createUnsubscribeMessage(keySubChannel))
mconn.Command("GET", runnerKey).
Expect("somethingelse").
Expect("somethingelse")
mconn.ReceiveWait = true
// ACTUALLY Fill the buffers
go func(mconn *redigomock.Conn) {
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
mconn.ReceiveNow <- true
}(mconn)
val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second))
conn.Command("GET", runnerKey).Expect("somethingelse")
val, err := WatchKey(runnerKey, "something", time.Second)
assert.NoError(t, err, "Expected no error")
assert.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed")
// Clean up watchers since Process isn't doing that for us (not running)
deleteWatchers(runnerKey)
}
func TestWatchKeyMassiveParallel(t *testing.T) {
mconn, td := setupMockPool()
func TestWatchKeyMassivelyParallel(t *testing.T) {
runTimes := 100 // 100 parallel watchers
conn, td := setupMockPool()
defer td()
go Process(false)
// Setup the initial subscription message
mconn.Command("SUBSCRIBE", keySubChannel).
Expect(createSubscribeMessage(keySubChannel))
mconn.Command("UNSUBSCRIBE", keySubChannel).
Expect(createUnsubscribeMessage(keySubChannel))
getCmd := mconn.Command("GET", runnerKey)
mconn.ReceiveWait = true
const runTimes = 100
wg := &sync.WaitGroup{}
wg.Add(runTimes)
getCmd := conn.Command("GET", runnerKey)
for i := 0; i < runTimes; i++ {
mconn.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"=somethingelse"))
getCmd = getCmd.Expect("something")
}
wg := &sync.WaitGroup{}
// Race-conditions /o/ \o\
for i := 0; i < runTimes; i++ {
wg.Add(1)
go func(mconn *redigomock.Conn) {
defer wg.Done()
// ACTUALLY Fill the buffers
go func(mconn *redigomock.Conn) {
mconn.ReceiveNow <- true
}(mconn)
val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second))
go func() {
val, err := WatchKey(runnerKey, "something", time.Second)
assert.NoError(t, err, "Expected no error")
assert.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
}(mconn)
wg.Done()
}()
}
wg.Wait()
processMessages(runTimes, "somethingelse")
wg.Wait()
}
......@@ -149,8 +149,7 @@ func GetString(key string) (string, error) {
if conn == nil {
return "", fmt.Errorf("Not connected to redis")
}
defer func() {
conn.Close()
}()
defer conn.Close()
return redis.String(conn.Do("GET", key))
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment