From 791fd2dacdfe31ecad72192c6677af287cbb330b Mon Sep 17 00:00:00 2001 From: Kirill Smelkov <kirr@nexedi.com> Date: Thu, 21 Feb 2019 13:55:44 +0300 Subject: [PATCH] . --- wcfs/__init__.py | 7 ++++--- wcfs/wcfs.go | 33 +++++++++++++++++++++++---------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/wcfs/__init__.py b/wcfs/__init__.py index faed250..43e9c46 100644 --- a/wcfs/__init__.py +++ b/wcfs/__init__.py @@ -25,7 +25,8 @@ will be automatically started if `autostart=True` parameter is passed to join. It will also be automatically started by default unless $WENDELIN_CORE_WCFS_AUTOSTART=no is specified in environment. -XXX Conn. +Conn represents connection to wcfs server obtained by join. +XXX """ import os, sys, hashlib, tempfile, subprocess, time @@ -41,7 +42,7 @@ from ZODB.FileStorage import FileStorage # Conn represents connection to wcfs server. class Conn(object): - # .mountpoint path to mountpoint + # .mountpoint path to wcfs mountpoint # ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly) def __init__(self, mountpoint, fwcfs): @@ -98,7 +99,7 @@ def _default_autostart(): # join connects to wcfs server for ZODB @ zurl. # -# If wcfs for that service was already started, join connects to it. +# If wcfs for that zurl was already started, join connects to it. # Otherwise it starts wcfs for zurl if autostart is True. # # join(zurl) -> Conn. diff --git a/wcfs/wcfs.go b/wcfs/wcfs.go index f7cf2a3..69a3e3a 100644 --- a/wcfs/wcfs.go +++ b/wcfs/wcfs.go @@ -587,12 +587,12 @@ func (root *Root) zwatcher(ctx context.Context) (err error) { traceWatch(">>>") - zwatchq := make(chan zodb.CommitEvent) + zwatchq := make(chan zodb.Event) at0 := root.zstor.AddWatch(zwatchq) // XXX -> to main thread to avoid race defer root.zstor.DelWatch(zwatchq) _ = at0 // XXX - var zevent zodb.CommitEvent + var zevent zodb.Event var ok bool for { @@ -609,15 +609,28 @@ func (root *Root) zwatcher(ctx context.Context) (err error) { return nil // closed XXX ok? } - traceWatch("zevent: %s", zevent) } - root.zδhandle1(zevent) + traceWatch("zevent: %s", zevent) + + var zδ *zodb.EventCommit + switch zevent := zevent.(type) { + default: + return fmt.Errorf("unexpected event: %T", zevent) + + case *zodb.EventError: + return zevent.Err + + case *zodb.EventCommit: + zδ = zevent + } + + root.zδhandle1(zδ) } } // zδhandle1 handles 1 change event from ZODB notification. -func (root *Root) zδhandle1(zevent zodb.CommitEvent) { +func (root *Root) zδhandle1(zδ *zodb.EventCommit) { head := root.head // while we are invalidating OS cache, make sure that nothing, that @@ -662,8 +675,8 @@ retry: fmt.Printf("\n\n\n") - // zevent = (tid↑, []oid) - for _, oid := range zevent.Changev { + // zδ = (tid↑, []oid) + for _, oid := range zδ.Changev { // XXX zhead.Cache() lock/unlock + comment it is not really needed obj := zhead.Cache().Get(oid) if obj == nil { @@ -772,7 +785,7 @@ retry: panic(err) // XXX } - // resync .zhead to zevent.tid + // resync .zhead to zδ.tid // XXX -> Head.Resync() ? // 1. deactivate changed ZBigFile (we keep all ZBigFiles activated during whole txn) @@ -785,7 +798,7 @@ retry: // 2. abort old and resync to new txn/at transaction.Current(zhead.txnCtx).Abort() txn, ctx := transaction.New(context.Background()) // XXX bg ok? - zhead.Resync(txn, zevent.Tid) + zhead.Resync(txn, zδ.Tid) zhead.txnCtx = ctx // 3. reactivate/restat invalidated ZBigFile @@ -808,7 +821,7 @@ retry: // notify .wcfs/zhead for sk := range gdebug.zheadSockTab { - _, err := fmt.Fprintf(sk, "%s\n", zevent.Tid) + _, err := fmt.Fprintf(sk, "%s\n", zδ.Tid) if err != nil { log.Error(err) // XXX errctx, -> warning? sk.Close() -- 2.30.9