bcast.c 27.7 KB
Newer Older
Per Liden's avatar
Per Liden committed
1 2
/*
 * net/tipc/bcast.c: TIPC broadcast code
3
 *
4
 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
Per Liden's avatar
Per Liden committed
5
 * Copyright (c) 2004, Intel Corporation.
6
 * Copyright (c) 2005, 2010-2011, Wind River Systems
Per Liden's avatar
Per Liden committed
7 8
 * All rights reserved.
 *
Per Liden's avatar
Per Liden committed
9
 * Redistribution and use in source and binary forms, with or without
Per Liden's avatar
Per Liden committed
10 11
 * modification, are permitted provided that the following conditions are met:
 *
Per Liden's avatar
Per Liden committed
12 13 14 15 16 17 18 19
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
Per Liden's avatar
Per Liden committed
20
 *
Per Liden's avatar
Per Liden committed
21 22 23 24 25 26 27 28 29 30 31 32 33 34
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Per Liden's avatar
Per Liden committed
35 36 37
 * POSSIBILITY OF SUCH DAMAGE.
 */

38 39
#include "socket.h"
#include "msg.h"
Per Liden's avatar
Per Liden committed
40
#include "bcast.h"
41
#include "name_distr.h"
42
#include "core.h"
Per Liden's avatar
Per Liden committed
43

44 45
#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */
#define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */
Per Liden's avatar
Per Liden committed
46

47
const char tipc_bclink_name[] = "broadcast-link";
Per Liden's avatar
Per Liden committed
48

49 50 51
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff);
52 53
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
Per Liden's avatar
Per Liden committed
54

55
static void tipc_bclink_lock(struct net *net)
56
{
57 58 59
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	spin_lock_bh(&tn->bclink->lock);
60 61
}

62
static void tipc_bclink_unlock(struct net *net)
63
{
64
	struct tipc_net *tn = net_generic(net, tipc_net_id);
65 66
	struct tipc_node *node = NULL;

67 68
	if (likely(!tn->bclink->flags)) {
		spin_unlock_bh(&tn->bclink->lock);
69 70 71
		return;
	}

72 73 74
	if (tn->bclink->flags & TIPC_BCLINK_RESET) {
		tn->bclink->flags &= ~TIPC_BCLINK_RESET;
		node = tipc_bclink_retransmit_to(net);
75
	}
76
	spin_unlock_bh(&tn->bclink->lock);
77 78 79 80 81

	if (node)
		tipc_link_reset_all(node);
}

82 83 84 85 86 87 88
void tipc_bclink_input(struct net *net)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
}

89 90 91 92 93
uint  tipc_bclink_get_mtu(void)
{
	return MAX_PKT_DEFAULT_MCAST;
}

94
void tipc_bclink_set_flags(struct net *net, unsigned int flags)
95
{
96 97 98
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tn->bclink->flags |= flags;
99 100
}

101
static u32 bcbuf_acks(struct sk_buff *buf)
Per Liden's avatar
Per Liden committed
102
{
103
	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
Per Liden's avatar
Per Liden committed
104 105
}

106
static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
Per Liden's avatar
Per Liden committed
107
{
108
	TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
Per Liden's avatar
Per Liden committed
109 110
}

111
static void bcbuf_decr_acks(struct sk_buff *buf)
Per Liden's avatar
Per Liden committed
112 113 114 115
{
	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}

116
void tipc_bclink_add_node(struct net *net, u32 addr)
117
{
118 119 120 121 122
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_add(&tn->bclink->bcast_nodes, addr);
	tipc_bclink_unlock(net);
123 124
}

125
void tipc_bclink_remove_node(struct net *net, u32 addr)
126
{
127 128 129 130 131
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_remove(&tn->bclink->bcast_nodes, addr);
	tipc_bclink_unlock(net);
132
}
Per Liden's avatar
Per Liden committed
133

134
static void bclink_set_last_sent(struct net *net)
135
{
136 137 138
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

139 140 141 142 143 144
	if (bcl->next_out)
		bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
	else
		bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
}

145
u32 tipc_bclink_get_last_sent(struct net *net)
146
{
147 148 149
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	return tn->bcl->fsm_msg_cnt;
150 151
}

152
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
Per Liden's avatar
Per Liden committed
153
{
154 155
	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
						seqno : node->bclink.last_sent;
Per Liden's avatar
Per Liden committed
156 157 158
}


159
/**
160 161
 * tipc_bclink_retransmit_to - get most recent node to request retransmission
 *
162
 * Called with bclink_lock locked
163
 */
164
struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
165
{
166 167 168
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	return tn->bclink->retransmit_to;
169 170
}

171
/**
Per Liden's avatar
Per Liden committed
172 173 174
 * bclink_retransmit_pkt - retransmit broadcast packets
 * @after: sequence number of last packet to *not* retransmit
 * @to: sequence number of last packet to retransmit
175
 *
176
 * Called with bclink_lock locked
Per Liden's avatar
Per Liden committed
177
 */
178
static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
Per Liden's avatar
Per Liden committed
179
{
180
	struct sk_buff *skb;
181
	struct tipc_link *bcl = tn->bcl;
Per Liden's avatar
Per Liden committed
182

183
	skb_queue_walk(&bcl->outqueue, skb) {
184 185
		if (more(buf_seqno(skb), after)) {
			tipc_link_retransmit(bcl, skb, mod(to - after));
186
			break;
187
		}
188
	}
Per Liden's avatar
Per Liden committed
189 190
}

191 192 193 194 195
/**
 * tipc_bclink_wakeup_users - wake up pending users
 *
 * Called with no locks taken
 */
196
void tipc_bclink_wakeup_users(struct net *net)
197
{
198
	struct tipc_net *tn = net_generic(net, tipc_net_id);
199

200
	tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
201 202
}

203
/**
204
 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
Per Liden's avatar
Per Liden committed
205 206
 * @n_ptr: node that sent acknowledgement info
 * @acked: broadcast sequence # that has been acknowledged
207
 *
208
 * Node is locked, bclink_lock unlocked.
Per Liden's avatar
Per Liden committed
209
 */
210
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
Per Liden's avatar
Per Liden committed
211
{
212
	struct sk_buff *skb, *tmp;
Per Liden's avatar
Per Liden committed
213 214
	struct sk_buff *next;
	unsigned int released = 0;
215 216
	struct net *net = n_ptr->net;
	struct tipc_net *tn = net_generic(net, tipc_net_id);
Per Liden's avatar
Per Liden committed
217

218
	tipc_bclink_lock(net);
219
	/* Bail out if tx queue is empty (no clean up is required) */
220
	skb = skb_peek(&tn->bcl->outqueue);
221
	if (!skb)
222 223 224 225 226 227 228 229 230
		goto exit;

	/* Determine which messages need to be acknowledged */
	if (acked == INVALID_LINK_SEQ) {
		/*
		 * Contact with specified node has been lost, so need to
		 * acknowledge sent messages only (if other nodes still exist)
		 * or both sent and unsent messages (otherwise)
		 */
231 232
		if (tn->bclink->bcast_nodes.count)
			acked = tn->bcl->fsm_msg_cnt;
233
		else
234
			acked = tn->bcl->next_out_no;
235 236 237 238 239
	} else {
		/*
		 * Bail out if specified sequence number does not correspond
		 * to a message that has been sent and not yet acknowledged
		 */
240
		if (less(acked, buf_seqno(skb)) ||
241
		    less(tn->bcl->fsm_msg_cnt, acked) ||
242 243 244 245 246
		    less_eq(acked, n_ptr->bclink.acked))
			goto exit;
	}

	/* Skip over packets that node has previously acknowledged */
247
	skb_queue_walk(&tn->bcl->outqueue, skb) {
248 249 250
		if (more(buf_seqno(skb), n_ptr->bclink.acked))
			break;
	}
Per Liden's avatar
Per Liden committed
251 252

	/* Update packets that node is now acknowledging */
253
	skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) {
254 255
		if (more(buf_seqno(skb), acked))
			break;
Per Liden's avatar
Per Liden committed
256

257 258
		next = tipc_skb_queue_next(&tn->bcl->outqueue, skb);
		if (skb != tn->bcl->next_out) {
259 260 261
			bcbuf_decr_acks(skb);
		} else {
			bcbuf_set_acks(skb, 0);
262 263
			tn->bcl->next_out = next;
			bclink_set_last_sent(net);
264 265
		}

266
		if (bcbuf_acks(skb) == 0) {
267
			__skb_unlink(skb, &tn->bcl->outqueue);
268
			kfree_skb(skb);
Per Liden's avatar
Per Liden committed
269 270 271 272 273 274
			released = 1;
		}
	}
	n_ptr->bclink.acked = acked;

	/* Try resolving broadcast link congestion, if necessary */
275 276 277
	if (unlikely(tn->bcl->next_out)) {
		tipc_link_push_packets(tn->bcl);
		bclink_set_last_sent(net);
278
	}
279
	if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
280
		n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
281
exit:
282
	tipc_bclink_unlock(net);
Per Liden's avatar
Per Liden committed
283 284
}

285
/**
286
 * tipc_bclink_update_link_state - update broadcast link state
287
 *
Ying Xue's avatar
Ying Xue committed
288
 * RCU and node lock set
Per Liden's avatar
Per Liden committed
289
 */
290
void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
291
				   u32 last_sent)
Per Liden's avatar
Per Liden committed
292
{
293
	struct sk_buff *buf;
294
	struct net *net = n_ptr->net;
295
	struct tipc_net *tn = net_generic(net, tipc_net_id);
Per Liden's avatar
Per Liden committed
296

297 298 299
	/* Ignore "stale" link state info */
	if (less_eq(last_sent, n_ptr->bclink.last_in))
		return;
Per Liden's avatar
Per Liden committed
300

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
	/* Update link synchronization state; quit if in sync */
	bclink_update_last_sent(n_ptr, last_sent);

	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
		return;

	/* Update out-of-sync state; quit if loss is still unconfirmed */
	if ((++n_ptr->bclink.oos_state) == 1) {
		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
			return;
		n_ptr->bclink.oos_state++;
	}

	/* Don't NACK if one has been recently sent (or seen) */
	if (n_ptr->bclink.oos_state & 0x1)
Per Liden's avatar
Per Liden committed
316 317
		return;

318
	/* Send NACK */
319
	buf = tipc_buf_acquire(INT_H_SIZE);
Per Liden's avatar
Per Liden committed
320
	if (buf) {
321
		struct tipc_msg *msg = buf_msg(buf);
322 323
		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
324

325
		tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
326
			      INT_H_SIZE, n_ptr->addr);
327
		msg_set_non_seq(msg, 1);
328
		msg_set_mc_netid(msg, tn->net_id);
329 330
		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
331
		msg_set_bcgap_to(msg, to);
Per Liden's avatar
Per Liden committed
332

333
		tipc_bclink_lock(net);
334
		tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
335 336
		tn->bcl->stats.sent_nacks++;
		tipc_bclink_unlock(net);
337
		kfree_skb(buf);
Per Liden's avatar
Per Liden committed
338

339
		n_ptr->bclink.oos_state++;
Per Liden's avatar
Per Liden committed
340 341 342
	}
}

343
/**
344
 * bclink_peek_nack - monitor retransmission requests sent by other nodes
Per Liden's avatar
Per Liden committed
345
 *
346 347
 * Delay any upcoming NACK by this node if another node has already
 * requested the first message this node is going to ask for.
Per Liden's avatar
Per Liden committed
348
 */
349
static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
Per Liden's avatar
Per Liden committed
350
{
351
	struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
Per Liden's avatar
Per Liden committed
352

353
	if (unlikely(!n_ptr))
Per Liden's avatar
Per Liden committed
354
		return;
355

356
	tipc_node_lock(n_ptr);
357

358
	if (n_ptr->bclink.recv_permitted &&
359 360 361 362
	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
		n_ptr->bclink.oos_state = 2;

363
	tipc_node_unlock(n_ptr);
Per Liden's avatar
Per Liden committed
364 365
}

366
/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
367
 *                    and to identified node local sockets
368
 * @net: the applicable net namespace
369
 * @list: chain of buffers containing message
370 371 372
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
373
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
374
{
375 376 377
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
	struct tipc_bclink *bclink = tn->bclink;
378 379
	int rc = 0;
	int bc = 0;
380
	struct sk_buff *skb;
381 382
	struct sk_buff_head arrvq;
	struct sk_buff_head inputq;
383 384

	/* Prepare clone of message for local node */
385 386 387
	skb = tipc_msg_reassemble(list);
	if (unlikely(!skb)) {
		__skb_queue_purge(list);
388 389 390
		return -EHOSTUNREACH;
	}

391
	/* Broadcast to all nodes */
392
	if (likely(bclink)) {
393
		tipc_bclink_lock(net);
394
		if (likely(bclink->bcast_nodes.count)) {
395
			rc = __tipc_link_xmit(net, bcl, list);
396
			if (likely(!rc)) {
397 398
				u32 len = skb_queue_len(&bcl->outqueue);

399
				bclink_set_last_sent(net);
400
				bcl->stats.queue_sz_counts++;
401
				bcl->stats.accu_queue_sz += len;
402 403 404
			}
			bc = 1;
		}
405
		tipc_bclink_unlock(net);
406 407 408
	}

	if (unlikely(!bc))
409
		__skb_queue_purge(list);
410

411
	if (unlikely(rc)) {
412
		kfree_skb(skb);
413 414 415 416 417 418 419
		return rc;
	}
	/* Deliver message clone */
	__skb_queue_head_init(&arrvq);
	skb_queue_head_init(&inputq);
	__skb_queue_tail(&arrvq, skb);
	tipc_sk_mcast_rcv(net, &arrvq, &inputq);
420 421 422
	return rc;
}

423
/**
424 425
 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
 *
426
 * Called with both sending node's lock and bclink_lock taken.
427 428 429
 */
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
430 431
	struct tipc_net *tn = net_generic(node->net, tipc_net_id);

432 433 434
	bclink_update_last_sent(node, seqno);
	node->bclink.last_in = seqno;
	node->bclink.oos_state = 0;
435
	tn->bcl->stats.recv_info++;
436 437 438 439 440

	/*
	 * Unicast an ACK periodically, ensuring that
	 * all nodes in the cluster don't ACK at the same time
	 */
441
	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
442 443
		tipc_link_proto_xmit(node->active_links[node->addr & 1],
				     STATE_MSG, 0, 0, 0, 0, 0);
444
		tn->bcl->stats.sent_acks++;
445 446 447
	}
}

448
/**
449
 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
450
 *
Ying Xue's avatar
Ying Xue committed
451
 * RCU is locked, no other locks set
Per Liden's avatar
Per Liden committed
452
 */
453
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
454
{
455
	struct tipc_net *tn = net_generic(net, tipc_net_id);
456
	struct tipc_link *bcl = tn->bcl;
Per Liden's avatar
Per Liden committed
457
	struct tipc_msg *msg = buf_msg(buf);
458
	struct tipc_node *node;
Per Liden's avatar
Per Liden committed
459 460
	u32 next_in;
	u32 seqno;
461
	int deferred = 0;
462 463
	int pos = 0;
	struct sk_buff *iskb;
464
	struct sk_buff_head *arrvq, *inputq;
Per Liden's avatar
Per Liden committed
465

466
	/* Screen out unwanted broadcast messages */
467
	if (msg_mc_netid(msg) != tn->net_id)
468 469
		goto exit;

470
	node = tipc_node_find(net, msg_prevnode(msg));
471 472 473 474
	if (unlikely(!node))
		goto exit;

	tipc_node_lock(node);
475
	if (unlikely(!node->bclink.recv_permitted))
476
		goto unlock;
Per Liden's avatar
Per Liden committed
477

478
	/* Handle broadcast protocol message */
Per Liden's avatar
Per Liden committed
479
	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
480 481
		if (msg_type(msg) != STATE_MSG)
			goto unlock;
482
		if (msg_destnode(msg) == tn->own_addr) {
483 484
			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
			tipc_node_unlock(node);
485
			tipc_bclink_lock(net);
Per Liden's avatar
Per Liden committed
486
			bcl->stats.recv_nacks++;
487 488
			tn->bclink->retransmit_to = node;
			bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
Per Liden's avatar
Per Liden committed
489
					      msg_bcgap_to(msg));
490
			tipc_bclink_unlock(net);
Per Liden's avatar
Per Liden committed
491
		} else {
492
			tipc_node_unlock(node);
493
			bclink_peek_nack(net, msg);
Per Liden's avatar
Per Liden committed
494
		}
495
		goto exit;
Per Liden's avatar
Per Liden committed
496 497
	}

498
	/* Handle in-sequence broadcast message */
Per Liden's avatar
Per Liden committed
499
	seqno = msg_seqno(msg);
500
	next_in = mod(node->bclink.last_in + 1);
501 502
	arrvq = &tn->bclink->arrvq;
	inputq = &tn->bclink->inputq;
Per Liden's avatar
Per Liden committed
503 504

	if (likely(seqno == next_in)) {
505
receive:
506
		/* Deliver message to destination */
Per Liden's avatar
Per Liden committed
507
		if (likely(msg_isdata(msg))) {
508
			tipc_bclink_lock(net);
509
			bclink_accept_pkt(node, seqno);
510 511 512 513
			spin_lock_bh(&inputq->lock);
			__skb_queue_tail(arrvq, buf);
			spin_unlock_bh(&inputq->lock);
			node->action_flags |= TIPC_BCAST_MSG_EVT;
514
			tipc_bclink_unlock(net);
515
			tipc_node_unlock(node);
Per Liden's avatar
Per Liden committed
516
		} else if (msg_user(msg) == MSG_BUNDLER) {
517
			tipc_bclink_lock(net);
518
			bclink_accept_pkt(node, seqno);
Per Liden's avatar
Per Liden committed
519 520
			bcl->stats.recv_bundles++;
			bcl->stats.recv_bundled += msg_msgcnt(msg);
521 522 523 524 525 526 527
			pos = 0;
			while (tipc_msg_extract(buf, &iskb, &pos)) {
				spin_lock_bh(&inputq->lock);
				__skb_queue_tail(arrvq, iskb);
				spin_unlock_bh(&inputq->lock);
			}
			node->action_flags |= TIPC_BCAST_MSG_EVT;
528
			tipc_bclink_unlock(net);
529
			tipc_node_unlock(node);
Per Liden's avatar
Per Liden committed
530
		} else if (msg_user(msg) == MSG_FRAGMENTER) {
531 532
			tipc_buf_append(&node->bclink.reasm_buf, &buf);
			if (unlikely(!buf && !node->bclink.reasm_buf))
533
				goto unlock;
534
			tipc_bclink_lock(net);
535
			bclink_accept_pkt(node, seqno);
Per Liden's avatar
Per Liden committed
536
			bcl->stats.recv_fragments++;
537
			if (buf) {
Per Liden's avatar
Per Liden committed
538
				bcl->stats.recv_fragmented++;
539
				msg = buf_msg(buf);
540
				tipc_bclink_unlock(net);
541 542
				goto receive;
			}
543
			tipc_bclink_unlock(net);
544
			tipc_node_unlock(node);
Per Liden's avatar
Per Liden committed
545
		} else {
546
			tipc_bclink_lock(net);
547
			bclink_accept_pkt(node, seqno);
548
			tipc_bclink_unlock(net);
549
			tipc_node_unlock(node);
550
			kfree_skb(buf);
Per Liden's avatar
Per Liden committed
551
		}
552
		buf = NULL;
553 554

		/* Determine new synchronization state */
555
		tipc_node_lock(node);
556 557 558
		if (unlikely(!tipc_node_is_up(node)))
			goto unlock;

559
		if (node->bclink.last_in == node->bclink.last_sent)
560 561
			goto unlock;

562
		if (skb_queue_empty(&node->bclink.deferred_queue)) {
563 564 565 566
			node->bclink.oos_state = 1;
			goto unlock;
		}

567
		msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
568 569 570 571 572 573
		seqno = msg_seqno(msg);
		next_in = mod(next_in + 1);
		if (seqno != next_in)
			goto unlock;

		/* Take in-sequence message from deferred queue & deliver it */
574
		buf = __skb_dequeue(&node->bclink.deferred_queue);
575 576 577 578 579
		goto receive;
	}

	/* Handle out-of-sequence broadcast message */
	if (less(next_in, seqno)) {
580
		deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
581
					       buf);
582
		bclink_update_last_sent(node, seqno);
583
		buf = NULL;
584
	}
585

586
	tipc_bclink_lock(net);
587

588 589
	if (deferred)
		bcl->stats.deferred_recv++;
590 591
	else
		bcl->stats.duplicates++;
592

593
	tipc_bclink_unlock(net);
594

595
unlock:
596
	tipc_node_unlock(node);
597
exit:
598
	kfree_skb(buf);
Per Liden's avatar
Per Liden committed
599 600
}

601
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
Per Liden's avatar
Per Liden committed
602
{
603
	return (n_ptr->bclink.recv_permitted &&
604
		(tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
Per Liden's avatar
Per Liden committed
605 606 607 608
}


/**
609
 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
610
 *
611 612
 * Send packet over as many bearers as necessary to reach all nodes
 * that have joined the broadcast link.
613
 *
614 615
 * Returns 0 (packet sent successfully) under all circumstances,
 * since the broadcast link's pseudo-bearer never blocks
Per Liden's avatar
Per Liden committed
616
 */
617 618
static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
			      struct tipc_bearer *unused1,
Adrian Bunk's avatar
Adrian Bunk committed
619
			      struct tipc_media_addr *unused2)
Per Liden's avatar
Per Liden committed
620 621
{
	int bp_index;
622
	struct tipc_msg *msg = buf_msg(buf);
623
	struct tipc_net *tn = net_generic(net, tipc_net_id);
624 625
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
	struct tipc_bclink *bclink = tn->bclink;
Per Liden's avatar
Per Liden committed
626

627
	/* Prepare broadcast link message for reliable transmission,
628 629 630 631
	 * if first time trying to send it;
	 * preparation is skipped for broadcast link protocol messages
	 * since they are sent in an unreliable manner and don't need it
	 */
Per Liden's avatar
Per Liden committed
632
	if (likely(!msg_non_seq(buf_msg(buf)))) {
633
		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
634
		msg_set_non_seq(msg, 1);
635
		msg_set_mc_netid(msg, tn->net_id);
636
		tn->bcl->stats.sent_info++;
637

638
		if (WARN_ON(!bclink->bcast_nodes.count)) {
639 640 641
			dump_stack();
			return 0;
		}
Per Liden's avatar
Per Liden committed
642 643 644
	}

	/* Send buffer over bearers until all targets reached */
645
	bcbearer->remains = bclink->bcast_nodes;
Per Liden's avatar
Per Liden committed
646 647

	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
648 649
		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
650 651
		struct tipc_bearer *bp[2] = {p, s};
		struct tipc_bearer *b = bp[msg_link_selector(msg)];
652
		struct sk_buff *tbuf;
Per Liden's avatar
Per Liden committed
653 654

		if (!p)
655
			break; /* No more bearers to try */
656 657
		if (!b)
			b = p;
658
		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
659
			       &bcbearer->remains_new);
660
		if (bcbearer->remains_new.count == bcbearer->remains.count)
661
			continue; /* Nothing added by bearer pair */
Per Liden's avatar
Per Liden committed
662

663 664
		if (bp_index == 0) {
			/* Use original buffer for first bearer */
665
			tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
666 667
		} else {
			/* Avoid concurrent buffer access */
668
			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
669 670
			if (!tbuf)
				break;
671 672
			tipc_bearer_send(net, b->identity, tbuf,
					 &b->bcast_addr);
673 674
			kfree_skb(tbuf); /* Bearer keeps a clone */
		}
675
		if (bcbearer->remains_new.count == 0)
676
			break; /* All targets reached */
Per Liden's avatar
Per Liden committed
677

678
		bcbearer->remains = bcbearer->remains_new;
Per Liden's avatar
Per Liden committed
679
	}
680

681
	return 0;
Per Liden's avatar
Per Liden committed
682 683 684
}

/**
685
 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
Per Liden's avatar
Per Liden committed
686
 */
687 688
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
			u32 node, bool action)
Per Liden's avatar
Per Liden committed
689
{
690
	struct tipc_net *tn = net_generic(net, tipc_net_id);
691
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
692 693
	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
	struct tipc_bcbearer_pair *bp_curr;
694
	struct tipc_bearer *b;
Per Liden's avatar
Per Liden committed
695 696 697
	int b_index;
	int pri;

698
	tipc_bclink_lock(net);
Per Liden's avatar
Per Liden committed
699

700 701 702 703 704
	if (action)
		tipc_nmap_add(nm_ptr, node);
	else
		tipc_nmap_remove(nm_ptr, node);

Per Liden's avatar
Per Liden committed
705 706 707
	/* Group bearers by priority (can assume max of two per priority) */
	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));

708
	rcu_read_lock();
Per Liden's avatar
Per Liden committed
709
	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
710
		b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
711
		if (!b || !b->nodes.count)
Per Liden's avatar
Per Liden committed
712 713 714 715 716 717 718
			continue;

		if (!bp_temp[b->priority].primary)
			bp_temp[b->priority].primary = b;
		else
			bp_temp[b->priority].secondary = b;
	}
719
	rcu_read_unlock();
Per Liden's avatar
Per Liden committed
720 721 722 723 724

	/* Create array of bearer pairs for broadcasting */
	bp_curr = bcbearer->bpairs;
	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));

725
	for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
Per Liden's avatar
Per Liden committed
726 727 728 729 730 731 732

		if (!bp_temp[pri].primary)
			continue;

		bp_curr->primary = bp_temp[pri].primary;

		if (bp_temp[pri].secondary) {
733 734
			if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
					    &bp_temp[pri].secondary->nodes)) {
Per Liden's avatar
Per Liden committed
735 736 737 738 739 740 741 742 743 744
				bp_curr->secondary = bp_temp[pri].secondary;
			} else {
				bp_curr++;
				bp_curr->primary = bp_temp[pri].secondary;
			}
		}

		bp_curr++;
	}

745
	tipc_bclink_unlock(net);
Per Liden's avatar
Per Liden committed
746 747
}

748 749
static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
				      struct tipc_stats *stats)
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798
{
	int i;
	struct nlattr *nest;

	struct nla_map {
		__u32 key;
		__u32 val;
	};

	struct nla_map map[] = {
		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
	};

	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
	if (!nest)
		return -EMSGSIZE;

	for (i = 0; i <  ARRAY_SIZE(map); i++)
		if (nla_put_u32(skb, map[i].key, map[i].val))
			goto msg_full;

	nla_nest_end(skb, nest);

	return 0;
msg_full:
	nla_nest_cancel(skb, nest);

	return -EMSGSIZE;
}

799
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
800 801 802 803 804
{
	int err;
	void *hdr;
	struct nlattr *attrs;
	struct nlattr *prop;
805 806
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
807 808 809 810

	if (!bcl)
		return 0;

811
	tipc_bclink_lock(net);
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845

	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
			  NLM_F_MULTI, TIPC_NL_LINK_GET);
	if (!hdr)
		return -EMSGSIZE;

	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
	if (!attrs)
		goto msg_full;

	/* The broadcast link is always up */
	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
		goto attr_msg_full;

	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
		goto attr_msg_full;
	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
		goto attr_msg_full;
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no))
		goto attr_msg_full;
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no))
		goto attr_msg_full;

	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
	if (!prop)
		goto attr_msg_full;
	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0]))
		goto prop_msg_full;
	nla_nest_end(msg->skb, prop);

	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
	if (err)
		goto attr_msg_full;

846
	tipc_bclink_unlock(net);
847 848 849 850 851 852 853 854 855 856
	nla_nest_end(msg->skb, attrs);
	genlmsg_end(msg->skb, hdr);

	return 0;

prop_msg_full:
	nla_nest_cancel(msg->skb, prop);
attr_msg_full:
	nla_nest_cancel(msg->skb, attrs);
msg_full:
857
	tipc_bclink_unlock(net);
858 859 860 861
	genlmsg_cancel(msg->skb, hdr);

	return -EMSGSIZE;
}
Per Liden's avatar
Per Liden committed
862

863
int tipc_bclink_stats(struct net *net, char *buf, const u32 buf_size)
Per Liden's avatar
Per Liden committed
864
{
865 866
	int ret;
	struct tipc_stats *s;
867 868
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
Per Liden's avatar
Per Liden committed
869 870 871 872

	if (!bcl)
		return 0;

873
	tipc_bclink_lock(net);
Per Liden's avatar
Per Liden committed
874

875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896
	s = &bcl->stats;

	ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
			    "  Window:%u packets\n",
			    bcl->name, bcl->queue_limit[0]);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
			     s->recv_info, s->recv_fragments,
			     s->recv_fragmented, s->recv_bundles,
			     s->recv_bundled);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
			     s->sent_info, s->sent_fragments,
			     s->sent_fragmented, s->sent_bundles,
			     s->sent_bundled);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  RX naks:%u defs:%u dups:%u\n",
			     s->recv_nacks, s->deferred_recv, s->duplicates);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  TX naks:%u acks:%u dups:%u\n",
			     s->sent_nacks, s->sent_acks, s->retransmitted);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
897 898
			     "  Congestion link:%u  Send queue max:%u avg:%u\n",
			     s->link_congs, s->max_queue_sz,
899 900
			     s->queue_sz_counts ?
			     (s->accu_queue_sz / s->queue_sz_counts) : 0);
Per Liden's avatar
Per Liden committed
901

902
	tipc_bclink_unlock(net);
903
	return ret;
Per Liden's avatar
Per Liden committed
904 905
}

906
int tipc_bclink_reset_stats(struct net *net)
Per Liden's avatar
Per Liden committed
907
{
908 909 910
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

Per Liden's avatar
Per Liden committed
911 912 913
	if (!bcl)
		return -ENOPROTOOPT;

914
	tipc_bclink_lock(net);
Per Liden's avatar
Per Liden committed
915
	memset(&bcl->stats, 0, sizeof(bcl->stats));
916
	tipc_bclink_unlock(net);
917
	return 0;
Per Liden's avatar
Per Liden committed
918 919
}

920
int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
Per Liden's avatar
Per Liden committed
921
{
922 923 924
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

Per Liden's avatar
Per Liden committed
925 926 927 928 929
	if (!bcl)
		return -ENOPROTOOPT;
	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
		return -EINVAL;

930
	tipc_bclink_lock(net);
931
	tipc_link_set_queue_limits(bcl, limit);
932
	tipc_bclink_unlock(net);
933
	return 0;
Per Liden's avatar
Per Liden committed
934 935
}

936
int tipc_bclink_init(struct net *net)
Per Liden's avatar
Per Liden committed
937
{
938
	struct tipc_net *tn = net_generic(net, tipc_net_id);
939 940 941
	struct tipc_bcbearer *bcbearer;
	struct tipc_bclink *bclink;
	struct tipc_link *bcl;
942

943 944 945 946 947 948 949 950 951 952 953
	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
	if (!bcbearer)
		return -ENOMEM;

	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
	if (!bclink) {
		kfree(bcbearer);
		return -ENOMEM;
	}

	bcl = &bclink->link;
Per Liden's avatar
Per Liden committed
954
	bcbearer->bearer.media = &bcbearer->media;
955
	bcbearer->media.send_msg = tipc_bcbearer_send;
956
	sprintf(bcbearer->media.name, "tipc-broadcast");
Per Liden's avatar
Per Liden committed
957

958
	spin_lock_init(&bclink->lock);
959
	__skb_queue_head_init(&bcl->outqueue);
960
	__skb_queue_head_init(&bcl->deferred_queue);
961
	skb_queue_head_init(&bcl->wakeupq);
Per Liden's avatar
Per Liden committed
962
	bcl->next_out_no = 1;
963
	spin_lock_init(&bclink->node.lock);
964 965
	__skb_queue_head_init(&bclink->arrvq);
	skb_queue_head_init(&bclink->inputq);
Per Liden's avatar
Per Liden committed
966
	bcl->owner = &bclink->node;
967
	bcl->owner->net = net;
968
	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
969
	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
970
	bcl->bearer_id = MAX_BEARERS;
971
	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
Per Liden's avatar
Per Liden committed
972
	bcl->state = WORKING_WORKING;
973 974
	bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
	msg_set_prevnode(bcl->pmsg, tn->own_addr);
975
	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
976 977 978
	tn->bcbearer = bcbearer;
	tn->bclink = bclink;
	tn->bcl = bcl;
979
	return 0;
Per Liden's avatar
Per Liden committed
980 981
}

982
void tipc_bclink_stop(struct net *net)
Per Liden's avatar
Per Liden committed
983
{
984 985
	struct tipc_net *tn = net_generic(net, tipc_net_id);

986 987 988
	tipc_bclink_lock(net);
	tipc_link_purge_queues(tn->bcl);
	tipc_bclink_unlock(net);
989

990
	RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
991
	synchronize_net();
992 993
	kfree(tn->bcbearer);
	kfree(tn->bclink);
Per Liden's avatar
Per Liden committed
994 995
}

996 997 998
/**
 * tipc_nmap_add - add a node to a node map
 */
999
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) == 0) {
		nm_ptr->count++;
		nm_ptr->map[w] |= mask;
	}
}

/**
 * tipc_nmap_remove - remove a node from a node map
 */
1014
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) != 0) {
		nm_ptr->map[w] &= ~mask;
		nm_ptr->count--;
	}
}

/**
 * tipc_nmap_diff - find differences between node maps
 * @nm_a: input node map A
 * @nm_b: input node map B
 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
 */
1032 1033 1034
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff)
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
{
	int stop = ARRAY_SIZE(nm_a->map);
	int w;
	int b;
	u32 map;

	memset(nm_diff, 0, sizeof(*nm_diff));
	for (w = 0; w < stop; w++) {
		map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
		nm_diff->map[w] = map;
		if (map != 0) {
			for (b = 0 ; b < WSIZE; b++) {
				if (map & (1 << b))
					nm_diff->count++;
			}
		}
	}
}