Commit 791fd2da authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 81fe88df
...@@ -25,7 +25,8 @@ will be automatically started if `autostart=True` parameter is passed to join. ...@@ -25,7 +25,8 @@ will be automatically started if `autostart=True` parameter is passed to join.
It will also be automatically started by default unless It will also be automatically started by default unless
$WENDELIN_CORE_WCFS_AUTOSTART=no is specified in environment. $WENDELIN_CORE_WCFS_AUTOSTART=no is specified in environment.
XXX Conn. Conn represents connection to wcfs server obtained by join.
XXX
""" """
import os, sys, hashlib, tempfile, subprocess, time import os, sys, hashlib, tempfile, subprocess, time
...@@ -41,7 +42,7 @@ from ZODB.FileStorage import FileStorage ...@@ -41,7 +42,7 @@ from ZODB.FileStorage import FileStorage
# Conn represents connection to wcfs server. # Conn represents connection to wcfs server.
class Conn(object): class Conn(object):
# .mountpoint path to mountpoint # .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly) # ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
def __init__(self, mountpoint, fwcfs): def __init__(self, mountpoint, fwcfs):
...@@ -98,7 +99,7 @@ def _default_autostart(): ...@@ -98,7 +99,7 @@ def _default_autostart():
# join connects to wcfs server for ZODB @ zurl. # join connects to wcfs server for ZODB @ zurl.
# #
# If wcfs for that service was already started, join connects to it. # If wcfs for that zurl was already started, join connects to it.
# Otherwise it starts wcfs for zurl if autostart is True. # Otherwise it starts wcfs for zurl if autostart is True.
# #
# join(zurl) -> Conn. # join(zurl) -> Conn.
......
...@@ -587,12 +587,12 @@ func (root *Root) zwatcher(ctx context.Context) (err error) { ...@@ -587,12 +587,12 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
traceWatch(">>>") traceWatch(">>>")
zwatchq := make(chan zodb.CommitEvent) zwatchq := make(chan zodb.Event)
at0 := root.zstor.AddWatch(zwatchq) // XXX -> to main thread to avoid race at0 := root.zstor.AddWatch(zwatchq) // XXX -> to main thread to avoid race
defer root.zstor.DelWatch(zwatchq) defer root.zstor.DelWatch(zwatchq)
_ = at0 // XXX _ = at0 // XXX
var zevent zodb.CommitEvent var zevent zodb.Event
var ok bool var ok bool
for { for {
...@@ -609,15 +609,28 @@ func (root *Root) zwatcher(ctx context.Context) (err error) { ...@@ -609,15 +609,28 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
return nil // closed XXX ok? return nil // closed XXX ok?
} }
traceWatch("zevent: %s", zevent)
} }
root.zδhandle1(zevent) traceWatch("zevent: %s", zevent)
var *zodb.EventCommit
switch zevent := zevent.(type) {
default:
return fmt.Errorf("unexpected event: %T", zevent)
case *zodb.EventError:
return zevent.Err
case *zodb.EventCommit:
= zevent
}
root.zδhandle1()
} }
} }
// zδhandle1 handles 1 change event from ZODB notification. // zδhandle1 handles 1 change event from ZODB notification.
func (root *Root) zδhandle1(zevent zodb.CommitEvent) { func (root *Root) zδhandle1(zδ *zodb.EventCommit) {
head := root.head head := root.head
// while we are invalidating OS cache, make sure that nothing, that // while we are invalidating OS cache, make sure that nothing, that
...@@ -662,8 +675,8 @@ retry: ...@@ -662,8 +675,8 @@ retry:
fmt.Printf("\n\n\n") fmt.Printf("\n\n\n")
// zevent = (tid↑, []oid) // zδ = (tid↑, []oid)
for _, oid := range zevent.Changev { for _, oid := range zδ.Changev {
// XXX zhead.Cache() lock/unlock + comment it is not really needed // XXX zhead.Cache() lock/unlock + comment it is not really needed
obj := zhead.Cache().Get(oid) obj := zhead.Cache().Get(oid)
if obj == nil { if obj == nil {
...@@ -772,7 +785,7 @@ retry: ...@@ -772,7 +785,7 @@ retry:
panic(err) // XXX panic(err) // XXX
} }
// resync .zhead to zevent.tid // resync .zhead to zδ.tid
// XXX -> Head.Resync() ? // XXX -> Head.Resync() ?
// 1. deactivate changed ZBigFile (we keep all ZBigFiles activated during whole txn) // 1. deactivate changed ZBigFile (we keep all ZBigFiles activated during whole txn)
...@@ -785,7 +798,7 @@ retry: ...@@ -785,7 +798,7 @@ retry:
// 2. abort old and resync to new txn/at // 2. abort old and resync to new txn/at
transaction.Current(zhead.txnCtx).Abort() transaction.Current(zhead.txnCtx).Abort()
txn, ctx := transaction.New(context.Background()) // XXX bg ok? txn, ctx := transaction.New(context.Background()) // XXX bg ok?
zhead.Resync(txn, zevent.Tid) zhead.Resync(txn, zδ.Tid)
zhead.txnCtx = ctx zhead.txnCtx = ctx
// 3. reactivate/restat invalidated ZBigFile // 3. reactivate/restat invalidated ZBigFile
...@@ -808,7 +821,7 @@ retry: ...@@ -808,7 +821,7 @@ retry:
// notify .wcfs/zhead // notify .wcfs/zhead
for sk := range gdebug.zheadSockTab { for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(sk, "%s\n", zevent.Tid) _, err := fmt.Fprintf(sk, "%s\n", zδ.Tid)
if err != nil { if err != nil {
log.Error(err) // XXX errctx, -> warning? log.Error(err) // XXX errctx, -> warning?
sk.Close() sk.Close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment