bcast.c 26.2 KB
Newer Older
Per Liden's avatar
Per Liden committed
1 2
/*
 * net/tipc/bcast.c: TIPC broadcast code
3
 *
4
 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
Per Liden's avatar
Per Liden committed
5
 * Copyright (c) 2004, Intel Corporation.
6
 * Copyright (c) 2005, 2010-2011, Wind River Systems
Per Liden's avatar
Per Liden committed
7 8
 * All rights reserved.
 *
Per Liden's avatar
Per Liden committed
9
 * Redistribution and use in source and binary forms, with or without
Per Liden's avatar
Per Liden committed
10 11
 * modification, are permitted provided that the following conditions are met:
 *
Per Liden's avatar
Per Liden committed
12 13 14 15 16 17 18 19
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
Per Liden's avatar
Per Liden committed
20
 *
Per Liden's avatar
Per Liden committed
21 22 23 24 25 26 27 28 29 30 31 32 33 34
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Per Liden's avatar
Per Liden committed
35 36 37
 * POSSIBILITY OF SUCH DAMAGE.
 */

38 39
#include "socket.h"
#include "msg.h"
Per Liden's avatar
Per Liden committed
40
#include "bcast.h"
41
#include "name_distr.h"
42
#include "core.h"
Per Liden's avatar
Per Liden committed
43

44 45
#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */
#define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */
Per Liden's avatar
Per Liden committed
46

47
const char tipc_bclink_name[] = "broadcast-link";
Per Liden's avatar
Per Liden committed
48

49 50 51
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff);
52 53
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
Per Liden's avatar
Per Liden committed
54

55
static void tipc_bclink_lock(struct net *net)
56
{
57 58 59
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	spin_lock_bh(&tn->bclink->lock);
60 61
}

62
static void tipc_bclink_unlock(struct net *net)
63
{
64
	struct tipc_net *tn = net_generic(net, tipc_net_id);
65

66
	spin_unlock_bh(&tn->bclink->lock);
67 68
}

69 70 71 72 73 74 75
void tipc_bclink_input(struct net *net)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
}

76 77 78 79 80
uint  tipc_bclink_get_mtu(void)
{
	return MAX_PKT_DEFAULT_MCAST;
}

81
static u32 bcbuf_acks(struct sk_buff *buf)
Per Liden's avatar
Per Liden committed
82
{
83
	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
Per Liden's avatar
Per Liden committed
84 85
}

86
static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
Per Liden's avatar
Per Liden committed
87
{
88
	TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
Per Liden's avatar
Per Liden committed
89 90
}

91
static void bcbuf_decr_acks(struct sk_buff *buf)
Per Liden's avatar
Per Liden committed
92 93 94 95
{
	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}

96
void tipc_bclink_add_node(struct net *net, u32 addr)
97
{
98 99 100 101 102
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_add(&tn->bclink->bcast_nodes, addr);
	tipc_bclink_unlock(net);
103 104
}

105
void tipc_bclink_remove_node(struct net *net, u32 addr)
106
{
107 108 109 110 111
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_remove(&tn->bclink->bcast_nodes, addr);
	tipc_bclink_unlock(net);
112
}
Per Liden's avatar
Per Liden committed
113

114
static void bclink_set_last_sent(struct net *net)
115
{
116 117 118
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

119
	bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
120 121
}

122
u32 tipc_bclink_get_last_sent(struct net *net)
123
{
124 125
	struct tipc_net *tn = net_generic(net, tipc_net_id);

126
	return tn->bcl->silent_intv_cnt;
127 128
}

129
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
Per Liden's avatar
Per Liden committed
130
{
131 132
	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
						seqno : node->bclink.last_sent;
Per Liden's avatar
Per Liden committed
133 134
}

135
/**
136 137
 * tipc_bclink_retransmit_to - get most recent node to request retransmission
 *
138
 * Called with bclink_lock locked
139
 */
140
struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
141
{
142 143 144
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	return tn->bclink->retransmit_to;
145 146
}

147
/**
Per Liden's avatar
Per Liden committed
148 149 150
 * bclink_retransmit_pkt - retransmit broadcast packets
 * @after: sequence number of last packet to *not* retransmit
 * @to: sequence number of last packet to retransmit
151
 *
152
 * Called with bclink_lock locked
Per Liden's avatar
Per Liden committed
153
 */
154
static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
Per Liden's avatar
Per Liden committed
155
{
156
	struct sk_buff *skb;
157
	struct tipc_link *bcl = tn->bcl;
Per Liden's avatar
Per Liden committed
158

Jon Paul Maloy's avatar
Jon Paul Maloy committed
159
	skb_queue_walk(&bcl->transmq, skb) {
160 161
		if (more(buf_seqno(skb), after)) {
			tipc_link_retransmit(bcl, skb, mod(to - after));
162
			break;
163
		}
164
	}
Per Liden's avatar
Per Liden committed
165 166
}

167 168 169 170 171
/**
 * tipc_bclink_wakeup_users - wake up pending users
 *
 * Called with no locks taken
 */
172
void tipc_bclink_wakeup_users(struct net *net)
173
{
174
	struct tipc_net *tn = net_generic(net, tipc_net_id);
175

176
	tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
177 178
}

179
/**
180
 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
Per Liden's avatar
Per Liden committed
181 182
 * @n_ptr: node that sent acknowledgement info
 * @acked: broadcast sequence # that has been acknowledged
183
 *
184
 * Node is locked, bclink_lock unlocked.
Per Liden's avatar
Per Liden committed
185
 */
186
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
Per Liden's avatar
Per Liden committed
187
{
188
	struct sk_buff *skb, *tmp;
Per Liden's avatar
Per Liden committed
189
	unsigned int released = 0;
190 191
	struct net *net = n_ptr->net;
	struct tipc_net *tn = net_generic(net, tipc_net_id);
Per Liden's avatar
Per Liden committed
192

193 194 195
	if (unlikely(!n_ptr->bclink.recv_permitted))
		return;

196
	tipc_bclink_lock(net);
197

198
	/* Bail out if tx queue is empty (no clean up is required) */
Jon Paul Maloy's avatar
Jon Paul Maloy committed
199
	skb = skb_peek(&tn->bcl->transmq);
200
	if (!skb)
201 202 203 204 205 206 207 208 209
		goto exit;

	/* Determine which messages need to be acknowledged */
	if (acked == INVALID_LINK_SEQ) {
		/*
		 * Contact with specified node has been lost, so need to
		 * acknowledge sent messages only (if other nodes still exist)
		 * or both sent and unsent messages (otherwise)
		 */
210
		if (tn->bclink->bcast_nodes.count)
211
			acked = tn->bcl->silent_intv_cnt;
212
		else
213
			acked = tn->bcl->snd_nxt;
214 215 216 217 218
	} else {
		/*
		 * Bail out if specified sequence number does not correspond
		 * to a message that has been sent and not yet acknowledged
		 */
219
		if (less(acked, buf_seqno(skb)) ||
220
		    less(tn->bcl->silent_intv_cnt, acked) ||
221 222 223 224 225
		    less_eq(acked, n_ptr->bclink.acked))
			goto exit;
	}

	/* Skip over packets that node has previously acknowledged */
Jon Paul Maloy's avatar
Jon Paul Maloy committed
226
	skb_queue_walk(&tn->bcl->transmq, skb) {
227 228 229
		if (more(buf_seqno(skb), n_ptr->bclink.acked))
			break;
	}
Per Liden's avatar
Per Liden committed
230 231

	/* Update packets that node is now acknowledging */
Jon Paul Maloy's avatar
Jon Paul Maloy committed
232
	skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
233 234
		if (more(buf_seqno(skb), acked))
			break;
Jon Paul Maloy's avatar
Jon Paul Maloy committed
235 236
		bcbuf_decr_acks(skb);
		bclink_set_last_sent(net);
237
		if (bcbuf_acks(skb) == 0) {
Jon Paul Maloy's avatar
Jon Paul Maloy committed
238
			__skb_unlink(skb, &tn->bcl->transmq);
239
			kfree_skb(skb);
Per Liden's avatar
Per Liden committed
240 241 242 243 244 245
			released = 1;
		}
	}
	n_ptr->bclink.acked = acked;

	/* Try resolving broadcast link congestion, if necessary */
Jon Paul Maloy's avatar
Jon Paul Maloy committed
246
	if (unlikely(skb_peek(&tn->bcl->backlogq))) {
247 248
		tipc_link_push_packets(tn->bcl);
		bclink_set_last_sent(net);
249
	}
250
	if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
251
		n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
252
exit:
253
	tipc_bclink_unlock(net);
Per Liden's avatar
Per Liden committed
254 255
}

256
/**
257
 * tipc_bclink_update_link_state - update broadcast link state
258
 *
Ying Xue's avatar
Ying Xue committed
259
 * RCU and node lock set
Per Liden's avatar
Per Liden committed
260
 */
261
void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
262
				   u32 last_sent)
Per Liden's avatar
Per Liden committed
263
{
264
	struct sk_buff *buf;
265
	struct net *net = n_ptr->net;
266
	struct tipc_net *tn = net_generic(net, tipc_net_id);
Per Liden's avatar
Per Liden committed
267

268 269 270
	/* Ignore "stale" link state info */
	if (less_eq(last_sent, n_ptr->bclink.last_in))
		return;
Per Liden's avatar
Per Liden committed
271

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
	/* Update link synchronization state; quit if in sync */
	bclink_update_last_sent(n_ptr, last_sent);

	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
		return;

	/* Update out-of-sync state; quit if loss is still unconfirmed */
	if ((++n_ptr->bclink.oos_state) == 1) {
		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
			return;
		n_ptr->bclink.oos_state++;
	}

	/* Don't NACK if one has been recently sent (or seen) */
	if (n_ptr->bclink.oos_state & 0x1)
Per Liden's avatar
Per Liden committed
287 288
		return;

289
	/* Send NACK */
290
	buf = tipc_buf_acquire(INT_H_SIZE);
Per Liden's avatar
Per Liden committed
291
	if (buf) {
292
		struct tipc_msg *msg = buf_msg(buf);
Jon Paul Maloy's avatar
Jon Paul Maloy committed
293
		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
294
		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
295

296
		tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
297
			      INT_H_SIZE, n_ptr->addr);
298
		msg_set_non_seq(msg, 1);
299
		msg_set_mc_netid(msg, tn->net_id);
300 301
		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
302
		msg_set_bcgap_to(msg, to);
Per Liden's avatar
Per Liden committed
303

304
		tipc_bclink_lock(net);
305
		tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
306 307
		tn->bcl->stats.sent_nacks++;
		tipc_bclink_unlock(net);
308
		kfree_skb(buf);
Per Liden's avatar
Per Liden committed
309

310
		n_ptr->bclink.oos_state++;
Per Liden's avatar
Per Liden committed
311 312 313
	}
}

314
/**
315
 * bclink_peek_nack - monitor retransmission requests sent by other nodes
Per Liden's avatar
Per Liden committed
316
 *
317 318
 * Delay any upcoming NACK by this node if another node has already
 * requested the first message this node is going to ask for.
Per Liden's avatar
Per Liden committed
319
 */
320
static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
Per Liden's avatar
Per Liden committed
321
{
322
	struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
Per Liden's avatar
Per Liden committed
323

324
	if (unlikely(!n_ptr))
Per Liden's avatar
Per Liden committed
325
		return;
326

327
	tipc_node_lock(n_ptr);
328
	if (n_ptr->bclink.recv_permitted &&
329 330 331
	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
		n_ptr->bclink.oos_state = 2;
332
	tipc_node_unlock(n_ptr);
333
	tipc_node_put(n_ptr);
Per Liden's avatar
Per Liden committed
334 335
}

336
/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
337
 *                    and to identified node local sockets
338
 * @net: the applicable net namespace
339
 * @list: chain of buffers containing message
340 341 342
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
343
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
344
{
345 346 347
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
	struct tipc_bclink *bclink = tn->bclink;
348 349
	int rc = 0;
	int bc = 0;
350
	struct sk_buff *skb;
351 352
	struct sk_buff_head arrvq;
	struct sk_buff_head inputq;
353 354

	/* Prepare clone of message for local node */
355 356 357
	skb = tipc_msg_reassemble(list);
	if (unlikely(!skb)) {
		__skb_queue_purge(list);
358 359
		return -EHOSTUNREACH;
	}
360
	/* Broadcast to all nodes */
361
	if (likely(bclink)) {
362
		tipc_bclink_lock(net);
363
		if (likely(bclink->bcast_nodes.count)) {
364
			rc = __tipc_link_xmit(net, bcl, list);
365
			if (likely(!rc)) {
Jon Paul Maloy's avatar
Jon Paul Maloy committed
366
				u32 len = skb_queue_len(&bcl->transmq);
367

368
				bclink_set_last_sent(net);
369
				bcl->stats.queue_sz_counts++;
370
				bcl->stats.accu_queue_sz += len;
371 372 373
			}
			bc = 1;
		}
374
		tipc_bclink_unlock(net);
375 376 377
	}

	if (unlikely(!bc))
378
		__skb_queue_purge(list);
379

380
	if (unlikely(rc)) {
381
		kfree_skb(skb);
382 383 384 385 386 387 388
		return rc;
	}
	/* Deliver message clone */
	__skb_queue_head_init(&arrvq);
	skb_queue_head_init(&inputq);
	__skb_queue_tail(&arrvq, skb);
	tipc_sk_mcast_rcv(net, &arrvq, &inputq);
389 390 391
	return rc;
}

392
/**
393 394
 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
 *
395
 * Called with both sending node's lock and bclink_lock taken.
396 397 398
 */
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
399 400
	struct tipc_net *tn = net_generic(node->net, tipc_net_id);

401 402 403
	bclink_update_last_sent(node, seqno);
	node->bclink.last_in = seqno;
	node->bclink.oos_state = 0;
404
	tn->bcl->stats.recv_info++;
405 406 407 408 409

	/*
	 * Unicast an ACK periodically, ensuring that
	 * all nodes in the cluster don't ACK at the same time
	 */
410
	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
411
		tipc_link_proto_xmit(node->active_links[node->addr & 1],
412
				     STATE_MSG, 0, 0, 0, 0);
413
		tn->bcl->stats.sent_acks++;
414 415 416
	}
}

417
/**
418
 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
419
 *
Ying Xue's avatar
Ying Xue committed
420
 * RCU is locked, no other locks set
Per Liden's avatar
Per Liden committed
421
 */
422
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
423
{
424
	struct tipc_net *tn = net_generic(net, tipc_net_id);
425
	struct tipc_link *bcl = tn->bcl;
Per Liden's avatar
Per Liden committed
426
	struct tipc_msg *msg = buf_msg(buf);
427
	struct tipc_node *node;
Per Liden's avatar
Per Liden committed
428 429
	u32 next_in;
	u32 seqno;
430
	int deferred = 0;
431 432
	int pos = 0;
	struct sk_buff *iskb;
433
	struct sk_buff_head *arrvq, *inputq;
Per Liden's avatar
Per Liden committed
434

435
	/* Screen out unwanted broadcast messages */
436
	if (msg_mc_netid(msg) != tn->net_id)
437 438
		goto exit;

439
	node = tipc_node_find(net, msg_prevnode(msg));
440 441 442 443
	if (unlikely(!node))
		goto exit;

	tipc_node_lock(node);
444
	if (unlikely(!node->bclink.recv_permitted))
445
		goto unlock;
Per Liden's avatar
Per Liden committed
446

447
	/* Handle broadcast protocol message */
Per Liden's avatar
Per Liden committed
448
	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
449 450
		if (msg_type(msg) != STATE_MSG)
			goto unlock;
451
		if (msg_destnode(msg) == tn->own_addr) {
452
			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
453
			tipc_bclink_lock(net);
Per Liden's avatar
Per Liden committed
454
			bcl->stats.recv_nacks++;
455 456
			tn->bclink->retransmit_to = node;
			bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
Per Liden's avatar
Per Liden committed
457
					      msg_bcgap_to(msg));
458
			tipc_bclink_unlock(net);
459
			tipc_node_unlock(node);
Per Liden's avatar
Per Liden committed
460
		} else {
461
			tipc_node_unlock(node);
462
			bclink_peek_nack(net, msg);
Per Liden's avatar
Per Liden committed
463
		}
464
		tipc_node_put(node);
465
		goto exit;
Per Liden's avatar
Per Liden committed
466 467
	}

468
	/* Handle in-sequence broadcast message */
Per Liden's avatar
Per Liden committed
469
	seqno = msg_seqno(msg);
470
	next_in = mod(node->bclink.last_in + 1);
471 472
	arrvq = &tn->bclink->arrvq;
	inputq = &tn->bclink->inputq;
Per Liden's avatar
Per Liden committed
473 474

	if (likely(seqno == next_in)) {
475
receive:
476
		/* Deliver message to destination */
Per Liden's avatar
Per Liden committed
477
		if (likely(msg_isdata(msg))) {
478
			tipc_bclink_lock(net);
479
			bclink_accept_pkt(node, seqno);
480 481 482 483
			spin_lock_bh(&inputq->lock);
			__skb_queue_tail(arrvq, buf);
			spin_unlock_bh(&inputq->lock);
			node->action_flags |= TIPC_BCAST_MSG_EVT;
484
			tipc_bclink_unlock(net);
485
			tipc_node_unlock(node);
Per Liden's avatar
Per Liden committed
486
		} else if (msg_user(msg) == MSG_BUNDLER) {
487
			tipc_bclink_lock(net);
488
			bclink_accept_pkt(node, seqno);
Per Liden's avatar
Per Liden committed
489 490
			bcl->stats.recv_bundles++;
			bcl->stats.recv_bundled += msg_msgcnt(msg);
491 492 493 494 495 496 497
			pos = 0;
			while (tipc_msg_extract(buf, &iskb, &pos)) {
				spin_lock_bh(&inputq->lock);
				__skb_queue_tail(arrvq, iskb);
				spin_unlock_bh(&inputq->lock);
			}
			node->action_flags |= TIPC_BCAST_MSG_EVT;
498
			tipc_bclink_unlock(net);
499
			tipc_node_unlock(node);
Per Liden's avatar
Per Liden committed
500
		} else if (msg_user(msg) == MSG_FRAGMENTER) {
501
			tipc_bclink_lock(net);
502
			bclink_accept_pkt(node, seqno);
503 504 505 506 507
			tipc_buf_append(&node->bclink.reasm_buf, &buf);
			if (unlikely(!buf && !node->bclink.reasm_buf)) {
				tipc_bclink_unlock(net);
				goto unlock;
			}
Per Liden's avatar
Per Liden committed
508
			bcl->stats.recv_fragments++;
509
			if (buf) {
Per Liden's avatar
Per Liden committed
510
				bcl->stats.recv_fragmented++;
511
				msg = buf_msg(buf);
512
				tipc_bclink_unlock(net);
513 514
				goto receive;
			}
515
			tipc_bclink_unlock(net);
516
			tipc_node_unlock(node);
Per Liden's avatar
Per Liden committed
517
		} else {
518
			tipc_bclink_lock(net);
519
			bclink_accept_pkt(node, seqno);
520
			tipc_bclink_unlock(net);
521
			tipc_node_unlock(node);
522
			kfree_skb(buf);
Per Liden's avatar
Per Liden committed
523
		}
524
		buf = NULL;
525 526

		/* Determine new synchronization state */
527
		tipc_node_lock(node);
528 529 530
		if (unlikely(!tipc_node_is_up(node)))
			goto unlock;

531
		if (node->bclink.last_in == node->bclink.last_sent)
532 533
			goto unlock;

Jon Paul Maloy's avatar
Jon Paul Maloy committed
534
		if (skb_queue_empty(&node->bclink.deferdq)) {
535 536 537 538
			node->bclink.oos_state = 1;
			goto unlock;
		}

Jon Paul Maloy's avatar
Jon Paul Maloy committed
539
		msg = buf_msg(skb_peek(&node->bclink.deferdq));
540 541 542 543 544 545
		seqno = msg_seqno(msg);
		next_in = mod(next_in + 1);
		if (seqno != next_in)
			goto unlock;

		/* Take in-sequence message from deferred queue & deliver it */
Jon Paul Maloy's avatar
Jon Paul Maloy committed
546
		buf = __skb_dequeue(&node->bclink.deferdq);
547 548 549 550 551
		goto receive;
	}

	/* Handle out-of-sequence broadcast message */
	if (less(next_in, seqno)) {
Jon Paul Maloy's avatar
Jon Paul Maloy committed
552
		deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
553
					       buf);
554
		bclink_update_last_sent(node, seqno);
555
		buf = NULL;
556
	}
557

558
	tipc_bclink_lock(net);
559

560 561
	if (deferred)
		bcl->stats.deferred_recv++;
562 563
	else
		bcl->stats.duplicates++;
564

565
	tipc_bclink_unlock(net);
566

567
unlock:
568
	tipc_node_unlock(node);
569
	tipc_node_put(node);
570
exit:
571
	kfree_skb(buf);
Per Liden's avatar
Per Liden committed
572 573
}

574
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
Per Liden's avatar
Per Liden committed
575
{
576
	return (n_ptr->bclink.recv_permitted &&
577
		(tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
Per Liden's avatar
Per Liden committed
578 579 580 581
}


/**
582
 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
583
 *
584 585
 * Send packet over as many bearers as necessary to reach all nodes
 * that have joined the broadcast link.
586
 *
587 588
 * Returns 0 (packet sent successfully) under all circumstances,
 * since the broadcast link's pseudo-bearer never blocks
Per Liden's avatar
Per Liden committed
589
 */
590 591
static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
			      struct tipc_bearer *unused1,
Adrian Bunk's avatar
Adrian Bunk committed
592
			      struct tipc_media_addr *unused2)
Per Liden's avatar
Per Liden committed
593 594
{
	int bp_index;
595
	struct tipc_msg *msg = buf_msg(buf);
596
	struct tipc_net *tn = net_generic(net, tipc_net_id);
597 598
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
	struct tipc_bclink *bclink = tn->bclink;
Per Liden's avatar
Per Liden committed
599

600
	/* Prepare broadcast link message for reliable transmission,
601 602 603 604
	 * if first time trying to send it;
	 * preparation is skipped for broadcast link protocol messages
	 * since they are sent in an unreliable manner and don't need it
	 */
Per Liden's avatar
Per Liden committed
605
	if (likely(!msg_non_seq(buf_msg(buf)))) {
606
		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
607
		msg_set_non_seq(msg, 1);
608
		msg_set_mc_netid(msg, tn->net_id);
609
		tn->bcl->stats.sent_info++;
610
		if (WARN_ON(!bclink->bcast_nodes.count)) {
611 612 613
			dump_stack();
			return 0;
		}
Per Liden's avatar
Per Liden committed
614 615 616
	}

	/* Send buffer over bearers until all targets reached */
617
	bcbearer->remains = bclink->bcast_nodes;
Per Liden's avatar
Per Liden committed
618 619

	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
620 621
		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
622 623
		struct tipc_bearer *bp[2] = {p, s};
		struct tipc_bearer *b = bp[msg_link_selector(msg)];
624
		struct sk_buff *tbuf;
Per Liden's avatar
Per Liden committed
625 626

		if (!p)
627
			break; /* No more bearers to try */
628 629
		if (!b)
			b = p;
630
		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
631
			       &bcbearer->remains_new);
632
		if (bcbearer->remains_new.count == bcbearer->remains.count)
633
			continue; /* Nothing added by bearer pair */
Per Liden's avatar
Per Liden committed
634

635 636
		if (bp_index == 0) {
			/* Use original buffer for first bearer */
637
			tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
638 639
		} else {
			/* Avoid concurrent buffer access */
640
			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
641 642
			if (!tbuf)
				break;
643 644
			tipc_bearer_send(net, b->identity, tbuf,
					 &b->bcast_addr);
645 646
			kfree_skb(tbuf); /* Bearer keeps a clone */
		}
647
		if (bcbearer->remains_new.count == 0)
648
			break; /* All targets reached */
Per Liden's avatar
Per Liden committed
649

650
		bcbearer->remains = bcbearer->remains_new;
Per Liden's avatar
Per Liden committed
651
	}
652

653
	return 0;
Per Liden's avatar
Per Liden committed
654 655 656
}

/**
657
 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
Per Liden's avatar
Per Liden committed
658
 */
659 660
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
			u32 node, bool action)
Per Liden's avatar
Per Liden committed
661
{
662
	struct tipc_net *tn = net_generic(net, tipc_net_id);
663
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
664 665
	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
	struct tipc_bcbearer_pair *bp_curr;
666
	struct tipc_bearer *b;
Per Liden's avatar
Per Liden committed
667 668 669
	int b_index;
	int pri;

670
	tipc_bclink_lock(net);
Per Liden's avatar
Per Liden committed
671

672 673 674 675 676
	if (action)
		tipc_nmap_add(nm_ptr, node);
	else
		tipc_nmap_remove(nm_ptr, node);

Per Liden's avatar
Per Liden committed
677 678 679
	/* Group bearers by priority (can assume max of two per priority) */
	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));

680
	rcu_read_lock();
Per Liden's avatar
Per Liden committed
681
	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
682
		b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
683
		if (!b || !b->nodes.count)
Per Liden's avatar
Per Liden committed
684 685 686 687 688 689 690
			continue;

		if (!bp_temp[b->priority].primary)
			bp_temp[b->priority].primary = b;
		else
			bp_temp[b->priority].secondary = b;
	}
691
	rcu_read_unlock();
Per Liden's avatar
Per Liden committed
692 693 694 695 696

	/* Create array of bearer pairs for broadcasting */
	bp_curr = bcbearer->bpairs;
	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));

697
	for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
Per Liden's avatar
Per Liden committed
698 699 700 701 702 703 704

		if (!bp_temp[pri].primary)
			continue;

		bp_curr->primary = bp_temp[pri].primary;

		if (bp_temp[pri].secondary) {
705 706
			if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
					    &bp_temp[pri].secondary->nodes)) {
Per Liden's avatar
Per Liden committed
707 708 709 710 711 712 713 714 715 716
				bp_curr->secondary = bp_temp[pri].secondary;
			} else {
				bp_curr++;
				bp_curr->primary = bp_temp[pri].secondary;
			}
		}

		bp_curr++;
	}

717
	tipc_bclink_unlock(net);
Per Liden's avatar
Per Liden committed
718 719
}

720 721
static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
				      struct tipc_stats *stats)
722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770
{
	int i;
	struct nlattr *nest;

	struct nla_map {
		__u32 key;
		__u32 val;
	};

	struct nla_map map[] = {
		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
	};

	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
	if (!nest)
		return -EMSGSIZE;

	for (i = 0; i <  ARRAY_SIZE(map); i++)
		if (nla_put_u32(skb, map[i].key, map[i].val))
			goto msg_full;

	nla_nest_end(skb, nest);

	return 0;
msg_full:
	nla_nest_cancel(skb, nest);

	return -EMSGSIZE;
}

771
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
772 773 774 775 776
{
	int err;
	void *hdr;
	struct nlattr *attrs;
	struct nlattr *prop;
777 778
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
779 780 781 782

	if (!bcl)
		return 0;

783
	tipc_bclink_lock(net);
784

785
	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
			  NLM_F_MULTI, TIPC_NL_LINK_GET);
	if (!hdr)
		return -EMSGSIZE;

	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
	if (!attrs)
		goto msg_full;

	/* The broadcast link is always up */
	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
		goto attr_msg_full;

	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
		goto attr_msg_full;
	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
		goto attr_msg_full;
802
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt))
803
		goto attr_msg_full;
804
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt))
805 806 807 808 809
		goto attr_msg_full;

	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
	if (!prop)
		goto attr_msg_full;
810
	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
811 812 813 814 815 816 817
		goto prop_msg_full;
	nla_nest_end(msg->skb, prop);

	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
	if (err)
		goto attr_msg_full;

818
	tipc_bclink_unlock(net);
819 820 821 822 823 824 825 826 827 828
	nla_nest_end(msg->skb, attrs);
	genlmsg_end(msg->skb, hdr);

	return 0;

prop_msg_full:
	nla_nest_cancel(msg->skb, prop);
attr_msg_full:
	nla_nest_cancel(msg->skb, attrs);
msg_full:
829
	tipc_bclink_unlock(net);
830 831 832 833
	genlmsg_cancel(msg->skb, hdr);

	return -EMSGSIZE;
}
Per Liden's avatar
Per Liden committed
834

835
int tipc_bclink_reset_stats(struct net *net)
Per Liden's avatar
Per Liden committed
836
{
837 838 839
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

Per Liden's avatar
Per Liden committed
840 841 842
	if (!bcl)
		return -ENOPROTOOPT;

843
	tipc_bclink_lock(net);
Per Liden's avatar
Per Liden committed
844
	memset(&bcl->stats, 0, sizeof(bcl->stats));
845
	tipc_bclink_unlock(net);
846
	return 0;
Per Liden's avatar
Per Liden committed
847 848
}

849
int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
Per Liden's avatar
Per Liden committed
850
{
851 852 853
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

Per Liden's avatar
Per Liden committed
854 855 856 857 858
	if (!bcl)
		return -ENOPROTOOPT;
	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
		return -EINVAL;

859
	tipc_bclink_lock(net);
860
	tipc_link_set_queue_limits(bcl, limit);
861
	tipc_bclink_unlock(net);
862
	return 0;
Per Liden's avatar
Per Liden committed
863 864
}

865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
{
	int err;
	u32 win;
	struct nlattr *props[TIPC_NLA_PROP_MAX + 1];

	if (!attrs[TIPC_NLA_LINK_PROP])
		return -EINVAL;

	err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props);
	if (err)
		return err;

	if (!props[TIPC_NLA_PROP_WIN])
		return -EOPNOTSUPP;

	win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);

	return tipc_bclink_set_queue_limits(net, win);
}

886
int tipc_bclink_init(struct net *net)
Per Liden's avatar
Per Liden committed
887
{
888
	struct tipc_net *tn = net_generic(net, tipc_net_id);
889 890 891
	struct tipc_bcbearer *bcbearer;
	struct tipc_bclink *bclink;
	struct tipc_link *bcl;
892

893 894 895 896 897 898 899 900 901 902 903
	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
	if (!bcbearer)
		return -ENOMEM;

	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
	if (!bclink) {
		kfree(bcbearer);
		return -ENOMEM;
	}

	bcl = &bclink->link;
Per Liden's avatar
Per Liden committed
904
	bcbearer->bearer.media = &bcbearer->media;
905
	bcbearer->media.send_msg = tipc_bcbearer_send;
906
	sprintf(bcbearer->media.name, "tipc-broadcast");
Per Liden's avatar
Per Liden committed
907

908
	spin_lock_init(&bclink->lock);
Jon Paul Maloy's avatar
Jon Paul Maloy committed
909 910 911
	__skb_queue_head_init(&bcl->transmq);
	__skb_queue_head_init(&bcl->backlogq);
	__skb_queue_head_init(&bcl->deferdq);
912
	skb_queue_head_init(&bcl->wakeupq);
913
	bcl->snd_nxt = 1;
914
	spin_lock_init(&bclink->node.lock);
915 916
	__skb_queue_head_init(&bclink->arrvq);
	skb_queue_head_init(&bclink->inputq);
Per Liden's avatar
Per Liden committed
917
	bcl->owner = &bclink->node;
918
	bcl->owner->net = net;
919
	bcl->mtu = MAX_PKT_DEFAULT_MCAST;
920
	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
921
	bcl->bearer_id = MAX_BEARERS;
922
	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
Per Liden's avatar
Per Liden committed
923
	bcl->state = WORKING_WORKING;
924 925
	bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
	msg_set_prevnode(bcl->pmsg, tn->own_addr);
926
	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
927 928 929
	tn->bcbearer = bcbearer;
	tn->bclink = bclink;
	tn->bcl = bcl;
930
	return 0;
Per Liden's avatar
Per Liden committed
931 932
}

933
void tipc_bclink_stop(struct net *net)
Per Liden's avatar
Per Liden committed
934
{
935 936
	struct tipc_net *tn = net_generic(net, tipc_net_id);

937 938 939
	tipc_bclink_lock(net);
	tipc_link_purge_queues(tn->bcl);
	tipc_bclink_unlock(net);
940

941
	RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
942
	synchronize_net();
943 944
	kfree(tn->bcbearer);
	kfree(tn->bclink);
Per Liden's avatar
Per Liden committed
945 946
}

947 948 949
/**
 * tipc_nmap_add - add a node to a node map
 */
950
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
951 952 953 954 955 956 957 958 959 960 961 962 963 964
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) == 0) {
		nm_ptr->count++;
		nm_ptr->map[w] |= mask;
	}
}

/**
 * tipc_nmap_remove - remove a node from a node map
 */
965
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) != 0) {
		nm_ptr->map[w] &= ~mask;
		nm_ptr->count--;
	}
}

/**
 * tipc_nmap_diff - find differences between node maps
 * @nm_a: input node map A
 * @nm_b: input node map B
 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
 */
983 984 985
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff)
986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003
{
	int stop = ARRAY_SIZE(nm_a->map);
	int w;
	int b;
	u32 map;

	memset(nm_diff, 0, sizeof(*nm_diff));
	for (w = 0; w < stop; w++) {
		map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
		nm_diff->map[w] = map;
		if (map != 0) {
			for (b = 0 ; b < WSIZE; b++) {
				if (map & (1 << b))
					nm_diff->count++;
			}
		}
	}
}