Commit 87c5178b authored by Julien Muchembled's avatar Julien Muchembled

Fix race when tweak touches partitions that are being reported as replicated

The bug could lead to data corruption (if a partition is wrongly marked as
UP_TO_DATE) or crashes (assertion failure on either the storage or the master).

The protocol is extended to handle the following scenario:

    S                                    M
    partition 0 outdated
      <-- UnfinishedTransactions ------>
    replication of partition 0 ...
    partition 1 outdated
      --- UnfinishedTransactions ...
    ... replication finished
      --- ReplicationDone ...
                                         tweak
      <-- partition 1 discarded --------
                                         tweak
      <-- partition 1 outdated ---------
          ... UnfinishedTransactions -->
          ... ReplicationDone --------->

The master can't simply mark all outdated cells as being updatable when it
receives an UnfinishedTransactions packet.
parent 64afd7d2
......@@ -837,6 +837,12 @@ class UnfinishedTransactions(Packet):
Ask unfinished transactions S -> PM.
Answer unfinished transactions PM -> S.
"""
_fmt = PStruct('ask_unfinished_transactions',
PList('row_list',
PNumber('offset'),
),
)
_answer = PStruct('answer_unfinished_transactions',
PTID('max_tid'),
PList('tid_list',
......
......@@ -15,9 +15,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.protocol import CellStates, ClusterStates, Packets, ProtocolError
from neo.lib.protocol import (CellStates, ClusterStates, Packets, ProtocolError,
uuid_str)
from neo.lib.exception import StoppedOperation
from neo.lib.pt import PartitionTableException
from neo.lib.util import dump
from . import BaseServiceHandler
......@@ -51,7 +53,7 @@ class StorageServiceHandler(BaseServiceHandler):
if app.packing is not None:
self.answerPack(conn, False)
def askUnfinishedTransactions(self, conn):
def askUnfinishedTransactions(self, conn, offset_list):
app = self.app
if app.backup_tid:
last_tid = app.pt.getBackupTid(min)
......@@ -64,6 +66,7 @@ class StorageServiceHandler(BaseServiceHandler):
pending_list = app.tm.registerForNotification(conn.getUUID())
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
conn.answer(p)
app.pt.updatable(conn.getUUID(), offset_list)
def notifyDeadlock(self, conn, *args):
self.app.tm.deadlock(conn.getUUID(), *args)
......@@ -84,20 +87,24 @@ class StorageServiceHandler(BaseServiceHandler):
def notifyReplicationDone(self, conn, offset, tid):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
uuid = conn.getUUID()
node = app.nm.getByUUID(uuid)
if app.backup_tid:
cell_list = app.backup_app.notifyReplicationDone(node, offset, tid)
if not cell_list:
return
else:
# TODO: check tid (see NonReadableCell.__doc__)
try:
cell_list = self.app.pt.setUpToDate(node, offset)
if not cell_list:
raise ProtocolError('Non-outdated partition')
except PartitionTableException, e:
raise ProtocolError(str(e))
logging.debug("%s is up for offset %s", node, offset)
if not cell_list:
logging.info("ignored late notification that"
" %s has replicated partition %s up to %s",
uuid_str(uuid), offset, dump(tid))
return
logging.debug("%s is up for partition %s (tid=%s)",
uuid_str(uuid), offset, dump(tid))
self.app.broadcastPartitionChanges(cell_list)
def answerPack(self, conn, status):
......
......@@ -23,11 +23,18 @@ from neo.lib.protocol import CellStates, ZERO_TID
class Cell(neo.lib.pt.Cell):
replicating = ZERO_TID
updatable = False
def setState(self, state):
readable = self.isReadable()
super(Cell, self).setState(state)
if readable and not self.isReadable():
if self.isReadable():
return
try:
del self.updatable
except AttributeError:
pass
if readable:
try:
del self.backup_tid, self.replicating
except AttributeError:
......@@ -157,7 +164,7 @@ class PartitionTable(neo.lib.pt.PartitionTable):
# check the partition is assigned and known as outdated
for cell in self.getCellList(offset):
if cell.getUUID() == uuid:
if cell.isOutOfDate():
if cell.isOutOfDate() and cell.updatable:
break
return
else:
......@@ -300,6 +307,12 @@ class PartitionTable(neo.lib.pt.PartitionTable):
logging.warning(self._first_outdated_message)
return change_list
def updatable(self, uuid, offset_list):
for offset in offset_list:
for cell in self.partition_list[offset]:
if cell.getUUID() == uuid and not cell.isReadable():
cell.updatable = True
def iterNodeCell(self, node):
for offset, row in enumerate(self.partition_list):
for cell in row:
......
......@@ -108,8 +108,8 @@ class TransactionManager(EventQueue):
assert isdisjoint(self._replicated), (offset_list, self._replicated)
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
offset_list, self._store_lock_dict)
self._app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=offset_list)
p = Packets.AskUnfinishedTransactions(offset_list)
self._app.master_conn.ask(p, offset_list=offset_list)
def replicated(self, partition, tid):
# also called for readable cells in BACKINGUP state
......
......@@ -212,6 +212,9 @@ class MasterPartitionTableTests(NeoUnitTestBase):
self.assertEqual(self._pt_states(pt), self._pt_states(new_pt))
def update(self, pt, change_list=None):
offset_list = range(pt.np)
for node in pt.count_dict:
pt.updatable(node.getUUID(), offset_list)
if change_list is None:
for offset, row in enumerate(pt.partition_list):
for cell in list(row):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment