Commit 35615994 authored by David S. Miller's avatar David S. Miller

Merge branch 'socket-sendmsg-zerocopy'

Willem de Bruijn says:

====================
socket sendmsg MSG_ZEROCOPY

Introduce zerocopy socket send flag MSG_ZEROCOPY. This extends the
shared page support (SKBTX_SHARED_FRAG) from sendpage to sendmsg.
Implement the feature for TCP initially, as large writes benefit
most.

On a send call with MSG_ZEROCOPY, the kernel pins user pages and
links these directly into the skbuff frags[] array.

Each send call with MSG_ZEROCOPY that transmits data will eventually
queue a completion notification on the error queue: a per-socket u32
incremented on each such call. A request may have to revert to copy
to succeed, for instance when a device cannot support scatter-gather
IO. In that case a flag is passed along to notify that the operation
succeeded without zerocopy optimization.

The implementation extends the existing zerocopy infra for tuntap,
vhost and xen with features needed for TCP, notably reference
counting to handle cloning on retransmit and GSO.

For more details, see also the netdev 2.1 paper and presentation at
https://netdevconf.org/2.1/session.html?debruijn

Changelog:

  v3 -> v4:
    - dropped UDP, RAW and PF_PACKET for now
        Without loopback support, datagrams are usually smaller than
        the ~8KB size threshold needed to benefit from zerocopy.
    - style: a few reverse chrismas tree
    - minor: SO_ZEROCOPY returns ENOTSUPP on unsupported protocols
    - minor: squashed SO_EE_CODE_ZEROCOPY_COPIED patch
    - minor: rebased on top of net-next with kmap_atomic fix

  v2 -> v3:
    - fix rebase conflict: SO_ZEROCOPY 59 -> 60

  v1 -> v2:
    - fix (kbuild-bot): do not remove uarg until patch 5
    - fix (kbuild-bot): move zerocopy_sg_from_iter doc with function
    - fix: remove unused extern in header file

  RFCv2 -> v1:
    - patch 2
        - review comment: in skb_copy_ubufs, always allocate order-0
            page, also when replacing compound source pages.
    - patch 3
        - fix: always queue completion notification on MSG_ZEROCOPY,
	    also if revert to copy.
	- fix: on syscall abort, correctly revert notification state
	- minor: skip queue notification on SOCK_DEAD
	- minor: replace BUG_ON with WARN_ON in recoverable error
    - patch 4
        - new: add socket option SOCK_ZEROCOPY.
	    only honor MSG_ZEROCOPY if set, ignore for legacy apps.
    - patch 5
        - fix: clear zerocopy state on skb_linearize
    - patch 6
        - fix: only coalesce if prev errqueue elem is zerocopy
	- minor: try coalescing with list tail instead of head
        - minor: merge bytelen limit patch
    - patch 7
        - new: signal when data had to be copied
    - patch 8 (tcp)
        - optimize: avoid setting PSH bit when exceeding max frags.
	    that limits GRO on the client. do not goto new_segment.
	- fix: fail on MSG_ZEROCOPY | MSG_FASTOPEN
	- minor: do not wait for memory: does not work for optmem
	- minor: simplify alloc
    - patch 9 (udp)
        - new: add PF_INET6
        - fix: attach zerocopy notification even if revert to copy
	- minor: simplify alloc size arithmetic
    - patch 10 (raw hdrinc)
        - new: add PF_INET6
    - patch 11 (pf_packet)
        - minor: simplify slightly
    - patch 12
        - new msg_zerocopy regression test: use veth pair to test
	    all protocols: ipv4/ipv6/packet, tcp/udp/raw, cork
	    all relevant ethtool settings: rx off, sg off
	    all relevant packet lengths: 0, <MAX_HEADER, max size

  RFC -> RFCv2:
    - review comment: do not loop skb with zerocopy frags onto rx:
          add skb_orphan_frags_rx to orphan even refcounted frags
	  call this in __netif_receive_skb_core, deliver_skb and tun:
	  same as commit 1080e512 ("net: orphan frags on receive")
    - fix: hold an explicit sk reference on each notification skb.
          previously relied on the reference (or wmem) held by the
	  data skb that would trigger notification, but this breaks
	  on skb_orphan.
    - fix: when aborting a send, do not inc the zerocopy counter
          this caused gaps in the notification chain
    - fix: in packet with SOCK_DGRAM, pull ll headers before calling
          zerocopy_sg_from_iter
    - fix: if sock_zerocopy_realloc does not allow coalescing,
          do not fail, just allocate a new ubuf
    - fix: in tcp, check return value of second allocation attempt
    - chg: allocate notification skbs from optmem
          to avoid affecting tcp write queue accounting (TSQ)
    - chg: limit #locked pages (ulimit) per user instead of per process
    - chg: grow notification ids from 16 to 32 bit
      - pass range [lo, hi] through 32 bit fields ee_info and ee_data
    - chg: rebased to davem-net-next on top of v4.10-rc7
    - add: limit notification coalescing
          sharing ubufs limits overhead, but delays notification until
	  the last packet is released, possibly unbounded. Add a cap.
    - tests: add snd_zerocopy_lo pf_packet test
    - tests: two bugfixes (add do_flush_tcp, ++sent not only in debug)

Limitations / Known Issues:
    - TCP may build slightly smaller than max TSO packets due to
      exceeding MAX_SKB_FRAGS frags when zerocopy pages are unaligned.
    - All SKBTX_SHARED_FRAG may require additional __skb_linearize or
      skb_copy_ubufs calls in u32, skb_find_text, similar to
      skb_checksum_help.

Notification skbuffs are allocated from optmem. For sockets that
cannot effectively coalesce notifications, the optmem max may need
to be increased to avoid hitting -ENOBUFS:

  sysctl -w net.core.optmem_max=1048576

In application load, copy avoidance shows a roughly 5% systemwide
reduction in cycles when streaming large flows and a 4-8% reduction in
wall clock time on early tensorflow test workloads.

For the single-machine veth tests to succeed, loopback support has to
be temporarily enabled by making skb_orphan_frags_rx map to
skb_orphan_frags.

* Performance

The below table shows cycles reported by perf for a netperf process
sending a single 10 Gbps TCP_STREAM. The first three columns show
Mcycles spent in the netperf process context. The second three columns
show time spent systemwide (-a -C A,B) on the two cpus that run the
process and interrupt handler. Reported is the median of at least 3
runs. std is a standard netperf, zc uses zerocopy and % is the ratio.
Netperf is pinned to cpu 2, network interrupts to cpu3, rps and rfs
are disabled and the kernel is booted with idle=halt.

NETPERF=./netperf -t TCP_STREAM -H $host -T 2 -l 30 -- -m $size

perf stat -e cycles $NETPERF
perf stat -C 2,3 -a -e cycles $NETPERF

        --process cycles--      ----cpu cycles----
           std      zc   %      std         zc   %
4K      27,609  11,217  41      49,217  39,175  79
16K     21,370   3,823  18      43,540  29,213  67
64K     20,557   2,312  11      42,189  26,910  64
256K    21,110   2,134  10      43,006  27,104  63
1M      20,987   1,610   8      42,759  25,931  61

Perf record indicates the main source of these differences. Process
cycles only at 1M writes (perf record; perf report -n):

std:
Samples: 42K of event 'cycles', Event count (approx.): 21258597313
 79.41%         33884  netperf  [kernel.kallsyms]  [k] copy_user_generic_string
  3.27%          1396  netperf  [kernel.kallsyms]  [k] tcp_sendmsg
  1.66%           694  netperf  [kernel.kallsyms]  [k] get_page_from_freelist
  0.79%           325  netperf  [kernel.kallsyms]  [k] tcp_ack
  0.43%           188  netperf  [kernel.kallsyms]  [k] __alloc_skb

zc:
Samples: 1K of event 'cycles', Event count (approx.): 1439509124
 30.36%           584  netperf.zerocop  [kernel.kallsyms]  [k] gup_pte_range
 14.63%           284  netperf.zerocop  [kernel.kallsyms]  [k] __zerocopy_sg_from_iter
  8.03%           159  netperf.zerocop  [kernel.kallsyms]  [k] skb_zerocopy_add_frags_iter
  4.84%            96  netperf.zerocop  [kernel.kallsyms]  [k] __alloc_skb
  3.10%            60  netperf.zerocop  [kernel.kallsyms]  [k] kmem_cache_alloc_node

* Safety

The number of pages that can be pinned on behalf of a user with
MSG_ZEROCOPY is bound by the locked memory ulimit.

While the kernel holds process memory pinned, a process cannot safely
reuse those pages for other purposes. Packets looped onto the receive
stack and queued to a socket can be held indefinitely. Avoid unbounded
notification latency by restricting user pages to egress paths only.
skb_orphan_frags_rx() will create a private copy of pages even for
refcounted packets when these are looped, as did skb_orphan_frags for
the original tun zerocopy implementation.

Pages are not remapped read-only. Processes can modify packet contents
while packets are in flight in the kernel path. Bytes on which kernel
control flow depends (headers) are copied to avoid TOCTTOU attacks.
Datapath integrity does not otherwise depend on payload, with three
exceptions: checksums, optional sk_filter/tc u32/.. and device +
driver logic. The effect of wrong checksums is limited to the
misbehaving process. TC filters that access contents may have to be
excluded by adding an skb_orphan_frags_rx.

Processes can also safely avoid OOM conditions by bounding the number
of bytes passed with MSG_ZEROCOPY and by removing shared pages after
transmission from their own memory map.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 84b7187c 07b65c5b
......@@ -109,4 +109,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _UAPI_ASM_SOCKET_H */
......@@ -102,5 +102,7 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _ASM_SOCKET_H */
......@@ -111,4 +111,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _ASM_IA64_SOCKET_H */
......@@ -102,4 +102,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _ASM_M32R_SOCKET_H */
......@@ -120,4 +120,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _UAPI_ASM_SOCKET_H */
......@@ -102,4 +102,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _ASM_SOCKET_H */
......@@ -101,4 +101,6 @@
#define SO_PEERGROUPS 0x4034
#define SO_ZEROCOPY 0x4035
#endif /* _UAPI_ASM_SOCKET_H */
......@@ -108,4 +108,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _ASM_SOCKET_H */
......@@ -98,6 +98,8 @@
#define SO_PEERGROUPS 0x003d
#define SO_ZEROCOPY 0x003e
/* Security levels - as per NRL IPv6 - don't actually do anything */
#define SO_SECURITY_AUTHENTICATION 0x5001
#define SO_SECURITY_ENCRYPTION_TRANSPORT 0x5002
......
......@@ -113,4 +113,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* _XTENSA_SOCKET_H */
......@@ -892,7 +892,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
sk_filter(tfile->socket.sk, skb))
goto drop;
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
goto drop;
skb_tx_timestamp(skb);
......
......@@ -533,6 +533,7 @@ static void handle_tx(struct vhost_net *net)
ubuf->callback = vhost_zerocopy_callback;
ubuf->ctx = nvq->ubufs;
ubuf->desc = nvq->upend_idx;
atomic_set(&ubuf->refcnt, 1);
msg.msg_control = ubuf;
msg.msg_controllen = sizeof(ubuf);
ubufs = nvq->ubufs;
......
......@@ -36,7 +36,8 @@ struct user_struct {
struct hlist_node uidhash_node;
kuid_t uid;
#if defined(CONFIG_PERF_EVENTS) || defined(CONFIG_BPF_SYSCALL)
#if defined(CONFIG_PERF_EVENTS) || defined(CONFIG_BPF_SYSCALL) || \
defined(CONFIG_NET)
atomic_long_t locked_vm;
#endif
};
......
......@@ -429,6 +429,7 @@ enum {
SKBTX_SCHED_TSTAMP = 1 << 6,
};
#define SKBTX_ZEROCOPY_FRAG (SKBTX_DEV_ZEROCOPY | SKBTX_SHARED_FRAG)
#define SKBTX_ANY_SW_TSTAMP (SKBTX_SW_TSTAMP | \
SKBTX_SCHED_TSTAMP)
#define SKBTX_ANY_TSTAMP (SKBTX_HW_TSTAMP | SKBTX_ANY_SW_TSTAMP)
......@@ -443,10 +444,46 @@ enum {
*/
struct ubuf_info {
void (*callback)(struct ubuf_info *, bool zerocopy_success);
void *ctx;
union {
struct {
unsigned long desc;
void *ctx;
};
struct {
u32 id;
u16 len;
u16 zerocopy:1;
u32 bytelen;
};
};
atomic_t refcnt;
struct mmpin {
struct user_struct *user;
unsigned int num_pg;
} mmp;
};
#define skb_uarg(SKB) ((struct ubuf_info *)(skb_shinfo(SKB)->destructor_arg))
struct ubuf_info *sock_zerocopy_alloc(struct sock *sk, size_t size);
struct ubuf_info *sock_zerocopy_realloc(struct sock *sk, size_t size,
struct ubuf_info *uarg);
static inline void sock_zerocopy_get(struct ubuf_info *uarg)
{
atomic_inc(&uarg->refcnt);
}
void sock_zerocopy_put(struct ubuf_info *uarg);
void sock_zerocopy_put_abort(struct ubuf_info *uarg);
void sock_zerocopy_callback(struct ubuf_info *uarg, bool success);
int skb_zerocopy_iter_stream(struct sock *sk, struct sk_buff *skb,
struct msghdr *msg, int len,
struct ubuf_info *uarg);
/* This data is invariant across clones and lives at
* the end of the header data, ie. at skb->end.
*/
......@@ -1214,6 +1251,45 @@ static inline struct skb_shared_hwtstamps *skb_hwtstamps(struct sk_buff *skb)
return &skb_shinfo(skb)->hwtstamps;
}
static inline struct ubuf_info *skb_zcopy(struct sk_buff *skb)
{
bool is_zcopy = skb && skb_shinfo(skb)->tx_flags & SKBTX_DEV_ZEROCOPY;
return is_zcopy ? skb_uarg(skb) : NULL;
}
static inline void skb_zcopy_set(struct sk_buff *skb, struct ubuf_info *uarg)
{
if (skb && uarg && !skb_zcopy(skb)) {
sock_zerocopy_get(uarg);
skb_shinfo(skb)->destructor_arg = uarg;
skb_shinfo(skb)->tx_flags |= SKBTX_ZEROCOPY_FRAG;
}
}
/* Release a reference on a zerocopy structure */
static inline void skb_zcopy_clear(struct sk_buff *skb, bool zerocopy)
{
struct ubuf_info *uarg = skb_zcopy(skb);
if (uarg) {
uarg->zerocopy = uarg->zerocopy && zerocopy;
sock_zerocopy_put(uarg);
skb_shinfo(skb)->tx_flags &= ~SKBTX_ZEROCOPY_FRAG;
}
}
/* Abort a zerocopy operation and revert zckey on error in send syscall */
static inline void skb_zcopy_abort(struct sk_buff *skb)
{
struct ubuf_info *uarg = skb_zcopy(skb);
if (uarg) {
sock_zerocopy_put_abort(uarg);
skb_shinfo(skb)->tx_flags &= ~SKBTX_ZEROCOPY_FRAG;
}
}
/**
* skb_queue_empty - check if a queue is empty
* @list: queue head
......@@ -1796,13 +1872,18 @@ static inline unsigned int skb_headlen(const struct sk_buff *skb)
return skb->len - skb->data_len;
}
static inline unsigned int skb_pagelen(const struct sk_buff *skb)
static inline unsigned int __skb_pagelen(const struct sk_buff *skb)
{
unsigned int i, len = 0;
for (i = skb_shinfo(skb)->nr_frags - 1; (int)i >= 0; i--)
len += skb_frag_size(&skb_shinfo(skb)->frags[i]);
return len + skb_headlen(skb);
return len;
}
static inline unsigned int skb_pagelen(const struct sk_buff *skb)
{
return skb_headlen(skb) + __skb_pagelen(skb);
}
/**
......@@ -2447,7 +2528,17 @@ static inline void skb_orphan(struct sk_buff *skb)
*/
static inline int skb_orphan_frags(struct sk_buff *skb, gfp_t gfp_mask)
{
if (likely(!(skb_shinfo(skb)->tx_flags & SKBTX_DEV_ZEROCOPY)))
if (likely(!skb_zcopy(skb)))
return 0;
if (skb_uarg(skb)->callback == sock_zerocopy_callback)
return 0;
return skb_copy_ubufs(skb, gfp_mask);
}
/* Frags must be orphaned, even if refcounted, if skb might loop to rx path */
static inline int skb_orphan_frags_rx(struct sk_buff *skb, gfp_t gfp_mask)
{
if (likely(!skb_zcopy(skb)))
return 0;
return skb_copy_ubufs(skb, gfp_mask);
}
......@@ -2879,6 +2970,8 @@ static inline int skb_add_data(struct sk_buff *skb,
static inline bool skb_can_coalesce(struct sk_buff *skb, int i,
const struct page *page, int off)
{
if (skb_zcopy(skb))
return false;
if (i) {
const struct skb_frag_struct *frag = &skb_shinfo(skb)->frags[i - 1];
......
......@@ -287,6 +287,7 @@ struct ucred {
#define MSG_BATCH 0x40000 /* sendmmsg(): more messages coming */
#define MSG_EOF MSG_FIN
#define MSG_ZEROCOPY 0x4000000 /* Use user data in kernel path */
#define MSG_FASTOPEN 0x20000000 /* Send data in TCP SYN */
#define MSG_CMSG_CLOEXEC 0x40000000 /* Set close_on_exec for file
descriptor received through
......
......@@ -294,6 +294,7 @@ struct sock_common {
* @sk_stamp: time stamp of last packet received
* @sk_tsflags: SO_TIMESTAMPING socket options
* @sk_tskey: counter to disambiguate concurrent tstamp requests
* @sk_zckey: counter to order MSG_ZEROCOPY notifications
* @sk_socket: Identd and reporting IO signals
* @sk_user_data: RPC layer private data
* @sk_frag: cached page frag
......@@ -462,6 +463,7 @@ struct sock {
u16 sk_tsflags;
u8 sk_shutdown;
u32 sk_tskey;
atomic_t sk_zckey;
struct socket *sk_socket;
void *sk_user_data;
#ifdef CONFIG_SECURITY
......@@ -1531,6 +1533,8 @@ struct sk_buff *sock_wmalloc(struct sock *sk, unsigned long size, int force,
gfp_t priority);
void __sock_wfree(struct sk_buff *skb);
void sock_wfree(struct sk_buff *skb);
struct sk_buff *sock_omalloc(struct sock *sk, unsigned long size,
gfp_t priority);
void skb_orphan_partial(struct sk_buff *skb);
void sock_rfree(struct sk_buff *skb);
void sock_efree(struct sk_buff *skb);
......
......@@ -104,4 +104,6 @@
#define SO_PEERGROUPS 59
#define SO_ZEROCOPY 60
#endif /* __ASM_GENERIC_SOCKET_H */
......@@ -18,10 +18,13 @@ struct sock_extended_err {
#define SO_EE_ORIGIN_ICMP 2
#define SO_EE_ORIGIN_ICMP6 3
#define SO_EE_ORIGIN_TXSTATUS 4
#define SO_EE_ORIGIN_ZEROCOPY 5
#define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
#define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1))
#define SO_EE_CODE_ZEROCOPY_COPIED 1
/**
* struct scm_timestamping - timestamps exposed through cmsg
*
......
......@@ -573,27 +573,12 @@ int skb_copy_datagram_from_iter(struct sk_buff *skb, int offset,
}
EXPORT_SYMBOL(skb_copy_datagram_from_iter);
/**
* zerocopy_sg_from_iter - Build a zerocopy datagram from an iov_iter
* @skb: buffer to copy
* @from: the source to copy from
*
* The function will first copy up to headlen, and then pin the userspace
* pages and build frags through them.
*
* Returns 0, -EFAULT or -EMSGSIZE.
*/
int zerocopy_sg_from_iter(struct sk_buff *skb, struct iov_iter *from)
int __zerocopy_sg_from_iter(struct sock *sk, struct sk_buff *skb,
struct iov_iter *from, size_t length)
{
int len = iov_iter_count(from);
int copy = min_t(int, skb_headlen(skb), len);
int frag = 0;
int frag = skb_shinfo(skb)->nr_frags;
/* copy up to skb headlen */
if (skb_copy_datagram_from_iter(skb, 0, from, copy))
return -EFAULT;
while (iov_iter_count(from)) {
while (length && iov_iter_count(from)) {
struct page *pages[MAX_SKB_FRAGS];
size_t start;
ssize_t copied;
......@@ -603,18 +588,24 @@ int zerocopy_sg_from_iter(struct sk_buff *skb, struct iov_iter *from)
if (frag == MAX_SKB_FRAGS)
return -EMSGSIZE;
copied = iov_iter_get_pages(from, pages, ~0U,
copied = iov_iter_get_pages(from, pages, length,
MAX_SKB_FRAGS - frag, &start);
if (copied < 0)
return -EFAULT;
iov_iter_advance(from, copied);
length -= copied;
truesize = PAGE_ALIGN(copied + start);
skb->data_len += copied;
skb->len += copied;
skb->truesize += truesize;
if (sk && sk->sk_type == SOCK_STREAM) {
sk->sk_wmem_queued += truesize;
sk_mem_charge(sk, truesize);
} else {
refcount_add(truesize, &skb->sk->sk_wmem_alloc);
}
while (copied) {
int size = min_t(int, copied, PAGE_SIZE - start);
skb_fill_page_desc(skb, frag++, pages[n], start, size);
......@@ -625,6 +616,28 @@ int zerocopy_sg_from_iter(struct sk_buff *skb, struct iov_iter *from)
}
return 0;
}
EXPORT_SYMBOL(__zerocopy_sg_from_iter);
/**
* zerocopy_sg_from_iter - Build a zerocopy datagram from an iov_iter
* @skb: buffer to copy
* @from: the source to copy from
*
* The function will first copy up to headlen, and then pin the userspace
* pages and build frags through them.
*
* Returns 0, -EFAULT or -EMSGSIZE.
*/
int zerocopy_sg_from_iter(struct sk_buff *skb, struct iov_iter *from)
{
int copy = min_t(int, skb_headlen(skb), iov_iter_count(from));
/* copy up to skb headlen */
if (skb_copy_datagram_from_iter(skb, 0, from, copy))
return -EFAULT;
return __zerocopy_sg_from_iter(NULL, skb, from, ~0U);
}
EXPORT_SYMBOL(zerocopy_sg_from_iter);
static int skb_copy_and_csum_datagram(const struct sk_buff *skb, int offset,
......
......@@ -1853,7 +1853,7 @@ static inline int deliver_skb(struct sk_buff *skb,
struct packet_type *pt_prev,
struct net_device *orig_dev)
{
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
return -ENOMEM;
refcount_inc(&skb->users);
return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
......@@ -4412,7 +4412,7 @@ static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
}
if (pt_prev) {
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
goto drop;
else
ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
......
......@@ -567,21 +567,10 @@ static void skb_release_data(struct sk_buff *skb)
for (i = 0; i < shinfo->nr_frags; i++)
__skb_frag_unref(&shinfo->frags[i]);
/*
* If skb buf is from userspace, we need to notify the caller
* the lower device DMA has done;
*/
if (shinfo->tx_flags & SKBTX_DEV_ZEROCOPY) {
struct ubuf_info *uarg;
uarg = shinfo->destructor_arg;
if (uarg->callback)
uarg->callback(uarg, true);
}
if (shinfo->frag_list)
kfree_skb_list(shinfo->frag_list);
skb_zcopy_clear(skb, true);
skb_free_head(skb);
}
......@@ -695,14 +684,7 @@ EXPORT_SYMBOL(kfree_skb_list);
*/
void skb_tx_error(struct sk_buff *skb)
{
if (skb_shinfo(skb)->tx_flags & SKBTX_DEV_ZEROCOPY) {
struct ubuf_info *uarg;
uarg = skb_shinfo(skb)->destructor_arg;
if (uarg->callback)
uarg->callback(uarg, false);
skb_shinfo(skb)->tx_flags &= ~SKBTX_DEV_ZEROCOPY;
}
skb_zcopy_clear(skb, true);
}
EXPORT_SYMBOL(skb_tx_error);
......@@ -915,6 +897,273 @@ struct sk_buff *skb_morph(struct sk_buff *dst, struct sk_buff *src)
}
EXPORT_SYMBOL_GPL(skb_morph);
static int mm_account_pinned_pages(struct mmpin *mmp, size_t size)
{
unsigned long max_pg, num_pg, new_pg, old_pg;
struct user_struct *user;
if (capable(CAP_IPC_LOCK) || !size)
return 0;
num_pg = (size >> PAGE_SHIFT) + 2; /* worst case */
max_pg = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
user = mmp->user ? : current_user();
do {
old_pg = atomic_long_read(&user->locked_vm);
new_pg = old_pg + num_pg;
if (new_pg > max_pg)
return -ENOBUFS;
} while (atomic_long_cmpxchg(&user->locked_vm, old_pg, new_pg) !=
old_pg);
if (!mmp->user) {
mmp->user = get_uid(user);
mmp->num_pg = num_pg;
} else {
mmp->num_pg += num_pg;
}
return 0;
}
static void mm_unaccount_pinned_pages(struct mmpin *mmp)
{
if (mmp->user) {
atomic_long_sub(mmp->num_pg, &mmp->user->locked_vm);
free_uid(mmp->user);
}
}
struct ubuf_info *sock_zerocopy_alloc(struct sock *sk, size_t size)
{
struct ubuf_info *uarg;
struct sk_buff *skb;
WARN_ON_ONCE(!in_task());
if (!sock_flag(sk, SOCK_ZEROCOPY))
return NULL;
skb = sock_omalloc(sk, 0, GFP_KERNEL);
if (!skb)
return NULL;
BUILD_BUG_ON(sizeof(*uarg) > sizeof(skb->cb));
uarg = (void *)skb->cb;
uarg->mmp.user = NULL;
if (mm_account_pinned_pages(&uarg->mmp, size)) {
kfree_skb(skb);
return NULL;
}
uarg->callback = sock_zerocopy_callback;
uarg->id = ((u32)atomic_inc_return(&sk->sk_zckey)) - 1;
uarg->len = 1;
uarg->bytelen = size;
uarg->zerocopy = 1;
atomic_set(&uarg->refcnt, 0);
sock_hold(sk);
return uarg;
}
EXPORT_SYMBOL_GPL(sock_zerocopy_alloc);
static inline struct sk_buff *skb_from_uarg(struct ubuf_info *uarg)
{
return container_of((void *)uarg, struct sk_buff, cb);
}
struct ubuf_info *sock_zerocopy_realloc(struct sock *sk, size_t size,
struct ubuf_info *uarg)
{
if (uarg) {
const u32 byte_limit = 1 << 19; /* limit to a few TSO */
u32 bytelen, next;
/* realloc only when socket is locked (TCP, UDP cork),
* so uarg->len and sk_zckey access is serialized
*/
if (!sock_owned_by_user(sk)) {
WARN_ON_ONCE(1);
return NULL;
}
bytelen = uarg->bytelen + size;
if (uarg->len == USHRT_MAX - 1 || bytelen > byte_limit) {
/* TCP can create new skb to attach new uarg */
if (sk->sk_type == SOCK_STREAM)
goto new_alloc;
return NULL;
}
next = (u32)atomic_read(&sk->sk_zckey);
if ((u32)(uarg->id + uarg->len) == next) {
if (mm_account_pinned_pages(&uarg->mmp, size))
return NULL;
uarg->len++;
uarg->bytelen = bytelen;
atomic_set(&sk->sk_zckey, ++next);
return uarg;
}
}
new_alloc:
return sock_zerocopy_alloc(sk, size);
}
EXPORT_SYMBOL_GPL(sock_zerocopy_realloc);
static bool skb_zerocopy_notify_extend(struct sk_buff *skb, u32 lo, u16 len)
{
struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
u32 old_lo, old_hi;
u64 sum_len;
old_lo = serr->ee.ee_info;
old_hi = serr->ee.ee_data;
sum_len = old_hi - old_lo + 1ULL + len;
if (sum_len >= (1ULL << 32))
return false;
if (lo != old_hi + 1)
return false;
serr->ee.ee_data += len;
return true;
}
void sock_zerocopy_callback(struct ubuf_info *uarg, bool success)
{
struct sk_buff *tail, *skb = skb_from_uarg(uarg);
struct sock_exterr_skb *serr;
struct sock *sk = skb->sk;
struct sk_buff_head *q;
unsigned long flags;
u32 lo, hi;
u16 len;
/* if !len, there was only 1 call, and it was aborted
* so do not queue a completion notification
*/
if (!uarg->len || sock_flag(sk, SOCK_DEAD))
goto release;
len = uarg->len;
lo = uarg->id;
hi = uarg->id + len - 1;
serr = SKB_EXT_ERR(skb);
memset(serr, 0, sizeof(*serr));
serr->ee.ee_errno = 0;
serr->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY;
serr->ee.ee_data = hi;
serr->ee.ee_info = lo;
if (!success)
serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
q = &sk->sk_error_queue;
spin_lock_irqsave(&q->lock, flags);
tail = skb_peek_tail(q);
if (!tail || SKB_EXT_ERR(tail)->ee.ee_origin != SO_EE_ORIGIN_ZEROCOPY ||
!skb_zerocopy_notify_extend(tail, lo, len)) {
__skb_queue_tail(q, skb);
skb = NULL;
}
spin_unlock_irqrestore(&q->lock, flags);
sk->sk_error_report(sk);
release:
consume_skb(skb);
sock_put(sk);
}
EXPORT_SYMBOL_GPL(sock_zerocopy_callback);
void sock_zerocopy_put(struct ubuf_info *uarg)
{
if (uarg && atomic_dec_and_test(&uarg->refcnt)) {
mm_unaccount_pinned_pages(&uarg->mmp);
if (uarg->callback)
uarg->callback(uarg, uarg->zerocopy);
else
consume_skb(skb_from_uarg(uarg));
}
}
EXPORT_SYMBOL_GPL(sock_zerocopy_put);
void sock_zerocopy_put_abort(struct ubuf_info *uarg)
{
if (uarg) {
struct sock *sk = skb_from_uarg(uarg)->sk;
atomic_dec(&sk->sk_zckey);
uarg->len--;
/* sock_zerocopy_put expects a ref. Most sockets take one per
* skb, which is zero on abort. tcp_sendmsg holds one extra, to
* avoid an skb send inside the main loop triggering uarg free.
*/
if (sk->sk_type != SOCK_STREAM)
atomic_inc(&uarg->refcnt);
sock_zerocopy_put(uarg);
}
}
EXPORT_SYMBOL_GPL(sock_zerocopy_put_abort);
extern int __zerocopy_sg_from_iter(struct sock *sk, struct sk_buff *skb,
struct iov_iter *from, size_t length);
int skb_zerocopy_iter_stream(struct sock *sk, struct sk_buff *skb,
struct msghdr *msg, int len,
struct ubuf_info *uarg)
{
struct ubuf_info *orig_uarg = skb_zcopy(skb);
struct iov_iter orig_iter = msg->msg_iter;
int err, orig_len = skb->len;
/* An skb can only point to one uarg. This edge case happens when
* TCP appends to an skb, but zerocopy_realloc triggered a new alloc.
*/
if (orig_uarg && uarg != orig_uarg)
return -EEXIST;
err = __zerocopy_sg_from_iter(sk, skb, &msg->msg_iter, len);
if (err == -EFAULT || (err == -EMSGSIZE && skb->len == orig_len)) {
/* Streams do not free skb on error. Reset to prev state. */
msg->msg_iter = orig_iter;
___pskb_trim(skb, orig_len);
return err;
}
skb_zcopy_set(skb, uarg);
return skb->len - orig_len;
}
EXPORT_SYMBOL_GPL(skb_zerocopy_iter_stream);
static int skb_zerocopy_clone(struct sk_buff *nskb, struct sk_buff *orig,
gfp_t gfp_mask)
{
if (skb_zcopy(orig)) {
if (skb_zcopy(nskb)) {
/* !gfp_mask callers are verified to !skb_zcopy(nskb) */
if (!gfp_mask) {
WARN_ON_ONCE(1);
return -ENOMEM;
}
if (skb_uarg(nskb) == skb_uarg(orig))
return 0;
if (skb_copy_ubufs(nskb, GFP_ATOMIC))
return -EIO;
}
skb_zcopy_set(nskb, skb_uarg(orig));
}
return 0;
}
/**
* skb_copy_ubufs - copy userspace skb frags buffers to kernel
* @skb: the skb to modify
......@@ -932,17 +1181,19 @@ EXPORT_SYMBOL_GPL(skb_morph);
*/
int skb_copy_ubufs(struct sk_buff *skb, gfp_t gfp_mask)
{
int i;
int num_frags = skb_shinfo(skb)->nr_frags;
struct page *page, *head = NULL;
struct ubuf_info *uarg = skb_shinfo(skb)->destructor_arg;
int i, new_frags;
u32 d_off;
for (i = 0; i < num_frags; i++) {
skb_frag_t *f = &skb_shinfo(skb)->frags[i];
u32 p_off, p_len, copied;
struct page *p;
u8 *vaddr;
if (!num_frags)
return 0;
if (skb_shared(skb) || skb_unclone(skb, gfp_mask))
return -EINVAL;
new_frags = (__skb_pagelen(skb) + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < new_frags; i++) {
page = alloc_page(gfp_mask);
if (!page) {
while (head) {
......@@ -952,33 +1203,51 @@ int skb_copy_ubufs(struct sk_buff *skb, gfp_t gfp_mask)
}
return -ENOMEM;
}
set_page_private(page, (unsigned long)head);
head = page;
}
page = head;
d_off = 0;
for (i = 0; i < num_frags; i++) {
skb_frag_t *f = &skb_shinfo(skb)->frags[i];
u32 p_off, p_len, copied;
struct page *p;
u8 *vaddr;
skb_frag_foreach_page(f, f->page_offset, skb_frag_size(f),
p, p_off, p_len, copied) {
u32 copy, done = 0;
vaddr = kmap_atomic(p);
memcpy(page_address(page) + copied, vaddr + p_off,
p_len);
while (done < p_len) {
if (d_off == PAGE_SIZE) {
d_off = 0;
page = (struct page *)page_private(page);
}
copy = min_t(u32, PAGE_SIZE - d_off, p_len - done);
memcpy(page_address(page) + d_off,
vaddr + p_off + done, copy);
done += copy;
d_off += copy;
}
kunmap_atomic(vaddr);
}
set_page_private(page, (unsigned long)head);
head = page;
}
/* skb frags release userspace buffers */
for (i = 0; i < num_frags; i++)
skb_frag_unref(skb, i);
uarg->callback(uarg, false);
/* skb frags point to kernel buffers */
for (i = num_frags - 1; i >= 0; i--) {
__skb_fill_page_desc(skb, i, head, 0,
skb_shinfo(skb)->frags[i].size);
for (i = 0; i < new_frags - 1; i++) {
__skb_fill_page_desc(skb, i, head, 0, PAGE_SIZE);
head = (struct page *)page_private(head);
}
__skb_fill_page_desc(skb, new_frags - 1, head, 0, d_off);
skb_shinfo(skb)->nr_frags = new_frags;
skb_shinfo(skb)->tx_flags &= ~SKBTX_DEV_ZEROCOPY;
skb_zcopy_clear(skb, false);
return 0;
}
EXPORT_SYMBOL_GPL(skb_copy_ubufs);
......@@ -1139,7 +1408,8 @@ struct sk_buff *__pskb_copy_fclone(struct sk_buff *skb, int headroom,
if (skb_shinfo(skb)->nr_frags) {
int i;
if (skb_orphan_frags(skb, gfp_mask)) {
if (skb_orphan_frags(skb, gfp_mask) ||
skb_zerocopy_clone(n, skb, gfp_mask)) {
kfree_skb(n);
n = NULL;
goto out;
......@@ -1216,9 +1486,10 @@ int pskb_expand_head(struct sk_buff *skb, int nhead, int ntail,
* be since all we did is relocate the values
*/
if (skb_cloned(skb)) {
/* copy this zero copy skb frags */
if (skb_orphan_frags(skb, gfp_mask))
goto nofrags;
if (skb_zcopy(skb))
atomic_inc(&skb_uarg(skb)->refcnt);
for (i = 0; i < skb_shinfo(skb)->nr_frags; i++)
skb_frag_ref(skb, i);
......@@ -1713,6 +1984,9 @@ void *__pskb_pull_tail(struct sk_buff *skb, int delta)
skb->tail += delta;
skb->data_len -= delta;
if (!skb->data_len)
skb_zcopy_clear(skb, false);
return skb_tail_pointer(skb);
}
EXPORT_SYMBOL(__pskb_pull_tail);
......@@ -2468,6 +2742,7 @@ skb_zerocopy(struct sk_buff *to, struct sk_buff *from, int len, int hlen)
skb_tx_error(from);
return -ENOMEM;
}
skb_zerocopy_clone(to, from, GFP_ATOMIC);
for (i = 0; i < skb_shinfo(from)->nr_frags; i++) {
if (!len)
......@@ -2765,6 +3040,7 @@ void skb_split(struct sk_buff *skb, struct sk_buff *skb1, const u32 len)
skb_shinfo(skb1)->tx_flags |= skb_shinfo(skb)->tx_flags &
SKBTX_SHARED_FRAG;
skb_zerocopy_clone(skb1, skb, 0);
if (len < pos) /* Split line is inside header. */
skb_split_inside_header(skb, skb1, len, pos);
else /* Second chunk has no header, nothing to copy. */
......@@ -2808,6 +3084,8 @@ int skb_shift(struct sk_buff *tgt, struct sk_buff *skb, int shiftlen)
if (skb_headlen(skb))
return 0;
if (skb_zcopy(tgt) || skb_zcopy(skb))
return 0;
todo = shiftlen;
from = 0;
......@@ -3381,6 +3659,8 @@ struct sk_buff *skb_segment(struct sk_buff *head_skb,
skb_shinfo(nskb)->tx_flags |= skb_shinfo(head_skb)->tx_flags &
SKBTX_SHARED_FRAG;
if (skb_zerocopy_clone(nskb, head_skb, GFP_ATOMIC))
goto err;
while (pos < offset + len) {
if (i >= nfrags) {
......@@ -4504,6 +4784,8 @@ bool skb_try_coalesce(struct sk_buff *to, struct sk_buff *from,
if (skb_has_frag_list(to) || skb_has_frag_list(from))
return false;
if (skb_zcopy(to) || skb_zcopy(from))
return false;
if (skb_headlen(from) != 0) {
struct page *page;
......
......@@ -1055,6 +1055,20 @@ int sock_setsockopt(struct socket *sock, int level, int optname,
if (val == 1)
dst_negative_advice(sk);
break;
case SO_ZEROCOPY:
if (sk->sk_family != PF_INET && sk->sk_family != PF_INET6)
ret = -ENOTSUPP;
else if (sk->sk_protocol != IPPROTO_TCP)
ret = -ENOTSUPP;
else if (sk->sk_state != TCP_CLOSE)
ret = -EBUSY;
else if (val < 0 || val > 1)
ret = -EINVAL;
else
sock_valbool_flag(sk, SOCK_ZEROCOPY, valbool);
break;
default:
ret = -ENOPROTOOPT;
break;
......@@ -1383,6 +1397,10 @@ int sock_getsockopt(struct socket *sock, int level, int optname,
v.val64 = sock_gen_cookie(sk);
break;
case SO_ZEROCOPY:
v.val = sock_flag(sk, SOCK_ZEROCOPY);
break;
default:
/* We implement the SO_SNDLOWAT etc to not be settable
* (1003.1g 7).
......@@ -1670,6 +1688,7 @@ struct sock *sk_clone_lock(const struct sock *sk, const gfp_t priority)
atomic_set(&newsk->sk_drops, 0);
newsk->sk_send_head = NULL;
newsk->sk_userlocks = sk->sk_userlocks & ~SOCK_BINDPORT_LOCK;
atomic_set(&newsk->sk_zckey, 0);
sock_reset_flag(newsk, SOCK_DONE);
......@@ -1923,6 +1942,33 @@ struct sk_buff *sock_wmalloc(struct sock *sk, unsigned long size, int force,
}
EXPORT_SYMBOL(sock_wmalloc);
static void sock_ofree(struct sk_buff *skb)
{
struct sock *sk = skb->sk;
atomic_sub(skb->truesize, &sk->sk_omem_alloc);
}
struct sk_buff *sock_omalloc(struct sock *sk, unsigned long size,
gfp_t priority)
{
struct sk_buff *skb;
/* small safe race: SKB_TRUESIZE may differ from final skb->truesize */
if (atomic_read(&sk->sk_omem_alloc) + SKB_TRUESIZE(size) >
sysctl_optmem_max)
return NULL;
skb = alloc_skb(size, priority);
if (!skb)
return NULL;
atomic_add(skb->truesize, &sk->sk_omem_alloc);
skb->sk = sk;
skb->destructor = sock_ofree;
return skb;
}
/*
* Allocate a memory block from the socket's option memory buffer.
*/
......@@ -2695,6 +2741,7 @@ void sock_init_data(struct socket *sock, struct sock *sk)
sk->sk_sndtimeo = MAX_SCHEDULE_TIMEOUT;
sk->sk_stamp = SK_DEFAULT_STAMP;
atomic_set(&sk->sk_zckey, 0);
#ifdef CONFIG_NET_RX_BUSY_POLL
sk->sk_napi_id = 0;
......
......@@ -1165,6 +1165,7 @@ static int tcp_sendmsg_fastopen(struct sock *sk, struct msghdr *msg,
int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
{
struct tcp_sock *tp = tcp_sk(sk);
struct ubuf_info *uarg = NULL;
struct sk_buff *skb;
struct sockcm_cookie sockc;
int flags, err, copied = 0;
......@@ -1174,6 +1175,26 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
long timeo;
flags = msg->msg_flags;
if (flags & MSG_ZEROCOPY && size) {
if (sk->sk_state != TCP_ESTABLISHED) {
err = -EINVAL;
goto out_err;
}
skb = tcp_send_head(sk) ? tcp_write_queue_tail(sk) : NULL;
uarg = sock_zerocopy_realloc(sk, size, skb_zcopy(skb));
if (!uarg) {
err = -ENOBUFS;
goto out_err;
}
/* skb may be freed in main loop, keep extra ref on uarg */
sock_zerocopy_get(uarg);
if (!(sk_check_csum_caps(sk) && sk->sk_route_caps & NETIF_F_SG))
uarg->zerocopy = 0;
}
if (unlikely(flags & MSG_FASTOPEN || inet_sk(sk)->defer_connect)) {
err = tcp_sendmsg_fastopen(sk, msg, &copied_syn, size);
if (err == -EINPROGRESS && copied_syn > 0)
......@@ -1297,7 +1318,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
err = skb_add_data_nocache(sk, skb, &msg->msg_iter, copy);
if (err)
goto do_fault;
} else {
} else if (!uarg || !uarg->zerocopy) {
bool merge = true;
int i = skb_shinfo(skb)->nr_frags;
struct page_frag *pfrag = sk_page_frag(sk);
......@@ -1335,6 +1356,13 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
page_ref_inc(pfrag->page);
}
pfrag->offset += copy;
} else {
err = skb_zerocopy_iter_stream(sk, skb, msg, copy, uarg);
if (err == -EMSGSIZE || err == -EEXIST)
goto new_segment;
if (err < 0)
goto do_error;
copy = err;
}
if (!copied)
......@@ -1381,6 +1409,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
tcp_push(sk, flags, mss_now, tp->nonagle, size_goal);
}
out_nopush:
sock_zerocopy_put(uarg);
return copied + copied_syn;
do_fault:
......@@ -1397,6 +1426,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
if (copied + copied_syn)
goto out;
out_err:
sock_zerocopy_put_abort(uarg);
err = sk_stream_error(sk, flags, err);
/* make sure we wake any epoll edge trigger waiter */
if (unlikely(skb_queue_len(&sk->sk_write_queue) == 0 &&
......
msg_zerocopy
socket
psock_fanout
psock_tpacket
......
......@@ -7,7 +7,7 @@ TEST_PROGS := run_netsocktests run_afpackettests test_bpf.sh netdevice.sh
TEST_GEN_FILES = socket
TEST_GEN_FILES += psock_fanout psock_tpacket
TEST_GEN_FILES += reuseport_bpf reuseport_bpf_cpu reuseport_bpf_numa
TEST_GEN_FILES += reuseport_dualstack
TEST_GEN_FILES += reuseport_dualstack msg_zerocopy
include ../lib.mk
......
/* Evaluate MSG_ZEROCOPY
*
* Send traffic between two processes over one of the supported
* protocols and modes:
*
* PF_INET/PF_INET6
* - SOCK_STREAM
* - SOCK_DGRAM
* - SOCK_DGRAM with UDP_CORK
* - SOCK_RAW
* - SOCK_RAW with IP_HDRINCL
*
* PF_PACKET
* - SOCK_DGRAM
* - SOCK_RAW
*
* Start this program on two connected hosts, one in send mode and
* the other with option '-r' to put it in receiver mode.
*
* If zerocopy mode ('-z') is enabled, the sender will verify that
* the kernel queues completions on the error queue for all zerocopy
* transfers.
*/
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <error.h>
#include <errno.h>
#include <limits.h>
#include <linux/errqueue.h>
#include <linux/if_packet.h>
#include <linux/ipv6.h>
#include <linux/socket.h>
#include <linux/sockios.h>
#include <net/ethernet.h>
#include <net/if.h>
#include <netinet/ip.h>
#include <netinet/ip6.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <poll.h>
#include <sched.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifndef SO_EE_ORIGIN_ZEROCOPY
#define SO_EE_ORIGIN_ZEROCOPY SO_EE_ORIGIN_UPAGE
#endif
#ifndef SO_ZEROCOPY
#define SO_ZEROCOPY 59
#endif
#ifndef SO_EE_CODE_ZEROCOPY_COPIED
#define SO_EE_CODE_ZEROCOPY_COPIED 1
#endif
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY 0x4000000
#endif
static int cfg_cork;
static bool cfg_cork_mixed;
static int cfg_cpu = -1; /* default: pin to last cpu */
static int cfg_family = PF_UNSPEC;
static int cfg_ifindex = 1;
static int cfg_payload_len;
static int cfg_port = 8000;
static bool cfg_rx;
static int cfg_runtime_ms = 4200;
static int cfg_verbose;
static int cfg_waittime_ms = 500;
static bool cfg_zerocopy;
static socklen_t cfg_alen;
static struct sockaddr_storage cfg_dst_addr;
static struct sockaddr_storage cfg_src_addr;
static char payload[IP_MAXPACKET];
static long packets, bytes, completions, expected_completions;
static int zerocopied = -1;
static uint32_t next_completion;
static unsigned long gettimeofday_ms(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
}
static uint16_t get_ip_csum(const uint16_t *start, int num_words)
{
unsigned long sum = 0;
int i;
for (i = 0; i < num_words; i++)
sum += start[i];
while (sum >> 16)
sum = (sum & 0xFFFF) + (sum >> 16);
return ~sum;
}
static int do_setcpu(int cpu)
{
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(cpu, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(1, 0, "setaffinity %d", cpu);
if (cfg_verbose)
fprintf(stderr, "cpu: %u\n", cpu);
return 0;
}
static void do_setsockopt(int fd, int level, int optname, int val)
{
if (setsockopt(fd, level, optname, &val, sizeof(val)))
error(1, errno, "setsockopt %d.%d: %d", level, optname, val);
}
static int do_poll(int fd, int events)
{
struct pollfd pfd;
int ret;
pfd.events = events;
pfd.revents = 0;
pfd.fd = fd;
ret = poll(&pfd, 1, cfg_waittime_ms);
if (ret == -1)
error(1, errno, "poll");
return ret && (pfd.revents & events);
}
static int do_accept(int fd)
{
int fda = fd;
fd = accept(fda, NULL, NULL);
if (fd == -1)
error(1, errno, "accept");
if (close(fda))
error(1, errno, "close listen sock");
return fd;
}
static bool do_sendmsg(int fd, struct msghdr *msg, bool do_zerocopy)
{
int ret, len, i, flags;
len = 0;
for (i = 0; i < msg->msg_iovlen; i++)
len += msg->msg_iov[i].iov_len;
flags = MSG_DONTWAIT;
if (do_zerocopy)
flags |= MSG_ZEROCOPY;
ret = sendmsg(fd, msg, flags);
if (ret == -1 && errno == EAGAIN)
return false;
if (ret == -1)
error(1, errno, "send");
if (cfg_verbose && ret != len)
fprintf(stderr, "send: ret=%u != %u\n", ret, len);
if (len) {
packets++;
bytes += ret;
if (do_zerocopy && ret)
expected_completions++;
}
return true;
}
static void do_sendmsg_corked(int fd, struct msghdr *msg)
{
bool do_zerocopy = cfg_zerocopy;
int i, payload_len, extra_len;
/* split up the packet. for non-multiple, make first buffer longer */
payload_len = cfg_payload_len / cfg_cork;
extra_len = cfg_payload_len - (cfg_cork * payload_len);
do_setsockopt(fd, IPPROTO_UDP, UDP_CORK, 1);
for (i = 0; i < cfg_cork; i++) {
/* in mixed-frags mode, alternate zerocopy and copy frags
* start with non-zerocopy, to ensure attach later works
*/
if (cfg_cork_mixed)
do_zerocopy = (i & 1);
msg->msg_iov[0].iov_len = payload_len + extra_len;
extra_len = 0;
do_sendmsg(fd, msg, do_zerocopy);
}
do_setsockopt(fd, IPPROTO_UDP, UDP_CORK, 0);
}
static int setup_iph(struct iphdr *iph, uint16_t payload_len)
{
struct sockaddr_in *daddr = (void *) &cfg_dst_addr;
struct sockaddr_in *saddr = (void *) &cfg_src_addr;
memset(iph, 0, sizeof(*iph));
iph->version = 4;
iph->tos = 0;
iph->ihl = 5;
iph->ttl = 2;
iph->saddr = saddr->sin_addr.s_addr;
iph->daddr = daddr->sin_addr.s_addr;
iph->protocol = IPPROTO_EGP;
iph->tot_len = htons(sizeof(*iph) + payload_len);
iph->check = get_ip_csum((void *) iph, iph->ihl << 1);
return sizeof(*iph);
}
static int setup_ip6h(struct ipv6hdr *ip6h, uint16_t payload_len)
{
struct sockaddr_in6 *daddr = (void *) &cfg_dst_addr;
struct sockaddr_in6 *saddr = (void *) &cfg_src_addr;
memset(ip6h, 0, sizeof(*ip6h));
ip6h->version = 6;
ip6h->payload_len = htons(payload_len);
ip6h->nexthdr = IPPROTO_EGP;
ip6h->hop_limit = 2;
ip6h->saddr = saddr->sin6_addr;
ip6h->daddr = daddr->sin6_addr;
return sizeof(*ip6h);
}
static void setup_sockaddr(int domain, const char *str_addr, void *sockaddr)
{
struct sockaddr_in6 *addr6 = (void *) sockaddr;
struct sockaddr_in *addr4 = (void *) sockaddr;
switch (domain) {
case PF_INET:
addr4->sin_family = AF_INET;
addr4->sin_port = htons(cfg_port);
if (inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
error(1, 0, "ipv4 parse error: %s", str_addr);
break;
case PF_INET6:
addr6->sin6_family = AF_INET6;
addr6->sin6_port = htons(cfg_port);
if (inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
error(1, 0, "ipv6 parse error: %s", str_addr);
break;
default:
error(1, 0, "illegal domain");
}
}
static int do_setup_tx(int domain, int type, int protocol)
{
int fd;
fd = socket(domain, type, protocol);
if (fd == -1)
error(1, errno, "socket t");
do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21);
if (cfg_zerocopy)
do_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, 1);
if (domain != PF_PACKET)
if (connect(fd, (void *) &cfg_dst_addr, cfg_alen))
error(1, errno, "connect");
return fd;
}
static bool do_recv_completion(int fd)
{
struct sock_extended_err *serr;
struct msghdr msg = {};
struct cmsghdr *cm;
uint32_t hi, lo, range;
int ret, zerocopy;
char control[100];
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
ret = recvmsg(fd, &msg, MSG_ERRQUEUE);
if (ret == -1 && errno == EAGAIN)
return false;
if (ret == -1)
error(1, errno, "recvmsg notification");
if (msg.msg_flags & MSG_CTRUNC)
error(1, errno, "recvmsg notification: truncated");
cm = CMSG_FIRSTHDR(&msg);
if (!cm)
error(1, 0, "cmsg: no cmsg");
if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
(cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR) ||
(cm->cmsg_level == SOL_PACKET && cm->cmsg_type == PACKET_TX_TIMESTAMP)))
error(1, 0, "serr: wrong type: %d.%d",
cm->cmsg_level, cm->cmsg_type);
serr = (void *) CMSG_DATA(cm);
if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY)
error(1, 0, "serr: wrong origin: %u", serr->ee_origin);
if (serr->ee_errno != 0)
error(1, 0, "serr: wrong error code: %u", serr->ee_errno);
hi = serr->ee_data;
lo = serr->ee_info;
range = hi - lo + 1;
/* Detect notification gaps. These should not happen often, if at all.
* Gaps can occur due to drops, reordering and retransmissions.
*/
if (lo != next_completion)
fprintf(stderr, "gap: %u..%u does not append to %u\n",
lo, hi, next_completion);
next_completion = hi + 1;
zerocopy = !(serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED);
if (zerocopied == -1)
zerocopied = zerocopy;
else if (zerocopied != zerocopy) {
fprintf(stderr, "serr: inconsistent\n");
zerocopied = zerocopy;
}
if (cfg_verbose >= 2)
fprintf(stderr, "completed: %u (h=%u l=%u)\n",
range, hi, lo);
completions += range;
return true;
}
/* Read all outstanding messages on the errqueue */
static void do_recv_completions(int fd)
{
while (do_recv_completion(fd)) {}
}
/* Wait for all remaining completions on the errqueue */
static void do_recv_remaining_completions(int fd)
{
int64_t tstop = gettimeofday_ms() + cfg_waittime_ms;
while (completions < expected_completions &&
gettimeofday_ms() < tstop) {
if (do_poll(fd, POLLERR))
do_recv_completions(fd);
}
if (completions < expected_completions)
error(1, 0, "missing notifications: %lu < %lu\n",
completions, expected_completions);
}
static void do_tx(int domain, int type, int protocol)
{
struct iovec iov[3] = { {0} };
struct sockaddr_ll laddr;
struct msghdr msg = {0};
struct ethhdr eth;
union {
struct ipv6hdr ip6h;
struct iphdr iph;
} nh;
uint64_t tstop;
int fd;
fd = do_setup_tx(domain, type, protocol);
if (domain == PF_PACKET) {
uint16_t proto = cfg_family == PF_INET ? ETH_P_IP : ETH_P_IPV6;
/* sock_raw passes ll header as data */
if (type == SOCK_RAW) {
memset(eth.h_dest, 0x06, ETH_ALEN);
memset(eth.h_source, 0x02, ETH_ALEN);
eth.h_proto = htons(proto);
iov[0].iov_base = &eth;
iov[0].iov_len = sizeof(eth);
msg.msg_iovlen++;
}
/* both sock_raw and sock_dgram expect name */
memset(&laddr, 0, sizeof(laddr));
laddr.sll_family = AF_PACKET;
laddr.sll_ifindex = cfg_ifindex;
laddr.sll_protocol = htons(proto);
laddr.sll_halen = ETH_ALEN;
memset(laddr.sll_addr, 0x06, ETH_ALEN);
msg.msg_name = &laddr;
msg.msg_namelen = sizeof(laddr);
}
/* packet and raw sockets with hdrincl must pass network header */
if (domain == PF_PACKET || protocol == IPPROTO_RAW) {
if (cfg_family == PF_INET)
iov[1].iov_len = setup_iph(&nh.iph, cfg_payload_len);
else
iov[1].iov_len = setup_ip6h(&nh.ip6h, cfg_payload_len);
iov[1].iov_base = (void *) &nh;
msg.msg_iovlen++;
}
iov[2].iov_base = payload;
iov[2].iov_len = cfg_payload_len;
msg.msg_iovlen++;
msg.msg_iov = &iov[3 - msg.msg_iovlen];
tstop = gettimeofday_ms() + cfg_runtime_ms;
do {
if (cfg_cork)
do_sendmsg_corked(fd, &msg);
else
do_sendmsg(fd, &msg, cfg_zerocopy);
while (!do_poll(fd, POLLOUT)) {
if (cfg_zerocopy)
do_recv_completions(fd);
}
} while (gettimeofday_ms() < tstop);
if (cfg_zerocopy)
do_recv_remaining_completions(fd);
if (close(fd))
error(1, errno, "close");
fprintf(stderr, "tx=%lu (%lu MB) txc=%lu zc=%c\n",
packets, bytes >> 20, completions,
zerocopied == 1 ? 'y' : 'n');
}
static int do_setup_rx(int domain, int type, int protocol)
{
int fd;
/* If tx over PF_PACKET, rx over PF_INET(6)/SOCK_RAW,
* to recv the only copy of the packet, not a clone
*/
if (domain == PF_PACKET)
error(1, 0, "Use PF_INET/SOCK_RAW to read");
if (type == SOCK_RAW && protocol == IPPROTO_RAW)
error(1, 0, "IPPROTO_RAW: not supported on Rx");
fd = socket(domain, type, protocol);
if (fd == -1)
error(1, errno, "socket r");
do_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, 1 << 21);
do_setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT, 1 << 16);
do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
if (bind(fd, (void *) &cfg_dst_addr, cfg_alen))
error(1, errno, "bind");
if (type == SOCK_STREAM) {
if (listen(fd, 1))
error(1, errno, "listen");
fd = do_accept(fd);
}
return fd;
}
/* Flush all outstanding bytes for the tcp receive queue */
static void do_flush_tcp(int fd)
{
int ret;
/* MSG_TRUNC flushes up to len bytes */
ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
if (ret == -1 && errno == EAGAIN)
return;
if (ret == -1)
error(1, errno, "flush");
if (!ret)
return;
packets++;
bytes += ret;
}
/* Flush all outstanding datagrams. Verify first few bytes of each. */
static void do_flush_datagram(int fd, int type)
{
int ret, off = 0;
char buf[64];
/* MSG_TRUNC will return full datagram length */
ret = recv(fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_TRUNC);
if (ret == -1 && errno == EAGAIN)
return;
/* raw ipv4 return with header, raw ipv6 without */
if (cfg_family == PF_INET && type == SOCK_RAW) {
off += sizeof(struct iphdr);
ret -= sizeof(struct iphdr);
}
if (ret == -1)
error(1, errno, "recv");
if (ret != cfg_payload_len)
error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len);
if (ret > sizeof(buf) - off)
ret = sizeof(buf) - off;
if (memcmp(buf + off, payload, ret))
error(1, 0, "recv: data mismatch");
packets++;
bytes += cfg_payload_len;
}
static void do_rx(int domain, int type, int protocol)
{
uint64_t tstop;
int fd;
fd = do_setup_rx(domain, type, protocol);
tstop = gettimeofday_ms() + cfg_runtime_ms;
do {
if (type == SOCK_STREAM)
do_flush_tcp(fd);
else
do_flush_datagram(fd, type);
do_poll(fd, POLLIN);
} while (gettimeofday_ms() < tstop);
if (close(fd))
error(1, errno, "close");
fprintf(stderr, "rx=%lu (%lu MB)\n", packets, bytes >> 20);
}
static void do_test(int domain, int type, int protocol)
{
int i;
if (cfg_cork && (domain == PF_PACKET || type != SOCK_DGRAM))
error(1, 0, "can only cork udp sockets");
do_setcpu(cfg_cpu);
for (i = 0; i < IP_MAXPACKET; i++)
payload[i] = 'a' + (i % 26);
if (cfg_rx)
do_rx(domain, type, protocol);
else
do_tx(domain, type, protocol);
}
static void usage(const char *filepath)
{
error(1, 0, "Usage: %s [options] <test>", filepath);
}
static void parse_opts(int argc, char **argv)
{
const int max_payload_len = sizeof(payload) -
sizeof(struct ipv6hdr) -
sizeof(struct tcphdr) -
40 /* max tcp options */;
int c;
cfg_payload_len = max_payload_len;
while ((c = getopt(argc, argv, "46c:C:D:i:mp:rs:S:t:vz")) != -1) {
switch (c) {
case '4':
if (cfg_family != PF_UNSPEC)
error(1, 0, "Pass one of -4 or -6");
cfg_family = PF_INET;
cfg_alen = sizeof(struct sockaddr_in);
break;
case '6':
if (cfg_family != PF_UNSPEC)
error(1, 0, "Pass one of -4 or -6");
cfg_family = PF_INET6;
cfg_alen = sizeof(struct sockaddr_in6);
break;
case 'c':
cfg_cork = strtol(optarg, NULL, 0);
break;
case 'C':
cfg_cpu = strtol(optarg, NULL, 0);
break;
case 'D':
setup_sockaddr(cfg_family, optarg, &cfg_dst_addr);
break;
case 'i':
cfg_ifindex = if_nametoindex(optarg);
if (cfg_ifindex == 0)
error(1, errno, "invalid iface: %s", optarg);
break;
case 'm':
cfg_cork_mixed = true;
break;
case 'p':
cfg_port = htons(strtoul(optarg, NULL, 0));
break;
case 'r':
cfg_rx = true;
break;
case 's':
cfg_payload_len = strtoul(optarg, NULL, 0);
break;
case 'S':
setup_sockaddr(cfg_family, optarg, &cfg_src_addr);
break;
case 't':
cfg_runtime_ms = 200 + strtoul(optarg, NULL, 10) * 1000;
break;
case 'v':
cfg_verbose++;
break;
case 'z':
cfg_zerocopy = true;
break;
}
}
if (cfg_payload_len > max_payload_len)
error(1, 0, "-s: payload exceeds max (%d)", max_payload_len);
if (cfg_cork_mixed && (!cfg_zerocopy || !cfg_cork))
error(1, 0, "-m: cork_mixed requires corking and zerocopy");
if (optind != argc - 1)
usage(argv[0]);
}
int main(int argc, char **argv)
{
const char *cfg_test;
parse_opts(argc, argv);
cfg_test = argv[argc - 1];
if (!strcmp(cfg_test, "packet"))
do_test(PF_PACKET, SOCK_RAW, 0);
else if (!strcmp(cfg_test, "packet_dgram"))
do_test(PF_PACKET, SOCK_DGRAM, 0);
else if (!strcmp(cfg_test, "raw"))
do_test(cfg_family, SOCK_RAW, IPPROTO_EGP);
else if (!strcmp(cfg_test, "raw_hdrincl"))
do_test(cfg_family, SOCK_RAW, IPPROTO_RAW);
else if (!strcmp(cfg_test, "tcp"))
do_test(cfg_family, SOCK_STREAM, 0);
else if (!strcmp(cfg_test, "udp"))
do_test(cfg_family, SOCK_DGRAM, 0);
else
error(1, 0, "unknown cfg_test %s", cfg_test);
return 0;
}
#!/bin/bash
#
# Send data between two processes across namespaces
# Run twice: once without and once with zerocopy
set -e
readonly DEV="veth0"
readonly DEV_MTU=65535
readonly BIN="./msg_zerocopy"
readonly RAND="$(mktemp -u XXXXXX)"
readonly NSPREFIX="ns-${RAND}"
readonly NS1="${NSPREFIX}1"
readonly NS2="${NSPREFIX}2"
readonly SADDR4='192.168.1.1'
readonly DADDR4='192.168.1.2'
readonly SADDR6='fd::1'
readonly DADDR6='fd::2'
readonly path_sysctl_mem="net.core.optmem_max"
# Argument parsing
if [[ "$#" -lt "2" ]]; then
echo "Usage: $0 [4|6] [tcp|udp|raw|raw_hdrincl|packet|packet_dgram] <args>"
exit 1
fi
readonly IP="$1"
shift
readonly TXMODE="$1"
shift
readonly EXTRA_ARGS="$@"
# Argument parsing: configure addresses
if [[ "${IP}" == "4" ]]; then
readonly SADDR="${SADDR4}"
readonly DADDR="${DADDR4}"
elif [[ "${IP}" == "6" ]]; then
readonly SADDR="${SADDR6}"
readonly DADDR="${DADDR6}"
else
echo "Invalid IP version ${IP}"
exit 1
fi
# Argument parsing: select receive mode
#
# This differs from send mode for
# - packet: use raw recv, because packet receives skb clones
# - raw_hdrinc: use raw recv, because hdrincl is a tx-only option
case "${TXMODE}" in
'packet' | 'packet_dgram' | 'raw_hdrincl')
RXMODE='raw'
;;
*)
RXMODE="${TXMODE}"
;;
esac
# Start of state changes: install cleanup handler
save_sysctl_mem="$(sysctl -n ${path_sysctl_mem})"
cleanup() {
ip netns del "${NS2}"
ip netns del "${NS1}"
sysctl -w -q "${path_sysctl_mem}=${save_sysctl_mem}"
}
trap cleanup EXIT
# Configure system settings
sysctl -w -q "${path_sysctl_mem}=1000000"
# Create virtual ethernet pair between network namespaces
ip netns add "${NS1}"
ip netns add "${NS2}"
ip link add "${DEV}" mtu "${DEV_MTU}" netns "${NS1}" type veth \
peer name "${DEV}" mtu "${DEV_MTU}" netns "${NS2}"
# Bring the devices up
ip -netns "${NS1}" link set "${DEV}" up
ip -netns "${NS2}" link set "${DEV}" up
# Set fixed MAC addresses on the devices
ip -netns "${NS1}" link set dev "${DEV}" address 02:02:02:02:02:02
ip -netns "${NS2}" link set dev "${DEV}" address 06:06:06:06:06:06
# Add fixed IP addresses to the devices
ip -netns "${NS1}" addr add 192.168.1.1/24 dev "${DEV}"
ip -netns "${NS2}" addr add 192.168.1.2/24 dev "${DEV}"
ip -netns "${NS1}" addr add fd::1/64 dev "${DEV}" nodad
ip -netns "${NS2}" addr add fd::2/64 dev "${DEV}" nodad
# Optionally disable sg or csum offload to test edge cases
# ip netns exec "${NS1}" ethtool -K "${DEV}" sg off
do_test() {
local readonly ARGS="$1"
echo "ipv${IP} ${TXMODE} ${ARGS}"
ip netns exec "${NS2}" "${BIN}" "-${IP}" -i "${DEV}" -t 2 -C 2 -S "${SADDR}" -D "${DADDR}" ${ARGS} -r "${RXMODE}" &
sleep 0.2
ip netns exec "${NS1}" "${BIN}" "-${IP}" -i "${DEV}" -t 1 -C 3 -S "${SADDR}" -D "${DADDR}" ${ARGS} "${TXMODE}"
wait
}
do_test "${EXTRA_ARGS}"
do_test "-z ${EXTRA_ARGS}"
echo ok
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment