Commit 67b42fa7 authored by Kirill Smelkov's avatar Kirill Smelkov

zodbcommit: Don't forget to call tpc_abort on an error

Two-phase commit protocol assumes that after tpc_begin, it will be
either successful tpc_vote + tpc_finish, or tpc_abort. We were not
calling tpc_abort on an error, potentially leaving storage in "commit is
in progress" state on an error.

/reviewed-by @jerome
/reviewed-on nexedi/zodbtools!19
parent c59a54ca
# Copyright (C) 2018-2020 Nexedi SA and Contributors.
# Copyright (C) 2018-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -58,49 +58,56 @@ def zodbcommit(stor, at, txn):
before = p64(u64(at)+1)
stor.tpc_begin(txn)
for obj in txn.objv:
data = None # data do be committed - setup vvv
if isinstance(obj, zodbdump.ObjectCopy):
# NEO does not support restore, and even if stor supports restore,
# going that way requires to already know tid for transaction we are
# committing. -> we just imitate copy by actually copying data and
# letting the storage deduplicate it.
data, _, _ = stor.loadBefore(obj.oid, p64(u64(obj.copy_from)+1))
elif isinstance(obj, zodbdump.ObjectDelete):
data = None
def _():
for obj in txn.objv:
data = None # data do be committed - setup vvv
if isinstance(obj, zodbdump.ObjectCopy):
# NEO does not support restore, and even if stor supports restore,
# going that way requires to already know tid for transaction we are
# committing. -> we just imitate copy by actually copying data and
# letting the storage deduplicate it.
data, _, _ = stor.loadBefore(obj.oid, p64(u64(obj.copy_from)+1))
elif isinstance(obj, zodbdump.ObjectData):
elif isinstance(obj, zodbdump.ObjectDelete):
data = None
if isinstance(obj.data, zodbdump.HashOnly):
raise ValueError('cannot commit transaction with hashonly object')
elif isinstance(obj, zodbdump.ObjectData):
data = obj.data
if isinstance(obj.data, zodbdump.HashOnly):
raise ValueError('cannot commit transaction with hashonly object')
else:
panic('invalid object record: %r' % (obj,))
data = obj.data
else:
panic('invalid object record: %r' % (obj,))
# now we have the data.
# find out what is oid's serial as of <before state
try:
xdata = stor.loadBefore(obj.oid, before)
except POSKeyError:
serial_prev = z64
else:
if xdata is None:
# now we have the data.
# find out what is oid's serial as of <before state
try:
xdata = stor.loadBefore(obj.oid, before)
except POSKeyError:
serial_prev = z64
else:
_, serial_prev, _ = xdata
# store the object.
# if it will be ConflictError - we just fail and let the caller retry.
if data is None:
stor.deleteObject(obj.oid, serial_prev, txn)
else:
stor.store(obj.oid, serial_prev, data, '', txn)
if xdata is None:
serial_prev = z64
else:
_, serial_prev, _ = xdata
# store the object.
# if it will be ConflictError - we just fail and let the caller retry.
if data is None:
stor.deleteObject(obj.oid, serial_prev, txn)
else:
stor.store(obj.oid, serial_prev, data, '', txn)
stor.tpc_vote(txn)
try:
_()
stor.tpc_vote(txn)
except:
stor.tpc_abort(txn)
raise
# in ZODB >= 5 tpc_finish returns tid directly, but on ZODB 4 it
# does not do so. Since we still need to support ZODB 4, utilize tpc_finish
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment