Commit 406de755 authored by Michael S. Tsirkin's avatar Michael S. Tsirkin Committed by David S. Miller

ptr_ring: keep consumer_head valid at all times

The comment near __ptr_ring_peek says:

 * If ring is never resized, and if the pointer is merely
 * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.

but this was in fact never possible since consumer_head would sometimes
point outside the ring. Refactor the code so that it's always
pointing within a ring.

Fixes: c5ad119f ("net: sched: pfifo_fast use skb_array")
Signed-off-by: default avatarMichael S. Tsirkin <mst@redhat.com>
Acked-by: default avatarJohn Fastabend <john.fastabend@gmail.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 7ece54a6
...@@ -248,22 +248,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) ...@@ -248,22 +248,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
/* Fundamentally, what we want to do is update consumer /* Fundamentally, what we want to do is update consumer
* index and zero out the entry so producer can reuse it. * index and zero out the entry so producer can reuse it.
* Doing it naively at each consume would be as simple as: * Doing it naively at each consume would be as simple as:
* r->queue[r->consumer++] = NULL; * consumer = r->consumer;
* if (unlikely(r->consumer >= r->size)) * r->queue[consumer++] = NULL;
* r->consumer = 0; * if (unlikely(consumer >= r->size))
* consumer = 0;
* r->consumer = consumer;
* but that is suboptimal when the ring is full as producer is writing * but that is suboptimal when the ring is full as producer is writing
* out new entries in the same cache line. Defer these updates until a * out new entries in the same cache line. Defer these updates until a
* batch of entries has been consumed. * batch of entries has been consumed.
*/ */
int head = r->consumer_head++; /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
* to work correctly.
*/
int consumer_head = r->consumer_head;
int head = consumer_head++;
/* Once we have processed enough entries invalidate them in /* Once we have processed enough entries invalidate them in
* the ring all at once so producer can reuse their space in the ring. * the ring all at once so producer can reuse their space in the ring.
* We also do this when we reach end of the ring - not mandatory * We also do this when we reach end of the ring - not mandatory
* but helps keep the implementation simple. * but helps keep the implementation simple.
*/ */
if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
r->consumer_head >= r->size)) { consumer_head >= r->size)) {
/* Zero out entries in the reverse order: this way we touch the /* Zero out entries in the reverse order: this way we touch the
* cache line that producer might currently be reading the last; * cache line that producer might currently be reading the last;
* producer won't make progress and touch other cache lines * producer won't make progress and touch other cache lines
...@@ -271,12 +277,13 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) ...@@ -271,12 +277,13 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
*/ */
while (likely(head >= r->consumer_tail)) while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL; r->queue[head--] = NULL;
r->consumer_tail = r->consumer_head; r->consumer_tail = consumer_head;
} }
if (unlikely(r->consumer_head >= r->size)) { if (unlikely(consumer_head >= r->size)) {
r->consumer_head = 0; consumer_head = 0;
r->consumer_tail = 0; r->consumer_tail = 0;
} }
r->consumer_head = consumer_head;
} }
static inline void *__ptr_ring_consume(struct ptr_ring *r) static inline void *__ptr_ring_consume(struct ptr_ring *r)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment