testClient.py 11.6 KB
Newer Older
1
#
2
# Copyright (C) 2009-2014  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
import os
18 19 20
import unittest
import transaction
import ZODB
Olivier Cros's avatar
Olivier Cros committed
21 22
import socket

23
from struct import pack
Olivier Cros's avatar
Olivier Cros committed
24
from neo.neoctl.neoctl import NeoCTL
25
from ZODB.FileStorage import FileStorage
26
from ZODB.POSException import ConflictError
27
from ZODB.tests.StorageTestBase import zodb_pickle
28
from persistent import Persistent
29
from .. import expectedFailure
30
from . import NEOCluster, NEOFunctionalTest
31

32 33 34 35 36 37 38 39 40 41 42 43 44
TREE_SIZE = 6

class Tree(Persistent):
    """ A simple binary tree """

    def __init__(self, depth):
        self.depth = depth
        if depth <= 0:
            return
        depth -= 1
        self.right = Tree(depth)
        self.left = Tree(depth)

45 46 47 48 49 50 51 52 53 54 55 56

# simple persitent object with conflict resolution
class PCounter(Persistent):

    _value = 0

    def value(self):
        return self._value

    def inc(self):
        self._value += 1

57 58 59

class PCounterWithResolution(PCounter):

60 61 62 63
    def _p_resolveConflict(self, old, saved, new):
        new['_value'] = saved['_value'] + new['_value']
        return new

64 65
class PObject(Persistent):
    pass
66 67 68 69 70 71

class ClientTests(NEOFunctionalTest):

    def setUp(self):
        NEOFunctionalTest.setUp(self)
        self.neo = NEOCluster(
72
            ['test_neo1', 'test_neo2', 'test_neo3', 'test_neo4'],
73
            partitions=3,
74
            replicas=2,
75
            master_count=1,
76 77 78
            temp_dir=self.getTempDirectory()
        )

79
    def _tearDown(self, success):
80 81
        if self.neo is not None:
            self.neo.stop()
82
        NEOFunctionalTest._tearDown(self, success)
83

84 85 86 87 88 89 90 91 92 93
    def __setup(self):
        # start cluster
        self.neo.setupDB()
        self.neo.start()
        self.neo.expectClusterRunning()
        self.db = ZODB.DB(self.neo.getZODBStorage())

    def makeTransaction(self):
        # create a transaction a get the root object
        txn = transaction.TransactionManager()
94 95
        conn = self.db.open(transaction_manager=txn)
        return (txn, conn)
96

97
    def testConflictResolutionTriggered1(self):
98 99
        """ Check that ConflictError is raised on write conflict """
        # create the initial objects
100
        self.__setup()
101 102
        t, c = self.makeTransaction()
        c.root()['without_resolution'] = PCounter()
103 104
        t.commit()

105
        # first with no conflict resolution
106 107 108 109
        t1, c1 = self.makeTransaction()
        t2, c2 = self.makeTransaction()
        o1 = c1.root()['without_resolution']
        o2 = c2.root()['without_resolution']
110 111
        self.assertEqual(o1.value(), 0)
        self.assertEqual(o2.value(), 0)
112 113 114
        o1.inc()
        o2.inc()
        o2.inc()
115 116 117 118 119
        t1.commit()
        self.assertEqual(o1.value(), 1)
        self.assertEqual(o2.value(), 2)
        self.assertRaises(ConflictError, t2.commit)

120 121 122 123
    def testIsolationAtZopeLevel(self):
        """ Check transaction isolation within zope connection """
        self.__setup()
        t, c = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
124 125 126
        root = c.root()
        root['item'] = 0
        root['other'] = 'bla'
127 128 129
        t.commit()
        t1, c1 = self.makeTransaction()
        t2, c2 = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
130 131
        # Makes c2 take a snapshot of database state
        c2.root()['other']
132 133 134 135 136 137 138 139 140 141
        c1.root()['item'] = 1
        t1.commit()
        # load objet from zope cache
        self.assertEqual(c1.root()['item'], 1)
        self.assertEqual(c2.root()['item'], 0)

    def testIsolationWithoutZopeCache(self):
        """ Check isolation with zope cache cleared """
        self.__setup()
        t, c = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
142 143 144
        root = c.root()
        root['item'] = 0
        root['other'] = 'bla'
145 146 147
        t.commit()
        t1, c1 = self.makeTransaction()
        t2, c2 = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
148 149
        # Makes c2 take a snapshot of database state
        c2.root()['other']
150 151 152 153 154 155 156 157
        c1.root()['item'] = 1
        t1.commit()
        # clear zope cache to force re-ask NEO
        c1.cacheMinimize()
        c2.cacheMinimize()
        self.assertEqual(c1.root()['item'], 1)
        self.assertEqual(c2.root()['item'], 0)

158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
    def __checkTree(self, tree, depth=TREE_SIZE):
        self.assertTrue(isinstance(tree, Tree))
        self.assertEqual(depth, tree.depth)
        depth -= 1
        if depth <= 0:
            return
        self.__checkTree(tree.right, depth)
        self.__checkTree(tree.left, depth)

    def __getDataFS(self, reset=False):
        name = os.path.join(self.getTempDirectory(), 'data.fs')
        if reset and os.path.exists(name):
            os.remove(name)
        storage = FileStorage(file_name=name)
        db = ZODB.DB(storage=storage)
        return (db, storage)

175 176 177 178 179 180 181 182
    def __populate(self, db, tree_size=TREE_SIZE):
        if isinstance(db.storage, FileStorage):
            from base64 import b64encode as undo_tid
        else:
            undo_tid = lambda x: x
        def undo(tid=None):
            db.undo(undo_tid(tid or db.lastTransaction()))
            transaction.commit()
183 184 185
        conn = db.open()
        root = conn.root()
        root['trees'] = Tree(tree_size)
186 187 188
        ob = root['trees'].right
        left = ob.left
        del ob.left
189
        transaction.commit()
190 191 192 193 194 195 196 197 198 199 200 201
        ob._p_changed = 1
        transaction.commit()
        t2 = db.lastTransaction()
        ob.left = left
        transaction.commit()
        undo()
        t4 = db.lastTransaction()
        undo(t2)
        undo()
        undo(t4)
        undo()
        undo()
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
        conn.close()

    def testImport(self):

        # source database
        dfs_db, dfs_storage  = self.__getDataFS()
        self.__populate(dfs_db)

        # create a neo storage
        self.neo.start()
        neo_storage = self.neo.getZODBStorage()

        # copy data fs to neo
        neo_storage.copyTransactionsFrom(dfs_storage, verbose=0)

        # check neo content
        (neo_db, neo_conn) = self.neo.getZODBConnection()
        self.__checkTree(neo_conn.root()['trees'])

221
    def testExport(self):
222 223 224 225

        # create a neo storage
        self.neo.start()
        (neo_db, neo_conn) = self.neo.getZODBConnection()
226
        self.__populate(neo_db)
227 228 229 230

        # copy neo to data fs
        dfs_db, dfs_storage  = self.__getDataFS(reset=True)
        neo_storage = self.neo.getZODBStorage()
231
        dfs_storage.copyTransactionsFrom(neo_storage)
232 233 234 235 236 237 238

        # check data fs content
        conn = dfs_db.open()
        root = conn.root()

        self.__checkTree(root['trees'])

239 240 241 242 243 244 245
    def testLockTimeout(self):
        """ Hold a lock on an object to block a second transaction """
        def test():
            self.neo = NEOCluster(['test_neo1'], replicas=0,
                temp_dir=self.getTempDirectory())
            neoctl = self.neo.getNEOCTL()
            self.neo.start()
246 247 248
            # BUG: The following 2 lines creates 2 app, i.e. 2 TCP connections
            #      to the storage, so there may be a race condition at network
            #      level and 'st2.store' may be effective before 'st1.store'.
249 250 251 252 253 254 255 256 257 258
            db1, conn1 = self.neo.getZODBConnection()
            db2, conn2 = self.neo.getZODBConnection()
            st1, st2 = conn1._storage, conn2._storage
            t1, t2 = transaction.Transaction(), transaction.Transaction()
            t1.user = t2.user = 'user'
            t1.description = t2.description = 'desc'
            oid = st1.new_oid()
            rev = '\0' * 8
            data = zodb_pickle(PObject())
            st2.tpc_begin(t2)
259
            st1.tpc_begin(t1)
260 261 262 263 264
            st1.store(oid, rev, data, '', t1)
            # this store will be delayed
            st2.store(oid, rev, data, '', t2)
            # the vote will timeout as t1 never release the lock
            self.assertRaises(ConflictError, st2.tpc_vote, t2)
265
        self.runWithTimeout(40, test)
266

Olivier Cros's avatar
Olivier Cros committed
267 268
    def testIPv6Client(self):
        """ Test the connectivity of an IPv6 connection for neo client """
269

Olivier Cros's avatar
Olivier Cros committed
270
        def test():
271
            """
Olivier Cros's avatar
Olivier Cros committed
272 273 274
            Implement the IPv6Client test
            """
            self.neo = NEOCluster(['test_neo1'], replicas=0,
275
                temp_dir = self.getTempDirectory(),
Olivier Cros's avatar
Olivier Cros committed
276 277 278 279 280 281 282
                address_type = socket.AF_INET6
                )
            neoctl = NeoCTL(('::1', 0))
            self.neo.start()
            db1, conn1 = self.neo.getZODBConnection()
            db2, conn2 = self.neo.getZODBConnection()
        self.runWithTimeout(40, test)
283

284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    def testDelayedLocksCancelled(self):
        """
            Hold a lock on an object, try to get another lock on the same
            object to delay it. Then cancel the second transaction and check
            that the lock is not hold when the first transaction ends
        """
        def test():
            self.neo = NEOCluster(['test_neo1'], replicas=0,
                temp_dir=self.getTempDirectory())
            neoctl = self.neo.getNEOCTL()
            self.neo.start()
            db1, conn1 = self.neo.getZODBConnection()
            db2, conn2 = self.neo.getZODBConnection()
            st1, st2 = conn1._storage, conn2._storage
            t1, t2 = transaction.Transaction(), transaction.Transaction()
            t1.user = t2.user = 'user'
            t1.description = t2.description = 'desc'
            oid = st1.new_oid()
            rev = '\0' * 8
            data = zodb_pickle(PObject())
            st1.tpc_begin(t1)
            st2.tpc_begin(t2)
            # t1 own the lock
            st1.store(oid, rev, data, '', t1)
            # t2 store is delayed
            st2.store(oid, rev, data, '', t2)
            # cancel t2, should cancel the store too
            st2.tpc_abort(t2)
            # finish t1, should release the lock
            st1.tpc_vote(t1)
            st1.tpc_finish(t1)
            db3, conn3 = self.neo.getZODBConnection()
            st3 = conn3._storage
            t3 = transaction.Transaction()
            t3.user = 'user'
            t3.description = 'desc'
            st3.tpc_begin(t3)
            # retreive the last revision
Vincent Pelletier's avatar
Vincent Pelletier committed
322
            data, serial = st3.load(oid, '')
323 324 325 326 327
            # try to store again, should not be delayed
            st3.store(oid, serial, data, '', t3)
            # the vote should not timeout
            st3.tpc_vote(t3)
            st3.tpc_finish(t3)
328
        self.runWithTimeout(10, test)
329

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
    def testGreaterOIDSaved(self):
        """
            Store an object with an OID greater than the last generated by the
            master. This OID must be intercepted at commit, used for next OID
            generations and persistently saved on storage nodes.
        """
        self.neo = NEOCluster(['test_neo1'], replicas=0,
            temp_dir=self.getTempDirectory())
        neoctl = self.neo.getNEOCTL()
        self.neo.start()
        db1, conn1 = self.neo.getZODBConnection()
        st1 = conn1._storage
        t1 = transaction.Transaction()
        rev = '\0' * 8
        data = zodb_pickle(PObject())
        my_oid = pack('!Q', 100000)
        # store an object with this OID
        st1.tpc_begin(t1)
        st1.store(my_oid, rev, data, '', t1)
        st1.tpc_vote(t1)
        st1.tpc_finish(t1)
        # request an oid, should be greater than mine
        oid = st1.new_oid()
        self.assertTrue(oid > my_oid)

355 356 357 358 359 360
def test_suite():
    return unittest.makeSuite(ClientTests)

if __name__ == "__main__":
    unittest.main(defaultTest="test_suite")