Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZEO
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
ZEO
Commits
8d9b2578
Commit
8d9b2578
authored
Mar 23, 2010
by
Jim Fulton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fixed a threading bug in the StorageServerDB implementation that cause
the fan-out to fail.
parent
7d721c31
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
65 additions
and
27 deletions
+65
-27
src/CHANGES.txt
src/CHANGES.txt
+6
-0
src/ZEO/StorageServer.py
src/ZEO/StorageServer.py
+0
-7
src/ZEO/tests/zeo-fan-out.test
src/ZEO/tests/zeo-fan-out.test
+59
-20
No files found.
src/CHANGES.txt
View file @
8d9b2578
...
@@ -5,6 +5,12 @@
...
@@ -5,6 +5,12 @@
3.10.0a2 (2010-??-??)
3.10.0a2 (2010-??-??)
=====================
=====================
Bugs Fixed
----------
- When using using a ClientStorage in a Storage server, there was a
threading bug that caused clients to get disconnected.
New Features
New Features
------------
------------
...
...
src/ZEO/StorageServer.py
View file @
8d9b2578
...
@@ -786,13 +786,6 @@ class StorageServerDB:
...
@@ -786,13 +786,6 @@ class StorageServerDB:
raise
StorageServerError
(
"Versions aren't supported."
)
raise
StorageServerError
(
"Versions aren't supported."
)
storage_id
=
self
.
storage_id
storage_id
=
self
.
storage_id
self
.
server
.
invalidate
(
None
,
storage_id
,
tid
,
oids
)
self
.
server
.
invalidate
(
None
,
storage_id
,
tid
,
oids
)
for
zeo_server
in
self
.
server
.
connections
.
get
(
storage_id
,
())[:]:
try
:
zeo_server
.
connection
.
poll
()
except
ZEO
.
zrpc
.
error
.
DisconnectedError
:
pass
else
:
break
# We only need to pull one :)
def
invalidateCache
(
self
):
def
invalidateCache
(
self
):
self
.
server
.
_invalidateCache
(
self
.
storage_id
)
self
.
server
.
_invalidateCache
(
self
.
storage_id
)
...
...
src/ZEO/tests/zeo-fan-out.test
View file @
8d9b2578
...
@@ -10,25 +10,29 @@ ZEO servers for us and another one that picks ports.
...
@@ -10,25 +10,29 @@ ZEO servers for us and another one that picks ports.
We'll start the first server:
We'll start the first server:
>>> (_, port0), adminaddr0 = start_server(
>>> (_, port0), adminaddr0 = start_server(
... '<filestorage>\npath fs\n</filestorage>', keep=1)
... '<filestorage>\npath fs\n
blob-dir blobs\n
</filestorage>', keep=1)
Then we'll start 2 others that use this one:
Then we'll start 2 others that use this one:
>>> addr1, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
>>> addr1, _ = start_server(
>>> addr2, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
... '<zeoclient>\nserver %s\nblob-dir b1\n</zeoclient>' % port0)
>>> addr2, _ = start_server(
... '<zeoclient>\nserver %s\nblob-dir b2\n</zeoclient>' % port0)
Now, let's create some client storages that connect to these:
Now, let's create some client storages that connect to these:
>>> import
ZEO
, transaction
>>> import
os, ZEO, ZODB.blob, ZODB.POSException
, transaction
>>> db1 = ZEO.DB(addr1)
>>> db0 = ZEO.DB(port0, blob_dir='cb0')
>>> db1 = ZEO.DB(addr1, blob_dir='cb1')
>>> tm1 = transaction.TransactionManager()
>>> tm1 = transaction.TransactionManager()
>>> c1 = db1.open(transaction_manager=tm1)
>>> c1 = db1.open(transaction_manager=tm1)
>>> r1 = c1.root()
>>> r1 = c1.root()
>>> r1
>>> r1
{}
{}
>>> db2 = ZEO.DB(addr2)
>>> db2 = ZEO.DB(addr2
, blob_dir='cb2'
)
>>> tm2 = transaction.TransactionManager()
>>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2)
>>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root()
>>> r2 = c2.root()
...
@@ -43,7 +47,12 @@ If we update c1, we'll eventually see the change in c2:
...
@@ -43,7 +47,12 @@ If we update c1, we'll eventually see the change in c2:
>>> r1[1].v = 1000
>>> r1[1].v = 1000
>>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2].v = -1000
>>> r1[2].v = -1000
>>> r1[3] = ZODB.blob.Blob('x'*4111222)
>>> for i in range(1000, 2000):
... r1[i] = persistent.mapping.PersistentMapping()
... r1[i].v = 0
>>> tm1.commit()
>>> tm1.commit()
>>> blob_id = r1[3]._p_oid, r1[1]._p_serial
>>> import time
>>> import time
>>> for i in range(100):
>>> for i in range(100):
...
@@ -51,7 +60,9 @@ If we update c1, we'll eventually see the change in c2:
...
@@ -51,7 +60,9 @@ If we update c1, we'll eventually see the change in c2:
... if 1 in r2:
... if 1 in r2:
... break
... break
... time.sleep(0.01)
... time.sleep(0.01)
>>> tm2.abort()
>>> r2[1].v
>>> r2[1].v
1000
1000
...
@@ -61,33 +72,60 @@ If we update c1, we'll eventually see the change in c2:
...
@@ -61,33 +72,60 @@ If we update c1, we'll eventually see the change in c2:
Now, let's see if we can break it. :)
Now, let's see if we can break it. :)
>>> def f():
>>> def f():
... c = db1.open(transaction.TransactionManager())
... r = c.root()
... i = 0
... while i < 100:
... r[1].v -= 1
... r[2].v += 1
... try:
... c.transaction_manager.commit()
... i += 1
... except ZODB.POSException.ConflictError:
... c.transaction_manager.abort()
... c.close()
>>> def g():
... c = db0.open(transaction.TransactionManager())
... r = c.root()
... for i in range(100):
... for i in range(100):
... r1[1].v -= 1
... for j in range(1000, 2000):
... r1[2].v += 1
... r[j].v += 1
... tm1.commit()
... c.transaction_manager.commit()
... time.sleep(0.01)
... c.close()
>>> import threading
>>> import threading
>>> thread = threading.Thread(target=f)
>>> threadf = threading.Thread(target=f)
>>> thread.start()
>>> threadg = threading.Thread(target=f)
>>> threadf.start()
>>> for i in range(1000):
>>> threadg.start()
>>> s2 = db2.storage
>>> start_time = time.time()
>>> while time.time() - start_time < 999:
... t = tm2.begin()
... t = tm2.begin()
... if r2[1].v + r2[2].v:
... if r2[1].v + r2[2].v:
... print 'oops', r2[1], r2[2]
... print 'oops', r2[1], r2[2]
... if r
1
[1].v == 900:
... if r
2
[1].v == 900:
... break # we caught up
... break # we caught up
... time.sleep(0.01)
... path = s2.fshelper.getBlobFilename(*blob_id)
... if os.path.exists(path):
... ZODB.blob.remove_committed(path)
... s2._server.sendBlob(*blob_id)
... else: print 'Dang'
>>> threadf.join()
>>> threadg.join()
>>> thread.join()
If we shutdown and restart the source server, the variables will be
If we shutdown and restart the source server, the variables will be
invalidated:
invalidated:
>>> stop_server(adminaddr0)
>>> stop_server(adminaddr0)
>>> _ = start_server('<filestorage 1>\npath fs\n</filestorage>\n',
>>> _ = start_server('<filestorage 1>\npath fs\n</filestorage>\n',
... port=port0)
... port=port0)
>>> for i in range(1000):
>>> for i in range(1000):
... c1.sync()
... c1.sync()
... c2.sync()
... c2.sync()
...
@@ -109,5 +147,6 @@ invalidated:
...
@@ -109,5 +147,6 @@ invalidated:
Cleanup:
Cleanup:
>>> db0.close()
>>> db1.close()
>>> db1.close()
>>> db2.close()
>>> db2.close()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment