backup_app.py
13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
##############################################################################
#
# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
# Julien Muchembled <jm@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
import random, weakref
from bisect import bisect
import neo.lib
from neo.lib.bootstrap import BootstrapManager
from neo.lib.connector import getConnectorHandler
from neo.lib.exception import PrimaryFailure
from neo.lib.node import NodeManager
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.protocol import INVALID_TID, ZERO_TID
from neo.lib.util import add64, u64, dump
from .app import StateChangedException
from .pt import PartitionTable
from .handlers.backup import BackupHandler
"""
Backup algorithm
This implementation relies on normal storage replication.
Storage nodes that are specialised for backup are not in the same NEO cluster,
but are managed by another master in a different cluster.
When the cluster is in BACKINGUP state, its master acts like a client to the
master of the main cluster. It gets notified of new data thanks to invalidation,
and notifies in turn its storage nodes what/when to replicate.
Storages stay in UP_TO_DATE state, even if partitions are synchronized up to
different tids. Storage nodes remember they are in such state and when
switching into RUNNING state, the cluster cuts the DB at the last TID for which
we have all data.
Out of backup storage nodes assigned to a partition, one is chosen as primary
for that partition. It means only this node will fetch data from the upstream
cluster, to minimize bandwidth between clusters. Other replicas will
synchronize from the primary node.
There is no UUID conflict between the 2 clusters:
- Storage nodes connect anonymously to upstream.
- Master node receives a new from upstream master and uses it only when
communicating with it.
"""
class BackupApplication(object):
pt = None
def __init__(self, app, name, master_addresses, connector_name):
self.app = weakref.proxy(app)
self.name = name
self.nm = NodeManager()
self.connector_handler = getConnectorHandler(connector_name)
for master_address in master_addresses:
self.nm.createMaster(address=master_address)
em = property(lambda self: self.app.em)
def close(self):
self.nm.close()
del self.__dict__
def log(self):
self.nm.log()
if self.pt is not None:
self.pt.log()
def provideService(self):
neo.lib.logging.info('provide backup')
poll = self.em.poll
app = self.app
pt = app.pt
while True:
app.changeClusterState(ClusterStates.STARTING_BACKUP)
bootstrap = BootstrapManager(self, self.name, NodeTypes.CLIENT)
# {offset -> node}
self.primary_partition_dict = {}
# [[tid]]
self.tid_list = tuple([] for _ in xrange(pt.getPartitions()))
try:
node, conn, uuid, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection(self.connector_handler)
try:
app.changeClusterState(ClusterStates.BACKINGUP)
del bootstrap, node
if num_partitions != pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
self.pt = PartitionTable(num_partitions, num_replicas)
conn.setHandler(BackupHandler(self))
conn.ask(Packets.AskNodeInformation())
conn.ask(Packets.AskPartitionTable())
conn.ask(Packets.AskLastTransaction())
# debug variable to log how big 'tid_list' can be.
self.debug_tid_count = 0
while True:
poll(1)
except PrimaryFailure, msg:
neo.lib.logging.error('upstream master is down: %s', msg)
finally:
app.backup_tid = pt.getBackupTid()
try:
conn.close()
except PrimaryFailure:
pass
try:
del self.pt
except AttributeError:
pass
except StateChangedException, e:
app.changeClusterState(*e.args)
last_tid = app.getLastTransaction()
if last_tid < app.backup_tid:
neo.lib.logging.warning(
"Truncating at %s (last_tid was %s)",
dump(app.backup_tid), dump(last_tid))
p = Packets.AskTruncate(app.backup_tid)
connection_list = []
for node in app.nm.getStorageList(only_identified=True):
conn = node.getConnection()
conn.ask(p)
connection_list.append(conn)
for conn in connection_list:
while conn.isPending():
poll(1)
app.setLastTransaction(app.backup_tid)
del app.backup_tid
break
finally:
del self.primary_partition_dict, self.tid_list
def nodeLost(self, node):
getCellList = self.app.pt.getCellList
trigger_set = set()
for offset, primary_node in self.primary_partition_dict.items():
if primary_node is not node:
continue
cell_list = getCellList(offset, readable=True)
cell = max(cell_list, key=lambda cell: cell.backup_tid)
tid = cell.backup_tid
self.primary_partition_dict[offset] = primary_node = cell.getNode()
p = Packets.Replicate(tid, '', {offset: primary_node.getAddress()})
for cell in cell_list:
cell.replicating = tid
if cell.backup_tid < tid:
neo.lib.logging.debug(
"ask %s to replicate partition %u up to %u from %r",
dump(cell.getUUID()), offset, u64(tid),
dump(primary_node.getUUID()))
cell.getNode().getConnection().notify(p)
trigger_set.add(primary_node)
for node in trigger_set:
self.triggerBackup(node)
def invalidatePartitions(self, tid, partition_set):
app = self.app
prev_tid = app.getLastTransaction()
app.setLastTransaction(tid)
pt = app.pt
getByUUID = app.nm.getByUUID
trigger_set = set()
for offset in xrange(pt.getPartitions()):
try:
last_max_tid = self.tid_list[offset][-1]
except IndexError:
last_max_tid = INVALID_TID
if offset in partition_set:
self.tid_list[offset].append(tid)
node_list = []
for cell in pt.getCellList(offset, readable=True):
node = cell.getNode()
assert node.isConnected()
node_list.append(node)
if last_max_tid <= cell.backup_tid:
# This is the last time we can increase
# 'backup_tid' without replication.
neo.lib.logging.debug(
"partition %u: updating backup_tid of %r to %u",
offset, cell, u64(prev_tid))
cell.backup_tid = prev_tid
assert node_list
trigger_set.update(node_list)
# Make sure we have a primary storage for this partition.
if offset not in self.primary_partition_dict:
self.primary_partition_dict[offset] = \
random.choice(node_list)
else:
# Partition not touched, so increase 'backup_tid' of all
# "up-to-date" replicas, without having to replicate.
for cell in pt.getCellList(offset, readable=True):
if last_max_tid <= cell.backup_tid:
cell.backup_tid = tid
neo.lib.logging.debug(
"partition %u: updating backup_tid of %r to %u",
offset, cell, u64(tid))
for node in trigger_set:
self.triggerBackup(node)
count = sum(map(len, self.tid_list))
if self.debug_tid_count < count:
neo.lib.logging.debug("Maximum number of tracked tids: %u", count)
self.debug_tid_count = count
def triggerBackup(self, node):
tid_list = self.tid_list
tid = self.app.getLastTransaction()
replicate_list = []
for offset, cell in self.app.pt.iterNodeCell(node):
max_tid = tid_list[offset]
if max_tid and self.primary_partition_dict[offset] is node and \
max(cell.backup_tid, cell.replicating) < max_tid[-1]:
cell.replicating = tid
replicate_list.append(offset)
if not replicate_list:
return
getByUUID = self.nm.getByUUID
getCellList = self.pt.getCellList
source_dict = {}
address_set = set()
for offset in replicate_list:
cell_list = getCellList(offset, readable=True)
random.shuffle(cell_list)
assert cell_list, offset
for cell in cell_list:
addr = cell.getAddress()
if addr in address_set:
break
else:
address_set.add(addr)
source_dict[offset] = addr
neo.lib.logging.debug(
"ask %s to replicate partition %u up to %u from %r",
dump(node.getUUID()), offset, u64(tid), addr)
node.getConnection().notify(Packets.Replicate(
tid, self.name, source_dict))
def notifyReplicationDone(self, node, offset, tid):
app = self.app
cell = app.pt.getCell(offset, node.getUUID())
tid_list = self.tid_list[offset]
if tid_list: # may be empty if the cell is out-of-date
# or if we're not fully initialized
if tid < tid_list[0]:
cell.replicating = tid
else:
try:
tid = add64(tid_list[bisect(tid_list, tid)], -1)
except IndexError:
tid = app.getLastTransaction()
neo.lib.logging.debug("partition %u: updating backup_tid of %r to %u",
offset, cell, u64(tid))
cell.backup_tid = tid
# Forget tids we won't need anymore.
cell_list = app.pt.getCellList(offset, readable=True)
del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))]
primary_node = self.primary_partition_dict.get(offset)
primary = primary_node is node
result = None if primary else app.pt.setUpToDate(node, offset)
if app.getClusterState() == ClusterStates.BACKINGUP:
assert not cell.isOutOfDate()
if result: # was out-of-date
max_tid, = [x.backup_tid for x in cell_list
if x.getNode() is primary_node]
if tid < max_tid:
cell.replicating = max_tid
neo.lib.logging.debug(
"ask %s to replicate partition %u up to %u from %r",
dump(node.getUUID()), offset, u64(max_tid),
dump(primary_node.getUUID()))
node.getConnection().notify(Packets.Replicate(max_tid,
'', {offset: primary_node.getAddress()}))
else:
self.triggerBackup(node)
if primary:
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
neo.lib.logging.debug(
"ask %s to replicate partition %u up to %u from"
" %r", dump(cell.getUUID()), offset, u64(tid),
dump(node.getUUID()))
cell.getNode().getConnection().notify(p)
return result