Commit d3780906 authored by Julien Muchembled's avatar Julien Muchembled

client: use a class instead of a simple dict to hold transaction information

parent 97e57031
......@@ -20,8 +20,7 @@ from random import shuffle
import heapq
import time
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.POSException import ReadConflictError
from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from . import OLD_ZODB
if OLD_ZODB:
from ZODB.ConflictResolution import ResolvedSerial
......@@ -31,14 +30,15 @@ from neo.lib import logging
from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Empty, Lock, SimpleQueue
from neo.lib.locking import Empty, Lock
from neo.lib.connection import MTClientConnection, ConnectionClosed
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
from .handlers import storage, master
from neo.lib.threaded_app import ThreadedApplication
from .cache import ClientCache
from .pool import ConnectionPool, InvolvedNodeDict
from .pool import ConnectionPool
from .transactions import TransactionContainer
from neo.lib.util import p64, u64, parseMasterList
CHECKED_SERIAL = object()
......@@ -52,44 +52,6 @@ if SignalHandler:
SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
class TransactionContainer(dict):
# IDEA: Drop this container and use the new set_data/data API on
# transactions (requires transaction >= 1.6).
def pop(self, txn):
return dict.pop(self, id(txn), None)
def get(self, txn):
try:
return self[id(txn)]
except KeyError:
raise StorageTransactionError("unknown transaction %r" % txn)
def new(self, txn):
key = id(txn)
if key in self:
raise StorageTransactionError("commit of transaction %r"
" already started" % txn)
context = self[key] = {
'queue': SimpleQueue(),
'txn': txn,
'ttid': None,
# data being stored
'data_dict': {},
'data_size': 0,
# data stored: this will go to the cache on tpc_finish
'cache_dict': {},
'cache_size': 0,
# conflicts to resolve
'conflict_dict': {}, # {oid: (base_serial, serial)}
# resolved conflicts
'resolved_dict': {}, # {oid: serial}
# status: 0 -> check only, 1 -> store, 2 -> failed
'involved_nodes': InvolvedNodeDict(), # {node_id: status}
}
return context
class Application(ThreadedApplication):
"""The client node application."""
......@@ -177,14 +139,14 @@ class Application(ThreadedApplication):
Just like _waitAnyMessage, but for per-transaction exchanges, rather
than per-thread.
"""
queue = txn_context['queue']
queue = txn_context.queue
self.setHandlerData(txn_context)
try:
self._waitAnyMessage(queue, block=block)
finally:
# Don't leave access to thread context, even if a raise happens.
self.setHandlerData(None)
if txn_context['conflict_dict']:
if txn_context.conflict_dict:
self._handleConflicts(txn_context)
def _askStorage(self, conn, packet, **kw):
......@@ -403,8 +365,8 @@ class Application(ThreadedApplication):
if answer_ttid is None:
raise NEOStorageError('tpc_begin failed')
assert tid in (None, answer_ttid), (tid, answer_ttid)
txn_context['Storage'] = storage
txn_context['ttid'] = answer_ttid
txn_context.Storage = storage
txn_context.ttid = answer_ttid
def store(self, oid, serial, data, version, transaction):
"""Store object."""
......@@ -414,7 +376,7 @@ class Application(ThreadedApplication):
self._store(self._txn_container.get(transaction), oid, serial, data)
def _store(self, txn_context, oid, serial, data, data_serial=None):
ttid = txn_context['ttid']
ttid = txn_context.ttid
if data is None:
# This is some undo: either a no-data object (undoing object
# creation) or a back-pointer to an earlier revision (going back to
......@@ -436,24 +398,22 @@ class Application(ThreadedApplication):
compression = 0
compressed_data = data
checksum = makeChecksum(compressed_data)
txn_context['data_size'] += size
txn_context.data_size += size
# Store object in tmp cache
txn_context['data_dict'][oid] = data
queue = txn_context['queue']
txn_context.data_dict[oid] = data
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid)
for ask in self.cp.iterateForWrite(oid, txn_context['involved_nodes']):
ask(packet, queue=queue, oid=oid, serial=serial)
txn_context.write(self, packet, oid, oid=oid, serial=serial)
while txn_context['data_size'] >= self._cache._max_size:
while txn_context.data_size >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context)
self._waitAnyTransactionMessage(txn_context, False)
def _handleConflicts(self, txn_context):
data_dict = txn_context['data_dict']
pop_conflict = txn_context['conflict_dict'].popitem
resolved_dict = txn_context['resolved_dict']
tryToResolveConflict = txn_context['Storage'].tryToResolveConflict
data_dict = txn_context.data_dict
pop_conflict = txn_context.conflict_dict.popitem
resolved_dict = txn_context.resolved_dict
tryToResolveConflict = txn_context.Storage.tryToResolveConflict
while 1:
# We iterate over conflict_dict, and clear it,
# because new items may be added by calls to _store.
......@@ -473,7 +433,7 @@ class Application(ThreadedApplication):
data = data_dict[oid]
except KeyError:
# succesfully stored on another storage node
data = txn_context['cache_dict'][oid]
data = txn_context.cache_dict[oid]
else:
# Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of
......@@ -493,7 +453,7 @@ class Application(ThreadedApplication):
serial))
# TODO: data can be None if a conflict happens during undo
if data:
txn_context['data_size'] -= len(data)
txn_context.data_size -= len(data)
if self.last_tid < conflict_serial:
self.sync() # possible late invalidation (very rare)
try:
......@@ -526,28 +486,27 @@ class Application(ThreadedApplication):
_waitAnyMessage(queue)
def waitStoreResponses(self, txn_context):
queue = txn_context['queue']
queue = txn_context.queue
pending = self.dispatcher.pending
_waitAnyTransactionMessage = self._waitAnyTransactionMessage
while pending(queue):
_waitAnyTransactionMessage(txn_context)
if txn_context['data_dict']:
if txn_context.data_dict:
raise NEOStorageError('could not store/check all oids')
def tpc_vote(self, transaction):
"""Store current transaction."""
txn_context = self._txn_container.get(transaction)
self.waitStoreResponses(txn_context)
ttid = txn_context['ttid']
ttid = txn_context.ttid
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
txn_context['cache_dict'])
queue = txn_context['queue']
involved_nodes = txn_context['involved_nodes']
txn_context.cache_dict)
queue = txn_context.queue
involved_nodes = txn_context.involved_nodes
# Ask in parallel all involved storage nodes to commit object metadata.
# Nodes that store the transaction metadata get a special packet.
trans_nodes = [ask(packet, queue=queue)
for ask in self.cp.iterateForWrite(ttid, involved_nodes)]
trans_nodes = txn_context.write(self, packet, ttid)
packet = Packets.AskVoteTransaction(ttid)
for uuid, status in involved_nodes.iteritems():
if status == 1 and uuid not in trans_nodes:
......@@ -555,10 +514,13 @@ class Application(ThreadedApplication):
if node is not None:
conn = self.cp.getConnForNode(node)
if conn is not None:
involved_nodes.ask(conn)(packet, queue=queue)
try:
conn.ask(packet, queue=queue)
continue
except ConnectionClosed:
pass
involved_nodes[uuid] = 2
self.waitResponses(queue)
self.waitResponses(txn_context.queue)
# If there are failed nodes, ask the master whether they can be
# disconnected while keeping the cluster operational. If possible,
# this will happen during tpc_finish.
......@@ -570,19 +532,19 @@ class Application(ThreadedApplication):
self._askPrimary(Packets.FailedVote(ttid, failed))
except ConnectionClosed:
pass
txn_context['voted'] = None
txn_context.voted = True
# We must not go further if connection to master was lost since
# tpc_begin, to lower the probability of failing during tpc_finish.
# IDEA: We can improve in 2 opposite directions:
# - In the case of big transactions, it would be useful to
# also detect failures earlier.
# - If possible, recover from master failure.
if 'error' in txn_context:
raise NEOStorageError(txn_context['error'])
if txn_context.error:
raise NEOStorageError(txn_context.error)
if OLD_ZODB:
return [(oid, ResolvedSerial)
for oid in txn_context['resolved_dict']]
return txn_context['resolved_dict']
for oid in txn_context.resolved_dict]
return txn_context.resolved_dict
def tpc_abort(self, transaction):
"""Abort current transaction."""
......@@ -595,14 +557,14 @@ class Application(ThreadedApplication):
pass
else:
try:
notify(Packets.AbortTransaction(txn_context['ttid'],
txn_context['involved_nodes']))
notify(Packets.AbortTransaction(txn_context.ttid,
txn_context.involved_nodes))
except ConnectionClosed:
pass
# We don't need to flush queue, as it won't be reused by future
# transactions (deleted on next line & indexed by transaction object
# instance).
self.dispatcher.forget_queue(txn_context['queue'], flush_queue=False)
self.dispatcher.forget_queue(txn_context.queue, flush_queue=False)
def tpc_finish(self, transaction, f=None):
"""Finish current transaction
......@@ -622,19 +584,19 @@ class Application(ThreadedApplication):
if any failure happens.
"""
txn_container = self._txn_container
if 'voted' not in txn_container.get(transaction):
if not txn_container.get(transaction).voted:
self.tpc_vote(transaction)
checked_list = []
self._load_lock_acquire()
try:
# Call finish on master
txn_context = txn_container.pop(transaction)
cache_dict = txn_context['cache_dict']
cache_dict = txn_context.cache_dict
checked_list = [oid for oid, data in cache_dict.iteritems()
if data is CHECKED_SERIAL]
for oid in checked_list:
del cache_dict[oid]
ttid = txn_context['ttid']
ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list)
try:
tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
......@@ -691,7 +653,7 @@ class Application(ThreadedApplication):
getCellSortKey = self.cp.getCellSortKey
getConnForCell = self.cp.getConnForCell
queue = self._thread_container.queue
ttid = txn_context['ttid']
ttid = txn_context.ttid
undo_object_tid_dict = {}
snapshot_tid = p64(u64(self.last_tid) + 1)
for partition, oid_list in partition_oid_dict.iteritems():
......@@ -733,7 +695,7 @@ class Application(ThreadedApplication):
'conflict')
# Resolve conflict
try:
data = txn_context['Storage'].tryToResolveConflict(
data = txn_context.Storage.tryToResolveConflict(
oid, current_serial, undone_tid, undo_data, data)
except ConflictError:
raise UndoError('Some data were modified by a later ' \
......@@ -907,16 +869,12 @@ class Application(ThreadedApplication):
self._txn_container.get(transaction), oid, serial)
def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
ttid = txn_context['ttid']
# Placeholders
queue = txn_context['queue']
ttid = txn_context.ttid
# ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
# after stores, and skips oids that have been successfully stored.
assert oid not in txn_context['cache_dict'], (oid, txn_context)
txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
involved_nodes = txn_context['involved_nodes']
assert oid not in txn_context.cache_dict, (oid, txn_context)
txn_context.data_dict.setdefault(oid, CHECKED_SERIAL)
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for ask in self.cp.iterateForWrite(oid, involved_nodes, 0):
ask(packet, queue=queue, oid=oid, serial=serial)
txn_context.write(self, packet, oid, 0, oid=oid, serial=serial)
self._waitAnyTransactionMessage(txn_context, False)
......@@ -147,7 +147,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
logging.critical(msg)
app.master_conn = None
for txn_context in app.txn_contexts():
txn_context['error'] = msg
txn_context.error = msg
try:
del app.pt
except AttributeError:
......
......@@ -22,7 +22,7 @@ from neo.lib.util import dump
from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler
from . import AnswerBaseHandler
from ..pool import InvolvedNodeDict
from ..transactions import Transaction
from ..exception import NEOStorageError, NEOStorageNotFoundError
from ..exception import NEOStorageDoesNotExistError
......@@ -82,26 +82,26 @@ class StorageAnswersHandler(AnswerBaseHandler):
if conflict != MAX_TID:
# If this conflict is not already resolved, mark it for
# resolution.
if conflict <= txn_context['resolved_dict'].get(oid, ''):
if conflict <= txn_context.resolved_dict.get(oid, ''):
return
txn_context['conflict_dict'][oid] = serial, conflict
txn_context.conflict_dict[oid] = serial, conflict
else:
try:
data = txn_context['data_dict'].pop(oid)
data = txn_context.data_dict.pop(oid)
except KeyError: # replica, or multiple undo
return
if type(data) is str:
size = len(data)
txn_context['data_size'] -= size
size += txn_context['cache_size']
txn_context.data_size -= size
size += txn_context.cache_size
if size < self.app._cache._max_size:
txn_context['cache_size'] = size
txn_context.cache_size = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
txn_context['cache_dict'][oid] = data
txn_context.cache_dict[oid] = data
answerCheckCurrentSerial = answerStoreObject
......@@ -112,11 +112,8 @@ class StorageAnswersHandler(AnswerBaseHandler):
def connectionClosed(self, conn):
txn_context = self.app.getHandlerData()
# XXX: A 'Transaction' class would be cleaner.
if type(txn_context) is dict:
involved_nodes = txn_context.get('involved_nodes')
if type(involved_nodes) is InvolvedNodeDict:
involved_nodes[conn.getUUID()] = 2
if type(txn_context) is Transaction:
txn_context.involved_nodes[conn.getUUID()] = 2
super(StorageAnswersHandler, self).connectionClosed(conn)
def answerTIDsFrom(self, conn, tid_list):
......
......@@ -37,23 +37,6 @@ CELL_GOOD = 0
CELL_FAILED = 1
class InvolvedNodeDict(dict):
# Keys are node ids instead of Node objects because a node may disappear
# from the cluster. In any case, we always have to check if the id is
# still known by the NodeManager.
def ask(self, conn):
def ask(*args, **kw):
try:
conn.ask(*args, **kw)
except ConnectionClosed:
self[conn.getUUID()] = 2
else:
self.fail = 0
return conn.getUUID()
return ask
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
......@@ -135,26 +118,6 @@ class ConnectionPool(object):
if self.app.master_conn is None:
raise NEOPrimaryMasterLost
def iterateForWrite(self, object_id, involved, store=1):
pt = self.app.pt
involved.fail = 1
for cell in pt.getCellList(pt.getPartition(object_id)):
node = cell.getNode()
uuid = node.getUUID()
status = involved.setdefault(uuid, store)
if status < store:
involved[uuid] = store
elif status > 1:
continue
conn = self.getConnForNode(node)
if conn is None:
involved[uuid] = 2
else:
yield involved.ask(conn)
if involved.fail:
raise NEOStorageError(
'no storage available for write to partition %s' % object_id)
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
......
#
# Copyright (C) 2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB.POSException import StorageTransactionError
from neo.lib.connection import ConnectionClosed
from neo.lib.locking import SimpleQueue
from .exception import NEOStorageError
class Transaction(object):
cache_size = 0 # size of data in cache_dict
data_size = 0 # size of data in data_dict
error = None
voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess
def __init__(self, txn):
self.queue = SimpleQueue()
self.txn = txn
# data being stored
self.data_dict = {}
# data stored: this will go to the cache on tpc_finish
self.cache_dict = {}
# conflicts to resolve
self.conflict_dict = {} # {oid: (base_serial, serial)}
# resolved conflicts
self.resolved_dict = {} # {oid: serial}
# Keys are node ids instead of Node objects because a node may
# disappear from the cluster. In any case, we always have to check
# if the id is still known by the NodeManager.
# status: 0 -> check only, 1 -> store, 2 -> failed
self.involved_nodes = {} # {node_id: status}
def write(self, app, packet, object_id, store=1, **kw):
uuid_list = []
pt = app.pt
involved = self.involved_nodes
object_id = pt.getPartition(object_id)
for cell in pt.getCellList(object_id):
node = cell.getNode()
uuid = node.getUUID()
status = involved.setdefault(uuid, store)
if status < store:
involved[uuid] = store
elif status > 1:
continue
conn = app.cp.getConnForNode(node)
if conn is not None:
try:
conn.ask(packet, queue=self.queue, **kw)
uuid_list.append(uuid)
continue
except ConnectionClosed:
pass
involved[uuid] = 2
if uuid_list:
return uuid_list
raise NEOStorageError(
'no storage available for write to partition %s' % object_id)
class TransactionContainer(dict):
# IDEA: Drop this container and use the new set_data/data API on
# transactions (requires transaction >= 1.6).
def pop(self, txn):
return dict.pop(self, id(txn), None)
def get(self, txn):
try:
return self[id(txn)]
except KeyError:
raise StorageTransactionError("unknown transaction %r" % txn)
def new(self, txn):
key = id(txn)
if key in self:
raise StorageTransactionError("commit of transaction %r"
" already started" % txn)
context = self[key] = Transaction(txn)
return context
......@@ -70,7 +70,7 @@ class ClientApplicationTests(NeoUnitTestBase):
def _begin(self, app, txn, tid):
txn_context = app._txn_container.new(txn)
txn_context['ttid'] = tid
txn_context.ttid = tid
return txn_context
def getApp(self, master_nodes=None, name='test', **kw):
......
......@@ -370,7 +370,7 @@ class Test(NEOThreadedTest):
resolved = []
last = lambda txn: txn._extension['last'] # BBB
def _handleConflicts(orig, txn_context):
resolved.append(last(txn_context['txn']))
resolved.append(last(txn_context.txn))
orig(txn_context)
def tpc_vote(orig, transaction):
(l3 if last(transaction) else l2)()
......@@ -969,7 +969,7 @@ class Test(NEOThreadedTest):
txn = transaction.Transaction()
client.tpc_begin(None, txn)
txn_context = client._txn_container.get(txn)
txn_context['ttid'] = add64(txn_context['ttid'], 1)
txn_context.ttid = add64(txn_context.ttid, 1)
self.assertRaises(POSException.StorageError,
client.tpc_finish, txn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment