Commit 5167f79c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 48eabaaa
......@@ -87,7 +87,7 @@ import (
type FileStorage struct {
file *os.File
// protects updates to index and to everything below - in other words
// protects updates to index and to txnh{Min,Max} - in other words
// to everything that depends on what current last transaction is.
mu sync.RWMutex
......@@ -97,6 +97,8 @@ type FileStorage struct {
// (both with .Len=0 & .Tid=0 if database is empty)
txnhMin TxnHeader
txnhMax TxnHeader
watchq chan XXX
}
// IStorageDriver
......@@ -449,6 +451,8 @@ func (fs *FileStorage) watch() (err error) {
f := fs.file
defer xerr.Contextf(&err, "%s: watch", f.Name())
// XXX race -> start watching before FileStorage is returned to user
// setup watcher to changes on f
w, err := fsnotify.NewWatcher()
if err != nil {
......@@ -461,19 +465,114 @@ func (fs *FileStorage) watch() (err error) {
return err
}
// loop checking f.size vs topPos vs posLastChecked (XXX)
//
// besides relying on notify we also check file periodically to avoid
// stalls due to e.g. OS notification errors.
toppos := fs.index.TopPos
var it *Iter
resetit := func() { // reset it to start from toppos with empty buffer
it = Iterate(seqReadAt(f), toppos, IterForward)
}
resetit()
tick := time.NewTicker(1*time.Second)
defer tick.Stop()
var tpartWrite time.Time
// loop checking f.size vs topPos vs posLastChecked (XXX)
for {
// XXX select here
fi, err := f.Stat()
if err != nil {
return err
}
fsize := fi.Size()
if fsize > toppos {
// XXX
switch {
case fsize == toppos:
continue // same as before
case fsize < toppos:
// XXX add pack support
return fmt.Errorf("file truncated (%d -> %d)", toppos, fsize)
}
// there is some data after toppos - try to read it. always create buffer
// anew, since there could be written and then aborted transactions.
err = it.NextTxn(LoadNoStrings)
if err != nil {
// transaction header could not be fully read.
//
// Even though FileStorage code always calls write with full txn
// header, the kernel could do the write in parts, e.g. if written
// region overlaps page boundary.
//
// we check for some time to distinguish in-progress write from just
// garbage.
if errors.Cause(err) == io.ErrUnexpectedEOF {
now := time.Now()
if tpartWrite.IsZero() {
tpartWrite = now
} else if now.Sub(tpartialStart) > 3*time.Second {
return err
}
} else {
// not ok - e.g. IO error
// EOF is ok - it is e.g. when transaction was aborted
if err != io.EOF {
return err
}
// EOF - reset t(partial)
tpartWrite = time.Time{}
}
// after an error (EOF, partial read) we have to retry with
// reset bufferring.
resetit()
continue
}
tpartWrite = time.Time{} // reset
// XXX dup wrt Index.Update
// we could successfully read the transaction header. Try to see now,
// whether it is finished transaction or not.
if it.Txnh.Status == zodb.TxnInprogress {
// not yet. we have to reset because transaction finish writes
// what we have already buffered.
resetit()
continue
}
// it is fully-committed transaction. scan its data records to update
// our index & notify watchers. There is no expected errors here.
var oidv []zodb.Oid
update := map[zodb.Oid]int64{}
for {
err = it.NextData()
if err != nil {
err = okEOF(err)
if err != nil {
return err
}
break
}
update[it.Datah.Oid] = it.Datah.Pos
oidv = append(oidv, it.Datah.Oid)
}
// update index
fs.mu.Lock()
index.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update {
index.Set(oid, pos)
}
fs.mu.Unlock()
// XXX -> watchq
select {
// XXX handle quit
......@@ -491,11 +590,16 @@ func (fs *FileStorage) watch() (err error) {
// write, truncate, chown ...) -> it is time to check the file again.
continue
// TODO + time.After(30s) to avoid stalls due to e.g. OS notification errors
case <-tick.C:
// recheck the file periodically.
continue
}
}
}
//func (fs *FileStorage) readUpdates() {
//}
func (fs *FileStorage) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
panic("TODO")
}
......
......@@ -435,6 +435,8 @@ func TestWatch(t *testing.T) {
if objvWant := []zodb.Oid{0, i}; !(tid == at && reflect.DeepEqual(objv, objvWant)) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", tid, objv, at, objvWant)
}
checkLastTid(at)
}
err := fs.Close()
......@@ -443,5 +445,5 @@ func TestWatch(t *testing.T) {
}
_, _, err = fs.Watch(ctx)
// assert err = "closed"
// XXX assert err = "closed"
}
......@@ -469,7 +469,7 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
// do not update the index immediately so that in case of error
// in the middle of txn's data, index stays consistent and
// correct for topPos pointing to previous transaction.
update := map[zodb.Oid]int64{} // XXX malloc every time -> better reuse
update := map[zodb.Oid]int64{}
for {
err = it.NextData()
if err != nil {
......@@ -492,7 +492,7 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
// notify progress
if progress != nil {
pd.TxnIndexed++
progress(pd)
progress(pd) // XXX + update
}
}
......
......@@ -293,7 +293,7 @@ var haveZODBPy = false
var workRoot string
func TestMain(m *testing.M) {
// check whether we have zodb/py
// check whether we have zodb/py XXX + zodbtools
cmd := exec.Command("python2", "-c", "import ZODB")
err := cmd.Run()
if err == nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment