Commit 9d8e264f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5aae345c
......@@ -468,9 +468,11 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
//
// watcher is the only place that mutates index and txnh{Min,Max}.
// XXX ^^^ will change after commit is implemented.
func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
//
// if errFirstRead is !nil, the error of reading first transaction header is sent to it.
func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
defer w.Close() // XXX lclose
err := fs._watcher(w)
err := fs._watcher(w, errFirstRead)
// it is ok if we got read error due to file being closed
if e, _ := errors.Cause(err).(*os.PathError); e != nil && (e.Err == os.ErrClosed ||
// XXX it can also be internal.poll.ErrFileClosing
......@@ -486,8 +488,8 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
log.Print(err)
}
// if watcher failed with e.g. IO error we no longer know what is real
// last_tid and which objects were modified in the IO error region.
// if watcher failed with e.g. IO error, we no longer know what is real
// last_tid and which objects were modified after it.
// -> storage operations have to fail from now on.
fs.shutdown(err)
......@@ -498,10 +500,16 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
// _watcher serves watcher and returns either when fs is closed (ok), or when
// it hits any kind of non-recoverable error.
func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error) (err error) {
f := fs.file
idx := fs.index
defer xerr.Contextf(&err, "%s: watcher", f.Name())
defer func() {
if errFirstRead != nil {
errFirstRead <- err
errFirstRead = nil
}
}()
// loop checking f.size vs topPos
//
......@@ -606,6 +614,11 @@ mainloop:
// read ok - reset t₀(partial)
t0partial = time.Time{}
if errFirstRead != nil {
errFirstRead <- nil // ok
errFirstRead = nil
}
// XXX dup wrt Index.Update
// we could successfully read the transaction header. Try to see now,
......@@ -634,7 +647,7 @@ mainloop:
oidv = append(oidv, it.Datah.Oid)
}
// update index & txnh{MinMax}
// update index & txnh{Min,Max}
fs.mu.Lock()
idx.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update {
......@@ -708,7 +721,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
fs.file = f
defer func() {
if err != nil {
f.Close() // XXX -> lclose
fs.shutdown(err)
}
}()
......@@ -743,8 +756,13 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
err = index.Update(ctx, fseq, -1, nil/*no progress; XXX log it? */)
}
// XXX cause=ErrUnexpectedEOF -> let watcher decide what was
// it: garbage or in-progress transaction
// it can be either garbage or in-progress transaction.
// defer to watcher to clarify this for us.
checkTailGarbage := false
if errors.Cause(err) == io.ErrUnexpectedEOF {
err = nil
checkTailGarbage = true
}
if err != nil {
return nil, err
}
......@@ -788,17 +806,31 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
if err != nil {
return nil, err
}
defer func() {
if err != nil {
w.Close() // XXX lclose
}
}()
err = w.Add(f.Name())
if err != nil {
w.Close() // XXX lclose
return nil, err
}
go fs.watcher(w)
var errFirstRead chan error
if checkTailGarbage {
defer xerr.Contextf(&err, "open %s: checking whether it is garbage at @%d", path, index.TopPos)
errFirstRead = make(chan error, 1)
}
go fs.watcher(w, errFirstRead)
if checkTailGarbage {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err = <-errFirstRead:
if err != nil {
return nil, err // it was garbage
}
}
}
return fs, nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment