summaryrefslogtreecommitdiff
path: root/net/tipc
diff options
context:
space:
mode:
authorAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-09-11 04:34:46 -0300
committerAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-09-11 04:34:46 -0300
commit863981e96738983919de841ec669e157e6bdaeb0 (patch)
treed6d89a12e7eb8017837c057935a2271290907f76 /net/tipc
parent8dec7c70575785729a6a9e6719a955e9c545bcab (diff)
Linux-libre 4.7.1-gnupck-4.7.1-gnu
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bearer.c116
-rw-r--r--net/tipc/bearer.h16
-rw-r--r--net/tipc/core.c8
-rw-r--r--net/tipc/discover.c7
-rw-r--r--net/tipc/discover.h2
-rw-r--r--net/tipc/link.c60
-rw-r--r--net/tipc/link.h2
-rw-r--r--net/tipc/msg.c6
-rw-r--r--net/tipc/msg.h40
-rw-r--r--net/tipc/netlink_compat.c114
-rw-r--r--net/tipc/node.c48
-rw-r--r--net/tipc/node.h6
-rw-r--r--net/tipc/server.c27
-rw-r--r--net/tipc/server.h4
-rw-r--r--net/tipc/socket.c200
-rw-r--r--net/tipc/socket.h17
-rw-r--r--net/tipc/subscr.c7
17 files changed, 462 insertions, 218 deletions
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 27a540621..a597708ae 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -205,6 +205,7 @@ static int tipc_enable_bearer(struct net *net, const char *name,
struct tipc_bearer *b;
struct tipc_media *m;
struct tipc_bearer_names b_names;
+ struct sk_buff *skb;
char addr_string[16];
u32 bearer_id;
u32 with_this_prio;
@@ -301,7 +302,7 @@ restart:
b->net_plane = bearer_id + 'A';
b->priority = priority;
- res = tipc_disc_create(net, b, &b->bcast_addr);
+ res = tipc_disc_create(net, b, &b->bcast_addr, &skb);
if (res) {
bearer_disable(net, b);
pr_warn("Bearer <%s> rejected, discovery object creation failed\n",
@@ -310,7 +311,8 @@ restart:
}
rcu_assign_pointer(tn->bearer_list[bearer_id], b);
-
+ if (skb)
+ tipc_bearer_xmit_skb(net, bearer_id, skb, &b->bcast_addr);
pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
name,
tipc_addr_string_fill(addr_string, disc_domain), priority);
@@ -328,6 +330,21 @@ static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b)
return 0;
}
+/* tipc_bearer_reset_all - reset all links on all bearers
+ */
+void tipc_bearer_reset_all(struct net *net)
+{
+ struct tipc_net *tn = tipc_net(net);
+ struct tipc_bearer *b;
+ int i;
+
+ for (i = 0; i < MAX_BEARERS; i++) {
+ b = rcu_dereference_rtnl(tn->bearer_list[i]);
+ if (b)
+ tipc_reset_bearer(net, b);
+ }
+}
+
/**
* bearer_disable
*
@@ -335,23 +352,16 @@ static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b)
*/
static void bearer_disable(struct net *net, struct tipc_bearer *b)
{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
- u32 i;
+ struct tipc_net *tn = tipc_net(net);
+ int bearer_id = b->identity;
pr_info("Disabling bearer <%s>\n", b->name);
b->media->disable_media(b);
-
- tipc_node_delete_links(net, b->identity);
+ tipc_node_delete_links(net, bearer_id);
RCU_INIT_POINTER(b->media_ptr, NULL);
if (b->link_req)
tipc_disc_delete(b->link_req);
-
- for (i = 0; i < MAX_BEARERS; i++) {
- if (b == rtnl_dereference(tn->bearer_list[i])) {
- RCU_INIT_POINTER(tn->bearer_list[i], NULL);
- break;
- }
- }
+ RCU_INIT_POINTER(tn->bearer_list[bearer_id], NULL);
kfree_rcu(b, rcu);
}
@@ -394,7 +404,7 @@ void tipc_disable_l2_media(struct tipc_bearer *b)
/**
* tipc_l2_send_msg - send a TIPC packet out over an L2 interface
- * @buf: the packet to be sent
+ * @skb: the packet to be sent
* @b: the bearer through which the packet is to be sent
* @dest: peer destination address
*/
@@ -403,17 +413,21 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
{
struct net_device *dev;
int delta;
+ void *tipc_ptr;
dev = (struct net_device *)rcu_dereference_rtnl(b->media_ptr);
if (!dev)
return 0;
+ /* Send RESET message even if bearer is detached from device */
+ tipc_ptr = rcu_dereference_rtnl(dev->tipc_ptr);
+ if (unlikely(!tipc_ptr && !msg_is_reset(buf_msg(skb))))
+ goto drop;
+
delta = dev->hard_header_len - skb_headroom(skb);
if ((delta > 0) &&
- pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) {
- kfree_skb(skb);
- return 0;
- }
+ pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC))
+ goto drop;
skb_reset_network_header(skb);
skb->dev = dev;
@@ -422,6 +436,9 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
dev->dev_addr, skb->len);
dev_queue_xmit(skb);
return 0;
+drop:
+ kfree_skb(skb);
+ return 0;
}
int tipc_bearer_mtu(struct net *net, u32 bearer_id)
@@ -450,6 +467,8 @@ void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b))
b->media->send_msg(net, skb, b, dest);
+ else
+ kfree_skb(skb);
rcu_read_unlock();
}
@@ -468,11 +487,11 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
rcu_read_lock();
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
- if (likely(b)) {
- skb_queue_walk_safe(xmitq, skb, tmp) {
- __skb_dequeue(xmitq);
- b->media->send_msg(net, skb, b, dst);
- }
+ if (unlikely(!b))
+ __skb_queue_purge(xmitq);
+ skb_queue_walk_safe(xmitq, skb, tmp) {
+ __skb_dequeue(xmitq);
+ b->media->send_msg(net, skb, b, dst);
}
rcu_read_unlock();
}
@@ -490,14 +509,14 @@ void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
rcu_read_lock();
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
- if (likely(b)) {
- skb_queue_walk_safe(xmitq, skb, tmp) {
- hdr = buf_msg(skb);
- msg_set_non_seq(hdr, 1);
- msg_set_mc_netid(hdr, net_id);
- __skb_dequeue(xmitq);
- b->media->send_msg(net, skb, b, &b->bcast_addr);
- }
+ if (unlikely(!b))
+ __skb_queue_purge(xmitq);
+ skb_queue_walk_safe(xmitq, skb, tmp) {
+ hdr = buf_msg(skb);
+ msg_set_non_seq(hdr, 1);
+ msg_set_mc_netid(hdr, net_id);
+ __skb_dequeue(xmitq);
+ b->media->send_msg(net, skb, b, &b->bcast_addr);
}
rcu_read_unlock();
}
@@ -513,24 +532,21 @@ void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
* ignores packets sent using interface multicast, and traffic sent to other
* nodes (which can happen if interface is running in promiscuous mode).
*/
-static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev,
+static int tipc_l2_rcv_msg(struct sk_buff *skb, struct net_device *dev,
struct packet_type *pt, struct net_device *orig_dev)
{
struct tipc_bearer *b;
rcu_read_lock();
b = rcu_dereference_rtnl(dev->tipc_ptr);
- if (likely(b)) {
- if (likely(buf->pkt_type <= PACKET_BROADCAST)) {
- buf->next = NULL;
- tipc_rcv(dev_net(dev), buf, b);
- rcu_read_unlock();
- return NET_RX_SUCCESS;
- }
+ if (likely(b && (skb->pkt_type <= PACKET_BROADCAST))) {
+ skb->next = NULL;
+ tipc_rcv(dev_net(dev), skb, b);
+ rcu_read_unlock();
+ return NET_RX_SUCCESS;
}
rcu_read_unlock();
-
- kfree_skb(buf);
+ kfree_skb(skb);
return NET_RX_DROP;
}
@@ -548,9 +564,18 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
{
struct net_device *dev = netdev_notifier_info_to_dev(ptr);
struct net *net = dev_net(dev);
+ struct tipc_net *tn = tipc_net(net);
struct tipc_bearer *b;
+ int i;
b = rtnl_dereference(dev->tipc_ptr);
+ if (!b) {
+ for (i = 0; i < MAX_BEARERS; b = NULL, i++) {
+ b = rtnl_dereference(tn->bearer_list[i]);
+ if (b && (b->media_ptr == dev))
+ break;
+ }
+ }
if (!b)
return NOTIFY_DONE;
@@ -560,13 +585,20 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
case NETDEV_CHANGE:
if (netif_carrier_ok(dev))
break;
+ case NETDEV_UP:
+ rcu_assign_pointer(dev->tipc_ptr, b);
+ break;
case NETDEV_GOING_DOWN:
+ RCU_INIT_POINTER(dev->tipc_ptr, NULL);
+ synchronize_net();
+ tipc_reset_bearer(net, b);
+ break;
case NETDEV_CHANGEMTU:
tipc_reset_bearer(net, b);
break;
case NETDEV_CHANGEADDR:
b->media->raw2addr(b, &b->addr,
- (char *)dev->dev_addr);
+ (char *)dev->dev_addr);
tipc_reset_bearer(net, b);
break;
case NETDEV_UNREGISTER:
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index e31820516..60e49c3be 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -42,8 +42,6 @@
#include <net/genetlink.h>
#define MAX_MEDIA 3
-#define MAX_NODES 4096
-#define WSIZE 32
/* Identifiers associated with TIPC message header media address info
* - address info field is 32 bytes long
@@ -62,16 +60,6 @@
#define TIPC_MEDIA_TYPE_UDP 3
/**
- * struct tipc_node_map - set of node identifiers
- * @count: # of nodes in set
- * @map: bitmap of node identifiers that are in the set
- */
-struct tipc_node_map {
- u32 count;
- u32 map[MAX_NODES / WSIZE];
-};
-
-/**
* struct tipc_media_addr - destination address used by TIPC bearers
* @value: address info (format defined by media)
* @media_id: TIPC media type identifier
@@ -142,7 +130,6 @@ struct tipc_media {
* @identity: array index of this bearer within TIPC bearer array
* @link_req: ptr to (optional) structure making periodic link setup requests
* @net_plane: network plane ('A' through 'H') currently associated with bearer
- * @nodes: indicates which nodes in cluster can be reached through bearer
*
* Note: media-specific code is responsible for initialization of the fields
* indicated below when a bearer is enabled; TIPC's generic bearer code takes
@@ -163,8 +150,6 @@ struct tipc_bearer {
u32 identity;
struct tipc_link_req *link_req;
char net_plane;
- int node_cnt;
- struct tipc_node_map nodes;
};
struct tipc_bearer_names {
@@ -213,6 +198,7 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest);
void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest);
struct tipc_bearer *tipc_bearer_find(struct net *net, const char *name);
struct tipc_media *tipc_media_find(const char *name);
+void tipc_bearer_reset_all(struct net *net);
int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
diff --git a/net/tipc/core.c b/net/tipc/core.c
index e2bdb07a4..fe1b062c4 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -112,11 +112,9 @@ static int __init tipc_init(void)
pr_info("Activated (version " TIPC_MOD_VER ")\n");
- sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
- TIPC_LOW_IMPORTANCE;
- sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
- TIPC_CRITICAL_IMPORTANCE;
- sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
+ sysctl_tipc_rmem[0] = RCVBUF_MIN;
+ sysctl_tipc_rmem[1] = RCVBUF_DEF;
+ sysctl_tipc_rmem[2] = RCVBUF_MAX;
err = tipc_netlink_start();
if (err)
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index f1e738e80..ad9d477cc 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -268,10 +268,9 @@ exit:
* Returns 0 if successful, otherwise -errno.
*/
int tipc_disc_create(struct net *net, struct tipc_bearer *b,
- struct tipc_media_addr *dest)
+ struct tipc_media_addr *dest, struct sk_buff **skb)
{
struct tipc_link_req *req;
- struct sk_buff *skb;
req = kmalloc(sizeof(*req), GFP_ATOMIC);
if (!req)
@@ -293,9 +292,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b,
setup_timer(&req->timer, disc_timeout, (unsigned long)req);
mod_timer(&req->timer, jiffies + req->timer_intv);
b->link_req = req;
- skb = skb_clone(req->buf, GFP_ATOMIC);
- if (skb)
- tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest);
+ *skb = skb_clone(req->buf, GFP_ATOMIC);
return 0;
}
diff --git a/net/tipc/discover.h b/net/tipc/discover.h
index c9b12770c..b80a33538 100644
--- a/net/tipc/discover.h
+++ b/net/tipc/discover.h
@@ -40,7 +40,7 @@
struct tipc_link_req;
int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,
- struct tipc_media_addr *dest);
+ struct tipc_media_addr *dest, struct sk_buff **skb);
void tipc_disc_delete(struct tipc_link_req *req);
void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr);
void tipc_disc_add_dest(struct tipc_link_req *req);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 7d2bb3e70..7d89f8713 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -140,6 +140,7 @@ struct tipc_link {
char if_name[TIPC_MAX_IF_NAME];
u32 priority;
char net_plane;
+ u16 rst_cnt;
/* Failover/synch */
u16 drop_point;
@@ -348,6 +349,8 @@ void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
u16 ack = snd_l->snd_nxt - 1;
snd_l->ackers--;
+ rcv_l->bc_peer_is_up = true;
+ rcv_l->state = LINK_ESTABLISHED;
tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
tipc_link_reset(rcv_l);
rcv_l->state = LINK_RESET;
@@ -701,40 +704,35 @@ static void link_profile_stats(struct tipc_link *l)
/* tipc_link_timeout - perform periodic task as instructed from node timeout
*/
-/* tipc_link_timeout - perform periodic task as instructed from node timeout
- */
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
{
+ int mtyp = 0;
int rc = 0;
- int mtyp = STATE_MSG;
- bool xmit = false;
- bool prb = false;
+ bool state = false;
+ bool probe = false;
+ bool setup = false;
u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
u16 bc_acked = l->bc_rcvlink->acked;
- bool bc_up = link_is_up(l->bc_rcvlink);
link_profile_stats(l);
switch (l->state) {
case LINK_ESTABLISHED:
case LINK_SYNCHING:
- if (!l->silent_intv_cnt) {
- if (bc_up && (bc_acked != bc_snt))
- xmit = true;
- } else if (l->silent_intv_cnt <= l->abort_limit) {
- xmit = true;
- prb = true;
- } else {
- rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
- }
+ if (l->silent_intv_cnt > l->abort_limit)
+ return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
+ mtyp = STATE_MSG;
+ state = bc_acked != bc_snt;
+ probe = l->silent_intv_cnt;
l->silent_intv_cnt++;
break;
case LINK_RESET:
- xmit = true;
+ setup = l->rst_cnt++ <= 4;
+ setup |= !(l->rst_cnt % 16);
mtyp = RESET_MSG;
break;
case LINK_ESTABLISHING:
- xmit = true;
+ setup = true;
mtyp = ACTIVATE_MSG;
break;
case LINK_PEER_RESET:
@@ -745,8 +743,8 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
break;
}
- if (xmit)
- tipc_link_build_proto_msg(l, mtyp, prb, 0, 0, 0, xmitq);
+ if (state || probe || setup)
+ tipc_link_build_proto_msg(l, mtyp, probe, 0, 0, 0, xmitq);
return rc;
}
@@ -833,6 +831,7 @@ void tipc_link_reset(struct tipc_link *l)
l->rcv_nxt = 1;
l->acked = 0;
l->silent_intv_cnt = 0;
+ l->rst_cnt = 0;
l->stats.recv_info = 0;
l->stale_count = 0;
l->bc_peer_is_up = false;
@@ -1110,12 +1109,12 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
return released;
}
-/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
+/* tipc_link_build_state_msg: prepare link state message for transmission
*
* Note that sending of broadcast ack is coordinated among nodes, to reduce
* risk of ack storms towards the sender
*/
-int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
+int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
{
if (!l)
return 0;
@@ -1140,11 +1139,17 @@ int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
{
int mtyp = RESET_MSG;
+ struct sk_buff *skb;
if (l->state == LINK_ESTABLISHING)
mtyp = ACTIVATE_MSG;
tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq);
+
+ /* Inform peer that this endpoint is going down if applicable */
+ skb = skb_peek_tail(xmitq);
+ if (skb && (l->state == LINK_RESET))
+ msg_set_peer_stopping(buf_msg(skb), 1);
}
/* tipc_link_build_nack_msg: prepare link nack message for transmission
@@ -1219,7 +1224,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
if (!tipc_data_input(l, skb, l->inputq))
rc |= tipc_link_input(l, skb, l->inputq);
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
- rc |= tipc_link_build_ack_msg(l, xmitq);
+ rc |= tipc_link_build_state_msg(l, xmitq);
if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
break;
} while ((skb = __skb_dequeue(defq)));
@@ -1411,7 +1416,9 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
l->priority = peers_prio;
/* ACTIVATE_MSG serves as PEER_RESET if link is already down */
- if ((mtyp == RESET_MSG) || !link_is_up(l))
+ if (msg_peer_stopping(hdr))
+ rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
+ else if ((mtyp == RESET_MSG) || !link_is_up(l))
rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT);
/* ACTIVATE_MSG takes up link if it was already locally reset */
@@ -1554,7 +1561,12 @@ void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
if (!msg_peer_node_is_up(hdr))
return;
- l->bc_peer_is_up = true;
+ /* Open when peer ackowledges our bcast init msg (pkt #1) */
+ if (msg_ack(hdr))
+ l->bc_peer_is_up = true;
+
+ if (!l->bc_peer_is_up)
+ return;
/* Ignore if peers_snd_nxt goes beyond receive window */
if (more(peers_snd_nxt, l->rcv_nxt + l->window))
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 6a94175ee..d7e9d42fc 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -123,7 +123,7 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq);
-int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
+int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
void tipc_link_add_bc_peer(struct tipc_link *snd_l,
struct tipc_link *uc_l,
struct sk_buff_head *xmitq);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 8740930f0..17201aa84 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -41,6 +41,8 @@
#include "name_table.h"
#define MAX_FORWARD_SIZE 1024
+#define BUF_HEADROOM (LL_MAX_HEADER + 48)
+#define BUF_TAILROOM 16
static unsigned int align(unsigned int i)
{
@@ -505,6 +507,10 @@ bool tipc_msg_reverse(u32 own_node, struct sk_buff **skb, int err)
msg_set_hdr_sz(hdr, BASIC_H_SIZE);
}
+ if (skb_cloned(_skb) &&
+ pskb_expand_head(_skb, BUF_HEADROOM, BUF_TAILROOM, GFP_KERNEL))
+ goto exit;
+
/* Now reverse the concerned fields */
msg_set_errcode(hdr, err);
msg_set_origport(hdr, msg_destport(&ohdr));
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 55778a0ae..7cf52fb39 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -94,17 +94,6 @@ struct plist;
#define TIPC_MEDIA_INFO_OFFSET 5
-/**
- * TIPC message buffer code
- *
- * TIPC message buffer headroom reserves space for the worst-case
- * link-level device header (in case the message is sent off-node).
- *
- * Note: Headroom should be a multiple of 4 to ensure the TIPC header fields
- * are word aligned for quicker access
- */
-#define BUF_HEADROOM (LL_MAX_HEADER + 48)
-
struct tipc_skb_cb {
void *handle;
struct sk_buff *tail;
@@ -715,6 +704,16 @@ static inline void msg_set_redundant_link(struct tipc_msg *m, u32 r)
msg_set_bits(m, 5, 12, 0x1, r);
}
+static inline u32 msg_peer_stopping(struct tipc_msg *m)
+{
+ return msg_bits(m, 5, 13, 0x1);
+}
+
+static inline void msg_set_peer_stopping(struct tipc_msg *m, u32 s)
+{
+ msg_set_bits(m, 5, 13, 0x1, s);
+}
+
static inline char *msg_media_addr(struct tipc_msg *m)
{
return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET];
@@ -733,16 +732,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n);
}
-static inline u32 msg_bcast_tag(struct tipc_msg *m)
+static inline u32 msg_conn_ack(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
}
-static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n)
+static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 9, 16, 0xffff, n);
}
+static inline u32 msg_adv_win(struct tipc_msg *m)
+{
+ return msg_bits(m, 9, 0, 0xffff);
+}
+
+static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
+{
+ msg_set_bits(m, 9, 0, 0xffff, n);
+}
+
static inline u32 msg_max_pkt(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff) * 4;
@@ -779,6 +788,11 @@ static inline bool msg_peer_node_is_up(struct tipc_msg *m)
return msg_redundant_link(m);
}
+static inline bool msg_is_reset(struct tipc_msg *hdr)
+{
+ return (msg_user(hdr) == LINK_PROTOCOL) && (msg_type(hdr) == RESET_MSG);
+}
+
struct sk_buff *tipc_buf_acquire(u32 size);
bool tipc_msg_validate(struct sk_buff *skb);
bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);
diff --git a/net/tipc/netlink_compat.c b/net/tipc/netlink_compat.c
index 4dfc5c14f..1fd464764 100644
--- a/net/tipc/netlink_compat.c
+++ b/net/tipc/netlink_compat.c
@@ -346,9 +346,15 @@ static int tipc_nl_compat_bearer_dump(struct tipc_nl_compat_msg *msg,
struct nlattr **attrs)
{
struct nlattr *bearer[TIPC_NLA_BEARER_MAX + 1];
+ int err;
+
+ if (!attrs[TIPC_NLA_BEARER])
+ return -EINVAL;
- nla_parse_nested(bearer, TIPC_NLA_BEARER_MAX, attrs[TIPC_NLA_BEARER],
- NULL);
+ err = nla_parse_nested(bearer, TIPC_NLA_BEARER_MAX,
+ attrs[TIPC_NLA_BEARER], NULL);
+ if (err)
+ return err;
return tipc_add_tlv(msg->rep, TIPC_TLV_BEARER_NAME,
nla_data(bearer[TIPC_NLA_BEARER_NAME]),
@@ -460,14 +466,31 @@ static int tipc_nl_compat_link_stat_dump(struct tipc_nl_compat_msg *msg,
struct nlattr *link[TIPC_NLA_LINK_MAX + 1];
struct nlattr *prop[TIPC_NLA_PROP_MAX + 1];
struct nlattr *stats[TIPC_NLA_STATS_MAX + 1];
+ int err;
- nla_parse_nested(link, TIPC_NLA_LINK_MAX, attrs[TIPC_NLA_LINK], NULL);
+ if (!attrs[TIPC_NLA_LINK])
+ return -EINVAL;
- nla_parse_nested(prop, TIPC_NLA_PROP_MAX, link[TIPC_NLA_LINK_PROP],
- NULL);
+ err = nla_parse_nested(link, TIPC_NLA_LINK_MAX, attrs[TIPC_NLA_LINK],
+ NULL);
+ if (err)
+ return err;
+
+ if (!link[TIPC_NLA_LINK_PROP])
+ return -EINVAL;
- nla_parse_nested(stats, TIPC_NLA_STATS_MAX, link[TIPC_NLA_LINK_STATS],
- NULL);
+ err = nla_parse_nested(prop, TIPC_NLA_PROP_MAX,
+ link[TIPC_NLA_LINK_PROP], NULL);
+ if (err)
+ return err;
+
+ if (!link[TIPC_NLA_LINK_STATS])
+ return -EINVAL;
+
+ err = nla_parse_nested(stats, TIPC_NLA_STATS_MAX,
+ link[TIPC_NLA_LINK_STATS], NULL);
+ if (err)
+ return err;
name = (char *)TLV_DATA(msg->req);
if (strcmp(name, nla_data(link[TIPC_NLA_LINK_NAME])) != 0)
@@ -569,12 +592,20 @@ static int tipc_nl_compat_link_dump(struct tipc_nl_compat_msg *msg,
{
struct nlattr *link[TIPC_NLA_LINK_MAX + 1];
struct tipc_link_info link_info;
+ int err;
- nla_parse_nested(link, TIPC_NLA_LINK_MAX, attrs[TIPC_NLA_LINK], NULL);
+ if (!attrs[TIPC_NLA_LINK])
+ return -EINVAL;
+
+ err = nla_parse_nested(link, TIPC_NLA_LINK_MAX, attrs[TIPC_NLA_LINK],
+ NULL);
+ if (err)
+ return err;
link_info.dest = nla_get_flag(link[TIPC_NLA_LINK_DEST]);
link_info.up = htonl(nla_get_flag(link[TIPC_NLA_LINK_UP]));
- strcpy(link_info.str, nla_data(link[TIPC_NLA_LINK_NAME]));
+ nla_strlcpy(link_info.str, link[TIPC_NLA_LINK_NAME],
+ TIPC_MAX_LINK_NAME);
return tipc_add_tlv(msg->rep, TIPC_TLV_LINK_INFO,
&link_info, sizeof(link_info));
@@ -758,12 +789,23 @@ static int tipc_nl_compat_name_table_dump(struct tipc_nl_compat_msg *msg,
u32 node, depth, type, lowbound, upbound;
static const char * const scope_str[] = {"", " zone", " cluster",
" node"};
+ int err;
- nla_parse_nested(nt, TIPC_NLA_NAME_TABLE_MAX,
- attrs[TIPC_NLA_NAME_TABLE], NULL);
+ if (!attrs[TIPC_NLA_NAME_TABLE])
+ return -EINVAL;
- nla_parse_nested(publ, TIPC_NLA_PUBL_MAX, nt[TIPC_NLA_NAME_TABLE_PUBL],
- NULL);
+ err = nla_parse_nested(nt, TIPC_NLA_NAME_TABLE_MAX,
+ attrs[TIPC_NLA_NAME_TABLE], NULL);
+ if (err)
+ return err;
+
+ if (!nt[TIPC_NLA_NAME_TABLE_PUBL])
+ return -EINVAL;
+
+ err = nla_parse_nested(publ, TIPC_NLA_PUBL_MAX,
+ nt[TIPC_NLA_NAME_TABLE_PUBL], NULL);
+ if (err)
+ return err;
ntq = (struct tipc_name_table_query *)TLV_DATA(msg->req);
@@ -815,8 +857,15 @@ static int __tipc_nl_compat_publ_dump(struct tipc_nl_compat_msg *msg,
{
u32 type, lower, upper;
struct nlattr *publ[TIPC_NLA_PUBL_MAX + 1];
+ int err;
- nla_parse_nested(publ, TIPC_NLA_PUBL_MAX, attrs[TIPC_NLA_PUBL], NULL);
+ if (!attrs[TIPC_NLA_PUBL])
+ return -EINVAL;
+
+ err = nla_parse_nested(publ, TIPC_NLA_PUBL_MAX, attrs[TIPC_NLA_PUBL],
+ NULL);
+ if (err)
+ return err;
type = nla_get_u32(publ[TIPC_NLA_PUBL_TYPE]);
lower = nla_get_u32(publ[TIPC_NLA_PUBL_LOWER]);
@@ -876,7 +925,13 @@ static int tipc_nl_compat_sk_dump(struct tipc_nl_compat_msg *msg,
u32 sock_ref;
struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
- nla_parse_nested(sock, TIPC_NLA_SOCK_MAX, attrs[TIPC_NLA_SOCK], NULL);
+ if (!attrs[TIPC_NLA_SOCK])
+ return -EINVAL;
+
+ err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX, attrs[TIPC_NLA_SOCK],
+ NULL);
+ if (err)
+ return err;
sock_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
tipc_tlv_sprintf(msg->rep, "%u:", sock_ref);
@@ -917,9 +972,15 @@ static int tipc_nl_compat_media_dump(struct tipc_nl_compat_msg *msg,
struct nlattr **attrs)
{
struct nlattr *media[TIPC_NLA_MEDIA_MAX + 1];
+ int err;
+
+ if (!attrs[TIPC_NLA_MEDIA])
+ return -EINVAL;
- nla_parse_nested(media, TIPC_NLA_MEDIA_MAX, attrs[TIPC_NLA_MEDIA],
- NULL);
+ err = nla_parse_nested(media, TIPC_NLA_MEDIA_MAX, attrs[TIPC_NLA_MEDIA],
+ NULL);
+ if (err)
+ return err;
return tipc_add_tlv(msg->rep, TIPC_TLV_MEDIA_NAME,
nla_data(media[TIPC_NLA_MEDIA_NAME]),
@@ -931,8 +992,15 @@ static int tipc_nl_compat_node_dump(struct tipc_nl_compat_msg *msg,
{
struct tipc_node_info node_info;
struct nlattr *node[TIPC_NLA_NODE_MAX + 1];
+ int err;
- nla_parse_nested(node, TIPC_NLA_NODE_MAX, attrs[TIPC_NLA_NODE], NULL);
+ if (!attrs[TIPC_NLA_NODE])
+ return -EINVAL;
+
+ err = nla_parse_nested(node, TIPC_NLA_NODE_MAX, attrs[TIPC_NLA_NODE],
+ NULL);
+ if (err)
+ return err;
node_info.addr = htonl(nla_get_u32(node[TIPC_NLA_NODE_ADDR]));
node_info.up = htonl(nla_get_flag(node[TIPC_NLA_NODE_UP]));
@@ -971,8 +1039,16 @@ static int tipc_nl_compat_net_dump(struct tipc_nl_compat_msg *msg,
{
__be32 id;
struct nlattr *net[TIPC_NLA_NET_MAX + 1];
+ int err;
+
+ if (!attrs[TIPC_NLA_NET])
+ return -EINVAL;
+
+ err = nla_parse_nested(net, TIPC_NLA_NET_MAX, attrs[TIPC_NLA_NET],
+ NULL);
+ if (err)
+ return err;
- nla_parse_nested(net, TIPC_NLA_NET_MAX, attrs[TIPC_NLA_NET], NULL);
id = htonl(nla_get_u32(net[TIPC_NLA_NET_ID]));
return tipc_add_tlv(msg->rep, TIPC_TLV_UNSIGNED, &id, sizeof(id));
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 9aaa1bc56..23d476184 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
/*
* net/tipc/node.c: TIPC node management routines
*
- * Copyright (c) 2000-2006, 2012-2015, Ericsson AB
+ * Copyright (c) 2000-2006, 2012-2016, Ericsson AB
* Copyright (c) 2005-2006, 2010-2014, Wind River Systems
* All rights reserved.
*
@@ -191,6 +191,20 @@ int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
tipc_node_put(n);
return mtu;
}
+
+u16 tipc_node_get_capabilities(struct net *net, u32 addr)
+{
+ struct tipc_node *n;
+ u16 caps;
+
+ n = tipc_node_find(net, addr);
+ if (unlikely(!n))
+ return TIPC_NODE_CAPABILITIES;
+ caps = n->capabilities;
+ tipc_node_put(n);
+ return caps;
+}
+
/*
* A trivial power-of-two bitmask technique is used for speed, since this
* operation is done for every incoming TIPC packet. The number of hash table
@@ -304,8 +318,11 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
spin_lock_bh(&tn->node_list_lock);
n = tipc_node_find(net, addr);
- if (n)
+ if (n) {
+ /* Same node may come back with new capabilities */
+ n->capabilities = capabilities;
goto exit;
+ }
n = kzalloc(sizeof(*n), GFP_ATOMIC);
if (!n) {
pr_warn("Node creation failed, no memory\n");
@@ -525,7 +542,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
struct tipc_link *ol = node_active_link(n, 0);
struct tipc_link *nl = n->links[bearer_id].link;
- if (!nl)
+ if (!nl || tipc_link_is_up(nl))
return;
tipc_link_fsm_evt(nl, LINK_ESTABLISH_EVT);
@@ -545,12 +562,16 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
pr_debug("Established link <%s> on network plane %c\n",
tipc_link_name(nl), tipc_link_plane(nl));
+ /* Ensure that a STATE message goes first */
+ tipc_link_build_state_msg(nl, xmitq);
+
/* First link? => give it both slots */
if (!ol) {
*slot0 = bearer_id;
*slot1 = bearer_id;
tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
n->action_flags |= TIPC_NOTIFY_NODE_UP;
+ tipc_link_set_active(nl, true);
tipc_bcast_add_peer(n->net, nl, xmitq);
return;
}
@@ -581,8 +602,12 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
struct sk_buff_head *xmitq)
{
+ struct tipc_media_addr *maddr;
+
tipc_node_write_lock(n);
__tipc_node_link_up(n, bearer_id, xmitq);
+ maddr = &n->links[bearer_id].maddr;
+ tipc_bearer_xmit(n->net, bearer_id, xmitq, maddr);
tipc_node_write_unlock(n);
}
@@ -1272,14 +1297,10 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
rc = tipc_bcast_rcv(net, be->link, skb);
- /* Broadcast link reset may happen at reassembly failure */
- if (rc & TIPC_LINK_DOWN_EVT)
- tipc_node_reset_links(n);
-
/* Broadcast ACKs are sent on a unicast link */
if (rc & TIPC_LINK_SND_BC_ACK) {
tipc_node_read_lock(n);
- tipc_link_build_ack_msg(le->link, &xmitq);
+ tipc_link_build_state_msg(le->link, &xmitq);
tipc_node_read_unlock(n);
}
@@ -1295,6 +1316,17 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
spin_unlock_bh(&be->inputq2.lock);
tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
}
+
+ if (rc & TIPC_LINK_DOWN_EVT) {
+ /* Reception reassembly failure => reset all links to peer */
+ if (!tipc_link_is_up(be->link))
+ tipc_node_reset_links(n);
+
+ /* Retransmission failure => reset all links to all peers */
+ if (!tipc_link_is_up(tipc_bc_sndlink(net)))
+ tipc_bearer_reset_all(net);
+ }
+
tipc_node_put(n);
}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index f39d9d06e..8264b3d97 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -45,10 +45,11 @@
/* Optional capabilities supported by this code version
*/
enum {
- TIPC_BCAST_SYNCH = (1 << 1)
+ TIPC_BCAST_SYNCH = (1 << 1),
+ TIPC_BLOCK_FLOWCTL = (2 << 1)
};
-#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
+#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
@@ -70,6 +71,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
+u16 tipc_node_get_capabilities(struct net *net, u32 addr);
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info);
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 2446bfbaa..272d20a79 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -86,6 +86,7 @@ struct outqueue_entry {
static void tipc_recv_work(struct work_struct *work);
static void tipc_send_work(struct work_struct *work);
static void tipc_clean_outqueues(struct tipc_conn *con);
+static void tipc_sock_release(struct tipc_conn *con);
static void tipc_conn_kref_release(struct kref *kref)
{
@@ -102,6 +103,7 @@ static void tipc_conn_kref_release(struct kref *kref)
}
saddr->scope = -TIPC_NODE_SCOPE;
kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr));
+ tipc_sock_release(con);
sock_release(sock);
con->sock = NULL;
}
@@ -136,28 +138,28 @@ static void sock_data_ready(struct sock *sk)
{
struct tipc_conn *con;
- read_lock(&sk->sk_callback_lock);
+ read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (con && test_bit(CF_CONNECTED, &con->flags)) {
conn_get(con);
if (!queue_work(con->server->rcv_wq, &con->rwork))
conn_put(con);
}
- read_unlock(&sk->sk_callback_lock);
+ read_unlock_bh(&sk->sk_callback_lock);
}
static void sock_write_space(struct sock *sk)
{
struct tipc_conn *con;
- read_lock(&sk->sk_callback_lock);
+ read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (con && test_bit(CF_CONNECTED, &con->flags)) {
conn_get(con);
if (!queue_work(con->server->send_wq, &con->swork))
conn_put(con);
}
- read_unlock(&sk->sk_callback_lock);
+ read_unlock_bh(&sk->sk_callback_lock);
}
static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
@@ -184,26 +186,31 @@ static void tipc_unregister_callbacks(struct tipc_conn *con)
write_unlock_bh(&sk->sk_callback_lock);
}
+static void tipc_sock_release(struct tipc_conn *con)
+{
+ struct tipc_server *s = con->server;
+
+ if (con->conid)
+ s->tipc_conn_release(con->conid, con->usr_data);
+
+ tipc_unregister_callbacks(con);
+}
+
static void tipc_close_conn(struct tipc_conn *con)
{
struct tipc_server *s = con->server;
if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
- if (con->conid)
- s->tipc_conn_shutdown(con->conid, con->usr_data);
spin_lock_bh(&s->idr_lock);
idr_remove(&s->conn_idr, con->conid);
s->idr_in_use--;
spin_unlock_bh(&s->idr_lock);
- tipc_unregister_callbacks(con);
-
/* We shouldn't flush pending works as we may be in the
* thread. In fact the races with pending rx/tx work structs
* are harmless for us here as we have already deleted this
- * connection from server connection list and set
- * sk->sk_user_data to 0 before releasing connection object.
+ * connection from server connection list.
*/
kernel_sock_shutdown(con->sock, SHUT_RDWR);
diff --git a/net/tipc/server.h b/net/tipc/server.h
index 9015faedb..34f8055af 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -53,7 +53,7 @@
* @send_wq: send workqueue
* @max_rcvbuf_size: maximum permitted receive message length
* @tipc_conn_new: callback will be called when new connection is incoming
- * @tipc_conn_shutdown: callback will be called when connection is shut down
+ * @tipc_conn_release: callback will be called before releasing the connection
* @tipc_conn_recvmsg: callback will be called when message arrives
* @saddr: TIPC server address
* @name: server name
@@ -70,7 +70,7 @@ struct tipc_server {
struct workqueue_struct *send_wq;
int max_rcvbuf_size;
void *(*tipc_conn_new)(int conid);
- void (*tipc_conn_shutdown)(int conid, void *usr_data);
+ void (*tipc_conn_release)(int conid, void *usr_data);
void (*tipc_conn_recvmsg)(struct net *net, int conid,
struct sockaddr_tipc *addr, void *usr_data,
void *buf, size_t len);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5f80d3fa9..c49b8df43 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -96,8 +96,11 @@ struct tipc_sock {
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool link_cong;
- uint sent_unacked;
- uint rcv_unacked;
+ u16 snt_unacked;
+ u16 snd_win;
+ u16 peer_caps;
+ u16 rcv_unacked;
+ u16 rcv_win;
struct sockaddr_tipc remote;
struct rhash_head node;
struct rcu_head rcu;
@@ -227,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
return container_of(sk, struct tipc_sock, sk);
}
-static int tsk_conn_cong(struct tipc_sock *tsk)
+static bool tsk_conn_cong(struct tipc_sock *tsk)
{
- return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+ return tsk->snt_unacked >= tsk->snd_win;
+}
+
+/* tsk_blocks(): translate a buffer size in bytes to number of
+ * advertisable blocks, taking into account the ratio truesize(len)/len
+ * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
+ */
+static u16 tsk_adv_blocks(int len)
+{
+ return len / FLOWCTL_BLK_SZ / 4;
+}
+
+/* tsk_inc(): increment counter for sent or received data
+ * - If block based flow control is not supported by peer we
+ * fall back to message based ditto, incrementing the counter
+ */
+static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
+{
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return ((msglen / FLOWCTL_BLK_SZ) + 1);
+ return 1;
}
/**
@@ -366,7 +389,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sock->state = state;
sock_init_data(sock, sk);
if (tipc_sk_insert(tsk)) {
- pr_warn("Socket create failed; port numbrer exhausted\n");
+ pr_warn("Socket create failed; port number exhausted\n");
return -EINVAL;
}
msg_set_origport(msg, tsk->portid);
@@ -377,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_write_space = tipc_write_space;
sk->sk_destruct = tipc_sock_destruct;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
- tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
+ /* Start out with safe limits until we receive an advertised window */
+ tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
+ tsk->rcv_win = tsk->snd_win;
+
if (sock->state == SS_READY) {
tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM)
@@ -770,12 +796,14 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
* @tsk: receiving socket
* @skb: pointer to message buffer.
*/
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
{
struct sock *sk = &tsk->sk;
+ u32 onode = tsk_own_node(tsk);
struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr);
- int conn_cong;
+ bool conn_cong;
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr))
@@ -785,11 +813,14 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
if (mtyp == CONN_PROBE) {
msg_set_type(hdr, CONN_PROBE_REPLY);
- tipc_sk_respond(sk, skb, TIPC_OK);
+ if (tipc_msg_reverse(onode, &skb, TIPC_OK))
+ __skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
- tsk->sent_unacked -= msg_msgcnt(hdr);
+ tsk->snt_unacked -= msg_conn_ack(hdr);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ tsk->snd_win = msg_adv_win(hdr);
if (conn_cong)
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
@@ -1020,12 +1051,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
u32 dnode;
uint mtu, send, sent = 0;
struct iov_iter save;
+ int hlen = MIN_H_SIZE;
/* Handle implied connection establishment */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz);
+ hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc))
- tsk->sent_unacked = 1;
+ tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc;
}
if (dsz > (uint)INT_MAX)
@@ -1054,7 +1087,7 @@ next:
if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_node_xmit(net, &pktchain, dnode, portid);
if (likely(!rc)) {
- tsk->sent_unacked++;
+ tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send;
if (sent == dsz)
return dsz;
@@ -1118,6 +1151,13 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
+ tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ return;
+
+ /* Fall back to message based flow control */
+ tsk->rcv_win = FLOWCTL_MSG_WIN;
+ tsk->snd_win = FLOWCTL_MSG_WIN;
}
/**
@@ -1214,7 +1254,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
}
-static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
+static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
struct net *net = sock_net(&tsk->sk);
struct sk_buff *skb = NULL;
@@ -1230,7 +1270,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
if (!skb)
return;
msg = buf_msg(skb);
- msg_set_msgcnt(msg, ack);
+ msg_set_conn_ack(msg, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+
+ /* Adjust to and advertize the correct window limit */
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
+ tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
+ msg_set_adv_win(msg, tsk->rcv_win);
+ }
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
}
@@ -1288,7 +1335,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
long timeo;
unsigned int sz;
u32 err;
- int res;
+ int res, hlen;
/* Catch invalid receive requests */
if (unlikely(!buf_len))
@@ -1313,6 +1360,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1335,7 +1383,7 @@ restart:
sz = buf_len;
m->msg_flags |= MSG_TRUNC;
}
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
+ res = skb_copy_datagram_msg(buf, hlen, m, sz);
if (res)
goto exit;
res = sz;
@@ -1347,15 +1395,15 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if ((sock->state != SS_READY) &&
- (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ if (likely(sock->state != SS_READY)) {
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
}
+ tsk_advance_rx_queue(sk);
exit:
release_sock(sk);
return res;
@@ -1384,7 +1432,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
int sz_to_copy, target, needed;
int sz_copied = 0;
u32 err;
- int res = 0;
+ int res = 0, hlen;
/* Catch invalid receive attempts */
if (unlikely(!buf_len))
@@ -1410,6 +1458,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1434,8 +1483,7 @@ restart:
needed = (buf_len - sz_copied);
sz_to_copy = (sz <= needed) ? sz : needed;
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
- m, sz_to_copy);
+ res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
if (res)
goto exit;
@@ -1457,20 +1505,18 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
- }
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
+ tsk_advance_rx_queue(sk);
/* Loop around if more data is required */
if ((sz_copied < buf_len) && /* didn't get all requested data */
(!skb_queue_empty(&sk->sk_receive_queue) ||
(sz_copied < target)) && /* and more is ready or required */
- (!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
(!err)) /* and haven't reached a FIN */
goto restart;
@@ -1602,30 +1648,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
/**
* rcvbuf_limit - get proper overload limit of socket receive queue
* @sk: socket
- * @buf: message
+ * @skb: message
*
- * For all connection oriented messages, irrespective of importance,
- * the default overload value (i.e. 67MB) is set as limit.
+ * For connection oriented messages, irrespective of importance,
+ * default queue limit is 2 MB.
*
- * For all connectionless messages, by default new queue limits are
- * as belows:
+ * For connectionless messages, queue limits are based on message
+ * importance as follows:
*
- * TIPC_LOW_IMPORTANCE (4 MB)
- * TIPC_MEDIUM_IMPORTANCE (8 MB)
- * TIPC_HIGH_IMPORTANCE (16 MB)
- * TIPC_CRITICAL_IMPORTANCE (32 MB)
+ * TIPC_LOW_IMPORTANCE (2 MB)
+ * TIPC_MEDIUM_IMPORTANCE (4 MB)
+ * TIPC_HIGH_IMPORTANCE (8 MB)
+ * TIPC_CRITICAL_IMPORTANCE (16 MB)
*
* Returns overload limit according to corresponding message importance
*/
-static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
+static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
{
- struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_msg *hdr = buf_msg(skb);
- if (msg_connected(msg))
- return sysctl_tipc_rmem[2];
+ if (unlikely(!msg_connected(hdr)))
+ return sk->sk_rcvbuf << msg_importance(hdr);
- return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
- msg_importance(msg);
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return sk->sk_rcvbuf;
+
+ return FLOWCTL_MSG_LIM;
}
/**
@@ -1640,7 +1689,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
*
* Returns true if message was added to socket receive queue, otherwise false
*/
-static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
+static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
{
struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk);
@@ -1650,7 +1700,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
int usr = msg_user(hdr);
if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
- tipc_sk_proto_rcv(tsk, skb);
+ tipc_sk_proto_rcv(tsk, skb, xmitq);
return false;
}
@@ -1693,7 +1743,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
return true;
reject:
- tipc_sk_respond(sk, skb, err);
+ if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
+ __skb_queue_tail(xmitq, skb);
return false;
}
@@ -1709,9 +1760,24 @@ reject:
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{
unsigned int truesize = skb->truesize;
+ struct sk_buff_head xmitq;
+ u32 dnode, selector;
+
+ __skb_queue_head_init(&xmitq);
- if (likely(filter_rcv(sk, skb)))
+ if (likely(filter_rcv(sk, skb, &xmitq))) {
atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
+ return 0;
+ }
+
+ if (skb_queue_empty(&xmitq))
+ return 0;
+
+ /* Send response/rejected message */
+ skb = __skb_dequeue(&xmitq);
+ dnode = msg_destnode(buf_msg(skb));
+ selector = msg_origport(buf_msg(skb));
+ tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
return 0;
}
@@ -1725,12 +1791,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
* Caller must hold socket lock
*/
static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
- u32 dport)
+ u32 dport, struct sk_buff_head *xmitq)
{
+ unsigned long time_limit = jiffies + 2;
+ struct sk_buff *skb;
unsigned int lim;
atomic_t *dcnt;
- struct sk_buff *skb;
- unsigned long time_limit = jiffies + 2;
+ u32 onode;
while (skb_queue_len(inputq)) {
if (unlikely(time_after_eq(jiffies, time_limit)))
@@ -1742,20 +1809,22 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
/* Add message directly to receive queue if possible */
if (!sock_owned_by_user(sk)) {
- filter_rcv(sk, skb);
+ filter_rcv(sk, skb, xmitq);
continue;
}
/* Try backlog, compensating for double-counted bytes */
dcnt = &tipc_sk(sk)->dupl_rcvcnt;
- if (sk->sk_backlog.len)
+ if (!sk->sk_backlog.len)
atomic_set(dcnt, 0);
lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
if (likely(!sk_add_backlog(sk, skb, lim)))
continue;
/* Overload => reject message back to sender */
- tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
+ onode = tipc_own_addr(sock_net(sk));
+ if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+ __skb_queue_tail(xmitq, skb);
break;
}
}
@@ -1768,12 +1837,14 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
*/
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
{
+ struct sk_buff_head xmitq;
u32 dnode, dport = 0;
int err;
struct tipc_sock *tsk;
struct sock *sk;
struct sk_buff *skb;
+ __skb_queue_head_init(&xmitq);
while (skb_queue_len(inputq)) {
dport = tipc_skb_peek_port(inputq, dport);
tsk = tipc_sk_lookup(net, dport);
@@ -1781,9 +1852,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
if (likely(tsk)) {
sk = &tsk->sk;
if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
- tipc_sk_enqueue(inputq, sk, dport);
+ tipc_sk_enqueue(inputq, sk, dport, &xmitq);
spin_unlock_bh(&sk->sk_lock.slock);
}
+ /* Send pending response/rejected messages, if any */
+ while ((skb = __skb_dequeue(&xmitq))) {
+ dnode = msg_destnode(buf_msg(skb));
+ tipc_node_xmit_skb(net, skb, dnode, dport);
+ }
sock_put(sk);
continue;
}
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 4241f2206..06fb5944c 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -1,6 +1,6 @@
/* net/tipc/socket.h: Include file for TIPC socket code
*
- * Copyright (c) 2014-2015, Ericsson AB
+ * Copyright (c) 2014-2016, Ericsson AB
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,10 +38,17 @@
#include <net/sock.h>
#include <net/genetlink.h>
-#define TIPC_CONNACK_INTV 256
-#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
-#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
- SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
+/* Compatibility values for deprecated message based flow control */
+#define FLOWCTL_MSG_WIN 512
+#define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
+
+#define FLOWCTL_BLK_SZ 1024
+
+/* Socket receive buffer sizes */
+#define RCVBUF_MIN (FLOWCTL_BLK_SZ * 512)
+#define RCVBUF_DEF (FLOWCTL_BLK_SZ * 1024 * 2)
+#define RCVBUF_MAX (FLOWCTL_BLK_SZ * 1024 * 16)
+
int tipc_socket_init(void);
void tipc_socket_stop(void);
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index e6cb386fb..0dd02244e 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -302,7 +302,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
}
/* Handle one termination request for the subscriber */
-static void tipc_subscrb_shutdown_cb(int conid, void *usr_data)
+static void tipc_subscrb_release_cb(int conid, void *usr_data)
{
tipc_subscrb_delete((struct tipc_subscriber *)usr_data);
}
@@ -326,8 +326,7 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
return tipc_subscrp_cancel(s, subscriber);
}
- if (s)
- tipc_subscrp_subscribe(net, s, subscriber, swap);
+ tipc_subscrp_subscribe(net, s, subscriber, swap);
}
/* Handle one request to establish a new subscriber */
@@ -365,7 +364,7 @@ int tipc_topsrv_start(struct net *net)
topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr);
topsrv->tipc_conn_recvmsg = tipc_subscrb_rcv_cb;
topsrv->tipc_conn_new = tipc_subscrb_connect_cb;
- topsrv->tipc_conn_shutdown = tipc_subscrb_shutdown_cb;
+ topsrv->tipc_conn_release = tipc_subscrb_release_cb;
strncpy(topsrv->name, name, strlen(name) + 1);
tn->topsrv = topsrv;