Commit 485a2cfc authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 79f54dcb
...@@ -123,14 +123,14 @@ import ( ...@@ -123,14 +123,14 @@ import (
type Status int type Status int
const ( const (
Active Status = iota // transaction is in progress Active Status = iota // transaction is in progress
Committing // transaction commit started Committing // transaction commit started
Committed // transaction commit finished successfully Committed // transaction commit finished successfully
// XXX CommitFailed // transaction commit resulted in error // XXX CommitFailed // transaction commit resulted in error
Aborting // transaction abort started Aborting // transaction abort started
Aborted // transaction was aborted by user Aborted // transaction was aborted by user
// XXX AbortFailed ? // transaction abort resulted in error // XXX AbortFailed ? // transaction abort resulted in error
// XXX Doomed // transaction was doomed // XXX Doomed // transaction was doomed
) )
// Transaction represents a transaction. // Transaction represents a transaction.
...@@ -140,15 +140,14 @@ const ( ...@@ -140,15 +140,14 @@ const (
// Before completion, if there are changes to managed data, corresponding // Before completion, if there are changes to managed data, corresponding
// DataManager(s) must join the transaction to participate in the completion. // DataManager(s) must join the transaction to participate in the completion.
type Transaction interface { type Transaction interface {
User() string // user name associated with transaction User() string // user name associated with transaction
Description() string // description of transaction Description() string // description of transaction
// XXX map[string]interface{} (objects must be simple values serialized with pickle or json, not "instances") // XXX map[string]interface{} (objects must be simple values serialized with pickle or json, not "instances")
Extension() string Extension() string
// XXX +Note, SetUser, ... // XXX +Note, SetUser, ...
// Status returns current status of the transaction. // Status returns current status of the transaction.
Status() Status Status() Status
...@@ -160,17 +159,16 @@ type Transaction interface { ...@@ -160,17 +159,16 @@ type Transaction interface {
// Commit must not be called after transaction completion began. // Commit must not be called after transaction completion began.
Commit(ctx context.Context) error Commit(ctx context.Context) error
// Abort aborts the transaction. // Abort aborts the transaction.
// //
// Abort completes the transaction by executing Abort on all // Abort completes the transaction by executing Abort on all
// DataManagers associated with it. // DataManagers associated with it.
// //
// Abort must not be called after transaction completion began. // Abort must not be called after transaction completion began.
Abort() // XXX + ctx, error? Abort() // XXX + ctx, error?
// XXX + Doom? // XXX + Doom?
// ---- part for data managers & friends ---- // ---- part for data managers & friends ----
// XXX move to separate interface? // XXX move to separate interface?
...@@ -208,7 +206,6 @@ func Current(ctx context.Context) Transaction { ...@@ -208,7 +206,6 @@ func Current(ctx context.Context) Transaction {
return currentTxn(ctx) return currentTxn(ctx)
} }
// DataManager manages data and can transactionally persist it. // DataManager manages data and can transactionally persist it.
// //
// If DataManager is registered to transaction via Transaction.Join, it will // If DataManager is registered to transaction via Transaction.Join, it will
...@@ -222,12 +219,12 @@ type DataManager interface { ...@@ -222,12 +219,12 @@ type DataManager interface {
// if abort was caused by user requesting transaction abort. If // if abort was caused by user requesting transaction abort. If
// two-phase commit was started and transaction needs to be aborted due // two-phase commit was started and transaction needs to be aborted due
// to two-phase commit logic, TPCAbort will be called. // to two-phase commit logic, TPCAbort will be called.
Abort(txn Transaction) // XXX +ctx, error Abort(txn Transaction) // XXX +ctx, error
// TPCBegin should begin commit of a transaction, starting the two-phase commit. // TPCBegin should begin commit of a transaction, starting the two-phase commit.
TPCBegin(txn Transaction) // XXX +ctx, error ? TPCBegin(txn Transaction) // XXX +ctx, error ?
// Commit should commit modifications to managed data. // Commit should commit modifications to managed data.
// //
// It should save changes to be made persistent if the transaction // It should save changes to be made persistent if the transaction
// commits (if TPCFinish is called later). If TPCAbort is called // commits (if TPCFinish is called later). If TPCAbort is called
...@@ -238,15 +235,15 @@ type DataManager interface { ...@@ -238,15 +235,15 @@ type DataManager interface {
// changes persist when TPCFinish is called. // changes persist when TPCFinish is called.
Commit(ctx context.Context, txn Transaction) error Commit(ctx context.Context, txn Transaction) error
// TPCVote should verify that a data manager can commit the transaction. // TPCVote should verify that a data manager can commit the transaction.
// //
// This is the last chance for a data manager to vote 'no'. A data // This is the last chance for a data manager to vote 'no'. A data
// manager votes 'no' by returning an error. // manager votes 'no' by returning an error.
TPCVote(ctx context.Context, txn Transaction) error TPCVote(ctx context.Context, txn Transaction) error
// TPCFinish should indicate confirmation that the transaction is done. // TPCFinish should indicate confirmation that the transaction is done.
// //
// It should make all changes to data modified by this transaction persist. // It should make all changes to data modified by this transaction persist.
// //
// This should never fail. If this returns an error, the database is // This should never fail. If this returns an error, the database is
// not expected to maintain consistency; it's a serious error. // not expected to maintain consistency; it's a serious error.
...@@ -254,14 +251,13 @@ type DataManager interface { ...@@ -254,14 +251,13 @@ type DataManager interface {
// TPCAbort should Abort a transaction. // TPCAbort should Abort a transaction.
// //
// This is called by a transaction manager to end a two-phase commit on // This is called by a transaction manager to end a two-phase commit on
// the data manager. It should abandon all changes to data modified // the data manager. It should abandon all changes to data modified
// by this transaction. // by this transaction.
// //
// This should never fail. // This should never fail.
TPCAbort(ctx context.Context, txn Transaction) // XXX error? TPCAbort(ctx context.Context, txn Transaction) // XXX error?
// XXX better do without SortKey - with it it is assumed that // XXX better do without SortKey - with it it is assumed that
// datamanagers are processed serially. // datamanagers are processed serially.
// SortKey() string // SortKey() string
...@@ -295,7 +291,7 @@ func With(ctx context.Context, f func(context.Context) error) (ok bool, _ error) ...@@ -295,7 +291,7 @@ func With(ctx context.Context, f func(context.Context) error) (ok bool, _ error)
txn, ctx := New(ctx) txn, ctx := New(ctx)
err := f(ctx) err := f(ctx)
if err != nil { if err != nil {
txn.Abort() // XXX err txn.Abort() // XXX err
return false, err return false, err
} }
......
...@@ -28,13 +28,13 @@ import ( ...@@ -28,13 +28,13 @@ import (
// transaction implements Transaction. // transaction implements Transaction.
type transaction struct { type transaction struct {
mu sync.Mutex mu sync.Mutex
status Status status Status
datav []DataManager datav []DataManager
syncv []Synchronizer syncv []Synchronizer
// metadata // metadata
user string user string
description string description string
extension string // XXX extension string // XXX
} }
...@@ -137,7 +137,7 @@ func (txn *transaction) Abort() { ...@@ -137,7 +137,7 @@ func (txn *transaction) Abort() {
go func() { go func() {
defer wg.Done() defer wg.Done()
datav[i].Abort(txn) // XXX err? datav[i].Abort(txn) // XXX err?
}() }()
} }
wg.Wait() wg.Wait()
...@@ -145,7 +145,7 @@ func (txn *transaction) Abort() { ...@@ -145,7 +145,7 @@ func (txn *transaction) Abort() {
// XXX set txn status // XXX set txn status
txn.mu.Lock() txn.mu.Lock()
// assert .status == Aborting // assert .status == Aborting
txn.status = Aborted // XXX what if errBeforeCompletion? txn.status = Aborted // XXX what if errBeforeCompletion?
txn.mu.Unlock() txn.mu.Unlock()
// sync.AfterCompletion // sync.AfterCompletion
...@@ -192,16 +192,15 @@ func (txn *transaction) RegisterSync(sync Synchronizer) { ...@@ -192,16 +192,15 @@ func (txn *transaction) RegisterSync(sync Synchronizer) {
// must be called with .mu held. // must be called with .mu held.
func (txn *transaction) checkNotYetCompleting(who string) { func (txn *transaction) checkNotYetCompleting(who string) {
switch txn.status { switch txn.status {
case Active: // XXX + Doomed ? case Active: // XXX + Doomed ?
// ok // ok
default: default:
panic("transaction: " + who + ": transaction completion already began") panic("transaction: " + who + ": transaction completion already began")
} }
} }
// ---- meta ---- // ---- meta ----
func (txn *transaction) User() string { return txn.user } func (txn *transaction) User() string { return txn.user }
func (txn *transaction) Description() string { return txn.description } func (txn *transaction) Description() string { return txn.description }
func (txn *transaction) Extension() string { return txn.extension } func (txn *transaction) Extension() string { return txn.extension }
...@@ -44,7 +44,6 @@ func TestBasic(t *testing.T) { ...@@ -44,7 +44,6 @@ func TestBasic(t *testing.T) {
Current(ctx) Current(ctx)
}() }()
// New // New
txn, ctx := New(ctx) txn, ctx := New(ctx)
if txn_ := Current(ctx); txn_ != txn { if txn_ := Current(ctx); txn_ != txn {
...@@ -52,7 +51,7 @@ func TestBasic(t *testing.T) { ...@@ -52,7 +51,7 @@ func TestBasic(t *testing.T) {
} }
// New(!ø) -> panic // New(!ø) -> panic
func () { func() {
defer func() { defer func() {
r := recover() r := recover()
if r == nil { if r == nil {
...@@ -86,12 +85,12 @@ func (d *dmAbortOnly) Abort(txn Transaction) { ...@@ -86,12 +85,12 @@ func (d *dmAbortOnly) Abort(txn Transaction) {
atomic.AddInt32(&d.nabort, +1) atomic.AddInt32(&d.nabort, +1)
} }
func (d *dmAbortOnly) bug() { d.t.Fatal("must not be called on abort") } func (d *dmAbortOnly) bug() { d.t.Fatal("must not be called on abort") }
func (d *dmAbortOnly) TPCBegin(_ Transaction) { d.bug(); panic(0) } func (d *dmAbortOnly) TPCBegin(_ Transaction) { d.bug(); panic(0) }
func (d *dmAbortOnly) Commit(_ context.Context, _ Transaction) error { d.bug(); panic(0) } func (d *dmAbortOnly) Commit(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCVote(_ context.Context, _ Transaction) error { d.bug(); panic(0) } func (d *dmAbortOnly) TPCVote(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCFinish(_ context.Context, _ Transaction) error { d.bug(); panic(0) } func (d *dmAbortOnly) TPCFinish(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCAbort(_ context.Context, _ Transaction) { d.bug(); panic(0) } func (d *dmAbortOnly) TPCAbort(_ context.Context, _ Transaction) { d.bug(); panic(0) }
func TestAbort(t *testing.T) { func TestAbort(t *testing.T) {
txn, ctx := New(context.Background()) txn, ctx := New(context.Background())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment