Commit a4be5d2f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a6cf5de9
......@@ -35,7 +35,7 @@ type OpenOptions struct {
NoCache bool // don't use cache for read/write operations; prefetch will be noop
}
// DriverOptions describes options for DriverOpener
// DriverOptions describes options for DriverOpener.
type DriverOptions struct {
ReadOnly bool // whether to open storage as read-only
......@@ -91,9 +91,10 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme)
}
drvWatchq := make(chan WatchEvent)
drvOpt := &DriverOptions{
ReadOnly: opt.ReadOnly,
// TODO watchq
Watchq: drvWatchq,
}
storDriver, err := opener(ctx, u, drvOpt)
......@@ -111,6 +112,11 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
return &storage{
IStorageDriver: storDriver,
l1cache: cache,
drvWatchq: drvWatchq,
watchReq: make(chan watchRequest),
watchTab: make(map[chan WatchEvent]struct{}),
}, nil
}
......@@ -123,11 +129,19 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
type storage struct {
IStorageDriver
l1cache *Cache // can be =nil, if opened with NoCache
}
// watcher
drvWatchq chan WatchEvent // watchq passed to driver
watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan WatchEvent]struct{} // registered watchers
}
// loading goes through cache - this way prefetching can work
// XXX Close - stop watching? (driver will close watchq in its own Close)
// XXX LastTid - report only LastTid for which cache is ready?
// or driver.LastTid(), then wait cache is ready?
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX here: offload xid validation from cache and driver ?
// XXX here: offload wrapping err -> OpError{"load", err} ?
......@@ -143,3 +157,63 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) {
s.l1cache.Prefetch(ctx, xid)
}
}
// watcher
// watchRequest represents request to add/del a watch.
type watchRequest struct {
op watchOp // add or del
ack chan struct{} // when request processed
watchq chan WatchEvent // {Add,Del}Watch argument
}
type watchOp int
const (
addWatch watchOp = 0
delWatch watchOp = 1
)
func (s *storage) watcher() {
for {
select {
case req := <-s.watchReq:
switch req.op {
case addWatch:
s.watchTab[req.watchq] = struct{}{}
case delWatch:
delete(s.watchTab, req.watchq)
default:
panic("bad watch request op")
}
close(req.ack)
case event, ok := <-s.drvWatchq:
if !ok {
// XXX storage closed
}
// deliver event to all watchers
for watchq := range s.watchTab {
watchq <- event
}
}
}
}
func (s *storage) AddWatch(watchq chan WatchEvent) {
// XXX when already Closed?
ack := make(chan struct{})
s.watchReq <- watchRequest{addWatch, ack, watchq}
<-ack
}
func (s *storage) DelWatch(watchq chan WatchEvent) {
// XXX when already Closed?
ack := make(chan struct{})
s.watchReq <- watchRequest{delWatch, ack, watchq}
<-ack
}
......@@ -430,7 +430,7 @@ func TestWatch(t *testing.T) {
// https://github.com/pypa/setuptools/issues/510
//
// Since pkg_resources are used everywhere (e.g. in zodburi to find all
// uri resolvers) this import slowness becomes the major component to
// uri resolvers) this import slowness becomes the major part of time to
// run py `zodb commit`.
//
// if one day it is either fixed, or worked around, we could ↑ 10 to 100.
......
......@@ -332,12 +332,7 @@ type IStorage interface {
// additional to IStorageDriver
Prefetcher
// Watch returns new watcher over the storage.
//
// The watcher represents invalidation channel (notify about changes
// made to DB). XXX
//Watch() Watcher XXX -> Watch(watchq) ? (then how to unsubscribe)
Watcher
}
// Prefetcher provides functionality to prefetch objects.
......@@ -441,15 +436,25 @@ type WatchEvent struct {
}
// Watcher allows to be notified of changes to database.
//
// Watcher is safe to be used from multiple goroutines simultaneously.
type Watcher interface {
// Watch waits-for and returns next event corresponding to comitted transaction.
// AddWatch registers watchq to be notified of database changes.
//
// Whenever a new transaction is committed into the database,
// corresponding event will be sent to watchq.
//
// XXX queue overflow -> special error?
Watch(ctx context.Context) (WatchEvent, error) // XXX name -> Read? ReadEvent?
// Once registered, watchq must be read. Not doing so will stuck whole storage.
//
// Multiple AddWatch calls with the same watchq register watchq only once.
AddWatch(watchq chan WatchEvent)
// Close stops the watcher.
// err is always nil. XXX ok?
Close() error
// DelWatch unregisters watchq to be notified of database changes.
//
// After DelWatch call completes, no new events will be sent to watchq.
//
// DelWatch is noop if watchq was not registered.
DelWatch(watchq chan WatchEvent)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment