Commit d2c0cef3 authored by Kirill Smelkov's avatar Kirill Smelkov

X neo: Draft support for Load

parent 2ab737be
No related merge requests found
...@@ -45,6 +45,7 @@ func (c *Client) Close() error { ...@@ -45,6 +45,7 @@ func (c *Client) Close() error {
} }
func (c *Client) LastTid() (zodb.Tid, error) { func (c *Client) LastTid() (zodb.Tid, error) {
// FIXME do not use global conn (see comment in openClientByURL)
// XXX open new conn for this particular req/reply ? // XXX open new conn for this particular req/reply ?
err := EncodeAndSend(c.storConn, &LastTransaction{}) err := EncodeAndSend(c.storConn, &LastTransaction{})
if err != nil { if err != nil {
...@@ -70,7 +71,46 @@ func (c *Client) LastTid() (zodb.Tid, error) { ...@@ -70,7 +71,46 @@ func (c *Client) LastTid() (zodb.Tid, error) {
} }
func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
panic("TODO") // XXX // FIXME do not use global conn (see comment in openClientByURL)
req := GetObject{Oid: xid.Oid}
if xid.TidBefore {
req.Serial = INVALID_TID
req.Tid = xid.Tid
} else {
req.Serial = xid.Tid
req.Tid = INVALID_TID
}
err = EncodeAndSend(c.storConn, &req)
if err != nil {
return nil, 0, err // XXX err context
}
reply, err := RecvAndDecode(c.storConn)
if err != nil {
// XXX err context (e.g. peer resetting connection -> currently only EOF)
return nil, 0, err
}
switch reply := reply.(type) {
case *Error:
return nil, 0, reply // XXX err context
default:
// XXX more error context ?
return nil, 0, fmt.Errorf("protocol error: unexpected reply: %T", reply)
case *AnswerGetObject:
data = reply.Data
tid = reply.Serial
// TODO reply.Checksum - check sha1
// TODO reply.Compression - decompress
// reply.NextSerial
// reply.DataSerial
return data, tid, nil
}
} }
func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
...@@ -98,8 +138,14 @@ func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) { ...@@ -98,8 +138,14 @@ func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
} }
// identification passed // identification passed
// XXX only one conn is not appropriate for multiple goroutines/threads
// asking storage in parallel. At the same time creating new conn for
// every request is ok? -> not so good to create new goroutine per 1 object read
// XXX -> server could reuse goroutines -> so not so bad ?
conn, err := storLink.NewConn() conn, err := storLink.NewConn()
if err != nil { if err != nil {
storLink.Close() // XXX err
return nil, err // XXX err ctx ? return nil, err // XXX err ctx ?
} }
......
...@@ -140,7 +140,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -140,7 +140,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
reply = &Error{Code: 0, Message: err.Error()} // XXX Code reply = &Error{Code: 0, Message: err.Error()} // XXX Code
} else { } else {
reply = &AnswerGetObject{ reply = &AnswerGetObject{
Oid: xid.Oid, Oid: xid.Oid,
Serial: tid, Serial: tid,
Compression: false, Compression: false,
......
...@@ -739,7 +739,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -739,7 +739,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// lookup in index position of oid data record within latest transaction who changed this oid // lookup in index position of oid data record within latest transaction who changed this oid
dataPos, ok := fs.index.Get(xid.Oid) dataPos, ok := fs.index.Get(xid.Oid)
if !ok { if !ok {
return nil, zodb.Tid(0), &zodb.ErrOidMissing{Oid: xid.Oid} return nil, 0, &zodb.ErrOidMissing{Oid: xid.Oid}
} }
// FIXME zodb.TidMax is only 7fff... tid from outside can be ffff... // FIXME zodb.TidMax is only 7fff... tid from outside can be ffff...
...@@ -760,13 +760,13 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -760,13 +760,13 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
err = &ErrXidLoad{xid, err} err = &ErrXidLoad{xid, err}
} }
return nil, zodb.Tid(0), err return nil, 0, err
} }
} }
// found dh.Tid < tidBefore; check it really satisfies xid.XTid // found dh.Tid < tidBefore; check it really satisfies xid.XTid
if !xid.XTid.TidBefore && dh.Tid != xid.XTid.Tid { if !xid.XTid.TidBefore && dh.Tid != xid.XTid.Tid {
return nil, zodb.Tid(0), &zodb.ErrXidMissing{Xid: xid} return nil, 0, &zodb.ErrXidMissing{Xid: xid}
} }
// even if we will scan back via backpointers, the tid returned should // even if we will scan back via backpointers, the tid returned should
...@@ -776,12 +776,12 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -776,12 +776,12 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// TODO data -> slab // TODO data -> slab
err = dh.LoadData(fs.file, &data) err = dh.LoadData(fs.file, &data)
if err != nil { if err != nil {
return nil, zodb.Tid(0), &ErrXidLoad{xid, err} return nil, 0, &ErrXidLoad{xid, err}
} }
if data == nil { if data == nil {
// data was deleted // data was deleted
// XXX or allow this and return via data=nil ? // XXX or allow this and return via data=nil ?
return nil, zodb.Tid(0), &zodb.ErrXidMissing{Xid: xid} return nil, 0, &zodb.ErrXidMissing{Xid: xid}
} }
return data, tid, nil return data, tid, nil
......
...@@ -143,6 +143,7 @@ type IStorage interface { ...@@ -143,6 +143,7 @@ type IStorage interface {
// LoadSerial and LoadBefore generalized into 1 Load (see Xid for details) // LoadSerial and LoadBefore generalized into 1 Load (see Xid for details)
// TODO data []byte -> something allocated from slab ? // TODO data []byte -> something allocated from slab ?
// XXX currently deleted data is returned as data=nil -- is it ok? // XXX currently deleted data is returned as data=nil -- is it ok?
// TODO specify error when data not found
Load(xid Xid) (data []byte, tid Tid, err error) // XXX -> StorageRecordInformation ? Load(xid Xid) (data []byte, tid Tid, err error) // XXX -> StorageRecordInformation ?
// -> Prefetch(xid Xid) ... // -> Prefetch(xid Xid) ...
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment