Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neoppod
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Carlos Ramos Carreño
neoppod
Commits
9f0f2afe
Commit
9f0f2afe
authored
Jan 18, 2018
by
Julien Muchembled
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
protocol: update packet docstrings
/reviewed-on
nexedi/neoppod!9
parent
f62f9bc9
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
215 additions
and
120 deletions
+215
-120
neo/lib/protocol.py
neo/lib/protocol.py
+213
-118
neo/storage/handlers/client.py
neo/storage/handlers/client.py
+2
-2
No files found.
neo/lib/protocol.py
View file @
9f0f2afe
...
@@ -647,7 +647,9 @@ class Error(Packet):
...
@@ -647,7 +647,9 @@ class Error(Packet):
"""
"""
Error is a special type of message, because this can be sent against
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
any other message, even if such a message does not expect a reply
usually. Any -> Any.
usually.
:nodes: * -> *
"""
"""
_fmt
=
PStruct
(
'error'
,
_fmt
=
PStruct
(
'error'
,
PNumber
(
'code'
),
PNumber
(
'code'
),
...
@@ -656,19 +658,25 @@ class Error(Packet):
...
@@ -656,19 +658,25 @@ class Error(Packet):
class
Ping
(
Packet
):
class
Ping
(
Packet
):
"""
"""
Check if a peer is still alive. Any -> Any.
Empty request used as network barrier.
:nodes: * -> *
"""
"""
_answer
=
PFEmpty
_answer
=
PFEmpty
class
CloseClient
(
Packet
):
class
CloseClient
(
Packet
):
"""
"""
Tell peer it can close the connection if it has finished with us. Any -> Any
Tell peer that it can close the connection if it has finished with us.
:nodes: * -> *
"""
"""
class
RequestIdentification
(
Packet
):
class
RequestIdentification
(
Packet
):
"""
"""
Request a node identification. This must be the first packet for any
Request a node identification. This must be the first packet for any
connection. Any -> Any.
connection.
:nodes: * -> *
"""
"""
poll_thread
=
True
poll_thread
=
True
...
@@ -690,7 +698,9 @@ class RequestIdentification(Packet):
...
@@ -690,7 +698,9 @@ class RequestIdentification(Packet):
class
PrimaryMaster
(
Packet
):
class
PrimaryMaster
(
Packet
):
"""
"""
Ask current primary master's uuid. CTL -> A.
Ask node identier of the current primary master.
:nodes: ctl -> A
"""
"""
_answer
=
PStruct
(
'answer_primary'
,
_answer
=
PStruct
(
'answer_primary'
,
PUUID
(
'primary_uuid'
),
PUUID
(
'primary_uuid'
),
...
@@ -698,7 +708,10 @@ class PrimaryMaster(Packet):
...
@@ -698,7 +708,10 @@ class PrimaryMaster(Packet):
class
NotPrimaryMaster
(
Packet
):
class
NotPrimaryMaster
(
Packet
):
"""
"""
Send list of known master nodes. SM -> Any.
Notify peer that I'm not the primary master. Attach any extra information
to help the peer joining the cluster.
:nodes: SM -> *
"""
"""
_fmt
=
PStruct
(
'not_primary_master'
,
_fmt
=
PStruct
(
'not_primary_master'
,
PSignedNull
(
'primary'
),
PSignedNull
(
'primary'
),
...
@@ -709,7 +722,10 @@ class NotPrimaryMaster(Packet):
...
@@ -709,7 +722,10 @@ class NotPrimaryMaster(Packet):
class
Recovery
(
Packet
):
class
Recovery
(
Packet
):
"""
"""
Ask all data needed by master to recover. PM -> S, S -> PM.
Ask storage nodes data needed by master to recover.
Reused by `neoctl print ids`.
:nodes: M -> S; ctl -> A -> M
"""
"""
_answer
=
PStruct
(
'answer_recovery'
,
_answer
=
PStruct
(
'answer_recovery'
,
PPTID
(
'ptid'
),
PPTID
(
'ptid'
),
...
@@ -720,7 +736,9 @@ class Recovery(Packet):
...
@@ -720,7 +736,9 @@ class Recovery(Packet):
class
LastIDs
(
Packet
):
class
LastIDs
(
Packet
):
"""
"""
Ask the last OID/TID so that a master can initialize its TransactionManager.
Ask the last OID/TID so that a master can initialize its TransactionManager.
PM -> S, S -> PM.
Reused by `neoctl print ids`.
:nodes: M -> S; ctl -> A -> M
"""
"""
_answer
=
PStruct
(
'answer_last_ids'
,
_answer
=
PStruct
(
'answer_last_ids'
,
POID
(
'last_oid'
),
POID
(
'last_oid'
),
...
@@ -729,8 +747,10 @@ class LastIDs(Packet):
...
@@ -729,8 +747,10 @@ class LastIDs(Packet):
class
PartitionTable
(
Packet
):
class
PartitionTable
(
Packet
):
"""
"""
Ask the full partition table. PM -> S.
Ask storage node the remaining data needed by master to recover.
Answer rows in a partition table. S -> PM.
This is also how the clients get the full partition table on connection.
:nodes: M -> S; C -> M
"""
"""
_answer
=
PStruct
(
'answer_partition_table'
,
_answer
=
PStruct
(
'answer_partition_table'
,
PPTID
(
'ptid'
),
PPTID
(
'ptid'
),
...
@@ -739,7 +759,9 @@ class PartitionTable(Packet):
...
@@ -739,7 +759,9 @@ class PartitionTable(Packet):
class
NotifyPartitionTable
(
Packet
):
class
NotifyPartitionTable
(
Packet
):
"""
"""
Send rows in a partition table to update other nodes. PM -> S, C.
Send the full partition table to admin/storage nodes on connection.
:nodes: M -> A, S
"""
"""
_fmt
=
PStruct
(
'send_partition_table'
,
_fmt
=
PStruct
(
'send_partition_table'
,
PPTID
(
'ptid'
),
PPTID
(
'ptid'
),
...
@@ -748,8 +770,9 @@ class NotifyPartitionTable(Packet):
...
@@ -748,8 +770,9 @@ class NotifyPartitionTable(Packet):
class
PartitionChanges
(
Packet
):
class
PartitionChanges
(
Packet
):
"""
"""
Notify a subset of a partition table. This is used to notify changes.
Notify about changes in the partition table.
PM -> S, C.
:nodes: M -> *
"""
"""
_fmt
=
PStruct
(
'notify_partition_changes'
,
_fmt
=
PStruct
(
'notify_partition_changes'
,
PPTID
(
'ptid'
),
PPTID
(
'ptid'
),
...
@@ -764,8 +787,10 @@ class PartitionChanges(Packet):
...
@@ -764,8 +787,10 @@ class PartitionChanges(Packet):
class
StartOperation
(
Packet
):
class
StartOperation
(
Packet
):
"""
"""
Tell a storage nodes to start an operation. Until a storage node receives
Tell a storage node to start operation. Before this message, it must only
this message, it must not serve client nodes. PM -> S.
communicate with the primary master.
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'start_operation'
,
_fmt
=
PStruct
(
'start_operation'
,
# XXX: Is this boolean needed ? Maybe this
# XXX: Is this boolean needed ? Maybe this
...
@@ -775,14 +800,17 @@ class StartOperation(Packet):
...
@@ -775,14 +800,17 @@ class StartOperation(Packet):
class
StopOperation
(
Packet
):
class
StopOperation
(
Packet
):
"""
"""
Tell a storage node to stop an operation. Once a storage node receives
Notify that the cluster is not operational anymore. Any operation between
this message, it must not serve client nodes. PM -> S.
nodes must be aborted.
:nodes: M -> S, C
"""
"""
class
UnfinishedTransactions
(
Packet
):
class
UnfinishedTransactions
(
Packet
):
"""
"""
Ask unfinished transactions S -> PM.
Ask unfinished transactions, which will be replicated when they're finished.
Answer unfinished transactions PM -> S.
:nodes: S -> M
"""
"""
_fmt
=
PStruct
(
'ask_unfinished_transactions'
,
_fmt
=
PStruct
(
'ask_unfinished_transactions'
,
PList
(
'row_list'
,
PList
(
'row_list'
,
...
@@ -799,8 +827,10 @@ class UnfinishedTransactions(Packet):
...
@@ -799,8 +827,10 @@ class UnfinishedTransactions(Packet):
class
LockedTransactions
(
Packet
):
class
LockedTransactions
(
Packet
):
"""
"""
Ask locked transactions PM -> S.
Ask locked transactions to replay committed transactions that haven't been
Answer locked transactions S -> PM.
unlocked.
:nodes: M -> S
"""
"""
_answer
=
PStruct
(
'answer_locked_transactions'
,
_answer
=
PStruct
(
'answer_locked_transactions'
,
PDict
(
'tid_dict'
,
PDict
(
'tid_dict'
,
...
@@ -811,7 +841,10 @@ class LockedTransactions(Packet):
...
@@ -811,7 +841,10 @@ class LockedTransactions(Packet):
class
FinalTID
(
Packet
):
class
FinalTID
(
Packet
):
"""
"""
Return final tid if ttid has been committed. * -> S. C -> PM.
Return final tid if ttid has been committed, to recover from certain
failures during tpc_finish.
:nodes: M -> S; C -> M, S
"""
"""
_fmt
=
PStruct
(
'final_tid'
,
_fmt
=
PStruct
(
'final_tid'
,
PTID
(
'ttid'
),
PTID
(
'ttid'
),
...
@@ -823,7 +856,9 @@ class FinalTID(Packet):
...
@@ -823,7 +856,9 @@ class FinalTID(Packet):
class
ValidateTransaction
(
Packet
):
class
ValidateTransaction
(
Packet
):
"""
"""
Commit a transaction. PM -> S.
Do replay a committed transaction that was not unlocked.
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'validate_transaction'
,
_fmt
=
PStruct
(
'validate_transaction'
,
PTID
(
'ttid'
),
PTID
(
'ttid'
),
...
@@ -832,8 +867,9 @@ class ValidateTransaction(Packet):
...
@@ -832,8 +867,9 @@ class ValidateTransaction(Packet):
class
BeginTransaction
(
Packet
):
class
BeginTransaction
(
Packet
):
"""
"""
Ask to begin a new transaction. C -> PM.
Ask to begin a new transaction. This maps to `tpc_begin`.
Answer when a transaction begin, give a TID if necessary. PM -> C.
:nodes: C -> M
"""
"""
_fmt
=
PStruct
(
'ask_begin_transaction'
,
_fmt
=
PStruct
(
'ask_begin_transaction'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -845,8 +881,10 @@ class BeginTransaction(Packet):
...
@@ -845,8 +881,10 @@ class BeginTransaction(Packet):
class
FailedVote
(
Packet
):
class
FailedVote
(
Packet
):
"""
"""
Report storage nodes for which vote failed.
C -> M
Report storage nodes for which vote failed.
True is returned if it's still possible to finish the transaction.
True is returned if it's still possible to finish the transaction.
:nodes: C -> M
"""
"""
_fmt
=
PStruct
(
'failed_vote'
,
_fmt
=
PStruct
(
'failed_vote'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -857,8 +895,10 @@ class FailedVote(Packet):
...
@@ -857,8 +895,10 @@ class FailedVote(Packet):
class
FinishTransaction
(
Packet
):
class
FinishTransaction
(
Packet
):
"""
"""
Finish a transaction. C -> PM.
Finish a transaction. Return the TID of the committed transaction.
Answer when a transaction is finished. PM -> C.
This maps to `tpc_finish`.
:nodes: C -> M
"""
"""
poll_thread
=
True
poll_thread
=
True
...
@@ -877,8 +917,9 @@ class FinishTransaction(Packet):
...
@@ -877,8 +917,9 @@ class FinishTransaction(Packet):
class
NotifyTransactionFinished
(
Packet
):
class
NotifyTransactionFinished
(
Packet
):
"""
"""
Notify that a transaction blocking a replication is now finished
Notify that a transaction blocking a replication is now finished.
M -> S
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'notify_transaction_finished'
,
_fmt
=
PStruct
(
'notify_transaction_finished'
,
PTID
(
'ttid'
),
PTID
(
'ttid'
),
...
@@ -887,8 +928,9 @@ class NotifyTransactionFinished(Packet):
...
@@ -887,8 +928,9 @@ class NotifyTransactionFinished(Packet):
class
LockInformation
(
Packet
):
class
LockInformation
(
Packet
):
"""
"""
Lock information on a transaction. PM -> S.
Commit a transaction. The new data is read-locked.
Notify information on a transaction locked. S -> PM.
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'ask_lock_informations'
,
_fmt
=
PStruct
(
'ask_lock_informations'
,
PTID
(
'ttid'
),
PTID
(
'ttid'
),
...
@@ -901,7 +943,10 @@ class LockInformation(Packet):
...
@@ -901,7 +943,10 @@ class LockInformation(Packet):
class
InvalidateObjects
(
Packet
):
class
InvalidateObjects
(
Packet
):
"""
"""
Invalidate objects. PM -> C.
Notify about a new transaction modifying objects,
invalidating client caches.
:nodes: M -> C
"""
"""
_fmt
=
PStruct
(
'ask_finish_transaction'
,
_fmt
=
PStruct
(
'ask_finish_transaction'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -910,7 +955,10 @@ class InvalidateObjects(Packet):
...
@@ -910,7 +955,10 @@ class InvalidateObjects(Packet):
class
UnlockInformation
(
Packet
):
class
UnlockInformation
(
Packet
):
"""
"""
Unlock information on a transaction. PM -> S.
Notify about a successfully committed transaction. The new data can be
unlocked.
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'notify_unlock_information'
,
_fmt
=
PStruct
(
'notify_unlock_information'
,
PTID
(
'ttid'
),
PTID
(
'ttid'
),
...
@@ -918,8 +966,9 @@ class UnlockInformation(Packet):
...
@@ -918,8 +966,9 @@ class UnlockInformation(Packet):
class
GenerateOIDs
(
Packet
):
class
GenerateOIDs
(
Packet
):
"""
"""
Ask new object IDs. C -> PM.
Ask new OIDs to create objects.
Answer new object IDs. PM -> C.
:nodes: C -> M
"""
"""
_fmt
=
PStruct
(
'ask_new_oids'
,
_fmt
=
PStruct
(
'ask_new_oids'
,
PNumber
(
'num_oids'
),
PNumber
(
'num_oids'
),
...
@@ -931,8 +980,10 @@ class GenerateOIDs(Packet):
...
@@ -931,8 +980,10 @@ class GenerateOIDs(Packet):
class
Deadlock
(
Packet
):
class
Deadlock
(
Packet
):
"""
"""
Ask master to generate a new TTID that will be used by the client
Ask master to generate a new TTID that will be used by the client to solve
to rebase a transaction. S -> PM -> C
a deadlock by rebasing the transaction on top of concurrent changes.
:nodes: S -> M -> C
"""
"""
_fmt
=
PStruct
(
'notify_deadlock'
,
_fmt
=
PStruct
(
'notify_deadlock'
,
PTID
(
'ttid'
),
PTID
(
'ttid'
),
...
@@ -941,7 +992,9 @@ class Deadlock(Packet):
...
@@ -941,7 +992,9 @@ class Deadlock(Packet):
class
RebaseTransaction
(
Packet
):
class
RebaseTransaction
(
Packet
):
"""
"""
Rebase transaction. C -> S.
Rebase a transaction to solve a deadlock.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_rebase_transaction'
,
_fmt
=
PStruct
(
'ask_rebase_transaction'
,
PTID
(
'ttid'
),
PTID
(
'ttid'
),
...
@@ -954,7 +1007,9 @@ class RebaseTransaction(Packet):
...
@@ -954,7 +1007,9 @@ class RebaseTransaction(Packet):
class
RebaseObject
(
Packet
):
class
RebaseObject
(
Packet
):
"""
"""
Rebase object. C -> S.
Rebase an object change to solve a deadlock.
:nodes: C -> S
XXX: It is a request packet to simplify the implementation. For more
XXX: It is a request packet to simplify the implementation. For more
efficiency, this should be turned into a notification, and the
efficiency, this should be turned into a notification, and the
...
@@ -980,9 +1035,11 @@ class RebaseObject(Packet):
...
@@ -980,9 +1035,11 @@ class RebaseObject(Packet):
class
StoreObject
(
Packet
):
class
StoreObject
(
Packet
):
"""
"""
Ask to
store an object. Send an OID, an original serial, a current
Ask to
create/modify an object. This maps to `store`.
transaction ID, and data. C -> S.
As for IStorage, 'serial' is ZERO_TID for new objects.
As for IStorage, 'serial' is ZERO_TID for new objects.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_store_object'
,
_fmt
=
PStruct
(
'ask_store_object'
,
POID
(
'oid'
),
POID
(
'oid'
),
...
@@ -1000,7 +1057,9 @@ class StoreObject(Packet):
...
@@ -1000,7 +1057,9 @@ class StoreObject(Packet):
class
AbortTransaction
(
Packet
):
class
AbortTransaction
(
Packet
):
"""
"""
Abort a transaction. C -> S and C -> PM -> S.
Abort a transaction. This maps to `tpc_abort`.
:nodes: C -> S; C -> M -> S
"""
"""
_fmt
=
PStruct
(
'abort_transaction'
,
_fmt
=
PStruct
(
'abort_transaction'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1009,8 +1068,9 @@ class AbortTransaction(Packet):
...
@@ -1009,8 +1068,9 @@ class AbortTransaction(Packet):
class
StoreTransaction
(
Packet
):
class
StoreTransaction
(
Packet
):
"""
"""
Ask to store a transaction. C -> S.
Ask to store a transaction. Implies vote.
Answer if transaction has been stored. S -> C.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_store_transaction'
,
_fmt
=
PStruct
(
'ask_store_transaction'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1023,8 +1083,9 @@ class StoreTransaction(Packet):
...
@@ -1023,8 +1083,9 @@ class StoreTransaction(Packet):
class
VoteTransaction
(
Packet
):
class
VoteTransaction
(
Packet
):
"""
"""
Ask to store a transaction. C -> S.
Ask to vote a transaction.
Answer if transaction has been stored. S -> C.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_vote_transaction'
,
_fmt
=
PStruct
(
'ask_vote_transaction'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1033,15 +1094,15 @@ class VoteTransaction(Packet):
...
@@ -1033,15 +1094,15 @@ class VoteTransaction(Packet):
class
GetObject
(
Packet
):
class
GetObject
(
Packet
):
"""
"""
Ask a stored object by its OID
and a serial or a TID if given. If a serial
Ask a stored object by its OID
, optionally at/before a specific tid.
is specified, the specified revision of an object will be returned. If
This maps to `load/loadBefore/loadSerial`.
a TID is specified, an object right before the TID will be returned. C -> S.
Answer the requested object. S -> C.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_object'
,
_fmt
=
PStruct
(
'ask_object'
,
POID
(
'oid'
),
POID
(
'oid'
),
PTID
(
'
serial
'
),
PTID
(
'
at
'
),
PTID
(
'
tid
'
),
PTID
(
'
before
'
),
)
)
_answer
=
PStruct
(
'answer_object'
,
_answer
=
PStruct
(
'answer_object'
,
...
@@ -1057,8 +1118,9 @@ class GetObject(Packet):
...
@@ -1057,8 +1118,9 @@ class GetObject(Packet):
class
TIDList
(
Packet
):
class
TIDList
(
Packet
):
"""
"""
Ask for TIDs between a range of offsets. The order of TIDs is descending,
Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). C -> S.
and the range is [first, last). This maps to `undoLog`.
Answer the requested TIDs. S -> C.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_tids'
,
_fmt
=
PStruct
(
'ask_tids'
,
PIndex
(
'first'
),
PIndex
(
'first'
),
...
@@ -1073,8 +1135,9 @@ class TIDList(Packet):
...
@@ -1073,8 +1135,9 @@ class TIDList(Packet):
class
TIDListFrom
(
Packet
):
class
TIDListFrom
(
Packet
):
"""
"""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
C -> S.
Used by `iterator`.
Answer the requested TIDs. S -> C
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'tid_list_from'
,
_fmt
=
PStruct
(
'tid_list_from'
,
PTID
(
'min_tid'
),
PTID
(
'min_tid'
),
...
@@ -1089,8 +1152,9 @@ class TIDListFrom(Packet):
...
@@ -1089,8 +1152,9 @@ class TIDListFrom(Packet):
class
TransactionInformation
(
Packet
):
class
TransactionInformation
(
Packet
):
"""
"""
Ask information about a transaction. Any -> S.
Ask for transaction metadata.
Answer information (user, description) about a transaction. S -> Any.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_transaction_information'
,
_fmt
=
PStruct
(
'ask_transaction_information'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1108,8 +1172,9 @@ class TransactionInformation(Packet):
...
@@ -1108,8 +1172,9 @@ class TransactionInformation(Packet):
class
ObjectHistory
(
Packet
):
class
ObjectHistory
(
Packet
):
"""
"""
Ask history information for a given object. The order of serials is
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C -> S.
descending, and the range is [first, last]. This maps to `history`.
Answer history information (serial, size) for an object. S -> C.
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_object_history'
,
_fmt
=
PStruct
(
'ask_object_history'
,
POID
(
'oid'
),
POID
(
'oid'
),
...
@@ -1124,9 +1189,9 @@ class ObjectHistory(Packet):
...
@@ -1124,9 +1189,9 @@ class ObjectHistory(Packet):
class
PartitionList
(
Packet
):
class
PartitionList
(
Packet
):
"""
"""
A
ll the following messages are for neoctl to admin node
A
sk information about partitions.
Ask information about partition
Answer information about partition
:nodes: ctl -> A
"""
"""
_fmt
=
PStruct
(
'ask_partition_list'
,
_fmt
=
PStruct
(
'ask_partition_list'
,
PNumber
(
'min_offset'
),
PNumber
(
'min_offset'
),
...
@@ -1141,8 +1206,9 @@ class PartitionList(Packet):
...
@@ -1141,8 +1206,9 @@ class PartitionList(Packet):
class
NodeList
(
Packet
):
class
NodeList
(
Packet
):
"""
"""
Ask information about nodes
Ask information about nodes.
Answer information about nodes
:nodes: ctl -> A
"""
"""
_fmt
=
PStruct
(
'ask_node_list'
,
_fmt
=
PStruct
(
'ask_node_list'
,
PFNodeType
,
PFNodeType
,
...
@@ -1154,7 +1220,9 @@ class NodeList(Packet):
...
@@ -1154,7 +1220,9 @@ class NodeList(Packet):
class
SetNodeState
(
Packet
):
class
SetNodeState
(
Packet
):
"""
"""
Set the node state
Change the state of a node.
:nodes: ctl -> A -> M
"""
"""
_fmt
=
PStruct
(
'set_node_state'
,
_fmt
=
PStruct
(
'set_node_state'
,
PUUID
(
'uuid'
),
PUUID
(
'uuid'
),
...
@@ -1165,7 +1233,10 @@ class SetNodeState(Packet):
...
@@ -1165,7 +1233,10 @@ class SetNodeState(Packet):
class
AddPendingNodes
(
Packet
):
class
AddPendingNodes
(
Packet
):
"""
"""
Ask the primary to include some pending node in the partition table
Mark given pending nodes as running, for future inclusion when tweaking
the partition table.
:nodes: ctl -> A -> M
"""
"""
_fmt
=
PStruct
(
'add_pending_nodes'
,
_fmt
=
PStruct
(
'add_pending_nodes'
,
PFUUIDList
,
PFUUIDList
,
...
@@ -1175,7 +1246,10 @@ class AddPendingNodes(Packet):
...
@@ -1175,7 +1246,10 @@ class AddPendingNodes(Packet):
class
TweakPartitionTable
(
Packet
):
class
TweakPartitionTable
(
Packet
):
"""
"""
Ask the primary to optimize the partition table. A -> PM.
Ask the master to balance the partition table, optionally excluding
specific nodes in anticipation of removing them.
:nodes: ctl -> A -> M
"""
"""
_fmt
=
PStruct
(
'tweak_partition_table'
,
_fmt
=
PStruct
(
'tweak_partition_table'
,
PFUUIDList
,
PFUUIDList
,
...
@@ -1185,7 +1259,9 @@ class TweakPartitionTable(Packet):
...
@@ -1185,7 +1259,9 @@ class TweakPartitionTable(Packet):
class
NotifyNodeInformation
(
Packet
):
class
NotifyNodeInformation
(
Packet
):
"""
"""
Notify information about one or more nodes. PM -> Any.
Notify information about one or more nodes.
:nodes: M -> *
"""
"""
_fmt
=
PStruct
(
'notify_node_informations'
,
_fmt
=
PStruct
(
'notify_node_informations'
,
PFloat
(
'id_timestamp'
),
PFloat
(
'id_timestamp'
),
...
@@ -1194,7 +1270,9 @@ class NotifyNodeInformation(Packet):
...
@@ -1194,7 +1270,9 @@ class NotifyNodeInformation(Packet):
class
SetClusterState
(
Packet
):
class
SetClusterState
(
Packet
):
"""
"""
Set the cluster state
Set the cluster state.
:nodes: ctl -> A -> M
"""
"""
_fmt
=
PStruct
(
'set_cluster_state'
,
_fmt
=
PStruct
(
'set_cluster_state'
,
PEnum
(
'state'
,
ClusterStates
),
PEnum
(
'state'
,
ClusterStates
),
...
@@ -1204,7 +1282,9 @@ class SetClusterState(Packet):
...
@@ -1204,7 +1282,9 @@ class SetClusterState(Packet):
class
Repair
(
Packet
):
class
Repair
(
Packet
):
"""
"""
Ask storage nodes to repair their databases. ctl -> A -> M
Ask storage nodes to repair their databases.
:nodes: ctl -> A -> M
"""
"""
_flags
=
map
(
PBoolean
,
(
'dry_run'
,
_flags
=
map
(
PBoolean
,
(
'dry_run'
,
# 'prune_orphan' (commented because it's the only option for the moment)
# 'prune_orphan' (commented because it's the only option for the moment)
...
@@ -1217,13 +1297,18 @@ class Repair(Packet):
...
@@ -1217,13 +1297,18 @@ class Repair(Packet):
class
RepairOne
(
Packet
):
class
RepairOne
(
Packet
):
"""
"""
See Repair. M -> S
Repair is translated to this message, asking a specific storage node to
repair its database.
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'repair'
,
*
Repair
.
_flags
)
_fmt
=
PStruct
(
'repair'
,
*
Repair
.
_flags
)
class
ClusterInformation
(
Packet
):
class
ClusterInformation
(
Packet
):
"""
"""
Notify information about the cluster
Notify about a cluster state change.
:nodes: M -> *
"""
"""
_fmt
=
PStruct
(
'notify_cluster_information'
,
_fmt
=
PStruct
(
'notify_cluster_information'
,
PEnum
(
'state'
,
ClusterStates
),
PEnum
(
'state'
,
ClusterStates
),
...
@@ -1231,8 +1316,9 @@ class ClusterInformation(Packet):
...
@@ -1231,8 +1316,9 @@ class ClusterInformation(Packet):
class
ClusterState
(
Packet
):
class
ClusterState
(
Packet
):
"""
"""
Ask state of the cluster
Ask the state of the cluster
Answer state of the cluster
:nodes: ctl -> A; A -> M
"""
"""
_answer
=
PStruct
(
'answer_cluster_state'
,
_answer
=
PStruct
(
'answer_cluster_state'
,
...
@@ -1243,8 +1329,7 @@ class ObjectUndoSerial(Packet):
...
@@ -1243,8 +1329,7 @@ class ObjectUndoSerial(Packet):
"""
"""
Ask storage the serial where object data is when undoing given transaction,
Ask storage the serial where object data is when undoing given transaction,
for a list of OIDs.
for a list of OIDs.
C -> S
Answer serials at which object data is when undoing a given transaction.
object_tid_dict has the following format:
object_tid_dict has the following format:
key: oid
key: oid
value: 3-tuple
value: 3-tuple
...
@@ -1254,7 +1339,8 @@ class ObjectUndoSerial(Packet):
...
@@ -1254,7 +1339,8 @@ class ObjectUndoSerial(Packet):
Where undone data is (tid at which data is before given undo).
Where undone data is (tid at which data is before given undo).
is_current (bool)
is_current (bool)
If current_serial's data is current on storage.
If current_serial's data is current on storage.
S -> C
:nodes: C -> S
"""
"""
_fmt
=
PStruct
(
'ask_undo_transaction'
,
_fmt
=
PStruct
(
'ask_undo_transaction'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1276,12 +1362,11 @@ class ObjectUndoSerial(Packet):
...
@@ -1276,12 +1362,11 @@ class ObjectUndoSerial(Packet):
class
CheckCurrentSerial
(
Packet
):
class
CheckCurrentSerial
(
Packet
):
"""
"""
Verifies if given serial is current for object oid in the database, and
Check if given serial is current for the given oid, and lock it so that
take a write lock on it (so that this state is not altered until
this state is not altered until transaction ends.
transaction ends).
This maps to `checkCurrentSerialInTransaction`.
Answer to AskCheckCurrentSerial.
Same structure as AnswerStoreObject, to handle the same way, except there
:nodes: C -> S
is nothing to invalidate in any client's cache.
"""
"""
_fmt
=
PStruct
(
'ask_check_current_serial'
,
_fmt
=
PStruct
(
'ask_check_current_serial'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1294,11 +1379,8 @@ class CheckCurrentSerial(Packet):
...
@@ -1294,11 +1379,8 @@ class CheckCurrentSerial(Packet):
class
Pack
(
Packet
):
class
Pack
(
Packet
):
"""
"""
Request a pack at given TID.
Request a pack at given TID.
C -> M
M -> S
:nodes: C -> M -> S
Inform that packing it over.
S -> M
M -> C
"""
"""
_fmt
=
PStruct
(
'ask_pack'
,
_fmt
=
PStruct
(
'ask_pack'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1310,8 +1392,10 @@ class Pack(Packet):
...
@@ -1310,8 +1392,10 @@ class Pack(Packet):
class
CheckReplicas
(
Packet
):
class
CheckReplicas
(
Packet
):
"""
"""
ctl -> A
Ask the cluster to search for mismatches between replicas, metadata only,
A -> M
and optionally within a specific range. Reference nodes can be specified.
:nodes: ctl -> A -> M
"""
"""
_fmt
=
PStruct
(
'check_replicas'
,
_fmt
=
PStruct
(
'check_replicas'
,
PDict
(
'partition_dict'
,
PDict
(
'partition_dict'
,
...
@@ -1325,7 +1409,11 @@ class CheckReplicas(Packet):
...
@@ -1325,7 +1409,11 @@ class CheckReplicas(Packet):
class
CheckPartition
(
Packet
):
class
CheckPartition
(
Packet
):
"""
"""
M -> S
Ask a storage node to compare a partition with all other nodes.
Like for CheckReplicas, only metadata are checked, optionally within a
specific range. A reference node can be specified.
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'check_partition'
,
_fmt
=
PStruct
(
'check_partition'
,
PNumber
(
'partition'
),
PNumber
(
'partition'
),
...
@@ -1342,11 +1430,8 @@ class CheckTIDRange(Packet):
...
@@ -1342,11 +1430,8 @@ class CheckTIDRange(Packet):
Ask some stats about a range of transactions.
Ask some stats about a range of transactions.
Used to know if there are differences between a replicating node and
Used to know if there are differences between a replicating node and
reference node.
reference node.
S -> S
Stats about a range of transactions.
:nodes: S -> S
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
"""
_fmt
=
PStruct
(
'ask_check_tid_range'
,
_fmt
=
PStruct
(
'ask_check_tid_range'
,
PNumber
(
'partition'
),
PNumber
(
'partition'
),
...
@@ -1366,11 +1451,8 @@ class CheckSerialRange(Packet):
...
@@ -1366,11 +1451,8 @@ class CheckSerialRange(Packet):
Ask some stats about a range of object history.
Ask some stats about a range of object history.
Used to know if there are differences between a replicating node and
Used to know if there are differences between a replicating node and
reference node.
reference node.
S -> S
Stats about a range of object history.
:nodes: S -> S
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
"""
_fmt
=
PStruct
(
'ask_check_serial_range'
,
_fmt
=
PStruct
(
'ask_check_serial_range'
,
PNumber
(
'partition'
),
PNumber
(
'partition'
),
...
@@ -1390,7 +1472,9 @@ class CheckSerialRange(Packet):
...
@@ -1390,7 +1472,9 @@ class CheckSerialRange(Packet):
class
PartitionCorrupted
(
Packet
):
class
PartitionCorrupted
(
Packet
):
"""
"""
S -> M
Notify that mismatches were found while check replicas for a partition.
:nodes: S -> M
"""
"""
_fmt
=
PStruct
(
'partition_corrupted'
,
_fmt
=
PStruct
(
'partition_corrupted'
,
PNumber
(
'partition'
),
PNumber
(
'partition'
),
...
@@ -1402,9 +1486,8 @@ class PartitionCorrupted(Packet):
...
@@ -1402,9 +1486,8 @@ class PartitionCorrupted(Packet):
class
LastTransaction
(
Packet
):
class
LastTransaction
(
Packet
):
"""
"""
Ask last committed TID.
Ask last committed TID.
C -> M
Answer last committed TID.
:nodes: C -> M; ctl -> A -> M
M -> C
"""
"""
poll_thread
=
True
poll_thread
=
True
...
@@ -1414,16 +1497,17 @@ class LastTransaction(Packet):
...
@@ -1414,16 +1497,17 @@ class LastTransaction(Packet):
class
NotifyReady
(
Packet
):
class
NotifyReady
(
Packet
):
"""
"""
Notify that node is ready to serve requests.
Notify that we're ready to serve requests.
S -> M
"""
pass
# replication
:nodes: S -> M
"""
class
FetchTransactions
(
Packet
):
class
FetchTransactions
(
Packet
):
"""
"""
S -> S
Ask a storage node to send all transaction data we don't have,
and reply with the list of transactions we should not have.
:nodes: S -> S
"""
"""
_fmt
=
PStruct
(
'ask_transaction_list'
,
_fmt
=
PStruct
(
'ask_transaction_list'
,
PNumber
(
'partition'
),
PNumber
(
'partition'
),
...
@@ -1440,7 +1524,9 @@ class FetchTransactions(Packet):
...
@@ -1440,7 +1524,9 @@ class FetchTransactions(Packet):
class
AddTransaction
(
Packet
):
class
AddTransaction
(
Packet
):
"""
"""
S -> S
Send metadata of a transaction to a node that do not have them.
:nodes: S -> S
"""
"""
nodelay
=
False
nodelay
=
False
...
@@ -1456,7 +1542,10 @@ class AddTransaction(Packet):
...
@@ -1456,7 +1542,10 @@ class AddTransaction(Packet):
class
FetchObjects
(
Packet
):
class
FetchObjects
(
Packet
):
"""
"""
S -> S
Ask a storage node to send object records we don't have,
and reply with the list of records we should not have.
:nodes: S -> S
"""
"""
_fmt
=
PStruct
(
'ask_object_list'
,
_fmt
=
PStruct
(
'ask_object_list'
,
PNumber
(
'partition'
),
PNumber
(
'partition'
),
...
@@ -1481,7 +1570,9 @@ class FetchObjects(Packet):
...
@@ -1481,7 +1570,9 @@ class FetchObjects(Packet):
class
AddObject
(
Packet
):
class
AddObject
(
Packet
):
"""
"""
S -> S
Send an object record to a node that do not have it.
:nodes: S -> S
"""
"""
nodelay
=
False
nodelay
=
False
...
@@ -1498,11 +1589,12 @@ class Replicate(Packet):
...
@@ -1498,11 +1589,12 @@ class Replicate(Packet):
"""
"""
Notify a storage node to replicate partitions up to given 'tid'
Notify a storage node to replicate partitions up to given 'tid'
and from given sources.
and from given sources.
M -> S
- upstream_name: replicate from an upstream cluster
- upstream_name: replicate from an upstream cluster
- address: address of the source storage node, or None if there's no new
- address: address of the source storage node, or None if there's no new
data up to 'tid' for the given partition
data up to 'tid' for the given partition
:nodes: M -> S
"""
"""
_fmt
=
PStruct
(
'replicate'
,
_fmt
=
PStruct
(
'replicate'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
@@ -1517,7 +1609,8 @@ class ReplicationDone(Packet):
...
@@ -1517,7 +1609,8 @@ class ReplicationDone(Packet):
"""
"""
Notify the master node that a partition has been successfully replicated
Notify the master node that a partition has been successfully replicated
from a storage to another.
from a storage to another.
S -> M
:nodes: S -> M
"""
"""
_fmt
=
PStruct
(
'notify_replication_done'
,
_fmt
=
PStruct
(
'notify_replication_done'
,
PNumber
(
'offset'
),
PNumber
(
'offset'
),
...
@@ -1527,6 +1620,8 @@ class ReplicationDone(Packet):
...
@@ -1527,6 +1620,8 @@ class ReplicationDone(Packet):
class
Truncate
(
Packet
):
class
Truncate
(
Packet
):
"""
"""
Request DB to be truncated. Also used to leave backup mode.
Request DB to be truncated. Also used to leave backup mode.
:nodes: ctl -> A -> M; M -> S
"""
"""
_fmt
=
PStruct
(
'truncate'
,
_fmt
=
PStruct
(
'truncate'
,
PTID
(
'tid'
),
PTID
(
'tid'
),
...
...
neo/storage/handlers/client.py
View file @
9f0f2afe
...
@@ -42,11 +42,11 @@ class ClientOperationHandler(BaseHandler):
...
@@ -42,11 +42,11 @@ class ClientOperationHandler(BaseHandler):
# for read rpc
# for read rpc
return
self
.
app
.
tm
.
read_queue
return
self
.
app
.
tm
.
read_queue
def
askObject
(
self
,
conn
,
oid
,
serial
,
tid
):
def
askObject
(
self
,
conn
,
oid
,
at
,
before
):
app
=
self
.
app
app
=
self
.
app
if
app
.
tm
.
loadLocked
(
oid
):
if
app
.
tm
.
loadLocked
(
oid
):
raise
DelayEvent
raise
DelayEvent
o
=
app
.
dm
.
getObject
(
oid
,
serial
,
tid
)
o
=
app
.
dm
.
getObject
(
oid
,
at
,
before
)
try
:
try
:
serial
,
next_serial
,
compression
,
checksum
,
data
,
data_serial
=
o
serial
,
next_serial
,
compression
,
checksum
,
data
,
data_serial
=
o
except
TypeError
:
except
TypeError
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment