Commit 62603506 authored by Jon Grimm's avatar Jon Grimm Committed by Sridhar Samudrala

[SCTP] Partial Data Delivery

Support pushing a partial record up to the application if we 
are receiving pressure on rwnd.  The most common case is that
the sender is sending a record larger than our rwnd.  We send 
as much up the receive queue in hopes that a read will occur 
up room in rwnd. 

Other associations on the socket need held off until the partial 
delivery condition is finally fufilled (or ABORTed).  Additionally, 
one must be careful to "do the right thing" with regards to 
associations peeled off to new sockets, properly preserving or 
clearing the partial delivery state.
parent d8a30a59
......@@ -86,6 +86,7 @@ typedef enum {
SCTP_CMD_PURGE_OUTQUEUE, /* Purge all data waiting to be sent. */
SCTP_CMD_SETUP_T2, /* Hi-level, setup T2-shutdown parms. */
SCTP_CMD_RTO_PENDING, /* Set transport's rto_pending. */
SCTP_CMD_CHUNK_PD, /* Partial data delivery considerations. */
SCTP_CMD_LAST
} sctp_verb_t;
......
......@@ -124,7 +124,6 @@ typedef struct sctp_association sctp_association_t;
typedef struct sctp_packet sctp_packet_t;
typedef struct sctp_chunk sctp_chunk_t;
typedef struct sctp_bind_addr sctp_bind_addr_t;
typedef struct sctp_opt sctp_opt_t;
typedef struct sctp_endpoint_common sctp_endpoint_common_t;
#include <net/sctp/tsnmap.h>
......@@ -249,10 +248,10 @@ struct sctp_af {
int optname,
char *optval,
int *optlen);
struct dst_entry *(*get_dst) (sctp_association_t *asoc,
struct dst_entry *(*get_dst) (struct sctp_association *asoc,
union sctp_addr *daddr,
union sctp_addr *saddr);
void (*get_saddr) (sctp_association_t *asoc,
void (*get_saddr) (struct sctp_association *asoc,
struct dst_entry *dst,
union sctp_addr *daddr,
union sctp_addr *saddr);
......@@ -310,6 +309,9 @@ struct sctp_opt {
/* What kind of a socket is this? */
sctp_socket_type_t type;
/* PF_ family specific functions. */
struct sctp_pf *pf;
/* What is our base endpointer? */
sctp_endpoint_t *ep;
......@@ -323,7 +325,10 @@ struct sctp_opt {
__u32 autoclose;
__u8 nodelay;
__u8 disable_fragments;
struct sctp_pf *pf;
__u8 pd_mode;
/* Receive to here while partial delivery is in effect. */
struct sk_buff_head pd_lobby;
};
......
......@@ -103,6 +103,10 @@ struct sctp_ulpevent *sctp_ulpevent_make_shutdown_event(
__u16 flags,
int priority);
struct sctp_ulpevent *sctp_ulpevent_make_pdapi(
const struct sctp_association *asoc,
__u32 indication, int priority);
struct sctp_ulpevent *sctp_ulpevent_make_rcvmsg(struct sctp_association *asoc,
struct sctp_chunk *chunk,
int priority);
......@@ -111,19 +115,24 @@ void sctp_ulpevent_read_sndrcvinfo(const struct sctp_ulpevent *event,
struct msghdr *);
__u16 sctp_ulpevent_get_notification_type(const struct sctp_ulpevent *event);
/* Is this event type enabled? */
static inline int sctp_ulpevent_type_enabled(__u16 sn_type,
struct sctp_event_subscribe *mask)
{
char *amask = (char *) mask;
return amask[sn_type - SCTP_SN_TYPE_BASE];
}
/* Given an event subscription, is this event enabled? */
static inline int sctp_ulpevent_is_enabled(const struct sctp_ulpevent *event,
const struct sctp_event_subscribe *mask)
struct sctp_event_subscribe *mask)
{
const char *amask = (const char *) mask;
__u16 sn_type;
int enabled = 1;
if (sctp_ulpevent_is_notification(event)) {
sn_type = sctp_ulpevent_get_notification_type(event);
enabled = amask[sn_type - SCTP_SN_TYPE_BASE];
enabled = sctp_ulpevent_type_enabled(sn_type, mask);
}
return enabled;
}
......
......@@ -48,7 +48,8 @@
/* A structure to carry information to the ULP (e.g. Sockets API) */
struct sctp_ulpq {
int malloced;
char malloced;
char pd_mode;
sctp_association_t *asoc;
struct sk_buff_head reasm;
struct sk_buff_head lobby;
......@@ -60,13 +61,19 @@ struct sctp_ulpq *sctp_ulpq_init(struct sctp_ulpq *, sctp_association_t *);
void sctp_ulpq_free(struct sctp_ulpq *);
/* Add a new DATA chunk for processing. */
int sctp_ulpq_tail_data(struct sctp_ulpq *, sctp_chunk_t *chunk, int priority);
int sctp_ulpq_tail_data(struct sctp_ulpq *, struct sctp_chunk *, int);
/* Add a new event for propogation to the ULP. */
int sctp_ulpq_tail_event(struct sctp_ulpq *, struct sctp_ulpevent *ev);
/* Is the ulpqueue empty. */
int sctp_ulpqueue_is_empty(struct sctp_ulpq *);
/* Perform partial delivery. */
void sctp_ulpq_partial_delivery(struct sctp_ulpq *, struct sctp_chunk *, int);
/* Abort the partial delivery. */
void sctp_ulpq_abort_pd(struct sctp_ulpq *, int);
/* Clear the partial data delivery condition on this socket. */
int sctp_clear_pd(struct sock *sk);
#endif /* __sctp_ulpqueue_h__ */
......
......@@ -368,6 +368,7 @@ struct sctp_rcv_pdapi_event {
sctp_assoc_t pdapi_assoc_id;
};
enum { SCTP_PARTIAL_DELIVERY_ABORTED=0, };
/*
* Described in Section 7.3
......@@ -415,8 +416,8 @@ enum sctp_sn_type {
SCTP_SN_TYPE_BASE = (1<<15),
SCTP_ASSOC_CHANGE,
SCTP_PEER_ADDR_CHANGE,
SCTP_REMOTE_ERROR,
SCTP_SEND_FAILED,
SCTP_REMOTE_ERROR,
SCTP_SHUTDOWN_EVENT,
SCTP_PARTIAL_DELIVERY_EVENT,
SCTP_ADAPTION_INDICATION,
......
......@@ -95,7 +95,7 @@ sctp_association_t *sctp_association_init(sctp_association_t *asoc,
sctp_scope_t scope,
int priority)
{
sctp_opt_t *sp;
struct sctp_opt *sp;
int i;
/* Retrieve the SCTP per socket area. */
......@@ -368,7 +368,7 @@ struct sctp_transport *sctp_assoc_add_peer(sctp_association_t *asoc,
int priority)
{
struct sctp_transport *peer;
sctp_opt_t *sp;
struct sctp_opt *sp;
unsigned short port;
/* AF_INET and AF_INET6 share common port field. */
......@@ -819,7 +819,7 @@ static void sctp_assoc_bh_rcv(sctp_association_t *asoc)
/* This routine moves an association from its old sk to a new sk. */
void sctp_assoc_migrate(sctp_association_t *assoc, struct sock *newsk)
{
sctp_opt_t *newsp = sctp_sk(newsk);
struct sctp_opt *newsp = sctp_sk(newsk);
/* Delete the association from the old endpoint's list of
* associations.
......@@ -996,7 +996,7 @@ void sctp_assoc_rwnd_increase(sctp_association_t *asoc, int len)
/* Send a window update SACK if the rwnd has increased by at least the
* minimum of the association's PMTU and half of the receive buffer.
* The algorithm used is similar to the one described in
* The algorithm used is similar to the one described in
* Section 4.2.3.3 of RFC 1122.
*/
if ((asoc->state == SCTP_STATE_ESTABLISHED) &&
......@@ -1006,9 +1006,9 @@ void sctp_assoc_rwnd_increase(sctp_association_t *asoc, int len)
SCTP_DEBUG_PRINTK("%s: Sending window update SACK- asoc: %p "
"rwnd: %u a_rwnd: %u\n",
__FUNCTION__, asoc, asoc->rwnd, asoc->a_rwnd);
sack = sctp_make_sack(asoc);
sack = sctp_make_sack(asoc);
if (!sack)
return;
return;
/* Update the last advertised rwnd value. */
asoc->a_rwnd = asoc->rwnd;
......@@ -1022,7 +1022,7 @@ void sctp_assoc_rwnd_increase(sctp_association_t *asoc, int len)
timer = &asoc->timers[SCTP_EVENT_TIMEOUT_SACK];
if (timer_pending(timer) && del_timer(timer))
sctp_association_put(asoc);
}
}
}
/* Decrease asoc's rwnd by len. */
......
......@@ -92,7 +92,7 @@ sctp_endpoint_t *sctp_endpoint_new(sctp_protocol_t *proto,
sctp_endpoint_t *sctp_endpoint_init(sctp_endpoint_t *ep, sctp_protocol_t *proto,
struct sock *sk, int priority)
{
sctp_opt_t *sp = sctp_sk(sk);
struct sctp_opt *sp = sctp_sk(sk);
memset(ep, 0, sizeof(sctp_endpoint_t));
/* Initialize the base structure. */
......
......@@ -598,6 +598,13 @@ int sctp_cmd_interpreter(sctp_event_t event_type, sctp_subtype_t subtype,
t->rto_pending = 1;
break;
case SCTP_CMD_CHUNK_PD:
/* Send a chunk to the sockets layer. */
sctp_ulpq_partial_delivery(&asoc->ulpq,
command->obj.ptr,
GFP_ATOMIC);
break;
default:
printk(KERN_WARNING "Impossible command: %u, %p\n",
command->verb, command->obj.ptr);
......@@ -1061,12 +1068,11 @@ static void sctp_cmd_assoc_failed(sctp_cmd_seq_t *commands,
break;
}
event = sctp_ulpevent_make_assoc_change(asoc,
0,
SCTP_COMM_LOST,
error, 0, 0,
GFP_ATOMIC);
/* Cancel any partial delivery in progress. */
sctp_ulpq_abort_pd(&asoc->ulpq, GFP_ATOMIC);
event = sctp_ulpevent_make_assoc_change(asoc, 0, SCTP_COMM_LOST,
error, 0, 0, GFP_ATOMIC);
if (event)
sctp_add_cmd_sf(commands, SCTP_CMD_EVENT_ULP,
SCTP_ULPEVENT(event));
......@@ -1141,7 +1147,7 @@ static void sctp_cmd_hb_timers_stop(sctp_cmd_seq_t *cmds,
if (del_timer(&t->hb_timer))
sctp_transport_put(t);
}
}
}
/* Helper function to update the heartbeat timer. */
static void sctp_cmd_hb_timer_update(sctp_cmd_seq_t *cmds,
......
......@@ -264,7 +264,7 @@ sctp_disposition_t sctp_sf_do_5_1B_init(const sctp_endpoint_t *ep,
if (sctp_assoc_set_bind_addr_from_ep(new_asoc, GFP_ATOMIC) < 0)
goto nomem_ack;
repl = sctp_make_init_ack(new_asoc, chunk, GFP_ATOMIC, len);
if (!repl)
goto nomem_ack;
......@@ -1540,11 +1540,11 @@ static sctp_disposition_t sctp_sf_do_dupcook_d(const sctp_endpoint_t *ep,
SCTP_ULPEVENT(ev));
}
sctp_add_cmd_sf(commands, SCTP_CMD_TRANSMIT, SCTP_NULL());
repl = sctp_make_cookie_ack(new_asoc, chunk);
if (!repl)
goto nomem;
sctp_add_cmd_sf(commands, SCTP_CMD_REPLY, SCTP_CHUNK(repl));
sctp_add_cmd_sf(commands, SCTP_CMD_TRANSMIT, SCTP_NULL());
......@@ -2241,6 +2241,7 @@ sctp_disposition_t sctp_sf_eat_data_6_2(const sctp_endpoint_t *ep,
sctp_datahdr_t *data_hdr;
sctp_chunk_t *err;
size_t datalen;
sctp_verb_t deliver;
int tmp;
__u32 tsn;
......@@ -2307,10 +2308,32 @@ sctp_disposition_t sctp_sf_eat_data_6_2(const sctp_endpoint_t *ep,
datalen = ntohs(chunk->chunk_hdr->length);
datalen -= sizeof(sctp_data_chunk_t);
deliver = SCTP_CMD_CHUNK_ULP;
/* Think about partial delivery. */
if ((datalen >= asoc->rwnd) && (!asoc->ulpq.pd_mode)) {
/* Even if we don't accept this chunk there is
* memory pressure.
*/
sctp_add_cmd_sf(commands, SCTP_CMD_CHUNK_PD, SCTP_NULL());
}
if (asoc->rwnd_over || (datalen > asoc->rwnd + asoc->frag_point)) {
SCTP_DEBUG_PRINTK("Discarding tsn: %u datalen: %Zd, "
"rwnd: %d\n", tsn, datalen, asoc->rwnd);
goto discard_force;
/* There is absolutely no room, but this is the most
* important tsn that we are waiting on, try to
* to partial deliver or renege to make room.
*/
if ((sctp_tsnmap_get_ctsn(&asoc->peer.tsn_map) + 1) == tsn) {
deliver = SCTP_CMD_CHUNK_PD;
} else {
SCTP_DEBUG_PRINTK("Discard tsn: %u len: %Zd, "
"rwnd: %d\n", tsn, datalen,
asoc->rwnd);
goto discard_force;
}
}
/*
......@@ -2335,10 +2358,11 @@ sctp_disposition_t sctp_sf_eat_data_6_2(const sctp_endpoint_t *ep,
return SCTP_DISPOSITION_CONSUME;
}
/* We are accepting this DATA chunk. */
/* Record the fact that we have received this TSN. */
sctp_add_cmd_sf(commands, SCTP_CMD_REPORT_TSN, SCTP_U32(tsn));
/* If definately accepting the DATA chunk, record its TSN, otherwise
* wait for renege processing.
*/
if (deliver != SCTP_CMD_CHUNK_PD)
sctp_add_cmd_sf(commands, SCTP_CMD_REPORT_TSN, SCTP_U32(tsn));
/* RFC 2960 6.5 Stream Identifier and Stream Sequence Number
*
......@@ -2352,10 +2376,9 @@ sctp_disposition_t sctp_sf_eat_data_6_2(const sctp_endpoint_t *ep,
err = sctp_make_op_error(asoc, chunk, SCTP_ERROR_INV_STRM,
&data_hdr->stream,
sizeof(data_hdr->stream));
if (err) {
if (err)
sctp_add_cmd_sf(commands, SCTP_CMD_REPLY,
SCTP_CHUNK(err));
}
goto discard_noforce;
}
......@@ -2363,7 +2386,8 @@ sctp_disposition_t sctp_sf_eat_data_6_2(const sctp_endpoint_t *ep,
* SCTP_CMD_CHUNK_ULP cmd before the SCTP_CMD_GEN_SACK, as the SACK
* chunk needs the updated rwnd.
*/
sctp_add_cmd_sf(commands, SCTP_CMD_CHUNK_ULP, SCTP_CHUNK(chunk));
sctp_add_cmd_sf(commands, deliver, SCTP_CHUNK(chunk));
if (asoc->autoclose) {
sctp_add_cmd_sf(commands, SCTP_CMD_TIMER_RESTART,
SCTP_TO(SCTP_EVENT_TIMEOUT_AUTOCLOSE));
......
......@@ -81,13 +81,13 @@
/* Forward declarations for internal helper functions. */
static int sctp_writeable(struct sock *sk);
static inline int sctp_wspace(sctp_association_t *asoc);
static inline int sctp_wspace(struct sctp_association *asoc);
static inline void sctp_set_owner_w(sctp_chunk_t *chunk);
static void sctp_wfree(struct sk_buff *skb);
static int sctp_wait_for_sndbuf(sctp_association_t *asoc, long *timeo_p,
static int sctp_wait_for_sndbuf(struct sctp_association *, long *timeo_p,
int msg_len);
static int sctp_wait_for_packet(struct sock * sk, int *err, long *timeo_p);
static int sctp_wait_for_connect(sctp_association_t *asoc, long *timeo_p);
static int sctp_wait_for_connect(struct sctp_association *, long *timeo_p);
static inline int sctp_verify_addr(struct sock *, union sctp_addr *, int);
static int sctp_bindx_add(struct sock *, struct sockaddr_storage *, int);
static int sctp_bindx_rem(struct sock *, struct sockaddr_storage *, int);
......@@ -158,7 +158,7 @@ static struct sctp_af *sctp_sockaddr_af(struct sctp_opt *opt,
/* Bind a local address either to an endpoint or to an association. */
SCTP_STATIC int sctp_do_bind(struct sock *sk, union sctp_addr *addr, int len)
{
sctp_opt_t *sp = sctp_sk(sk);
struct sctp_opt *sp = sctp_sk(sk);
sctp_endpoint_t *ep = sp->ep;
sctp_bind_addr_t *bp = &ep->base.bind_addr;
struct sctp_af *af;
......@@ -454,7 +454,7 @@ int sctp_bindx_add(struct sock *sk, struct sockaddr_storage *addrs, int addrcnt)
*/
int sctp_bindx_rem(struct sock *sk, struct sockaddr_storage *addrs, int addrcnt)
{
sctp_opt_t *sp = sctp_sk(sk);
struct sctp_opt *sp = sctp_sk(sk);
sctp_endpoint_t *ep = sp->ep;
int cnt;
sctp_bind_addr_t *bp = &ep->base.bind_addr;
......@@ -662,6 +662,7 @@ SCTP_STATIC void sctp_close(struct sock *sk, long timeout)
/* Clean up any skbs sitting on the receive queue. */
skb_queue_purge(&sk->receive_queue);
skb_queue_purge(&sctp_sk(sk)->pd_lobby);
/* This will run the backlog queue. */
sctp_release_sock(sk);
......@@ -714,7 +715,7 @@ SCTP_STATIC int sctp_msghdr_parse(const struct msghdr *, sctp_cmsgs_t *);
SCTP_STATIC int sctp_sendmsg(struct kiocb *iocb, struct sock *sk,
struct msghdr *msg, int msg_len)
{
sctp_opt_t *sp;
struct sctp_opt *sp;
sctp_endpoint_t *ep;
sctp_association_t *new_asoc=NULL, *asoc=NULL;
struct sctp_transport *transport;
......@@ -1117,7 +1118,7 @@ SCTP_STATIC int sctp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr
int len, int noblock, int flags, int *addr_len)
{
struct sctp_ulpevent *event = NULL;
sctp_opt_t *sp = sctp_sk(sk);
struct sctp_opt *sp = sctp_sk(sk);
struct sk_buff *skb;
int copied;
int err = 0;
......@@ -1176,7 +1177,6 @@ SCTP_STATIC int sctp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr
/* If skb's length exceeds the user's buffer, update the skb and
* push it back to the receive_queue so that the next call to
* recvmsg() will return the remaining data. Don't set MSG_EOR.
* Otherwise, set MSG_EOR indicating the end of a message.
*/
if (skb_len > copied) {
msg->msg_flags &= ~MSG_EOR;
......@@ -1184,6 +1184,7 @@ SCTP_STATIC int sctp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr
goto out_free;
sctp_skb_pull(skb, copied);
skb_queue_head(&sk->receive_queue, skb);
/* When only partial message is copied to the user, increase
* rwnd by that amount. If all the data in the skb is read,
* rwnd is updated when the skb's destructor is called via
......@@ -1191,9 +1192,11 @@ SCTP_STATIC int sctp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr
*/
sctp_assoc_rwnd_increase(event->asoc, copied);
goto out;
} else {
msg->msg_flags |= MSG_EOR;
}
} else if ((event->msg_flags & MSG_NOTIFICATION) ||
(event->msg_flags & MSG_EOR))
msg->msg_flags |= MSG_EOR;
else
msg->msg_flags &= ~MSG_EOR;
out_free:
sctp_ulpevent_free(event); /* Free the skb. */
......@@ -1231,7 +1234,7 @@ static inline int sctp_setsockopt_set_events(struct sock *sk, char *optval,
static inline int sctp_setsockopt_autoclose(struct sock *sk, char *optval,
int optlen)
{
sctp_opt_t *sp = sctp_sk(sk);
struct sctp_opt *sp = sctp_sk(sk);
/* Applicable to UDP-style socket only */
if (SCTP_SOCKET_TCP == sp->type)
......@@ -1353,7 +1356,7 @@ static inline int sctp_setsockopt_set_default_send_param(struct sock *sk,
asoc->defaults.timetolive = info.sinfo_timetolive;
return 0;
}
/* API 6.2 setsockopt(), getsockopt()
*
* Applications use setsockopt() and getsockopt() to set or retrieve
......@@ -1481,7 +1484,7 @@ SCTP_STATIC int sctp_setsockopt(struct sock *sk, int level, int optname,
SCTP_STATIC int sctp_connect(struct sock *sk, struct sockaddr *uaddr,
int addr_len)
{
sctp_opt_t *sp;
struct sctp_opt *sp;
sctp_endpoint_t *ep;
sctp_association_t *asoc;
struct sctp_transport *transport;
......@@ -1603,7 +1606,7 @@ SCTP_STATIC int sctp_init_sock(struct sock *sk)
{
sctp_endpoint_t *ep;
sctp_protocol_t *proto;
sctp_opt_t *sp;
struct sctp_opt *sp;
SCTP_DEBUG_PRINTK("sctp_init_sock(sk: %p)\n", sk);
......@@ -1632,7 +1635,7 @@ SCTP_STATIC int sctp_init_sock(struct sock *sk)
/* Initialize default RTO related parameters. These parameters can
* be modified for with the SCTP_RTOINFO socket option.
* FIXME: This are not used yet.
* FIXME: These are not used yet.
*/
sp->rtoinfo.srto_initial = proto->rto_initial;
sp->rtoinfo.srto_max = proto->rto_max;
......@@ -1669,6 +1672,11 @@ SCTP_STATIC int sctp_init_sock(struct sock *sk)
*/
sp->autoclose = 0;
sp->pf = sctp_get_pf_specific(sk->family);
/* Control variables for partial data delivery. */
sp->pd_mode = 0;
skb_queue_head_init(&sp->pd_lobby);
/* Create a per socket endpoint structure. Even if we
* change the data structure relationships, this may still
* be useful for storing pre-connect address information.
......@@ -1823,8 +1831,8 @@ SCTP_STATIC int sctp_do_peeloff(sctp_association_t *assoc, struct socket **newso
struct sock *newsk;
struct socket *tmpsock;
sctp_endpoint_t *newep;
sctp_opt_t *oldsp = sctp_sk(oldsk);
sctp_opt_t *newsp;
struct sctp_opt *oldsp = sctp_sk(oldsk);
struct sctp_opt *newsp;
struct sk_buff *skb, *tmp;
struct sctp_ulpevent *event;
int err = 0;
......@@ -1867,6 +1875,43 @@ SCTP_STATIC int sctp_do_peeloff(sctp_association_t *assoc, struct socket **newso
}
}
/* Clean up an messages pending delivery due to partial
* delivery. Three cases:
* 1) No partial deliver; no work.
* 2) Peeling off partial delivery; keep pd_lobby in new pd_lobby.
* 3) Peeling off non-partial delivery; move pd_lobby to recieve_queue.
*/
skb_queue_head_init(&newsp->pd_lobby);
sctp_sk(newsk)->pd_mode = assoc->ulpq.pd_mode;;
if (sctp_sk(oldsk)->pd_mode) {
struct sk_buff_head *queue;
/* Decide which queue to move pd_lobby skbs to. */
if (assoc->ulpq.pd_mode) {
queue = &newsp->pd_lobby;
} else
queue = &newsk->receive_queue;
/* Walk through the pd_lobby, looking for skbs that
* need moved to the new socket.
*/
sctp_skb_for_each(skb, &oldsp->pd_lobby, tmp) {
event = sctp_skb2event(skb);
if (event->asoc == assoc) {
__skb_unlink(skb, skb->list);
__skb_queue_tail(queue, skb);
}
}
/* Clear up any skbs waiting for the partial
* delivery to finish.
*/
if (assoc->ulpq.pd_mode)
sctp_clear_pd(oldsk);
}
/* Set the type of socket to indicate that it is peeled off from the
* original socket.
*/
......@@ -2438,7 +2483,7 @@ static int sctp_get_port(struct sock *sk, unsigned short snum)
*/
SCTP_STATIC int sctp_seqpacket_listen(struct sock *sk, int backlog)
{
sctp_opt_t *sp = sctp_sk(sk);
struct sctp_opt *sp = sctp_sk(sk);
sctp_endpoint_t *ep = sp->ep;
/* Only UDP style sockets that are not peeled off are allowed to
......
......@@ -628,14 +628,12 @@ struct sctp_ulpevent *sctp_ulpevent_make_rcvmsg(sctp_association_t *asoc,
if (!event)
goto fail_init;
for (list = skb_shinfo(skb)->frag_list; list; list = list->next) {
/* Note: Not clearing the entire event struct as
* this is just a fragment of the real event. However,
* we still need to do rwnd accounting.
*/
/* Note: Not clearing the entire event struct as
* this is just a fragment of the real event. However,
* we still need to do rwnd accounting.
*/
for (list = skb_shinfo(skb)->frag_list; list; list = list->next)
sctp_ulpevent_set_owner_r(list, asoc);
}
info = (struct sctp_sndrcvinfo *) &event->sndrcvinfo;
......@@ -733,6 +731,64 @@ struct sctp_ulpevent *sctp_ulpevent_make_rcvmsg(sctp_association_t *asoc,
return NULL;
}
/* Create a partial delivery related event.
*
* 5.3.1.7 SCTP_PARTIAL_DELIVERY_EVENT
*
* When a reciever is engaged in a partial delivery of a
* message this notification will be used to inidicate
* various events.
*/
struct sctp_ulpevent *sctp_ulpevent_make_pdapi(
const sctp_association_t *asoc, __u32 indication, int priority)
{
struct sctp_ulpevent *event;
struct sctp_rcv_pdapi_event *pd;
struct sk_buff *skb;
event = sctp_ulpevent_new(sizeof(struct sctp_assoc_change),
MSG_NOTIFICATION, priority);
if (!event)
goto fail;
skb = sctp_event2skb(event);
pd = (struct sctp_rcv_pdapi_event *)
skb_put(skb, sizeof(struct sctp_rcv_pdapi_event));
/* pdapi_type
* It should be SCTP_PARTIAL_DELIVERY_EVENT
*
* pdapi_flags: 16 bits (unsigned integer)
* Currently unused.
*/
pd->pdapi_type = SCTP_PARTIAL_DELIVERY_EVENT;
pd->pdapi_flags = 0;
/* pdapi_length: 32 bits (unsigned integer)
*
* This field is the total length of the notification data, including
* the notification header. It will generally be sizeof (struct
* sctp_rcv_pdapi_event).
*/
pd->pdapi_length = sizeof(struct sctp_rcv_pdapi_event);
/* pdapi_indication: 32 bits (unsigned integer)
*
* This field holds the indication being sent to the application.
*/
pd->pdapi_indication = indication;
/* pdapi_assoc_id: sizeof (sctp_assoc_t)
*
* The association id field, holds the identifier for the association.
*/
pd->pdapi_assoc_id = sctp_assoc2id(asoc);
return event;
fail:
return NULL;
}
/* Return the notification type, assuming this is a notification
* event.
*/
......
......@@ -84,6 +84,7 @@ struct sctp_ulpq *sctp_ulpq_init(struct sctp_ulpq *ulpq,
ulpq->asoc = asoc;
skb_queue_head_init(&ulpq->reasm);
skb_queue_head_init(&ulpq->lobby);
ulpq->pd_mode = 0;
ulpq->malloced = 0;
return ulpq;
......@@ -96,15 +97,16 @@ void sctp_ulpq_flush(struct sctp_ulpq *ulpq)
struct sk_buff *skb;
struct sctp_ulpevent *event;
while ((skb = skb_dequeue(&ulpq->lobby))) {
event = (struct sctp_ulpevent *) skb->cb;
while ((skb = __skb_dequeue(&ulpq->lobby))) {
event = sctp_skb2event(skb);
sctp_ulpevent_free(event);
}
while ((skb = skb_dequeue(&ulpq->reasm))) {
event = (struct sctp_ulpevent *) skb->cb;
while ((skb = __skb_dequeue(&ulpq->reasm))) {
event = sctp_skb2event(skb);
sctp_ulpevent_free(event);
}
}
/* Dispose of a ulpqueue. */
......@@ -117,7 +119,7 @@ void sctp_ulpq_free(struct sctp_ulpq *ulpq)
/* Process an incoming DATA chunk. */
int sctp_ulpq_tail_data(struct sctp_ulpq *ulpq, sctp_chunk_t *chunk,
int priority)
int priority)
{
struct sk_buff_head temp;
sctp_data_chunk_t *hdr;
......@@ -125,12 +127,7 @@ int sctp_ulpq_tail_data(struct sctp_ulpq *ulpq, sctp_chunk_t *chunk,
hdr = (sctp_data_chunk_t *) chunk->chunk_hdr;
/* FIXME: Instead of event being the skb clone, we really should
* have a new skb based chunk structure that we can convert to
* an event. Temporarily, I'm carrying a few chunk fields in
* the event to allow reassembly. Its too painful to change
* everything at once. --jgrimm
*/
/* Create an event from the incoming chunk. */
event = sctp_ulpevent_make_rcvmsg(chunk->asoc, chunk, priority);
if (!event)
return -ENOMEM;
......@@ -139,10 +136,10 @@ int sctp_ulpq_tail_data(struct sctp_ulpq *ulpq, sctp_chunk_t *chunk,
event = sctp_ulpq_reasm(ulpq, event);
/* Do ordering if needed. */
if (event) {
if ((event) && (event->msg_flags & MSG_EOR)){
/* Create a temporary list to collect chunks on. */
skb_queue_head_init(&temp);
skb_queue_tail(&temp, sctp_event2skb(event));
__skb_queue_tail(&temp, sctp_event2skb(event));
event = sctp_ulpq_order(ulpq, event);
}
......@@ -154,10 +151,40 @@ int sctp_ulpq_tail_data(struct sctp_ulpq *ulpq, sctp_chunk_t *chunk,
return 0;
}
/* Clear the partial delivery mode for this socket. Note: This
* assumes that no association is currently in partial delivery mode.
*/
int sctp_clear_pd(struct sock *sk)
{
struct sctp_opt *sp;
sp = sctp_sk(sk);
sp->pd_mode = 0;
if (!skb_queue_empty(&sp->pd_lobby)) {
struct list_head *list;
sctp_skb_list_tail(&sp->pd_lobby, &sk->receive_queue);
list = (struct list_head *)&sctp_sk(sk)->pd_lobby;
INIT_LIST_HEAD(list);
return 1;
}
return 0;
}
/* Clear the pd_mode and restart any pending messages waiting for delivery. */
static int sctp_ulpq_clear_pd(struct sctp_ulpq *ulpq)
{
ulpq->pd_mode = 0;
return sctp_clear_pd(ulpq->asoc->base.sk);
}
/* Add a new event for propogation to the ULP. */
int sctp_ulpq_tail_event(struct sctp_ulpq *ulpq, struct sctp_ulpevent *event)
{
struct sock *sk = ulpq->asoc->base.sk;
struct sk_buff_head *queue;
int clear_pd = 0;
/* If the socket is just going to throw this away, do not
* even try to deliver it.
......@@ -169,16 +196,41 @@ int sctp_ulpq_tail_event(struct sctp_ulpq *ulpq, struct sctp_ulpevent *event)
if (!sctp_ulpevent_is_enabled(event, &sctp_sk(sk)->subscribe))
goto out_free;
/* If we are in partial delivery mode, post to the lobby until
* partial delivery is cleared, unless, of course _this_ is
* the association the cause of the partial delivery.
*/
if (!sctp_sk(sk)->pd_mode) {
queue = &sk->receive_queue;
} else if (ulpq->pd_mode) {
if (event->msg_flags & MSG_NOTIFICATION)
queue = &sctp_sk(sk)->pd_lobby;
else {
clear_pd = event->msg_flags & MSG_EOR;
queue = &sk->receive_queue;
}
} else
queue = &sctp_sk(sk)->pd_lobby;
/* If we are harvesting multiple skbs they will be
* collected on a list.
*/
if (sctp_event2skb(event)->list)
sctp_skb_list_tail(sctp_event2skb(event)->list,
&sk->receive_queue);
sctp_skb_list_tail(sctp_event2skb(event)->list, queue);
else
skb_queue_tail(&sk->receive_queue, sctp_event2skb(event));
skb_queue_tail(queue, sctp_event2skb(event));
wake_up_interruptible(sk->sleep);
/* Did we just complete partial delivery and need to get
* rolling again? Move pending data to the receive
* queue.
*/
if (clear_pd)
sctp_ulpq_clear_pd(ulpq);
if (queue == &sk->receive_queue)
wake_up_interruptible(sk->sleep);
return 1;
out_free:
......@@ -192,7 +244,7 @@ int sctp_ulpq_tail_event(struct sctp_ulpq *ulpq, struct sctp_ulpevent *event)
/* 2nd Level Abstractions */
/* Helper function to store chunks that need to be reassembled. */
static inline void sctp_ulpq_store_reasm(struct sctp_ulpq *ulpq,
static inline void sctp_ulpq_store_reasm(struct sctp_ulpq *ulpq,
struct sctp_ulpevent *event)
{
struct sk_buff *pos, *tmp;
......@@ -212,7 +264,7 @@ static inline void sctp_ulpq_store_reasm(struct sctp_ulpq *ulpq,
/* If the queue is empty, we have a different function to call. */
if (skb_peek(&ulpq->reasm))
__skb_insert(sctp_event2skb(event), pos->prev, pos,
__skb_insert(sctp_event2skb(event), pos->prev, pos,
&ulpq->reasm);
else
__skb_queue_tail(&ulpq->reasm, sctp_event2skb(event));
......@@ -233,7 +285,10 @@ static inline struct sctp_ulpevent *sctp_make_reassembled_event(struct sk_buff *
struct sk_buff *list = skb_shinfo(f_frag)->frag_list;
/* Store the pointer to the 2nd skb */
pos = f_frag->next;
if (f_frag == l_frag)
pos = NULL;
else
pos = f_frag->next;
/* Get the last skb in the f_frag's frag_list if present. */
for (last = list; list; last = list, list = list->next);
......@@ -248,7 +303,8 @@ static inline struct sctp_ulpevent *sctp_make_reassembled_event(struct sk_buff *
/* Remove the first fragment from the reassembly queue. */
__skb_unlink(f_frag, f_frag->list);
do {
while (pos) {
pnext = pos->next;
/* Update the len and data_len fields of the first fragment. */
......@@ -264,13 +320,14 @@ static inline struct sctp_ulpevent *sctp_make_reassembled_event(struct sk_buff *
pos->next = pnext;
pos = pnext;
} while (1);
};
event = (struct sctp_ulpevent *) f_frag->cb;
event = sctp_skb2event(f_frag);
return event;
}
/* Helper function to check if an incoming chunk has filled up the last
* missing fragment in a SCTP datagram and return the corresponding event.
*/
......@@ -282,7 +339,7 @@ static inline struct sctp_ulpevent *sctp_ulpq_retrieve_reassembled(struct sctp_u
__u32 ctsn, next_tsn;
struct sctp_ulpevent *retval = NULL;
/* Initialized to 0 just to avoid compiler warning message. Will
/* Initialized to 0 just to avoid compiler warning message. Will
* never be used with this value. It is referenced only after it
* is set when we find the first fragment of a message.
*/
......@@ -298,7 +355,7 @@ static inline struct sctp_ulpevent *sctp_ulpq_retrieve_reassembled(struct sctp_u
* start the next pass when we find another first fragment.
*/
sctp_skb_for_each(pos, &ulpq->reasm, tmp) {
cevent = (struct sctp_ulpevent *) pos->cb;
cevent = sctp_skb2event(pos);
ctsn = cevent->sndrcvinfo.sinfo_tsn;
switch (cevent->msg_flags & SCTP_DATA_FRAG_MASK) {
......@@ -315,7 +372,7 @@ static inline struct sctp_ulpevent *sctp_ulpq_retrieve_reassembled(struct sctp_u
break;
case SCTP_DATA_LAST_FRAG:
if ((first_frag) && (ctsn == next_tsn))
if (first_frag && (ctsn == next_tsn))
retval = sctp_make_reassembled_event(
first_frag, pos);
else
......@@ -326,14 +383,78 @@ static inline struct sctp_ulpevent *sctp_ulpq_retrieve_reassembled(struct sctp_u
/* We have the reassembled event. There is no need to look
* further.
*/
if (retval)
if (retval) {
retval->msg_flags |= MSG_EOR;
break;
}
}
return retval;
}
/* Retrieve the next set of fragments of a partial message. */
static inline struct sctp_ulpevent *sctp_ulpq_retrieve_partial(struct sctp_ulpq *ulpq)
{
struct sk_buff *pos, *tmp, *last_frag, *first_frag;
struct sctp_ulpevent *cevent;
__u32 ctsn, next_tsn;
int is_last;
struct sctp_ulpevent *retval;
/* The chunks are held in the reasm queue sorted by TSN.
* Walk through the queue sequentially and look for the first
* sequence of fragmented chunks.
*/
if (skb_queue_empty(&ulpq->reasm))
return NULL;
last_frag = first_frag = NULL;
retval = NULL;
next_tsn = 0;
is_last = 0;
sctp_skb_for_each(pos, &ulpq->reasm, tmp) {
cevent = sctp_skb2event(pos);
ctsn = cevent->sndrcvinfo.sinfo_tsn;
switch (cevent->msg_flags & SCTP_DATA_FRAG_MASK) {
case SCTP_DATA_MIDDLE_FRAG:
if (!first_frag) {
first_frag = pos;
next_tsn = ctsn + 1;
last_frag = pos;
} else if (next_tsn == ctsn)
next_tsn++;
else
goto done;
break;
case SCTP_DATA_LAST_FRAG:
if (!first_frag)
first_frag = pos;
else if (ctsn != next_tsn)
goto done;
last_frag = pos;
is_last = 1;
goto done;
default:
return NULL;
};
}
/* We have the reassembled event. There is no need to look
* further.
*/
done:
retval = sctp_make_reassembled_event(first_frag, last_frag);
if (is_last)
retval->msg_flags |= MSG_EOR;
return retval;
}
/* Helper function to reassemble chunks. Hold chunks on the reasm queue that
/* Helper function to reassemble chunks. Hold chunks on the reasm queue that
* need reassembling.
*/
static inline struct sctp_ulpevent *sctp_ulpq_reasm(struct sctp_ulpq *ulpq,
......@@ -341,22 +462,86 @@ static inline struct sctp_ulpevent *sctp_ulpq_reasm(struct sctp_ulpq *ulpq,
{
struct sctp_ulpevent *retval = NULL;
/* FIXME: We should be using some new chunk structure here
* instead of carrying chunk fields in the event structure.
* This is temporary as it is too painful to change everything
* at once.
*/
/* Check if this is part of a fragmented message. */
if (SCTP_DATA_NOT_FRAG == (event->msg_flags & SCTP_DATA_FRAG_MASK))
if (SCTP_DATA_NOT_FRAG == (event->msg_flags & SCTP_DATA_FRAG_MASK)) {
event->msg_flags |= MSG_EOR;
return event;
}
sctp_ulpq_store_reasm(ulpq, event);
retval = sctp_ulpq_retrieve_reassembled(ulpq);
if (!ulpq->pd_mode)
retval = sctp_ulpq_retrieve_reassembled(ulpq);
else {
__u32 ctsn, ctsnap;
/* Do not even bother unless this is the next tsn to
* be delivered.
*/
ctsn = event->sndrcvinfo.sinfo_tsn;
ctsnap = sctp_tsnmap_get_ctsn(&ulpq->asoc->peer.tsn_map);
if (TSN_lte(ctsn, ctsnap))
retval = sctp_ulpq_retrieve_partial(ulpq);
}
return retval;
}
/* Retrieve the first part (sequential fragments) for partial delivery. */
static inline struct sctp_ulpevent *sctp_ulpq_retrieve_first(struct sctp_ulpq *ulpq)
{
struct sk_buff *pos, *tmp, *last_frag, *first_frag;
struct sctp_ulpevent *cevent;
__u32 ctsn, next_tsn;
struct sctp_ulpevent *retval;
/* The chunks are held in the reasm queue sorted by TSN.
* Walk through the queue sequentially and look for a sequence of
* fragmented chunks that start a datagram.
*/
if (skb_queue_empty(&ulpq->reasm))
return NULL;
last_frag = first_frag = NULL;
retval = NULL;
next_tsn = 0;
sctp_skb_for_each(pos, &ulpq->reasm, tmp) {
cevent = sctp_skb2event(pos);
ctsn = cevent->sndrcvinfo.sinfo_tsn;
switch (cevent->msg_flags & SCTP_DATA_FRAG_MASK) {
case SCTP_DATA_FIRST_FRAG:
if (!first_frag) {
first_frag = pos;
next_tsn = ctsn + 1;
last_frag = pos;
} else
goto done;
break;
case SCTP_DATA_MIDDLE_FRAG:
if (!first_frag)
return NULL;
if (ctsn == next_tsn) {
next_tsn++;
last_frag = pos;
} else
goto done;
break;
default:
return NULL;
};
}
/* We have the reassembled event. There is no need to look
* further.
*/
done:
retval = sctp_make_reassembled_event(first_frag, last_frag);
return retval;
}
/* Helper function to gather skbs that have possibly become
* ordered by an an incoming chunk.
*/
......@@ -392,7 +577,7 @@ static inline void sctp_ulpq_retrieve_ordered(struct sctp_ulpq *ulpq,
/* Found it, so mark in the ssnmap. */
sctp_ssn_next(in, sid);
__skb_unlink(pos, pos->list);
/* Attach all gathered skbs to the event. */
......@@ -412,7 +597,6 @@ static inline void sctp_ulpq_store_ordered(struct sctp_ulpq *ulpq,
sid = event->sndrcvinfo.sinfo_stream;
ssn = event->sndrcvinfo.sinfo_ssn;
/* Find the right place in this list. We store them by
* stream ID and then by SSN.
*/
......@@ -429,7 +613,7 @@ static inline void sctp_ulpq_store_ordered(struct sctp_ulpq *ulpq,
/* If the queue is empty, we have a different function to call. */
if (skb_peek(&ulpq->lobby))
__skb_insert(sctp_event2skb(event), pos->prev, pos,
__skb_insert(sctp_event2skb(event), pos->prev, pos,
&ulpq->lobby);
else
__skb_queue_tail(&ulpq->lobby, sctp_event2skb(event));
......@@ -441,12 +625,6 @@ static inline struct sctp_ulpevent *sctp_ulpq_order(struct sctp_ulpq *ulpq,
__u16 sid, ssn;
struct sctp_stream *in;
/* FIXME: We should be using some new chunk structure here
* instead of carrying chunk fields in the event structure.
* This is temporary as it is too painful to change everything
* at once.
*/
/* Check if this message needs ordering. */
if (SCTP_DATA_UNORDERED & event->msg_flags)
return event;
......@@ -475,3 +653,54 @@ static inline struct sctp_ulpevent *sctp_ulpq_order(struct sctp_ulpq *ulpq,
return event;
}
/* Partial deliver the first message as there is pressure on rwnd. */
void sctp_ulpq_partial_delivery(struct sctp_ulpq *ulpq,
struct sctp_chunk *chunk, int priority)
{
struct sctp_ulpevent *event;
/* Are we already in partial delivery mode? */
if (!sctp_sk(ulpq->asoc->base.sk)->pd_mode) {
/* Is partial delivery possible? */
event = sctp_ulpq_retrieve_first(ulpq);
/* Send event to the ULP. */
if (event) {
sctp_ulpq_tail_event(ulpq, event);
sctp_sk(ulpq->asoc->base.sk)->pd_mode = 1;
ulpq->pd_mode = 1;
return;
}
}
/* Assert: Either already in partial delivery mode or partial
* delivery wasn't possible, so now the only recourse is
* to renege. FIXME: Add renege support starts here.
*/
}
/* Notify the application if an association is aborted and in
* partial delivery mode. Send up any pending received messages.
*/
void sctp_ulpq_abort_pd(struct sctp_ulpq *ulpq, int priority)
{
struct sctp_ulpevent *ev = NULL;
struct sock *sk;
if (!ulpq->pd_mode)
return;
sk = ulpq->asoc->base.sk;
if (sctp_ulpevent_type_enabled(SCTP_PARTIAL_DELIVERY_EVENT,
&sctp_sk(sk)->subscribe))
ev = sctp_ulpevent_make_pdapi(ulpq->asoc,
SCTP_PARTIAL_DELIVERY_ABORTED,
priority);
if (ev)
skb_queue_tail(&sk->receive_queue, sctp_event2skb(ev));
/* If there is data waiting, send it up the socket now. */
if (sctp_ulpq_clear_pd(ulpq) || ev)
wake_up_interruptible(sk->sleep);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment