Commit 48433419 authored by David S. Miller's avatar David S. Miller

Merge branch 'strparser'

Tom Herbert says:

====================
strp: Stream parser for messages

This patch set introduces a utility for parsing application layer
protocol messages in a TCP stream. This is a generalization of the
mechanism implemented of Kernel Connection Multiplexor.

This patch set adapts KCM to use the strparser. We expect that kTLS
can use this mechanism also. RDS would probably be another candidate
to use a common stream parsing mechanism.

The API includes a context structure, a set of callbacks, utility
functions, and a data ready function. The callbacks include
a parse_msg function that is called to perform parsing (e.g.
BPF parsing in case of KCM), and a rcv_msg function that is called
when a full message has been completed.

For strparser we specify the return codes from the parser to allow
the backend to indicate that control of the socket should be
transferred back to userspace to handle some exceptions in the
stream: The return values are:

      >0 : indicates length of successfully parsed message
       0  : indicates more data must be received to parse the message
       -ESTRPIPE : current message should not be processed by the
          kernel, return control of the socket to userspace which
          can proceed to read the messages itself
       other < 0 : Error is parsing, give control back to userspace
          assuming that synchronization is lost and the stream
          is unrecoverable (application expected to close TCP socket)

There is one issue I haven't been able to fully resolve. If parse_msg
returns ESTRPIPE (wants control back to userspace) the parser may
already have consumed some bytes of the message. There is no way to
put bytes back into the TCP receive queue and tcp_read_sock does not
allow an easy way to peek messages. In lieu of a better solution, we
return ENODATA on the socket to indicate that the data stream is
unrecoverable (application needs to close socket). This condition
should only happen if an application layer message header is split
across two skbuffs and parsing just the first skbuff wasn't sufficient
to determine the that transfer to userspace is needed.

This patch set contains:

  - strparser implementation
  - changes to kcm to use strparser
  - strparser.txt documentation

v2:
  - Add copyright notice to C files
  - Remove GPL module license from strparser.c
  - Add report of rxpause

v3:
  - Restore GPL module license
  - Use EXPORT_SYMBOL_GPL

v4:
  - Removed unused function, changed another to be static as suggested
    by davem
  - Rewoked data_ready to be called from upper layer, no longer requires
    taking over socket data_ready callback as suggested by Lance Chao

Tested:
  - Ran a KCM thrash test for 24 hours. No behavioral or performance
    differences observed.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents d2d371ae adcce4d5
Stream Parser
-------------
The stream parser (strparser) is a utility that parses messages of an
application layer protocol running over a TCP connection. The stream
parser works in conjunction with an upper layer in the kernel to provide
kernel support for application layer messages. For instance, Kernel
Connection Multiplexor (KCM) uses the Stream Parser to parse messages
using a BPF program.
Interface
---------
The API includes a context structure, a set of callbacks, utility
functions, and a data_ready function. The callbacks include
a parse_msg function that is called to perform parsing (e.g.
BPF parsing in case of KCM), and a rcv_msg function that is called
when a full message has been completed.
A stream parser can be instantiated for a TCP connection. This is done
by:
strp_init(struct strparser *strp, struct sock *csk,
struct strp_callbacks *cb)
strp is a struct of type strparser that is allocated by the upper layer.
csk is the TCP socket associated with the stream parser. Callbacks are
called by the stream parser.
Callbacks
---------
There are four callbacks:
int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
parse_msg is called to determine the length of the next message
in the stream. The upper layer must implement this function. It
should parse the sk_buff as containing the headers for the
next application layer messages in the stream.
The skb->cb in the input skb is a struct strp_rx_msg. Only
the offset field is relevant in parse_msg and gives the offset
where the message starts in the skb.
The return values of this function are:
>0 : indicates length of successfully parsed message
0 : indicates more data must be received to parse the message
-ESTRPIPE : current message should not be processed by the
kernel, return control of the socket to userspace which
can proceed to read the messages itself
other < 0 : Error is parsing, give control back to userspace
assuming that synchronization is lost and the stream
is unrecoverable (application expected to close TCP socket)
In the case that an error is returned (return value is less than
zero) the stream parser will set the error on TCP socket and wake
it up. If parse_msg returned -ESTRPIPE and the stream parser had
previously read some bytes for the current message, then the error
set on the attached socket is ENODATA since the stream is
unrecoverable in that case.
void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
rcv_msg is called when a full message has been received and
is queued. The callee must consume the sk_buff; it can
call strp_pause to prevent any further messages from being
received in rcv_msg (see strp_pause below). This callback
must be set.
The skb->cb in the input skb is a struct strp_rx_msg. This
struct contains two fields: offset and full_len. Offset is
where the message starts in the skb, and full_len is the
the length of the message. skb->len - offset may be greater
then full_len since strparser does not trim the skb.
int (*read_sock_done)(struct strparser *strp, int err);
read_sock_done is called when the stream parser is done reading
the TCP socket. The stream parser may read multiple messages
in a loop and this function allows cleanup to occur when existing
the loop. If the callback is not set (NULL in strp_init) a
default function is used.
void (*abort_parser)(struct strparser *strp, int err);
This function is called when stream parser encounters an error
in parsing. The default function stops the stream parser for the
TCP socket and sets the error in the socket. The default function
can be changed by setting the callback to non-NULL in strp_init.
Functions
---------
The upper layer calls strp_tcp_data_ready when data is ready on the lower
socket for strparser to process. This should be called from a data_ready
callback that is set on the socket.
strp_stop is called to completely stop stream parser operations. This
is called internally when the stream parser encounters an error, and
it is called from the upper layer when unattaching a TCP socket.
strp_done is called to unattach the stream parser from the TCP socket.
This must be called after the stream processor has be stopped.
strp_check_rcv is called to check for new messages on the socket. This
is normally called at initialization of the a stream parser instance
of after strp_unpause.
Statistics
----------
Various counters are kept for each stream parser for a TCP socket.
These are in the strp_stats structure. strp_aggr_stats is a convenience
structure for accumulating statistics for multiple stream parser
instances. save_strp_stats and aggregate_strp_stats are helper functions
to save and aggregate statistics.
Message assembly limits
-----------------------
The stream parser provide mechanisms to limit the resources consumed by
message assembly.
A timer is set when assembly starts for a new message. The message
timeout is taken from rcvtime for the associated TCP socket. If the
timer fires before assembly completes the stream parser is aborted
and the ETIMEDOUT error is set on the TCP socket.
Message length is limited to the receive buffer size of the associated
TCP socket. If the length returned by parse_msg is greater than
the socket buffer size then the stream parser is aborted with
EMSGSIZE error set on the TCP socket. Note that this makes the
maximum size of receive skbuffs for a socket with a stream parser
to be 2*sk_rcvbuf of the TCP socket.
......@@ -13,6 +13,7 @@
#include <linux/skbuff.h>
#include <net/sock.h>
#include <net/strparser.h>
#include <uapi/linux/kcm.h>
extern unsigned int kcm_net_id;
......@@ -21,16 +22,8 @@ extern unsigned int kcm_net_id;
#define KCM_STATS_INCR(stat) ((stat)++)
struct kcm_psock_stats {
unsigned long long rx_msgs;
unsigned long long rx_bytes;
unsigned long long tx_msgs;
unsigned long long tx_bytes;
unsigned int rx_aborts;
unsigned int rx_mem_fail;
unsigned int rx_need_more_hdr;
unsigned int rx_msg_too_big;
unsigned int rx_msg_timeouts;
unsigned int rx_bad_hdr_len;
unsigned long long reserved;
unsigned long long unreserved;
unsigned int tx_aborts;
......@@ -64,13 +57,6 @@ struct kcm_tx_msg {
struct sk_buff *last_skb;
};
struct kcm_rx_msg {
int full_len;
int accum_len;
int offset;
int early_eaten;
};
/* Socket structure for KCM client sockets */
struct kcm_sock {
struct sock sk;
......@@ -87,6 +73,7 @@ struct kcm_sock {
struct work_struct tx_work;
struct list_head wait_psock_list;
struct sk_buff *seq_skb;
u32 tx_stopped : 1;
/* Don't use bit fields here, these are set under different locks */
bool tx_wait;
......@@ -104,11 +91,11 @@ struct bpf_prog;
/* Structure for an attached lower socket */
struct kcm_psock {
struct sock *sk;
struct strparser strp;
struct kcm_mux *mux;
int index;
u32 tx_stopped : 1;
u32 rx_stopped : 1;
u32 done : 1;
u32 unattaching : 1;
......@@ -121,18 +108,12 @@ struct kcm_psock {
struct kcm_psock_stats stats;
/* Receive */
struct sk_buff *rx_skb_head;
struct sk_buff **rx_skb_nextp;
struct sk_buff *ready_rx_msg;
struct list_head psock_ready_list;
struct work_struct rx_work;
struct delayed_work rx_delayed_work;
struct bpf_prog *bpf_prog;
struct kcm_sock *rx_kcm;
unsigned long long saved_rx_bytes;
unsigned long long saved_rx_msgs;
struct timer_list rx_msg_timer;
unsigned int rx_need_bytes;
struct sk_buff *ready_rx_msg;
/* Transmit */
struct kcm_sock *tx_kcm;
......@@ -146,6 +127,7 @@ struct kcm_net {
struct mutex mutex;
struct kcm_psock_stats aggregate_psock_stats;
struct kcm_mux_stats aggregate_mux_stats;
struct strp_aggr_stats aggregate_strp_stats;
struct list_head mux_list;
int count;
};
......@@ -163,6 +145,7 @@ struct kcm_mux {
struct kcm_mux_stats stats;
struct kcm_psock_stats aggregate_psock_stats;
struct strp_aggr_stats aggregate_strp_stats;
/* Receive */
spinlock_t rx_lock ____cacheline_aligned_in_smp;
......@@ -190,14 +173,6 @@ static inline void aggregate_psock_stats(struct kcm_psock_stats *stats,
/* Save psock statistics in the mux when psock is being unattached. */
#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat)
SAVE_PSOCK_STATS(rx_msgs);
SAVE_PSOCK_STATS(rx_bytes);
SAVE_PSOCK_STATS(rx_aborts);
SAVE_PSOCK_STATS(rx_mem_fail);
SAVE_PSOCK_STATS(rx_need_more_hdr);
SAVE_PSOCK_STATS(rx_msg_too_big);
SAVE_PSOCK_STATS(rx_msg_timeouts);
SAVE_PSOCK_STATS(rx_bad_hdr_len);
SAVE_PSOCK_STATS(tx_msgs);
SAVE_PSOCK_STATS(tx_bytes);
SAVE_PSOCK_STATS(reserved);
......
/*
* Stream Parser
*
* Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
* as published by the Free Software Foundation.
*/
#ifndef __NET_STRPARSER_H_
#define __NET_STRPARSER_H_
#include <linux/skbuff.h>
#include <net/sock.h>
#define STRP_STATS_ADD(stat, count) ((stat) += (count))
#define STRP_STATS_INCR(stat) ((stat)++)
struct strp_stats {
unsigned long long rx_msgs;
unsigned long long rx_bytes;
unsigned int rx_mem_fail;
unsigned int rx_need_more_hdr;
unsigned int rx_msg_too_big;
unsigned int rx_msg_timeouts;
unsigned int rx_bad_hdr_len;
};
struct strp_aggr_stats {
unsigned long long rx_msgs;
unsigned long long rx_bytes;
unsigned int rx_mem_fail;
unsigned int rx_need_more_hdr;
unsigned int rx_msg_too_big;
unsigned int rx_msg_timeouts;
unsigned int rx_bad_hdr_len;
unsigned int rx_aborts;
unsigned int rx_interrupted;
unsigned int rx_unrecov_intr;
};
struct strparser;
/* Callbacks are called with lock held for the attached socket */
struct strp_callbacks {
int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
int (*read_sock_done)(struct strparser *strp, int err);
void (*abort_parser)(struct strparser *strp, int err);
};
struct strp_rx_msg {
int full_len;
int offset;
};
static inline struct strp_rx_msg *strp_rx_msg(struct sk_buff *skb)
{
return (struct strp_rx_msg *)((void *)skb->cb +
offsetof(struct qdisc_skb_cb, data));
}
/* Structure for an attached lower socket */
struct strparser {
struct sock *sk;
u32 rx_stopped : 1;
u32 rx_paused : 1;
u32 rx_aborted : 1;
u32 rx_interrupted : 1;
u32 rx_unrecov_intr : 1;
struct sk_buff **rx_skb_nextp;
struct timer_list rx_msg_timer;
struct sk_buff *rx_skb_head;
unsigned int rx_need_bytes;
struct delayed_work rx_delayed_work;
struct work_struct rx_work;
struct strp_stats stats;
struct strp_callbacks cb;
};
/* Must be called with lock held for attached socket */
static inline void strp_pause(struct strparser *strp)
{
strp->rx_paused = 1;
}
/* May be called without holding lock for attached socket */
static inline void strp_unpause(struct strparser *strp)
{
strp->rx_paused = 0;
}
static inline void save_strp_stats(struct strparser *strp,
struct strp_aggr_stats *agg_stats)
{
/* Save psock statistics in the mux when psock is being unattached. */
#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += \
strp->stats._stat)
SAVE_PSOCK_STATS(rx_msgs);
SAVE_PSOCK_STATS(rx_bytes);
SAVE_PSOCK_STATS(rx_mem_fail);
SAVE_PSOCK_STATS(rx_need_more_hdr);
SAVE_PSOCK_STATS(rx_msg_too_big);
SAVE_PSOCK_STATS(rx_msg_timeouts);
SAVE_PSOCK_STATS(rx_bad_hdr_len);
#undef SAVE_PSOCK_STATS
if (strp->rx_aborted)
agg_stats->rx_aborts++;
if (strp->rx_interrupted)
agg_stats->rx_interrupted++;
if (strp->rx_unrecov_intr)
agg_stats->rx_unrecov_intr++;
}
static inline void aggregate_strp_stats(struct strp_aggr_stats *stats,
struct strp_aggr_stats *agg_stats)
{
#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat)
SAVE_PSOCK_STATS(rx_msgs);
SAVE_PSOCK_STATS(rx_bytes);
SAVE_PSOCK_STATS(rx_mem_fail);
SAVE_PSOCK_STATS(rx_need_more_hdr);
SAVE_PSOCK_STATS(rx_msg_too_big);
SAVE_PSOCK_STATS(rx_msg_timeouts);
SAVE_PSOCK_STATS(rx_bad_hdr_len);
SAVE_PSOCK_STATS(rx_aborts);
SAVE_PSOCK_STATS(rx_interrupted);
SAVE_PSOCK_STATS(rx_unrecov_intr);
#undef SAVE_PSOCK_STATS
}
void strp_done(struct strparser *strp);
void strp_stop(struct strparser *strp);
void strp_check_rcv(struct strparser *strp);
int strp_init(struct strparser *strp, struct sock *csk,
struct strp_callbacks *cb);
void strp_tcp_data_ready(struct strparser *strp);
#endif /* __NET_STRPARSER_H_ */
......@@ -369,6 +369,7 @@ source "net/irda/Kconfig"
source "net/bluetooth/Kconfig"
source "net/rxrpc/Kconfig"
source "net/kcm/Kconfig"
source "net/strparser/Kconfig"
config FIB_RULES
bool
......
......@@ -35,6 +35,7 @@ obj-$(CONFIG_BT) += bluetooth/
obj-$(CONFIG_SUNRPC) += sunrpc/
obj-$(CONFIG_AF_RXRPC) += rxrpc/
obj-$(CONFIG_AF_KCM) += kcm/
obj-$(CONFIG_STREAM_PARSER) += strparser/
obj-$(CONFIG_ATM) += atm/
obj-$(CONFIG_L2TP) += l2tp/
obj-$(CONFIG_DECNET) += decnet/
......
......@@ -172,6 +172,5 @@ static void __exit ila_fini(void)
module_init(ila_init);
module_exit(ila_fini);
MODULE_ALIAS_RTNL_LWT(ILA);
MODULE_AUTHOR("Tom Herbert <tom@herbertland.com>");
MODULE_LICENSE("GPL");
......@@ -3,6 +3,7 @@ config AF_KCM
tristate "KCM sockets"
depends on INET
select BPF_SYSCALL
select STREAM_PARSER
---help---
KCM (Kernel Connection Multiplexor) sockets provide a method
for multiplexing messages of a message based application
......
......@@ -155,8 +155,8 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
seq_printf(seq,
" psock-%-5u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8d ",
psock->index,
psock->stats.rx_msgs,
psock->stats.rx_bytes,
psock->strp.stats.rx_msgs,
psock->strp.stats.rx_bytes,
psock->stats.tx_msgs,
psock->stats.tx_bytes,
psock->sk->sk_receive_queue.qlen,
......@@ -170,9 +170,12 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
if (psock->tx_stopped)
seq_puts(seq, "TxStop ");
if (psock->rx_stopped)
if (psock->strp.rx_stopped)
seq_puts(seq, "RxStop ");
if (psock->strp.rx_paused)
seq_puts(seq, "RxPause ");
if (psock->tx_kcm)
seq_printf(seq, "Rsvd-%d ", psock->tx_kcm->index);
......@@ -275,6 +278,7 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
{
struct kcm_psock_stats psock_stats;
struct kcm_mux_stats mux_stats;
struct strp_aggr_stats strp_stats;
struct kcm_mux *mux;
struct kcm_psock *psock;
struct net *net = seq->private;
......@@ -282,20 +286,28 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
memset(&mux_stats, 0, sizeof(mux_stats));
memset(&psock_stats, 0, sizeof(psock_stats));
memset(&strp_stats, 0, sizeof(strp_stats));
mutex_lock(&knet->mutex);
aggregate_mux_stats(&knet->aggregate_mux_stats, &mux_stats);
aggregate_psock_stats(&knet->aggregate_psock_stats,
&psock_stats);
aggregate_strp_stats(&knet->aggregate_strp_stats,
&strp_stats);
list_for_each_entry_rcu(mux, &knet->mux_list, kcm_mux_list) {
spin_lock_bh(&mux->lock);
aggregate_mux_stats(&mux->stats, &mux_stats);
aggregate_psock_stats(&mux->aggregate_psock_stats,
&psock_stats);
list_for_each_entry(psock, &mux->psocks, psock_list)
aggregate_strp_stats(&mux->aggregate_strp_stats,
&strp_stats);
list_for_each_entry(psock, &mux->psocks, psock_list) {
aggregate_psock_stats(&psock->stats, &psock_stats);
save_strp_stats(&psock->strp, &strp_stats);
}
spin_unlock_bh(&mux->lock);
}
......@@ -328,7 +340,7 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
mux_stats.rx_ready_drops);
seq_printf(seq,
"%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
"%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
"Psock",
"RX-Msgs",
"RX-Bytes",
......@@ -337,6 +349,8 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
"Reserved",
"Unreserved",
"RX-Aborts",
"RX-Intr",
"RX-Unrecov",
"RX-MemFail",
"RX-NeedMor",
"RX-BadLen",
......@@ -345,20 +359,22 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
"TX-Aborts");
seq_printf(seq,
"%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
"%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
"",
psock_stats.rx_msgs,
psock_stats.rx_bytes,
strp_stats.rx_msgs,
strp_stats.rx_bytes,
psock_stats.tx_msgs,
psock_stats.tx_bytes,
psock_stats.reserved,
psock_stats.unreserved,
psock_stats.rx_aborts,
psock_stats.rx_mem_fail,
psock_stats.rx_need_more_hdr,
psock_stats.rx_bad_hdr_len,
psock_stats.rx_msg_too_big,
psock_stats.rx_msg_timeouts,
strp_stats.rx_aborts,
strp_stats.rx_interrupted,
strp_stats.rx_unrecov_intr,
strp_stats.rx_mem_fail,
strp_stats.rx_need_more_hdr,
strp_stats.rx_bad_hdr_len,
strp_stats.rx_msg_too_big,
strp_stats.rx_msg_timeouts,
psock_stats.tx_aborts);
return 0;
......
This diff is collapsed.
config STREAM_PARSER
tristate
default n
obj-$(CONFIG_STREAM_PARSER) += strparser.o
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment