Commit 7f035440 authored by Tres Seaver's avatar Tres Seaver

Merge pull request #12 from NextThought/pypy-support

Add support for PyPy.
parents 96b1702a b53a1025
......@@ -8,3 +8,4 @@ develop-eggs
eggs
parts
testing.log
.dir-locals.el
language: python
sudo: false
python:
- 2.6
- 2.7
- 3.2
- 3.3
- 3.4
before_install:
# Workaround for a permissions issue with Travis virtual machine images
# that breaks Python's multiprocessing:
# https://github.com/travis-ci/travis-cookbooks/issues/155
- sudo rm -rf /dev/shm
- sudo ln -s /run/shm /dev/shm
- pypy
install:
- virtualenv env
- env/bin/pip install -U setuptools distribute
- env/bin/python bootstrap.py
- pip install -U setuptools distribute
- python bootstrap.py
- bin/buildout
script:
- bin/test -v1 -j99
......
Changelog
=========
4.2.0 (unreleased)
------------------
- Add support for PyPy.
4.1.0 (2015-01-06)
------------------
......
......@@ -6,7 +6,7 @@ parts =
versions = versions
[versions]
zdaemon = 4.0.0a1
[test]
recipe = zc.recipe.testrunner
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Setup
"""
version = '4.1.0'
version = '4.2.0.dev0'
from setuptools import setup, find_packages
import os
import sys
......@@ -34,6 +34,7 @@ Programming Language :: Python :: 3.2
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
Programming Language :: Python :: Implementation :: CPython
Programming Language :: Python :: Implementation :: PyPy
Topic :: Database
Topic :: Software Development :: Libraries :: Python Modules
Operating System :: Microsoft :: Windows
......@@ -115,10 +116,10 @@ setup(name="ZEO",
tests_require = tests_require,
extras_require = dict(test=tests_require),
install_requires = [
'ZODB >= 4.0.0b2',
'ZODB >= 4.2.0b1',
'six',
'transaction',
'persistent',
'persistent >= 4.1.0',
'zc.lockfile',
'ZConfig',
'zdaemon',
......
......@@ -19,6 +19,7 @@ ClientStorage -- the main class, implementing the Storage API
"""
import BTrees.IOBTree
import gc
import logging
import os
import re
......@@ -1543,7 +1544,21 @@ class ClientStorage(object):
self._iterator_ids.clear()
return
# Recall that self._iterators is a WeakValueDictionary. Under
# non-refcounted implementations like PyPy, this means that
# unreachable iterators (and their IDs) may still be in this
# map for some arbitrary period of time (until the next
# garbage collection occurs.) This is fine: the server
# supports being asked to GC the same iterator ID more than
# once. Iterator ids can be reused, but only after a server
# restart, after which we had already been called with
# `disconnected` True and so had cleared out our map anyway,
# plus we simply replace whatever is in the map if we get a
# duplicate id---and duplicates at that point would be dead
# objects waiting to be cleaned up. So there's never any risk
# of confusing TransactionIterator objects that are in use.
iids = self._iterator_ids - set(self._iterators)
self._iterators._last_gc = time.time() # let tests know we've been called
if iids:
try:
self._server.iterator_gc(list(iids))
......
......@@ -20,6 +20,7 @@ TODO: Need some basic access control-- a declaration of the methods
exported for invocation by the server.
"""
import asyncore
import codecs
import itertools
import logging
import os
......@@ -495,7 +496,7 @@ class ZEOStorage:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
if delay is not None:
delay.error()
delay.error(sys.exc_info())
else:
raise
else:
......@@ -687,7 +688,8 @@ class ZEOStorage:
if PY3:
pickler = Pickler(BytesIO(), 3)
else:
pickler = Pickler()
# The pure-python version requires at least one argument (PyPy)
pickler = Pickler(0)
pickler.fast = 1
try:
pickler.dump(error)
......@@ -1308,8 +1310,12 @@ class StorageServer:
status['connections'] = len(status['connections'])
status['waiting'] = len(self._waiting[storage_id])
status['timeout-thread-is-alive'] = self.timeouts[storage_id].isAlive()
status['last-transaction'] = (
self.storages[storage_id].lastTransaction().encode('hex'))
last_transaction = self.storages[storage_id].lastTransaction()
last_transaction_hex = codecs.encode(last_transaction, 'hex_codec')
if PY3:
# doctests and maybe clients expect a str, not bytes
last_transaction_hex = str(last_transaction_hex, 'ascii')
status['last-transaction'] = last_transaction_hex
return status
def ruok(self):
......@@ -1631,4 +1637,3 @@ class Serving(ServerEvent):
class Closed(ServerEvent):
pass
......@@ -14,9 +14,11 @@
"""Python versions compatiblity
"""
import sys
import platform
PY3 = sys.version_info[0] >= 3
PY32 = sys.version_info[:2] == (3, 2)
PYPY = getattr(platform, 'python_implementation', lambda: None)() == 'PyPy'
if PY3:
from pickle import Pickler, Unpickler as _Unpickler, dump, dumps, loads
......@@ -55,4 +57,3 @@ try:
from cStringIO import StringIO
except:
from io import StringIO
......@@ -71,14 +71,20 @@ class Database:
def save(self, fd=None):
filename = self.filename
needs_closed = False
if not fd:
fd = open(filename, 'w')
needs_closed = True
try:
if self.realm:
print("realm", self.realm, file=fd)
for username in sorted(self._users.keys()):
print("%s: %s" % (username, self._users[username]), file=fd)
finally:
if needs_closed:
fd.close()
def load(self):
filename = self.filename
......@@ -88,7 +94,7 @@ class Database:
if not os.path.exists(filename):
return
fd = open(filename)
with open(filename) as fd:
L = fd.readlines()
if not L:
......
......@@ -36,6 +36,7 @@ import ZODB.fsIndex
import zc.lockfile
from ZODB.utils import p64, u64, z64
import six
from ._compat import PYPY
logger = logging.getLogger("ZEO.cache")
......@@ -130,21 +131,23 @@ allocated_record_overhead = 43
# to the end of the file that the new object can't fit in one
# contiguous chunk, currentofs is reset to ZEC_HEADER_SIZE first.
class locked(object):
def __init__(self, func):
self.func = func
def __get__(self, inst, class_):
if inst is None:
return self
def call(*args, **kw):
def locked(func):
def _locked_wrapper(inst, *args, **kwargs):
inst._lock.acquire()
try:
return self.func(inst, *args, **kw)
return func(inst, *args, **kwargs)
finally:
inst._lock.release()
return call
return _locked_wrapper
# Under PyPy, the available dict specializations perform significantly
# better (faster) than the pure-Python BTree implementation. They may
# use less memory too. And we don't require any of the special BTree features...
_current_index_type = ZODB.fsIndex.fsIndex if not PYPY else dict
_noncurrent_index_type = BTrees.LOBTree.LOBTree if not PYPY else dict
# ...except at this leaf level
_noncurrent_bucket_type = BTrees.LLBTree.LLBucket
class ClientCache(object):
"""A simple in-memory cache."""
......@@ -173,13 +176,13 @@ class ClientCache(object):
self._len = 0
# {oid -> pos}
self.current = ZODB.fsIndex.fsIndex()
self.current = _current_index_type()
# {oid -> {tid->pos}}
# Note that caches in the wild seem to have very little non-current
# data, so this would seem to have little impact on memory consumption.
# I wonder if we even need to store non-current data in the cache.
self.noncurrent = BTrees.LOBTree.LOBTree()
self.noncurrent = _noncurrent_index_type()
# tid for the most recent transaction we know about. This is also
# stored near the start of the file.
......@@ -276,8 +279,8 @@ class ClientCache(object):
# Remember the location of the largest free block. That seems a
# decent place to start currentofs.
self.current = ZODB.fsIndex.fsIndex()
self.noncurrent = BTrees.LOBTree.LOBTree()
self.current = _current_index_type()
self.noncurrent = _noncurrent_index_type()
l = 0
last = ofs = ZEC_HEADER_SIZE
first_free_offset = 0
......@@ -369,7 +372,7 @@ class ClientCache(object):
def _set_noncurrent(self, oid, tid, ofs):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
noncurrent_for_oid = BTrees.LLBTree.LLBucket()
noncurrent_for_oid = _noncurrent_bucket_type()
self.noncurrent[u64(oid)] = noncurrent_for_oid
noncurrent_for_oid[u64(tid)] = ofs
......
......@@ -73,14 +73,12 @@ def check(addr, output_metrics, status, per):
s.connect(addr)
except socket.error as err:
return error("Can't connect %s" % err)
fp = s.makefile()
fp.write('\x00\x00\x00\x04ruok')
fp.flush()
proto = fp.read(struct.unpack(">I", fp.read(4))[0])
datas = fp.read(struct.unpack(">I", fp.read(4))[0])
fp.close()
s.sendall(b'\x00\x00\x00\x04ruok')
proto = s.recv(struct.unpack(">I", s.recv(4))[0])
datas = s.recv(struct.unpack(">I", s.recv(4))[0])
s.close()
data = json.loads(datas)
data = json.loads(datas.decode("ascii"))
if not data:
return warn("No storages")
......@@ -88,7 +86,7 @@ def check(addr, output_metrics, status, per):
messages = []
level = 0
if output_metrics:
for storage_id, sdata in data.items():
for storage_id, sdata in sorted(data.items()):
for name in nodiff_names:
new_metric(metrics, storage_id, name, sdata[name])
......@@ -100,7 +98,7 @@ def check(addr, output_metrics, status, per):
with open(status) as f: # Read previous
old = json.loads(f.read())
dt /= per_times[per]
for storage_id, sdata in data.items():
for storage_id, sdata in sorted(data.items()):
sdata['sameple-time'] = now
if storage_id in old:
sold = old[storage_id]
......@@ -110,7 +108,7 @@ def check(addr, output_metrics, status, per):
with open(status, 'w') as f: # save current
f.write(json.dumps(data))
for storage_id, sdata in data.items():
for storage_id, sdata in sorted(data.items()):
if sdata['last-transaction'] == NO_TRANSACTION:
messages.append("Empty storage %r" % storage_id)
level = max(level, 1)
......
......@@ -118,34 +118,34 @@ profixes metrics with a storage id.
... """)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m', '-sstatus'])
Empty storage u'second'|second:active_txns=0
Empty storage u'first'
| second:connections=0
second:waiting=0
first:active_txns=0
first:connections=0
Empty storage u'first'|first:active_txns=0
Empty storage u'second'
| first:connections=0
first:waiting=0
second:active_txns=0
second:connections=0
second:waiting=0
1
>>> nagios([saddr, '-m', '-sstatus'])
Empty storage u'second'|second:active_txns=0
Empty storage u'first'
| second:connections=0
second:waiting=0
first:active_txns=0
first:connections=0
Empty storage u'first'|first:active_txns=0
Empty storage u'second'
| first:connections=0
first:waiting=0
second:aborts=0.0
second:commits=0.0
second:conflicts=0.0
second:conflicts_resolved=0.0
second:loads=0.0
second:stores=0.0
second:active_txns=0
second:connections=0
second:waiting=0
first:aborts=0.0
first:commits=0.0
first:conflicts=0.0
first:conflicts_resolved=0.0
first:loads=0.0
first:loads=42.42
first:stores=0.0
second:aborts=0.0
second:commits=0.0
second:conflicts=0.0
second:conflicts_resolved=0.0
second:loads=42.42
second:stores=0.0
1
>>> stop()
......@@ -619,8 +619,9 @@ class InvqTests(CommonSetupTearDown):
perstorage = self.openClientStorage(cache="test")
forker.wait_until(
(lambda : perstorage.verify_result == "quick verification"),
onfail=(lambda : None))
func=(lambda : perstorage.verify_result == "quick verification"),
timeout=60,
label="perstorage.verify_result to be quick verification")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage._server._last_invals,
......
......@@ -15,10 +15,41 @@
import transaction
import six
import gc
class IterationTests:
def _assertIteratorIdsEmpty(self):
# Account for the need to run a GC collection
# under non-refcounted implementations like PyPy
# for storage._iterator_gc to fully do its job.
# First, confirm that it ran
self.assertTrue(self._storage._iterators._last_gc > 0)
gc_enabled = gc.isenabled()
gc.disable() # make sure there's no race conditions cleaning out the weak refs
try:
self.assertEquals(0, len(self._storage._iterator_ids))
except AssertionError:
# Ok, we have ids. That should also mean that the
# weak dictionary has the same length.
self.assertEqual(len(self._storage._iterators), len(self._storage._iterator_ids))
# Now if we do a collection and re-ask for iterator_gc
# everything goes away as expected.
gc.enable()
gc.collect()
gc.collect() # sometimes PyPy needs it twice to clear weak refs
self._storage._iterator_gc()
self.assertEqual(len(self._storage._iterators), len(self._storage._iterator_ids))
self.assertEquals(0, len(self._storage._iterator_ids))
finally:
if gc_enabled:
gc.enable()
else:
gc.disable()
def checkIteratorGCProtocol(self):
# Test garbage collection on protocol level.
server = self._storage._server
......@@ -78,8 +109,9 @@ class IterationTests:
# GC happens at the transaction boundary. After that, both the storage
# and the server have forgotten the iterator.
self._storage._iterators._last_gc = -1
self._dostore()
self.assertEquals(0, len(self._storage._iterator_ids))
self._assertIteratorIdsEmpty()
self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
def checkIteratorGCStorageTPCAborting(self):
......@@ -93,9 +125,10 @@ class IterationTests:
iid = list(self._storage._iterator_ids)[0]
t = transaction.Transaction()
self._storage._iterators._last_gc = -1
self._storage.tpc_begin(t)
self._storage.tpc_abort(t)
self.assertEquals(0, len(self._storage._iterator_ids))
self._assertIteratorIdsEmpty()
self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
def checkIteratorGCStorageDisconnect(self):
......
......@@ -82,7 +82,8 @@ dynamic port using ZConfig, you'd use a hostname by itself. In this
case, ZConfig passes None as the port.
>>> import ZEO.runzeo
>>> r = open('conf', 'w').write("""
>>> with open('conf', 'w') as f:
... _ = f.write("""
... <zeo>
... address 127.0.0.1
... </zeo>
......@@ -103,4 +104,3 @@ case, ZConfig passes None as the port.
.. cleanup
>>> ZODB.event.notify = old_notify
......@@ -185,7 +185,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
s.close()
else:
logging.debug('boo hoo')
raise
raise RuntimeError("Failed to start server")
return addr, adminaddr, pid, tmpfile
......
......@@ -34,7 +34,8 @@ A current client should be able to connect to a old server:
2
>>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> r = conn.root()['blob1'].open('w').write(b'blob data 1')
>>> with conn.root()['blob1'].open('w') as f:
... r = f.write(b'blob data 1')
>>> transaction.commit()
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
......@@ -44,7 +45,8 @@ A current client should be able to connect to a old server:
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> r = conn2.root()['blob2'].open('w').write(b'blob data 2')
>>> with conn2.root()['blob2'].open('w') as f:
... r = f.write(b'blob data 2')
>>> transaction.commit()
>>> @wait_until("Get the new data")
......@@ -76,9 +78,11 @@ A current client should be able to connect to a old server:
>>> conn.root().x
17
>>> conn.root()['blob1'].open().read()
>>> with conn.root()['blob1'].open() as f:
... f.read()
b'blob data 1'
>>> conn.root()['blob2'].open().read()
>>> with conn.root()['blob2'].open() as f:
... f.read()
b'blob data 2'
Note that when taking to a 3.8 server, iteration won't work:
......@@ -118,7 +122,8 @@ Note that we'll have to pull some hijinks:
2
>>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> r = conn.root()['blob1'].open('w').write(b'blob data 1')
>>> with conn.root()['blob1'].open('w') as f:
... r = f.write(b'blob data 1')
>>> transaction.commit()
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
......@@ -128,7 +133,8 @@ Note that we'll have to pull some hijinks:
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> r = conn2.root()['blob2'].open('w').write(b'blob data 2')
>>> with conn2.root()['blob2'].open('w') as f:
... r = f.write(b'blob data 2')
>>> transaction.commit()
......@@ -161,9 +167,11 @@ Note that we'll have to pull some hijinks:
>>> conn.root().x
17
>>> conn.root()['blob1'].open().read()
>>> with conn.root()['blob1'].open() as f:
... f.read()
b'blob data 1'
>>> conn.root()['blob2'].open().read()
>>> with conn.root()['blob2'].open() as f:
... f.read()
b'blob data 2'
Make some old protocol calls:
......
......@@ -556,17 +556,23 @@ class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
'history() raised exception: history() takes at most '
'3 arguments (5 given)'
)
py32_msg = (
'history() raised exception: history() takes at most '
'3 positional arguments (5 given)'
)
py3_msg = (
'history() raised exception: history() takes '
'from 2 to 3 positional arguments but 5 were given'
)
for level, message, kw in log:
if message.endswith(py2_msg) or message.endswith(py3_msg):
if (message.endswith(py2_msg) or
message.endswith(py32_msg) or
message.endswith(py3_msg)):
self.assertEqual(level,logging.ERROR)
self.assertEqual(kw,{'exc_info':True})
break
else:
self.fail("error not in log")
self.fail("error not in log %s" % log)
# cleanup
del conn.logger.log
......@@ -1328,10 +1334,13 @@ def test_ruok():
>>> import json, socket, struct
>>> s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> s.connect(addr)
>>> _ = s.send(struct.pack(">I", 4)+"ruok")
>>> writer = s.makefile(mode='wb')
>>> _ = writer.write(struct.pack(">I", 4)+b"ruok")
>>> writer.close()
>>> proto = s.recv(struct.unpack(">I", s.recv(4))[0])
>>> pprint.pprint(json.loads(s.recv(struct.unpack(">I", s.recv(4))[0])))
{u'1': {u'aborts': 0,
>>> data = json.loads(s.recv(struct.unpack(">I", s.recv(4))[0]).decode("ascii"))
>>> pprint.pprint(data['1'])
{u'aborts': 0,
u'active_txns': 0,
u'commits': 1,
u'conflicts': 0,
......@@ -1344,7 +1353,7 @@ def test_ruok():
u'stores': 1,
u'timeout-thread-is-alive': True,
u'verifying_clients': 0,
u'waiting': 0}}
u'waiting': 0}
>>> db.close(); s.close()
"""
......@@ -1792,6 +1801,9 @@ def test_suite():
(re.compile("ZEO.Exceptions.ClientStorageError"), "ClientStorageError"),
(re.compile(r"\[Errno \d+\]"), '[Errno N]'),
(re.compile(r"loads=\d+\.\d+"), 'loads=42.42'),
# Python 3 drops the u prefix
(re.compile("u('.*?')"), r"\1"),
(re.compile('u(".*?")'), r"\1")
]
if not PY3:
patterns.append((re.compile("^'(blob[^']*)'"), r"b'\1'"))
......
......@@ -16,7 +16,8 @@ It is an error not to specify any storages:
... from io import StringIO
>>> stderr = sys.stderr
>>> r = open('config', 'w').write("""
>>> with open('config', 'w') as f:
... _ = f.write("""
... <zeo>
... address 8100
... </zeo>
......@@ -37,7 +38,8 @@ It is an error not to specify any storages:
But we can specify a storage without a name:
>>> r = open('config', 'w').write("""
>>> with open('config', 'w') as f:
... _ = f.write("""
... <zeo>
... address 8100
... </zeo>
......@@ -52,7 +54,8 @@ But we can specify a storage without a name:
We can't have multiple unnamed storages:
>>> sys.stderr = StringIO()
>>> r = open('config', 'w').write("""
>>> with open('config', 'w') as f:
... _ = f.write("""
... <zeo>
... address 8100
... </zeo>
......@@ -74,7 +77,8 @@ We can't have multiple unnamed storages:
Or an unnamed storage and one named '1':
>>> sys.stderr = StringIO()
>>> r = open('config', 'w').write("""
>>> with open('config', 'w') as f:
... _ = f.write("""
... <zeo>
... address 8100
... </zeo>
......@@ -95,7 +99,8 @@ Or an unnamed storage and one named '1':
But we can have multiple storages:
>>> r = open('config', 'w').write("""
>>> with open('config', 'w') as f:
... _ = f.write("""
... <zeo>
... address 8100
... </zeo>
......@@ -112,7 +117,8 @@ But we can have multiple storages:
As long as the names are unique:
>>> sys.stderr = StringIO()
>>> r = open('config', 'w').write("""
>>> with open('config', 'w') as f:
... _ = f.write("""
... <zeo>
... address 8100
... </zeo>
......
......@@ -52,7 +52,8 @@ Now, let's write some data:
>>> conn = db.open()
>>> for i in range(1, 101):
... conn.root()[i] = ZODB.blob.Blob()
... w = conn.root()[i].open('w').write((chr(i)*100).encode('ascii'))
... with conn.root()[i].open('w') as f:
... w = f.write((chr(i)*100).encode('ascii'))
>>> transaction.commit()
We've committed 10000 bytes of data, but our target size is 3000. We
......@@ -85,19 +86,22 @@ necessary, but the cache size will remain not much bigger than the
target:
>>> for i in range(1, 101):
... data = conn.root()[i].open().read()
... with conn.root()[i].open() as f:
... data = f.read()
... if data != (chr(i)*100).encode('ascii'):
... print('bad data', repr(chr(i)), repr(data))
>>> wait_until("size is reduced", check, 99, onfail)
>>> for i in range(1, 101):
... data = conn.root()[i].open().read()
... with conn.root()[i].open() as f:
... data = f.read()
... if data != (chr(i)*100).encode('ascii'):
... print('bad data', repr(chr(i)), repr(data))
>>> for i in range(1, 101):
... data = conn.root()[i].open('c').read()
... with conn.root()[i].open('c') as f:
... data = f.read()
... if data != (chr(i)*100).encode('ascii'):
... print('bad data', repr(chr(i)), repr(data))
......@@ -114,11 +118,13 @@ provoke problems:
... for i in range(300):
... time.sleep(0)
... i = random.randint(1, 100)
... data = conn.root()[i].open().read()
... with conn.root()[i].open() as f:
... data = f.read()
... if data != (chr(i)*100).encode('ascii'):
... print('bad data', repr(chr(i)), repr(data))
... i = random.randint(1, 100)
... data = conn.root()[i].open('c').read()
... with conn.root()[i].open('c') as f:
... data = f.read()
... if data != (chr(i)*100).encode('ascii'):
... print('bad data', repr(chr(i)), repr(data))
... db.close()
......@@ -143,4 +149,3 @@ provoke problems:
>>> db.close()
>>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size
......@@ -52,8 +52,11 @@ def client_loop(map):
try:
r, w, e = select.select(r, w, e, client_timeout())
except select.error as err:
if err[0] != errno.EINTR:
if err[0] == errno.EBADF:
# Python >= 3.3 makes select.error an alias of OSError,
# which is not subscriptable but does have the 'errno' attribute
err_errno = getattr(err, 'errno', None) or err[0]
if err_errno != errno.EINTR:
if err_errno == errno.EBADF:
# If a connection is closed while we are
# calling select on it, we can get a bad
......
......@@ -632,8 +632,8 @@ class ManagedServerConnection(Connection):
self.message_output(self.current_protocol)
def recv_handshake(self, proto):
if proto == 'ruok':
self.message_output(json.dumps(self.mgr.ruok()))
if proto == b'ruok':
self.message_output(json.dumps(self.mgr.ruok()).encode("ascii"))
self.poll()
Connection.close(self)
else:
......
......@@ -13,7 +13,7 @@
##############################################################################
import logging
from ZEO._compat import Unpickler, Pickler, BytesIO, PY3
from ZEO._compat import Unpickler, Pickler, BytesIO, PY3, PYPY
from ZEO.zrpc.error import ZRPCError
from ZEO.zrpc.log import log, short_repr
......@@ -41,12 +41,23 @@ def encode(*args): # args: (msgid, flags, name, args)
else:
pickler = Pickler(1)
pickler.fast = 1
return pickler.dump(args, 1)
# Only CPython's cPickle supports dumping
# and returning in one operation:
# return pickler.dump(args, 1)
# For PyPy we must return the value; fortunately this
# works the same on CPython and is no more expensive
pickler.dump(args)
return pickler.getvalue()
if PY3:
# XXX: Py3: Needs optimization.
fast_encode = encode
elif PYPY:
# can't use the python-2 branch, need a new pickler
# every time, getvalue() only works once
fast_encode = encode
else:
def fast_encode():
# Only use in cases where you *know* the data contains only basic
......@@ -63,7 +74,10 @@ def decode(msg):
"""Decodes msg and returns its parts"""
unpickler = Unpickler(BytesIO(msg))
unpickler.find_global = find_global
try:
unpickler.find_class = find_global # PyPy, zodbpickle, the non-c-accelerated version
except AttributeError:
pass
try:
return unpickler.load() # msgid, flags, name, args
except:
......@@ -75,6 +89,10 @@ def server_decode(msg):
"""Decodes msg and returns its parts"""
unpickler = Unpickler(BytesIO(msg))
unpickler.find_global = server_find_global
try:
unpickler.find_class = server_find_global # PyPy, zodbpickle, the non-c-accelerated version
except AttributeError:
pass
try:
return unpickler.load() # msgid, flags, name, args
......
......@@ -167,7 +167,10 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
try:
d = self.recv(8192)
except socket.error as err:
if err[0] in expected_socket_read_errors:
# Python >= 3.3 makes select.error an alias of OSError,
# which is not subscriptable but does have the 'errno' attribute
err_errno = getattr(err, 'errno', None) or err[0]
if err_errno in expected_socket_read_errors:
return
raise
if not d:
......@@ -268,6 +271,9 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
if isinstance(message, six.binary_type):
size += self.__message_output(messages.pop(0), output)
elif isinstance(message, six.text_type):
# XXX This can silently lead to data loss and client hangs
# if asserts aren't enabled. Encountered this under Python3
# and 'ruok' protocol
assert False, "Got a unicode message: %s" % repr(message)
elif message is _close_marker:
del messages[:]
......@@ -291,7 +297,10 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
# Fix for https://bugs.launchpad.net/zodb/+bug/182833
# ensure the above mentioned "output" invariant
output.insert(0, v)
if err[0] in expected_socket_write_errors:
# Python >= 3.3 makes select.error an alias of OSError,
# which is not subscriptable but does have the 'errno' attribute
err_errno = getattr(err, 'errno', None) or err[0]
if err_errno in expected_socket_write_errors:
break # we couldn't write anything
raise
......
[tox]
envlist =
py26,py27,py32,py33,py34,simple
py26,py27,py32,py33,py34,pypy,simple
[testenv]
commands =
......@@ -9,11 +9,11 @@ commands =
# Only run functional tests if unit tests pass.
zope-testrunner -f --test-path=src --auto-color --auto-progress
deps =
ZODB >= 4.0.0b2
ZODB >= 4.2.0b1
random2
ZConfig
manuel
persistent
persistent >= 4.1.0
transaction
zc.lockfile
zdaemon
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment