Commit 09433847 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0eba9861
......@@ -432,8 +432,8 @@ package main
// zheadMu > Watch.atMu
//
// WatchLink.byfileMu > Watch.atMu
// BigFile.watchMu > Watch.atMu
// WatchLink.byfileMu > BigFileDir.fileMu
// BigFile.watchMu > Watch.atMu
import (
"bufio"
......@@ -605,7 +605,7 @@ type WatchNode struct {
idNext int32 // ID for next opened WatchLink
}
// /head/watch handle - served by WatchLink.
// /head/watch open - served by WatchLink.
type WatchLink struct {
sk *FileSock // IO channel to client
id int32 // ID of this /head/watch handle (for debug log)
......@@ -615,7 +615,6 @@ type WatchLink struct {
//
// both already established, and watches being initialized in-progress are registered here.
// (see setupWatch)
// XXX byfile -> wlink-global watchMu ?
byfileMu sync.Mutex // zheadMu.W | zheadMu.R + byfileMu (XXX recheck)
byfile map[zodb.Oid]*Watch // {} foid -> Watch
......@@ -1509,14 +1508,17 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
headAt, headAt.Time().Sub(at.Time().Time))
}
wlink.byfileMu.Lock()
// if watch was already established - we need to update it
w := wlink.byfile[foid] // XXX locking
w := wlink.byfile[foid]
if w == nil {
// watch was not previously established - set it up anew
bfdir.fileMu.Lock()
f := bfdir.fileTab[foid]
bfdir.fileMu.Unlock()
if f == nil {
wlink.byfileMu.Unlock()
// by "invalidation protocol" watch is setup after data file was opened
return fmt.Errorf("file not yet known to wcfs or is not a ZBigFile")
}
......@@ -1534,9 +1536,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// at="-" (InvalidTid) means "remove the watch"
if at == zodb.InvalidTid {
delete(wlink.byfile, foid) // XXX locking
delete(wlink.byfile, foid)
delete(f.watchTab, w)
f.watchMu.Unlock()
wlink.byfileMu.Unlock()
return nil
}
......@@ -1551,6 +1554,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
if !(at >= w.at) {
w.atMu.Unlock()
f.watchMu.Unlock()
wlink.byfileMu.Unlock()
return fmt.Errorf("going back in history is forbidden")
}
......@@ -1590,13 +1594,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
//
// - also: there won't be simultaneous READs that would need to be
// unpinned, because we update w.at to requested at early.
//
// XXX register only if watch was created anew, not updated?
w.at = at
// NOTE registering f.watchTab[w] and wlink.byfile[foid] = w must come together.
f.watchTab[w] = struct{}{}
wlink.byfile[foid] = w // XXX locking
wlink.byfile[foid] = w
f.watchMu.Unlock()
wlink.byfileMu.Unlock()
// XXX defer -> unregister watch if error?
......@@ -1715,12 +1717,14 @@ func (wlink *WatchLink) _serve() (err error) {
}
// unregister all watches created on this wlink
for _, w := range wlink.byfile { // XXX locking
wlink.byfileMu.Lock()
for _, w := range wlink.byfile {
w.file.watchMu.Lock()
delete(w.file.watchTab, w)
w.file.watchMu.Unlock()
}
wlink.byfile = nil
wlink.byfileMu.Unlock()
// write to peer if it was logical error on client side
// then .sk.tx to wakeup rx on client side
......
......@@ -2057,6 +2057,7 @@ def procwait_(ctx, proc): # -> ok
raise
return True
# xdefer is like defer, but makes sure exception raised before deferred
# function is called is not lost.
#
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment