StorageServer.py 8.56 KB
Newer Older
Jim Fulton's avatar
Jim Fulton committed
1 2 3 4 5 6

import asyncore, socket, string, sys, cPickle
from smac import smac
from ZODB import POSException
from ZODB.Transaction import Transaction
import traceback
Jim Fulton's avatar
Jim Fulton committed
7
from zLOG import LOG, INFO, ERROR
Jim Fulton's avatar
Jim Fulton committed
8

Jim Fulton's avatar
Jim Fulton committed
9
class StorageServerError(POSException.StorageError): pass
Jim Fulton's avatar
Jim Fulton committed
10

Jim Fulton's avatar
Jim Fulton committed
11 12
def blather(*args):
    LOG('ZEO Server', INFO, string.join(args))
Jim Fulton's avatar
Jim Fulton committed
13

Jim Fulton's avatar
Jim Fulton committed
14
class StorageServer(asyncore.dispatcher):
Jim Fulton's avatar
Jim Fulton committed
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

    def __init__(self, connection, storages):
        
        self.host, self.port = connection
        self.__storages=storages

        self.__connections={}
        self.__get_connections=self.__connections.get


        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)

        self.bind((self.host, self.port))

        self.listen(5)

    def register_connection(self, connection, storage_id):
        storage=self.__storages.get(storage_id, None)
        if storage is None:
            connection.close()
            return None, None
        
        connections=self.__get_connections(storage_id, None)
        if connections is None:
            self.__connections[storage_id]=connections=[]
        connections.append(connection)
Jim Fulton's avatar
Jim Fulton committed
42 43 44 45 46 47 48 49 50 51 52 53
        return storage, storage_id

    def unregister_connection(self, connection, storage_id):
        
        connections=self.__get_connections(storage_id, None)
        if connections: 
            n=[]
            for c in connections:
                if c is not connection:
                    n.append(c)
        
            self.__connections[storage_id]=n
Jim Fulton's avatar
Jim Fulton committed
54 55 56 57 58 59

    def invalidate(self, connection, storage_id, invalidated,
                   dumps=cPickle.dumps):
        for c in self.__connections[storage_id]:
            if c is connection: continue
            c.message_output('I'+dumps(invalidated))
Jim Fulton's avatar
Jim Fulton committed
60

Jim Fulton's avatar
Jim Fulton committed
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
    def writable(self): return 0
    
    def handle_read(self): pass
    
    def readable(self): return 1
    
    def handle_connect (self): pass
    
    def handle_accept(self):
        try:
            sock, addr = self.accept()
        except socket.error:
            sys.stderr.write('warning: accept failed\n')

        Connection(self, sock, addr)

Jim Fulton's avatar
Jim Fulton committed
77 78 79 80 81 82 83
    def log_info(self, message, type='info'):
        if type=='error': type=ERROR
        else: type=INFO
        LOG('ZEO Server', type, message)

    log=log_info

Jim Fulton's avatar
Jim Fulton committed
84 85 86
storage_methods={}
for n in ('get_info', 'abortVersion', 'commitVersion', 'history',
          'load', 'modifiedInVersion', 'new_oid', 'pack', 'store',
Jim Fulton's avatar
Jim Fulton committed
87
          'tpc_abort', 'tpc_begin', 'tpc_begin_sync', 'tpc_finish', 'undo',
Jim Fulton's avatar
Jim Fulton committed
88 89 90
          'undoLog', 'versionEmpty',
          'zeoLoad', 'zeoVerify',
          ):
Jim Fulton's avatar
Jim Fulton committed
91 92 93
    storage_methods[n]=1
storage_method=storage_methods.has_key

Jim Fulton's avatar
Jim Fulton committed
94
_noreturn=[]
Jim Fulton's avatar
Jim Fulton committed
95 96 97 98 99 100 101 102 103
class Connection(smac):

    _transaction=None
    __storage=__storage_id=None

    def __init__(self, server, sock, addr):
        smac.__init__(self, sock, addr)
        self.__server=server
        self.__invalidated=[]
Jim Fulton's avatar
Jim Fulton committed
104
        self.__closed=None
Jim Fulton's avatar
Jim Fulton committed
105 106

    def close(self):
Jim Fulton's avatar
Jim Fulton committed
107 108 109 110 111
        t=self._transaction
        if (t is not None and self.__storage is not None and
            self.__storage._transaction is t):
            self.tpc_abort(t.id)

Jim Fulton's avatar
Jim Fulton committed
112
        self.__server.unregister_connection(self, self.__storage_id)
Jim Fulton's avatar
Jim Fulton committed
113
        self.__closed=1
Jim Fulton's avatar
Jim Fulton committed
114 115 116
        smac.close(self)

    def message_input(self, message):
Jim Fulton's avatar
Jim Fulton committed
117 118 119
        if __debug__:
            m=`message`
            if len(m) > 60: m=m[:60]+' ...'
Jim Fulton's avatar
Jim Fulton committed
120
            blather('message_input', m)
Jim Fulton's avatar
Jim Fulton committed
121

Jim Fulton's avatar
Jim Fulton committed
122 123 124 125 126 127 128 129 130
        if self.__storage is None:
            self.__storage, self.__storage_id = (
                self.__server.register_connection(self, message))
            return
            
        rt='R'
        try:
            args=cPickle.loads(message)
            name, args = args[0], args[1:]
Jim Fulton's avatar
Jim Fulton committed
131 132 133
            if __debug__:
                m=`tuple(args)`
                if len(m) > 60: m=m[:60]+' ...'
Jim Fulton's avatar
Jim Fulton committed
134
                blather('call: %s%s' % (name, m))
Jim Fulton's avatar
Jim Fulton committed
135
                
Jim Fulton's avatar
Jim Fulton committed
136 137 138 139 140 141
            if not storage_method(name):
                raise 'Invalid Method Name', name
            if hasattr(self, name):
                r=apply(getattr(self, name), args)
            else:
                r=apply(getattr(self.__storage, name), args)
Jim Fulton's avatar
Jim Fulton committed
142
            if r is _noreturn: return
Jim Fulton's avatar
Jim Fulton committed
143
        except:
Jim Fulton's avatar
Jim Fulton committed
144
            LOG('ZEO Server', ERROR, 'error', error=sys.exc_info())
Jim Fulton's avatar
Jim Fulton committed
145 146 147 148
            t, r = sys.exc_info()[:2]
            if type(r) is not type(self): r=t,r
            rt='E'

Jim Fulton's avatar
Jim Fulton committed
149 150 151
        if __debug__:
            m=`r`
            if len(m) > 60: m=m[:60]+' ...'
Jim Fulton's avatar
Jim Fulton committed
152
            blather('%s: %s' % (rt, m))
Jim Fulton's avatar
Jim Fulton committed
153
            
Jim Fulton's avatar
Jim Fulton committed
154 155 156 157 158 159 160 161 162 163 164 165 166
        r=cPickle.dumps(r,1)
        self.message_output(rt+r)

    def get_info(self):
        storage=self.__storage
        return {
            'length': len(storage),
            'size': storage.getSize(),
            'name': storage.getName(),
            'supportsUndo': storage.supportsUndo(),
            'supportsVersions': storage.supportsVersions(),
            }

Jim Fulton's avatar
Jim Fulton committed
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
    def zeoLoad(self, oid):
        storage=self.__storage
        v=storage.modifiedInVersion(oid)
        if v: pv, sv = storage.load(oid, v)
        else: pv=sv=None
        p, s = storage.load(oid,'')
        return p, s, v, pv, sv

    def zeoVerify(self, oid, s, sv,
                  dumps=cPickle.dumps):
        try: p, os, v, pv, osv = self.zeoLoad(oid)
        except: return _noreturn
        p=pv=None # free the pickles
        if os != s:
            self.message_output('I'+dumps(((oid, os, ''),)))            
        elif osv != sv:
            self.message_output('I'+dumps(((oid, osv, v),)))
            
        return _noreturn
        

Jim Fulton's avatar
Jim Fulton committed
188 189 190 191
    def store(self, oid, serial, data, version, id):
        t=self._transaction
        if t is None or id != t.id:
            raise POSException.StorageTransactionError(self, id)
Jim Fulton's avatar
Jim Fulton committed
192
        newserial=self.__storage.store(oid, serial, data, version, t)
Jim Fulton's avatar
Jim Fulton committed
193 194 195 196 197 198 199 200
        if serial != '\0\0\0\0\0\0\0\0':
            self.__invalidated.append(oid, serial, version)
        return newserial

    def tpc_abort(self, id):
        t=self._transaction
        if t is None or id != t.id: return
        r=self.__storage.tpc_abort(t)
Jim Fulton's avatar
Jim Fulton committed
201 202 203 204 205 206 207 208

        storage=self.__storage
        try: waiting=storage.__waiting
        except: waiting=storage.__waiting=[]
        while waiting:
            f, args = waiting.pop(0)
            if apply(f,args): break

Jim Fulton's avatar
Jim Fulton committed
209 210 211
        self._transaction=None
        self.__invalidated=[]
        
Jim Fulton's avatar
Jim Fulton committed
212 213 214
    def unlock(self):
        if self.__closed: return
        self.message_output('UN.')
Jim Fulton's avatar
Jim Fulton committed
215 216 217 218 219 220

    def tpc_begin(self, id, user, description, ext):
        t=self._transaction
        if t is not None and id == t.id: return
        storage=self.__storage
        if storage._transaction is not None:
Jim Fulton's avatar
Jim Fulton committed
221 222 223 224
            try: waiting=storage.__waiting
            except: waiting=storage.__waiting=[]
            waiting.append(self.unlock, ())
            return 1 # Return a flag indicating a lock condition.
Jim Fulton's avatar
Jim Fulton committed
225 226 227 228 229 230 231
            
        self._transaction=t=Transaction()
        t.id=id
        t.user=user
        t.description=description
        storage.tpc_begin(t)
        self.__invalidated=[]
Jim Fulton's avatar
Jim Fulton committed
232 233 234 235 236 237 238 239 240 241 242 243 244 245

    def tpc_begin_sync(self, id, user, description, ext):
        if self.__closed: return
        t=self._transaction
        if t is not None and id == t.id: return
        storage=self.__storage
        if storage._transaction is None:
            self.try_again_sync(id, user, description, ext)
        else:
            try: waiting=storage.__waiting
            except: waiting=storage.__waiting=[]
            waiting.append(self.try_again_sync, (id, user, description, ext))

        return _noreturn
Jim Fulton's avatar
Jim Fulton committed
246
        
Jim Fulton's avatar
Jim Fulton committed
247 248 249 250 251 252 253 254 255 256 257 258
    def try_again_sync(self, id, user, description, ext):
        storage=self.__storage
        if storage._transaction is None:
            self._transaction=t=Transaction()
            t.id=id
            t.user=user
            t.description=description
            storage.tpc_begin(t)
            self.__invalidated=[]
            self.message_output('RN.')
            
        return 1
Jim Fulton's avatar
Jim Fulton committed
259 260 261 262 263 264

    def tpc_finish(self, id, user, description, ext):
        t=self._transaction
        if id != t.id: return
        t.user=user
        t.description=description
Jim Fulton's avatar
Jim Fulton committed
265 266 267 268 269 270 271 272 273 274 275
        t.ext=ext

        storage=self.__storage
        r=storage.tpc_finish(t)
        
        try: waiting=storage.__waiting
        except: waiting=storage.__waiting=[]
        while waiting:
            f, args = waiting.pop(0)
            if apply(f,args): break

Jim Fulton's avatar
Jim Fulton committed
276 277 278 279 280 281 282 283
        self._transaction=None
        self.__server.invalidate(self, self.__storage_id, self.__invalidated)
        self.__invalidated=[]
        

if __name__=='__main__':
    import ZODB.FileStorage
    name, port = sys.argv[1:3]
Jim Fulton's avatar
Jim Fulton committed
284
    blather(name, port)
Jim Fulton's avatar
Jim Fulton committed
285 286
    try: port='',string.atoi(port)
    except: pass
Jim Fulton's avatar
Jim Fulton committed
287
    StorageServer(port, ZODB.FileStorage.FileStorage(name))
Jim Fulton's avatar
Jim Fulton committed
288
    asyncore.loop()