summaryrefslogtreecommitdiff
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-09-11 04:34:46 -0300
committerAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-09-11 04:34:46 -0300
commit863981e96738983919de841ec669e157e6bdaeb0 (patch)
treed6d89a12e7eb8017837c057935a2271290907f76 /net/tipc/socket.c
parent8dec7c70575785729a6a9e6719a955e9c545bcab (diff)
Linux-libre 4.7.1-gnupck-4.7.1-gnu
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c200
1 files changed, 138 insertions, 62 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5f80d3fa9..c49b8df43 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -96,8 +96,11 @@ struct tipc_sock {
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool link_cong;
- uint sent_unacked;
- uint rcv_unacked;
+ u16 snt_unacked;
+ u16 snd_win;
+ u16 peer_caps;
+ u16 rcv_unacked;
+ u16 rcv_win;
struct sockaddr_tipc remote;
struct rhash_head node;
struct rcu_head rcu;
@@ -227,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
return container_of(sk, struct tipc_sock, sk);
}
-static int tsk_conn_cong(struct tipc_sock *tsk)
+static bool tsk_conn_cong(struct tipc_sock *tsk)
{
- return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+ return tsk->snt_unacked >= tsk->snd_win;
+}
+
+/* tsk_blocks(): translate a buffer size in bytes to number of
+ * advertisable blocks, taking into account the ratio truesize(len)/len
+ * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
+ */
+static u16 tsk_adv_blocks(int len)
+{
+ return len / FLOWCTL_BLK_SZ / 4;
+}
+
+/* tsk_inc(): increment counter for sent or received data
+ * - If block based flow control is not supported by peer we
+ * fall back to message based ditto, incrementing the counter
+ */
+static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
+{
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return ((msglen / FLOWCTL_BLK_SZ) + 1);
+ return 1;
}
/**
@@ -366,7 +389,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sock->state = state;
sock_init_data(sock, sk);
if (tipc_sk_insert(tsk)) {
- pr_warn("Socket create failed; port numbrer exhausted\n");
+ pr_warn("Socket create failed; port number exhausted\n");
return -EINVAL;
}
msg_set_origport(msg, tsk->portid);
@@ -377,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_write_space = tipc_write_space;
sk->sk_destruct = tipc_sock_destruct;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
- tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
+ /* Start out with safe limits until we receive an advertised window */
+ tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
+ tsk->rcv_win = tsk->snd_win;
+
if (sock->state == SS_READY) {
tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM)
@@ -770,12 +796,14 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
* @tsk: receiving socket
* @skb: pointer to message buffer.
*/
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
{
struct sock *sk = &tsk->sk;
+ u32 onode = tsk_own_node(tsk);
struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr);
- int conn_cong;
+ bool conn_cong;
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr))
@@ -785,11 +813,14 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
if (mtyp == CONN_PROBE) {
msg_set_type(hdr, CONN_PROBE_REPLY);
- tipc_sk_respond(sk, skb, TIPC_OK);
+ if (tipc_msg_reverse(onode, &skb, TIPC_OK))
+ __skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
- tsk->sent_unacked -= msg_msgcnt(hdr);
+ tsk->snt_unacked -= msg_conn_ack(hdr);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ tsk->snd_win = msg_adv_win(hdr);
if (conn_cong)
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
@@ -1020,12 +1051,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
u32 dnode;
uint mtu, send, sent = 0;
struct iov_iter save;
+ int hlen = MIN_H_SIZE;
/* Handle implied connection establishment */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz);
+ hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc))
- tsk->sent_unacked = 1;
+ tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc;
}
if (dsz > (uint)INT_MAX)
@@ -1054,7 +1087,7 @@ next:
if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_node_xmit(net, &pktchain, dnode, portid);
if (likely(!rc)) {
- tsk->sent_unacked++;
+ tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send;
if (sent == dsz)
return dsz;
@@ -1118,6 +1151,13 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
+ tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ return;
+
+ /* Fall back to message based flow control */
+ tsk->rcv_win = FLOWCTL_MSG_WIN;
+ tsk->snd_win = FLOWCTL_MSG_WIN;
}
/**
@@ -1214,7 +1254,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
}
-static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
+static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
struct net *net = sock_net(&tsk->sk);
struct sk_buff *skb = NULL;
@@ -1230,7 +1270,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
if (!skb)
return;
msg = buf_msg(skb);
- msg_set_msgcnt(msg, ack);
+ msg_set_conn_ack(msg, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+
+ /* Adjust to and advertize the correct window limit */
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
+ tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
+ msg_set_adv_win(msg, tsk->rcv_win);
+ }
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
}
@@ -1288,7 +1335,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
long timeo;
unsigned int sz;
u32 err;
- int res;
+ int res, hlen;
/* Catch invalid receive requests */
if (unlikely(!buf_len))
@@ -1313,6 +1360,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1335,7 +1383,7 @@ restart:
sz = buf_len;
m->msg_flags |= MSG_TRUNC;
}
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
+ res = skb_copy_datagram_msg(buf, hlen, m, sz);
if (res)
goto exit;
res = sz;
@@ -1347,15 +1395,15 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if ((sock->state != SS_READY) &&
- (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ if (likely(sock->state != SS_READY)) {
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
}
+ tsk_advance_rx_queue(sk);
exit:
release_sock(sk);
return res;
@@ -1384,7 +1432,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
int sz_to_copy, target, needed;
int sz_copied = 0;
u32 err;
- int res = 0;
+ int res = 0, hlen;
/* Catch invalid receive attempts */
if (unlikely(!buf_len))
@@ -1410,6 +1458,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1434,8 +1483,7 @@ restart:
needed = (buf_len - sz_copied);
sz_to_copy = (sz <= needed) ? sz : needed;
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
- m, sz_to_copy);
+ res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
if (res)
goto exit;
@@ -1457,20 +1505,18 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
- }
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
+ tsk_advance_rx_queue(sk);
/* Loop around if more data is required */
if ((sz_copied < buf_len) && /* didn't get all requested data */
(!skb_queue_empty(&sk->sk_receive_queue) ||
(sz_copied < target)) && /* and more is ready or required */
- (!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
(!err)) /* and haven't reached a FIN */
goto restart;
@@ -1602,30 +1648,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
/**
* rcvbuf_limit - get proper overload limit of socket receive queue
* @sk: socket
- * @buf: message
+ * @skb: message
*
- * For all connection oriented messages, irrespective of importance,
- * the default overload value (i.e. 67MB) is set as limit.
+ * For connection oriented messages, irrespective of importance,
+ * default queue limit is 2 MB.
*
- * For all connectionless messages, by default new queue limits are
- * as belows:
+ * For connectionless messages, queue limits are based on message
+ * importance as follows:
*
- * TIPC_LOW_IMPORTANCE (4 MB)
- * TIPC_MEDIUM_IMPORTANCE (8 MB)
- * TIPC_HIGH_IMPORTANCE (16 MB)
- * TIPC_CRITICAL_IMPORTANCE (32 MB)
+ * TIPC_LOW_IMPORTANCE (2 MB)
+ * TIPC_MEDIUM_IMPORTANCE (4 MB)
+ * TIPC_HIGH_IMPORTANCE (8 MB)
+ * TIPC_CRITICAL_IMPORTANCE (16 MB)
*
* Returns overload limit according to corresponding message importance
*/
-static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
+static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
{
- struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_msg *hdr = buf_msg(skb);
- if (msg_connected(msg))
- return sysctl_tipc_rmem[2];
+ if (unlikely(!msg_connected(hdr)))
+ return sk->sk_rcvbuf << msg_importance(hdr);
- return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
- msg_importance(msg);
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return sk->sk_rcvbuf;
+
+ return FLOWCTL_MSG_LIM;
}
/**
@@ -1640,7 +1689,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
*
* Returns true if message was added to socket receive queue, otherwise false
*/
-static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
+static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
{
struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk);
@@ -1650,7 +1700,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
int usr = msg_user(hdr);
if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
- tipc_sk_proto_rcv(tsk, skb);
+ tipc_sk_proto_rcv(tsk, skb, xmitq);
return false;
}
@@ -1693,7 +1743,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
return true;
reject:
- tipc_sk_respond(sk, skb, err);
+ if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
+ __skb_queue_tail(xmitq, skb);
return false;
}
@@ -1709,9 +1760,24 @@ reject:
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{
unsigned int truesize = skb->truesize;
+ struct sk_buff_head xmitq;
+ u32 dnode, selector;
+
+ __skb_queue_head_init(&xmitq);
- if (likely(filter_rcv(sk, skb)))
+ if (likely(filter_rcv(sk, skb, &xmitq))) {
atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
+ return 0;
+ }
+
+ if (skb_queue_empty(&xmitq))
+ return 0;
+
+ /* Send response/rejected message */
+ skb = __skb_dequeue(&xmitq);
+ dnode = msg_destnode(buf_msg(skb));
+ selector = msg_origport(buf_msg(skb));
+ tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
return 0;
}
@@ -1725,12 +1791,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
* Caller must hold socket lock
*/
static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
- u32 dport)
+ u32 dport, struct sk_buff_head *xmitq)
{
+ unsigned long time_limit = jiffies + 2;
+ struct sk_buff *skb;
unsigned int lim;
atomic_t *dcnt;
- struct sk_buff *skb;
- unsigned long time_limit = jiffies + 2;
+ u32 onode;
while (skb_queue_len(inputq)) {
if (unlikely(time_after_eq(jiffies, time_limit)))
@@ -1742,20 +1809,22 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
/* Add message directly to receive queue if possible */
if (!sock_owned_by_user(sk)) {
- filter_rcv(sk, skb);
+ filter_rcv(sk, skb, xmitq);
continue;
}
/* Try backlog, compensating for double-counted bytes */
dcnt = &tipc_sk(sk)->dupl_rcvcnt;
- if (sk->sk_backlog.len)
+ if (!sk->sk_backlog.len)
atomic_set(dcnt, 0);
lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
if (likely(!sk_add_backlog(sk, skb, lim)))
continue;
/* Overload => reject message back to sender */
- tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
+ onode = tipc_own_addr(sock_net(sk));
+ if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+ __skb_queue_tail(xmitq, skb);
break;
}
}
@@ -1768,12 +1837,14 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
*/
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
{
+ struct sk_buff_head xmitq;
u32 dnode, dport = 0;
int err;
struct tipc_sock *tsk;
struct sock *sk;
struct sk_buff *skb;
+ __skb_queue_head_init(&xmitq);
while (skb_queue_len(inputq)) {
dport = tipc_skb_peek_port(inputq, dport);
tsk = tipc_sk_lookup(net, dport);
@@ -1781,9 +1852,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
if (likely(tsk)) {
sk = &tsk->sk;
if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
- tipc_sk_enqueue(inputq, sk, dport);
+ tipc_sk_enqueue(inputq, sk, dport, &xmitq);
spin_unlock_bh(&sk->sk_lock.slock);
}
+ /* Send pending response/rejected messages, if any */
+ while ((skb = __skb_dequeue(&xmitq))) {
+ dnode = msg_destnode(buf_msg(skb));
+ tipc_node_xmit_skb(net, skb, dnode, dport);
+ }
sock_put(sk);
continue;
}