Commit ae387e60 authored by Albertas Agejevas's avatar Albertas Agejevas

Porting to Python 3: version strings are binary, other compat things.

parent 2d9b3e6c
...@@ -29,6 +29,8 @@ import tempfile ...@@ -29,6 +29,8 @@ import tempfile
import threading import threading
import time import time
import weakref import weakref
from binascii import hexlify
import zc.lockfile import zc.lockfile
import ZEO.interfaces import ZEO.interfaces
import ZODB import ZODB
...@@ -634,10 +636,10 @@ class ClientStorage(object): ...@@ -634,10 +636,10 @@ class ClientStorage(object):
stub = self.StorageServerStubClass(conn) stub = self.StorageServerStubClass(conn)
if self._client_label and conn.peer_protocol_version >= "Z310": if self._client_label and conn.peer_protocol_version >= b"Z310":
stub.set_client_label(self._client_label) stub.set_client_label(self._client_label)
if conn.peer_protocol_version < "Z3101": if conn.peer_protocol_version < b"Z3101":
logger.warning("Old server doesn't suppport " logger.warning("Old server doesn't suppport "
"checkCurrentSerialInTransaction") "checkCurrentSerialInTransaction")
self.checkCurrentSerialInTransaction = lambda *args: None self.checkCurrentSerialInTransaction = lambda *args: None
...@@ -1332,7 +1334,7 @@ class ClientStorage(object): ...@@ -1332,7 +1334,7 @@ class ClientStorage(object):
self._pickler = Pickler(self._tfile, 1) self._pickler = Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo self._pickler.fast = 1 # Don't use the memo
if self._connection.peer_protocol_version < 'Z309': if self._connection.peer_protocol_version < b'Z309':
client = ClientStorage308Adapter(self) client = ClientStorage308Adapter(self)
else: else:
client = self client = self
...@@ -1557,7 +1559,7 @@ class TransactionIterator(object): ...@@ -1557,7 +1559,7 @@ class TransactionIterator(object):
def __iter__(self): def __iter__(self):
return self return self
def next(self): def __next__(self):
if self._ended: if self._ended:
raise StopIteration() raise StopIteration()
...@@ -1575,6 +1577,8 @@ class TransactionIterator(object): ...@@ -1575,6 +1577,8 @@ class TransactionIterator(object):
return ClientStorageTransactionInformation( return ClientStorageTransactionInformation(
self._storage, self, *tx_data) self._storage, self, *tx_data)
next = __next__
class ClientStorageTransactionInformation(ZODB.BaseStorage.TransactionRecord): class ClientStorageTransactionInformation(ZODB.BaseStorage.TransactionRecord):
...@@ -1607,7 +1611,7 @@ class RecordIterator(object): ...@@ -1607,7 +1611,7 @@ class RecordIterator(object):
def __iter__(self): def __iter__(self):
return self return self
def next(self): def __next__(self):
if self._completed: if self._completed:
# We finished iteration once already and the server can't know # We finished iteration once already and the server can't know
# about the iteration anymore. # about the iteration anymore.
...@@ -1620,6 +1624,8 @@ class RecordIterator(object): ...@@ -1620,6 +1624,8 @@ class RecordIterator(object):
raise StopIteration() raise StopIteration()
return ZODB.BaseStorage.DataRecord(*item) return ZODB.BaseStorage.DataRecord(*item)
next = __next__
class ClientStorage308Adapter: class ClientStorage308Adapter:
...@@ -1647,7 +1653,8 @@ class BlobCacheLayout(object): ...@@ -1647,7 +1653,8 @@ class BlobCacheLayout(object):
base, rem = divmod(utils.u64(oid), self.size) base, rem = divmod(utils.u64(oid), self.size)
return os.path.join( return os.path.join(
str(rem), str(rem),
"%s.%s%s" % (base, tid.encode('hex'), ZODB.blob.BLOB_SUFFIX) "%s.%s%s" % (base, hexlify(tid).decode('ascii'),
ZODB.blob.BLOB_SUFFIX)
) )
def _accessed(filename): def _accessed(filename):
......
...@@ -308,7 +308,7 @@ class StorageServer: ...@@ -308,7 +308,7 @@ class StorageServer:
class StorageServer308(StorageServer): class StorageServer308(StorageServer):
def __init__(self, rpc): def __init__(self, rpc):
if rpc.peer_protocol_version == 'Z200': if rpc.peer_protocol_version == b'Z200':
self.lastTransaction = lambda: z64 self.lastTransaction = lambda: z64
self.getInvalidations = lambda tid: None self.getInvalidations = lambda tid: None
self.getAuthProtocol = lambda: None self.getAuthProtocol = lambda: None
...@@ -386,7 +386,7 @@ def stub(client, connection): ...@@ -386,7 +386,7 @@ def stub(client, connection):
raise ValueError("Timeout waiting for protocol handshake") raise ValueError("Timeout waiting for protocol handshake")
time.sleep(0.1) time.sleep(0.1)
if connection.peer_protocol_version < 'Z309': if connection.peer_protocol_version < b'Z309':
return StorageServer308(connection) return StorageServer308(connection)
return StorageServer(connection) return StorageServer(connection)
......
...@@ -142,7 +142,7 @@ class TBIterator(object): ...@@ -142,7 +142,7 @@ class TBIterator(object):
def __iter__(self): def __iter__(self):
return self return self
def next(self): def __next__(self):
"""Return next tuple of data or None if EOF""" """Return next tuple of data or None if EOF"""
if self.count == 0: if self.count == 0:
self.file.seek(0) self.file.seek(0)
...@@ -151,3 +151,4 @@ class TBIterator(object): ...@@ -151,3 +151,4 @@ class TBIterator(object):
oid_ver_data = self.unpickler.load() oid_ver_data = self.unpickler.load()
self.count -= 1 self.count -= 1
return oid_ver_data return oid_ver_data
next = __next__
...@@ -17,7 +17,7 @@ Let's start a Z308 server ...@@ -17,7 +17,7 @@ Let's start a Z308 server
... ''' ... '''
>>> addr, admin = start_server( >>> addr, admin = start_server(
... storage_conf, dict(invalidation_queue_size=5), protocol='Z308') ... storage_conf, dict(invalidation_queue_size=5), protocol=b'Z308')
A current client should be able to connect to a old server: A current client should be able to connect to a old server:
...@@ -25,7 +25,7 @@ A current client should be able to connect to a old server: ...@@ -25,7 +25,7 @@ A current client should be able to connect to a old server:
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage) >>> wait_connected(db.storage)
>>> db.storage._connection.peer_protocol_version >>> db.storage._connection.peer_protocol_version
'Z308' b'Z308'
>>> conn = db.open() >>> conn = db.open()
>>> conn.root().x = 0 >>> conn.root().x = 0
...@@ -105,11 +105,11 @@ Note that we'll have to pull some hijinks: ...@@ -105,11 +105,11 @@ Note that we'll have to pull some hijinks:
>>> import ZEO.zrpc.connection >>> import ZEO.zrpc.connection
>>> old_current_protocol = ZEO.zrpc.connection.Connection.current_protocol >>> old_current_protocol = ZEO.zrpc.connection.Connection.current_protocol
>>> ZEO.zrpc.connection.Connection.current_protocol = 'Z308' >>> ZEO.zrpc.connection.Connection.current_protocol = b'Z308'
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> db.storage._connection.peer_protocol_version >>> db.storage._connection.peer_protocol_version
'Z308' b'Z308'
>>> wait_connected(db.storage) >>> wait_connected(db.storage)
>>> conn = db.open() >>> conn = db.open()
>>> conn.root().x = 0 >>> conn.root().x = 0
......
...@@ -114,7 +114,7 @@ class MonitorTests(ZEO.tests.testMonitor.MonitorTests): ...@@ -114,7 +114,7 @@ class MonitorTests(ZEO.tests.testMonitor.MonitorTests):
def check_connection_management_with_old_client(self): def check_connection_management_with_old_client(self):
# Check that connection management works even when using an # Check that connection management works even when using an
# older protcool that requires a connection adapter. # older protcool that requires a connection adapter.
test_protocol = "Z303" test_protocol = b"Z303"
current_protocol = ZEO.zrpc.connection.Connection.current_protocol current_protocol = ZEO.zrpc.connection.Connection.current_protocol
ZEO.zrpc.connection.Connection.current_protocol = test_protocol ZEO.zrpc.connection.Connection.current_protocol = test_protocol
ZEO.zrpc.connection.Connection.servers_we_can_talk_to.append( ZEO.zrpc.connection.Connection.servers_we_can_talk_to.append(
......
...@@ -443,7 +443,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -443,7 +443,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0 (test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes (test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
1 callAsync serialnos ... 1 callAsync serialnos ...
>>> zs1.connection = None >>> zs1.connection = None
...@@ -456,7 +456,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -456,7 +456,7 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0 (test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, 36 bytes (test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
2 callAsync serialnos ... 2 callAsync serialnos ...
>>> zs1.txnlog.close() >>> zs1.txnlog.close()
...@@ -475,6 +475,8 @@ def test_suite(): ...@@ -475,6 +475,8 @@ def test_suite():
(re.compile('\d+/test-addr'), ''), (re.compile('\d+/test-addr'), ''),
(re.compile("'lock_time': \d+.\d+"), 'lock_time'), (re.compile("'lock_time': \d+.\d+"), 'lock_time'),
(re.compile(r"'start': '[^\n]+'"), 'start'), (re.compile(r"'start': '[^\n]+'"), 'start'),
(re.compile('ZODB.POSException.StorageTransactionError'),
'StorageTransactionError'),
]), ]),
), ),
)) ))
......
...@@ -112,6 +112,7 @@ def client_loop(map): ...@@ -112,6 +112,7 @@ def client_loop(map):
'A ZEO client loop failed.', 'A ZEO client loop failed.',
exc_info=sys.exc_info()) exc_info=sys.exc_info())
except: except:
pass pass
for fd, obj in map.items(): for fd, obj in map.items():
...@@ -212,7 +213,7 @@ class ConnectionManager(object): ...@@ -212,7 +213,7 @@ class ConnectionManager(object):
log("CM.close(): self.thread.join() timed out", log("CM.close(): self.thread.join() timed out",
level=logging.WARNING) level=logging.WARNING)
for fd, obj in self.map.items(): for fd, obj in list(self.map.items()):
if obj is not self.trigger: if obj is not self.trigger:
try: try:
obj.close() obj.close()
......
...@@ -243,16 +243,16 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -243,16 +243,16 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# Protocol variables: # Protocol variables:
# Our preferred protocol. # Our preferred protocol.
current_protocol = "Z3101" current_protocol = b"Z3101"
# If we're a client, an exhaustive list of the server protocols we # If we're a client, an exhaustive list of the server protocols we
# can accept. # can accept.
servers_we_can_talk_to = ["Z308", "Z309", "Z310", current_protocol] servers_we_can_talk_to = [b"Z308", b"Z309", b"Z310", current_protocol]
# If we're a server, an exhaustive list of the client protocols we # If we're a server, an exhaustive list of the client protocols we
# can accept. # can accept.
clients_we_can_talk_to = [ clients_we_can_talk_to = [
"Z200", "Z201", "Z303", "Z308", "Z309", "Z310", current_protocol] b"Z200", b"Z201", b"Z303", b"Z308", b"Z309", b"Z310", current_protocol]
# This is pretty excruciating. Details: # This is pretty excruciating. Details:
# #
...@@ -278,7 +278,7 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -278,7 +278,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# Exception types that should not be logged: # Exception types that should not be logged:
unlogged_exception_types = () unlogged_exception_types = ()
# Client constructor passes 'C' for tag, server constructor 'S'. This # Client constructor passes b'C' for tag, server constructor b'S'. This
# is used in log messages, and to determine whether we can speak with # is used in log messages, and to determine whether we can speak with
# our peer. # our peer.
def __init__(self, sock, addr, obj, tag, map=None): def __init__(self, sock, addr, obj, tag, map=None):
...@@ -290,9 +290,9 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -290,9 +290,9 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.closed = False self.closed = False
self.peer_protocol_version = None # set in recv_handshake() self.peer_protocol_version = None # set in recv_handshake()
assert tag in "CS" assert tag in b"CS"
self.tag = tag self.tag = tag
self.logger = logging.getLogger('ZEO.zrpc.Connection(%c)' % tag) self.logger = logging.getLogger('ZEO.zrpc.Connection(%r)' % tag)
if isinstance(addr, tuple): if isinstance(addr, tuple):
self.log_label = "(%s:%d) " % addr self.log_label = "(%s:%d) " % addr
else: else:
...@@ -393,10 +393,10 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -393,10 +393,10 @@ class Connection(smac.SizedMessageAsyncConnection, object):
del self.message_input # uncover normal-case message_input() del self.message_input # uncover normal-case message_input()
self.peer_protocol_version = proto self.peer_protocol_version = proto
if self.tag == 'C': if self.tag == b'C':
good_protos = self.servers_we_can_talk_to good_protos = self.servers_we_can_talk_to
else: else:
assert self.tag == 'S' assert self.tag == b'S'
good_protos = self.clients_we_can_talk_to good_protos = self.clients_we_can_talk_to
if proto in good_protos: if proto in good_protos:
...@@ -604,7 +604,7 @@ class ManagedServerConnection(Connection): ...@@ -604,7 +604,7 @@ class ManagedServerConnection(Connection):
def __init__(self, sock, addr, obj, mgr): def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr self.mgr = mgr
map = {} map = {}
Connection.__init__(self, sock, addr, obj, 'S', map=map) Connection.__init__(self, sock, addr, obj, b'S', map=map)
self.decode = ZEO.zrpc.marshal.server_decode self.decode = ZEO.zrpc.marshal.server_decode
...@@ -699,7 +699,7 @@ class ManagedClientConnection(Connection): ...@@ -699,7 +699,7 @@ class ManagedClientConnection(Connection):
self.replies_cond = threading.Condition() self.replies_cond = threading.Condition()
self.replies = {} self.replies = {}
self.__super_init(sock, addr, None, tag='C', map=mgr.map) self.__super_init(sock, addr, None, tag=b'C', map=mgr.map)
self.trigger = mgr.trigger self.trigger = mgr.trigger
self.call_from_thread = self.trigger.pull_trigger self.call_from_thread = self.trigger.pull_trigger
self.call_from_thread() self.call_from_thread()
......
...@@ -182,7 +182,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -182,7 +182,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
if msg_size > input_len: if msg_size > input_len:
if inp is None: if inp is None:
self.__inp = d self.__inp = d
elif type(self.__inp) is str: elif isinstance(self.__inp, six.binary_type):
self.__inp = [self.__inp, d] self.__inp = [self.__inp, d]
else: else:
self.__inp.append(d) self.__inp.append(d)
...@@ -190,7 +190,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -190,7 +190,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
return # keep waiting for more input return # keep waiting for more input
# load all previous input and d into single string inp # load all previous input and d into single string inp
if isinstance(inp, str): if isinstance(inp, six.binary_type):
inp = inp + d inp = inp + d
elif inp is None: elif inp is None:
inp = d inp = d
...@@ -265,7 +265,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -265,7 +265,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
size = sum((len(s) for s in output)) size = sum((len(s) for s in output))
while (size <= SEND_SIZE) and messages: while (size <= SEND_SIZE) and messages:
message = messages[0] message = messages[0]
if message.__class__ is six.binary_type: if isinstance(message, six.binary_type):
size += self.__message_output(messages.pop(0), output) size += self.__message_output(messages.pop(0), output)
elif message is _close_marker: elif message is _close_marker:
del messages[:] del messages[:]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment