Commit 814a1fa8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a8e61d2f
...@@ -65,8 +65,8 @@ type NodeLink struct { ...@@ -65,8 +65,8 @@ type NodeLink struct {
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
acceptq chan *Conn // queue of incoming connections for Accept acceptq chan *Conn // queue of incoming connections for Accept
txq chan txReq // tx requests from Conns go via here // txq chan txReq // tx requests from Conns go via here
// (rx packets are routed to Conn.rxq) // // (rx packets are routed to Conn.rxq)
axdown chan struct{} // ready when accept is marked as no longer operational axdown chan struct{} // ready when accept is marked as no longer operational
axdown1 sync.Once // CloseAccept may be called severall times axdown1 sync.Once // CloseAccept may be called severall times
...@@ -95,16 +95,16 @@ type Conn struct { ...@@ -95,16 +95,16 @@ type Conn struct {
link *NodeLink link *NodeLink
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit results for this Conn go back here // txerr chan error // transmit results for this Conn go back here
txdown chan struct{} // ready when Conn TX is marked as no longer operational // txdown chan struct{} // ready when Conn TX is marked as no longer operational
rxdown chan struct{} // ----//---- RX rxdown chan struct{} // ----//---- RX
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown // txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
rxdownOnce sync.Once // ----//---- rxdownOnce sync.Once // ----//----
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
rxclosed int32 // whether CloseRecv was called rxclosed int32 // whether CloseRecv was called
txclosed int32 // whether CloseSend was called // txclosed int32 // whether CloseSend was called
errMsg *Error // error message for peer if rx is down errMsg *Error // error message for peer if rx is down
...@@ -176,7 +176,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -176,7 +176,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
connTab: map[uint32]*Conn{}, connTab: map[uint32]*Conn{},
nextConnId: nextConnId, nextConnId: nextConnId,
acceptq: make(chan *Conn), // XXX +buf acceptq: make(chan *Conn), // XXX +buf
txq: make(chan txReq), //txq: make(chan txReq),
axdown: make(chan struct{}), axdown: make(chan struct{}),
down: make(chan struct{}), down: make(chan struct{}),
} }
...@@ -195,8 +195,8 @@ func (nl *NodeLink) newConn(connId uint32) *Conn { ...@@ -195,8 +195,8 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{link: nl, c := &Conn{link: nl,
connId: connId, connId: connId,
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send //txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}), //txdown: make(chan struct{}),
rxdown: make(chan struct{}), rxdown: make(chan struct{}),
} }
nl.connTab[connId] = c nl.connTab[connId] = c
...@@ -328,9 +328,9 @@ func (c *Conn) shutdown() { ...@@ -328,9 +328,9 @@ func (c *Conn) shutdown() {
} }
func (c *Conn) shutdownTX() { func (c *Conn) shutdownTX() {
c.txdownOnce.Do(func() { //c.txdownOnce.Do(func() {
close(c.txdown) // close(c.txdown)
}) //})
} }
// shutdownRX marks .rxq as no loner operational // shutdownRX marks .rxq as no loner operational
...@@ -389,7 +389,7 @@ func (c *Conn) Close() error { ...@@ -389,7 +389,7 @@ func (c *Conn) Close() error {
nl := c.link nl := c.link
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
atomic.StoreInt32(&c.rxclosed, 1) atomic.StoreInt32(&c.rxclosed, 1)
atomic.StoreInt32(&c.txclosed, 1) //atomic.StoreInt32(&c.txclosed, 1)
c.shutdown() c.shutdown()
// adjust link.connTab // adjust link.connTab
...@@ -655,6 +655,7 @@ type txReq struct { ...@@ -655,6 +655,7 @@ type txReq struct {
errch chan error errch chan error
} }
/*
// errSendShutdown returns appropriate error when c.txdown is found ready in Send // errSendShutdown returns appropriate error when c.txdown is found ready in Send
func (c *Conn) errSendShutdown() error { func (c *Conn) errSendShutdown() error {
switch { switch {
...@@ -672,6 +673,7 @@ func (c *Conn) errSendShutdown() error { ...@@ -672,6 +673,7 @@ func (c *Conn) errSendShutdown() error {
return ErrLinkDown return ErrLinkDown
} }
} }
*/
// sendPkt sends raw packet via connection. // sendPkt sends raw packet via connection.
// //
...@@ -686,6 +688,7 @@ func (c *Conn) sendPkt(pkt *PktBuf) error { ...@@ -686,6 +688,7 @@ func (c *Conn) sendPkt(pkt *PktBuf) error {
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109 // https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14 // https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14
///*
func (c *Conn) sendPkt2(pkt *PktBuf) error { func (c *Conn) sendPkt2(pkt *PktBuf) error {
// set pkt connId associated with this connection // set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId) pkt.Header().ConnId = hton32(c.connId)
...@@ -702,6 +705,7 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error { ...@@ -702,6 +705,7 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error {
return err return err
} }
//*/
/* /*
func (c *Conn) sendPkt2(pkt *PktBuf) error { func (c *Conn) sendPkt2(pkt *PktBuf) error {
...@@ -1216,7 +1220,8 @@ func (c *Conn) Recv() (Msg, error) { ...@@ -1216,7 +1220,8 @@ func (c *Conn) Recv() (Msg, error) {
return msg, nil return msg, nil
} }
// Send sends message // Send sends message.
//
// it encodes message into packet and sends it // it encodes message into packet and sends it
func (c *Conn) Send(msg Msg) error { func (c *Conn) Send(msg Msg) error {
traceConnSendPre(c, msg) traceConnSendPre(c, msg)
......
...@@ -77,7 +77,7 @@ Sgo() { ...@@ -77,7 +77,7 @@ Sgo() {
# -cpuprofile cpu.out # -cpuprofile cpu.out
# -trace trace.out # -trace trace.out
exec -a Sgo \ exec -a Sgo \
neo -log_dir=$log storage -cluster=$cluster -bind=$Sbind -masters=$Mbind "$@" & neo -trace trace.out -log_dir=$log storage -cluster=$cluster -bind=$Sbind -masters=$Mbind "$@" &
} }
......
...@@ -8,6 +8,9 @@ import ( ...@@ -8,6 +8,9 @@ import (
"crypto/sha1" "crypto/sha1"
"flag" "flag"
"fmt" "fmt"
"hash"
"hash/crc32"
"hash/adler32"
//"os" //"os"
"time" "time"
...@@ -100,12 +103,25 @@ func zsha1(ctx context.Context, url string, useprefetch bool) (err error) { ...@@ -100,12 +103,25 @@ func zsha1(ctx context.Context, url string, useprefetch bool) (err error) {
} }
before := lastTid + 1 // XXX overflow ? before := lastTid + 1 // XXX overflow ?
if false { if true {
defer profile.Start(profile.TraceProfile).Stop() //defer profile.Start(profile.TraceProfile).Stop()
defer profile.Start().Stop()
} }
for qqq := 0; qqq < 10; qqq++ {
tstart := time.Now() tstart := time.Now()
m := sha1.New() var m hash.Hash
hashName := "crc32"
switch hashName {
case "sha1":
m = sha1.New()
case "crc32":
m = crc32.NewIEEE()
case "adler32":
m = adler32.New()
default:
panic(0) // XXX
}
oid := zodb.Oid(0) oid := zodb.Oid(0)
nread := 0 nread := 0
...@@ -142,8 +158,9 @@ loop: ...@@ -142,8 +158,9 @@ loop:
if useprefetch { if useprefetch {
x += " +prefetch" x += " +prefetch"
} }
fmt.Printf("%x ; oid=0..%d nread=%d t=%s (%s / object) x=%s\n", fmt.Printf("%s:%x ; oid=0..%d nread=%d t=%s (%s / object) x=%s\n",
m.Sum(nil), oid-1, nread, δt, δt / time.Duration(oid), x) // XXX /oid cast ? hashName,m.Sum(nil), oid-1, nread, δt, δt / time.Duration(oid), x) // XXX /oid cast ?
}
return nil return nil
} }
...@@ -9,9 +9,38 @@ from ZODB.POSException import POSKeyError ...@@ -9,9 +9,38 @@ from ZODB.POSException import POSKeyError
from ZODB.utils import p64, u64 from ZODB.utils import p64, u64
import hashlib import hashlib
from zlib import crc32, adler32
import sys import sys
from time import time from time import time
# crc32 in hashlib interface
class CRC32Hasher:
name = "crc32"
def __init__(self):
self.h = crc32('')
def update(self, data):
self.h = crc32(data, self.h)
def hexdigest(self):
return '%x' % (self.h & 0xffffffff)
# adler32 in hashlib interface
class Adler32Hasher:
name = "adler32"
def __init__(self):
self.h = adler32('')
def update(self, data):
self.h = adler32(data, self.h)
def hexdigest(self):
return '%x' % (self.h & 0xffffffff)
def main(): def main():
url = sys.argv[1] url = sys.argv[1]
...@@ -19,30 +48,34 @@ def main(): ...@@ -19,30 +48,34 @@ def main():
last_tid = stor.lastTransaction() last_tid = stor.lastTransaction()
before = p64(u64(last_tid) + 1) before = p64(u64(last_tid) + 1)
tstart = time() for zzz in range(10):
m = hashlib.sha1() #m = hashlib.sha1()
m = CRC32Hasher()
#m = Adler32Hasher()
tstart = time()
oid = 0 oid = 0
nread = 0 nread = 0
while 1: while 1:
try: try:
data, serial, _ = stor.loadBefore(p64(oid), before) data, serial, _ = stor.loadBefore(p64(oid), before)
except POSKeyError: except POSKeyError:
break break
m.update(data) m.update(data)
#print('%s @%s\tsha1: %s' % (oid, u64(serial), m.hexdigest()), file=sys.stderr) #print('%s @%s\tsha1: %s' % (oid, u64(serial), m.hexdigest()), file=sys.stderr)
#print('\tdata: %s' % (data.encode('hex'),), file=sys.stderr) #print('\tdata: %s' % (data.encode('hex'),), file=sys.stderr)
nread += len(data) nread += len(data)
oid += 1 oid += 1
tend = time() tend = time()
dt = tend - tstart dt = tend - tstart
print('%s ; oid=0..%d nread=%d t=%.3fs (%.1fμs / object) x=zsha1.py' % \ print('%s:%s ; oid=0..%d nread=%d t=%.3fs (%.1fμs / object) x=zsha1.py' % \
(m.hexdigest(), oid-1, nread, dt, dt * 1E6 / oid)) (m.name, m.hexdigest(), oid-1, nread, dt, dt * 1E6 / oid))
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment