diff options
Diffstat (limited to 'net/rds')
-rw-r--r-- | net/rds/af_rds.c | 9 | ||||
-rw-r--r-- | net/rds/bind.c | 3 | ||||
-rw-r--r-- | net/rds/connection.c | 33 | ||||
-rw-r--r-- | net/rds/ib.c | 24 | ||||
-rw-r--r-- | net/rds/ib.h | 6 | ||||
-rw-r--r-- | net/rds/ib_cm.c | 26 | ||||
-rw-r--r-- | net/rds/ib_rdma.c | 57 | ||||
-rw-r--r-- | net/rds/ib_recv.c | 82 | ||||
-rw-r--r-- | net/rds/ib_send.c | 13 | ||||
-rw-r--r-- | net/rds/iw.c | 12 | ||||
-rw-r--r-- | net/rds/iw_cm.c | 5 | ||||
-rw-r--r-- | net/rds/iw_rdma.c | 5 | ||||
-rw-r--r-- | net/rds/iw_send.c | 5 | ||||
-rw-r--r-- | net/rds/rdma.c | 9 | ||||
-rw-r--r-- | net/rds/rdma_transport.c | 15 | ||||
-rw-r--r-- | net/rds/rds.h | 24 | ||||
-rw-r--r-- | net/rds/send.c | 57 | ||||
-rw-r--r-- | net/rds/tcp.c | 165 | ||||
-rw-r--r-- | net/rds/tcp.h | 7 | ||||
-rw-r--r-- | net/rds/tcp_connect.c | 9 | ||||
-rw-r--r-- | net/rds/tcp_listen.c | 40 | ||||
-rw-r--r-- | net/rds/tcp_recv.c | 11 | ||||
-rw-r--r-- | net/rds/transport.c | 4 |
23 files changed, 456 insertions, 165 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index 896834cd3..a2f28a6d4 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -438,6 +438,14 @@ static const struct proto_ops rds_proto_ops = { .sendpage = sock_no_sendpage, }; +static void rds_sock_destruct(struct sock *sk) +{ + struct rds_sock *rs = rds_sk_to_rs(sk); + + WARN_ON((&rs->rs_item != rs->rs_item.next || + &rs->rs_item != rs->rs_item.prev)); +} + static int __rds_create(struct socket *sock, struct sock *sk, int protocol) { struct rds_sock *rs; @@ -445,6 +453,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) sock_init_data(sock, sk); sock->ops = &rds_proto_ops; sk->sk_protocol = protocol; + sk->sk_destruct = rds_sock_destruct; rs = rds_sk_to_rs(sk); spin_lock_init(&rs->rs_lock); diff --git a/net/rds/bind.c b/net/rds/bind.c index 4ebd29c12..dd666fb9b 100644 --- a/net/rds/bind.c +++ b/net/rds/bind.c @@ -185,7 +185,8 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) ret = 0; goto out; } - trans = rds_trans_get_preferred(sin->sin_addr.s_addr); + trans = rds_trans_get_preferred(sock_net(sock->sk), + sin->sin_addr.s_addr); if (!trans) { ret = -EADDRNOTAVAIL; rds_remove_bound(rs); diff --git a/net/rds/connection.c b/net/rds/connection.c index da6da57e5..49adeef80 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -70,7 +70,8 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr) } while (0) /* rcu read lock must be held or the connection spinlock */ -static struct rds_connection *rds_conn_lookup(struct hlist_head *head, +static struct rds_connection *rds_conn_lookup(struct net *net, + struct hlist_head *head, __be32 laddr, __be32 faddr, struct rds_transport *trans) { @@ -78,7 +79,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, hlist_for_each_entry_rcu(conn, head, c_hash_node) { if (conn->c_faddr == faddr && conn->c_laddr == laddr && - conn->c_trans == trans) { + conn->c_trans == trans && net == rds_conn_net(conn)) { ret = conn; break; } @@ -117,7 +118,8 @@ static void rds_conn_reset(struct rds_connection *conn) * For now they are not garbage collected once they're created. They * are torn down as the module is removed, if ever. */ -static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, +static struct rds_connection *__rds_conn_create(struct net *net, + __be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp, int is_outgoing) { @@ -131,7 +133,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, if (!is_outgoing && otrans->t_type == RDS_TRANS_TCP) goto new_conn; rcu_read_lock(); - conn = rds_conn_lookup(head, laddr, faddr, trans); + conn = rds_conn_lookup(net, head, laddr, faddr, trans); if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport && laddr == faddr && !is_outgoing) { /* This is a looped back IB connection, and we're @@ -157,6 +159,7 @@ new_conn: conn->c_faddr = faddr; spin_lock_init(&conn->c_lock); conn->c_next_tx_seq = 1; + rds_conn_net_set(conn, net); init_waitqueue_head(&conn->c_waitq); INIT_LIST_HEAD(&conn->c_send_queue); @@ -174,7 +177,7 @@ new_conn: * can bind to the destination address then we'd rather the messages * flow through loopback rather than either transport. */ - loop_trans = rds_trans_get_preferred(faddr); + loop_trans = rds_trans_get_preferred(net, faddr); if (loop_trans) { rds_trans_put(loop_trans); conn->c_loopback = 1; @@ -187,6 +190,12 @@ new_conn: } } + if (trans == NULL) { + kmem_cache_free(rds_conn_slab, conn); + conn = ERR_PTR(-ENODEV); + goto out; + } + conn->c_trans = trans; ret = trans->conn_alloc(conn, gfp); @@ -237,7 +246,7 @@ new_conn: if (!is_outgoing && otrans->t_type == RDS_TRANS_TCP) found = NULL; else - found = rds_conn_lookup(head, laddr, faddr, trans); + found = rds_conn_lookup(net, head, laddr, faddr, trans); if (found) { trans->conn_free(conn->c_transport_data); kmem_cache_free(rds_conn_slab, conn); @@ -260,17 +269,19 @@ out: return conn; } -struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr, +struct rds_connection *rds_conn_create(struct net *net, + __be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp) { - return __rds_conn_create(laddr, faddr, trans, gfp, 0); + return __rds_conn_create(net, laddr, faddr, trans, gfp, 0); } EXPORT_SYMBOL_GPL(rds_conn_create); -struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, +struct rds_connection *rds_conn_create_outgoing(struct net *net, + __be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp) { - return __rds_conn_create(laddr, faddr, trans, gfp, 1); + return __rds_conn_create(net, laddr, faddr, trans, gfp, 1); } EXPORT_SYMBOL_GPL(rds_conn_create_outgoing); @@ -297,6 +308,8 @@ void rds_conn_shutdown(struct rds_connection *conn) wait_event(conn->c_waitq, !test_bit(RDS_IN_XMIT, &conn->c_flags)); + wait_event(conn->c_waitq, + !test_bit(RDS_RECV_REFILL, &conn->c_flags)); conn->c_trans->conn_shutdown(conn); rds_conn_reset(conn); diff --git a/net/rds/ib.c b/net/rds/ib.c index ba2dffeff..2d3f2ab47 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -99,8 +99,6 @@ static void rds_ib_dev_free(struct work_struct *work) if (rds_ibdev->mr_pool) rds_ib_destroy_mr_pool(rds_ibdev->mr_pool); - if (rds_ibdev->mr) - ib_dereg_mr(rds_ibdev->mr); if (rds_ibdev->pd) ib_dealloc_pd(rds_ibdev->pd); @@ -164,12 +162,6 @@ static void rds_ib_add_one(struct ib_device *device) goto put_dev; } - rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, IB_ACCESS_LOCAL_WRITE); - if (IS_ERR(rds_ibdev->mr)) { - rds_ibdev->mr = NULL; - goto put_dev; - } - rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev); if (IS_ERR(rds_ibdev->mr_pool)) { rds_ibdev->mr_pool = NULL; @@ -230,11 +222,10 @@ struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device) * * This can be called at any time and can be racing with any other RDS path. */ -static void rds_ib_remove_one(struct ib_device *device) +static void rds_ib_remove_one(struct ib_device *device, void *client_data) { - struct rds_ib_device *rds_ibdev; + struct rds_ib_device *rds_ibdev = client_data; - rds_ibdev = ib_get_client_data(device, &rds_ib_client); if (!rds_ibdev) return; @@ -317,7 +308,7 @@ static void rds_ib_ic_info(struct socket *sock, unsigned int len, * allowed to influence which paths have priority. We could call userspace * asserting this policy "routing". */ -static int rds_ib_laddr_check(__be32 addr) +static int rds_ib_laddr_check(struct net *net, __be32 addr) { int ret; struct rdma_cm_id *cm_id; @@ -366,6 +357,7 @@ void rds_ib_exit(void) rds_ib_sysctl_exit(); rds_ib_recv_exit(); rds_trans_unregister(&rds_ib_transport); + rds_ib_fmr_exit(); } struct rds_transport rds_ib_transport = { @@ -401,10 +393,14 @@ int rds_ib_init(void) INIT_LIST_HEAD(&rds_ib_devices); - ret = ib_register_client(&rds_ib_client); + ret = rds_ib_fmr_init(); if (ret) goto out; + ret = ib_register_client(&rds_ib_client); + if (ret) + goto out_fmr_exit; + ret = rds_ib_sysctl_init(); if (ret) goto out_ibreg; @@ -427,6 +423,8 @@ out_sysctl: rds_ib_sysctl_exit(); out_ibreg: rds_ib_unregister_client(); +out_fmr_exit: + rds_ib_fmr_exit(); out: return ret; } diff --git a/net/rds/ib.h b/net/rds/ib.h index 86d88ec5d..aae60fda7 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -100,7 +100,6 @@ struct rds_ib_connection { /* alphabet soup, IBTA style */ struct rdma_cm_id *i_cm_id; struct ib_pd *i_pd; - struct ib_mr *i_mr; struct ib_cq *i_send_cq; struct ib_cq *i_recv_cq; @@ -173,7 +172,6 @@ struct rds_ib_device { struct list_head conn_list; struct ib_device *dev; struct ib_pd *pd; - struct ib_mr *mr; struct rds_ib_mr_pool *mr_pool; unsigned int fmr_max_remaps; unsigned int max_fmrs; @@ -313,6 +311,8 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, void rds_ib_sync_mr(void *trans_private, int dir); void rds_ib_free_mr(void *trans_private, int invalidate); void rds_ib_flush_mrs(void); +int rds_ib_fmr_init(void); +void rds_ib_fmr_exit(void); /* ib_recv.c */ int rds_ib_recv_init(void); @@ -320,7 +320,7 @@ void rds_ib_recv_exit(void); int rds_ib_recv(struct rds_connection *conn); int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic); void rds_ib_recv_free_caches(struct rds_ib_connection *ic); -void rds_ib_recv_refill(struct rds_connection *conn, int prefill); +void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp); void rds_ib_inc_free(struct rds_incoming *inc); int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 0da2a45b3..9043f5c04 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -135,7 +135,7 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even rds_ib_recv_init_ring(ic); /* Post receive buffers - as a side effect, this will update * the posted credit count. */ - rds_ib_recv_refill(conn, 1); + rds_ib_recv_refill(conn, 1, GFP_KERNEL); /* Tune RNR behavior */ rds_ib_tune_rnr(ic, &qp_attr); @@ -269,7 +269,6 @@ static int rds_ib_setup_qp(struct rds_connection *conn) /* Protection domain and memory range */ ic->i_pd = rds_ibdev->pd; - ic->i_mr = rds_ibdev->mr; cq_attr.cqe = ic->i_send_ring.w_nr + 1; ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler, @@ -375,7 +374,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) rds_ib_recv_init_ack(ic); - rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr, + rdsdebug("conn %p pd %p cq %p %p\n", conn, ic->i_pd, ic->i_send_cq, ic->i_recv_cq); out: @@ -448,8 +447,9 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id, (unsigned long long)be64_to_cpu(lguid), (unsigned long long)be64_to_cpu(fguid)); - conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport, - GFP_KERNEL); + /* RDS/IB is not currently netns aware, thus init_net */ + conn = rds_conn_create(&init_net, dp->dp_daddr, dp->dp_saddr, + &rds_ib_transport, GFP_KERNEL); if (IS_ERR(conn)) { rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn)); conn = NULL; @@ -639,6 +639,15 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) (atomic_read(&ic->i_signaled_sends) == 0)); tasklet_kill(&ic->i_recv_tasklet); + /* first destroy the ib state that generates callbacks */ + if (ic->i_cm_id->qp) + rdma_destroy_qp(ic->i_cm_id); + if (ic->i_send_cq) + ib_destroy_cq(ic->i_send_cq); + if (ic->i_recv_cq) + ib_destroy_cq(ic->i_recv_cq); + + /* then free the resources that ib callbacks use */ if (ic->i_send_hdrs) ib_dma_free_coherent(dev, ic->i_send_ring.w_nr * @@ -662,12 +671,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) if (ic->i_recvs) rds_ib_recv_clear_ring(ic); - if (ic->i_cm_id->qp) - rdma_destroy_qp(ic->i_cm_id); - if (ic->i_send_cq) - ib_destroy_cq(ic->i_send_cq); - if (ic->i_recv_cq) - ib_destroy_cq(ic->i_recv_cq); rdma_destroy_id(ic->i_cm_id); /* @@ -678,7 +681,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ic->i_cm_id = NULL; ic->i_pd = NULL; - ic->i_mr = NULL; ic->i_send_cq = NULL; ic->i_recv_cq = NULL; ic->i_send_hdrs = NULL; diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index 657ba9f5d..251d1ce0b 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -83,6 +83,25 @@ struct rds_ib_mr_pool { struct ib_fmr_attr fmr_attr; }; +struct workqueue_struct *rds_ib_fmr_wq; + +int rds_ib_fmr_init(void) +{ + rds_ib_fmr_wq = create_workqueue("rds_fmr_flushd"); + if (!rds_ib_fmr_wq) + return -ENOMEM; + return 0; +} + +/* By the time this is called all the IB devices should have been torn down and + * had their pools freed. As each pool is freed its work struct is waited on, + * so the pool flushing work queue should be idle by the time we get here. + */ +void rds_ib_fmr_exit(void) +{ + destroy_workqueue(rds_ib_fmr_wq); +} + static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **); static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr); static void rds_ib_mr_pool_flush_worker(struct work_struct *work); @@ -151,12 +170,17 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) struct rds_ib_device *rds_ibdev_old; rds_ibdev_old = rds_ib_get_device(ipaddr); - if (rds_ibdev_old) { + if (!rds_ibdev_old) + return rds_ib_add_ipaddr(rds_ibdev, ipaddr); + + if (rds_ibdev_old != rds_ibdev) { rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr); rds_ib_dev_put(rds_ibdev_old); + return rds_ib_add_ipaddr(rds_ibdev, ipaddr); } + rds_ib_dev_put(rds_ibdev_old); - return rds_ib_add_ipaddr(rds_ibdev, ipaddr); + return 0; } void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) @@ -336,8 +360,6 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) goto out_no_cigar; } - memset(ibmr, 0, sizeof(*ibmr)); - ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd, (IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_READ | @@ -485,7 +507,7 @@ static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr) /* FIXME we need a way to tell a r/w MR * from a r/o MR */ - BUG_ON(irqs_disabled()); + WARN_ON(!page->mapping && irqs_disabled()); set_page_dirty(page); put_page(page); } @@ -523,11 +545,13 @@ static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int fr /* * given an llist of mrs, put them all into the list_head for more processing */ -static void llist_append_to_list(struct llist_head *llist, struct list_head *list) +static unsigned int llist_append_to_list(struct llist_head *llist, + struct list_head *list) { struct rds_ib_mr *ibmr; struct llist_node *node; struct llist_node *next; + unsigned int count = 0; node = llist_del_all(llist); while (node) { @@ -535,7 +559,9 @@ static void llist_append_to_list(struct llist_head *llist, struct list_head *lis ibmr = llist_entry(node, struct rds_ib_mr, llnode); list_add_tail(&ibmr->unmap_list, list); node = next; + count++; } + return count; } /* @@ -576,7 +602,7 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, LIST_HEAD(unmap_list); LIST_HEAD(fmr_list); unsigned long unpinned = 0; - unsigned int nfreed = 0, ncleaned = 0, free_goal; + unsigned int nfreed = 0, dirty_to_clean = 0, free_goal; int ret = 0; rds_ib_stats_inc(s_ib_rdma_mr_pool_flush); @@ -618,8 +644,8 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, /* Get the list of all MRs to be dropped. Ordering matters - * we want to put drop_list ahead of free_list. */ - llist_append_to_list(&pool->drop_list, &unmap_list); - llist_append_to_list(&pool->free_list, &unmap_list); + dirty_to_clean = llist_append_to_list(&pool->drop_list, &unmap_list); + dirty_to_clean += llist_append_to_list(&pool->free_list, &unmap_list); if (free_all) llist_append_to_list(&pool->clean_list, &unmap_list); @@ -647,7 +673,6 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, kfree(ibmr); nfreed++; } - ncleaned++; } if (!list_empty(&unmap_list)) { @@ -673,7 +698,7 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, } atomic_sub(unpinned, &pool->free_pinned); - atomic_sub(ncleaned, &pool->dirty_count); + atomic_sub(dirty_to_clean, &pool->dirty_count); atomic_sub(nfreed, &pool->item_count); out: @@ -710,16 +735,18 @@ void rds_ib_free_mr(void *trans_private, int invalidate) /* If we've pinned too many pages, request a flush */ if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned || - atomic_read(&pool->dirty_count) >= pool->max_items / 10) - schedule_delayed_work(&pool->flush_worker, 10); + atomic_read(&pool->dirty_count) >= pool->max_items / 5) + queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10); if (invalidate) { if (likely(!in_interrupt())) { rds_ib_flush_mr_pool(pool, 0, NULL); } else { /* We get here if the user created a MR marked - * as use_once and invalidate at the same time. */ - schedule_delayed_work(&pool->flush_worker, 10); + * as use_once and invalidate at the same time. + */ + queue_delayed_work(rds_ib_fmr_wq, + &pool->flush_worker, 10); } } diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index cac5b4506..f43831e41 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -62,12 +62,12 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic) sge = &recv->r_sge[0]; sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header)); sge->length = sizeof(struct rds_header); - sge->lkey = ic->i_mr->lkey; + sge->lkey = ic->i_pd->local_dma_lkey; sge = &recv->r_sge[1]; sge->addr = 0; sge->length = RDS_FRAG_SIZE; - sge->lkey = ic->i_mr->lkey; + sge->lkey = ic->i_pd->local_dma_lkey; } } @@ -297,7 +297,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic } static int rds_ib_recv_refill_one(struct rds_connection *conn, - struct rds_ib_recv_work *recv, int prefill) + struct rds_ib_recv_work *recv, gfp_t gfp) { struct rds_ib_connection *ic = conn->c_transport_data; struct ib_sge *sge; @@ -305,7 +305,7 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn, gfp_t slab_mask = GFP_NOWAIT; gfp_t page_mask = GFP_NOWAIT; - if (prefill) { + if (gfp & __GFP_WAIT) { slab_mask = GFP_KERNEL; page_mask = GFP_HIGHUSER; } @@ -347,6 +347,24 @@ out: return ret; } +static int acquire_refill(struct rds_connection *conn) +{ + return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0; +} + +static void release_refill(struct rds_connection *conn) +{ + clear_bit(RDS_RECV_REFILL, &conn->c_flags); + + /* We don't use wait_on_bit()/wake_up_bit() because our waking is in a + * hot path and finding waiters is very rare. We don't want to walk + * the system-wide hashed waitqueue buckets in the fast path only to + * almost never find waiters. + */ + if (waitqueue_active(&conn->c_waitq)) + wake_up_all(&conn->c_waitq); +} + /* * This tries to allocate and post unused work requests after making sure that * they have all the allocations they need to queue received fragments into @@ -354,15 +372,23 @@ out: * * -1 is returned if posting fails due to temporary resource exhaustion. */ -void rds_ib_recv_refill(struct rds_connection *conn, int prefill) +void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) { struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_recv_work *recv; struct ib_recv_wr *failed_wr; unsigned int posted = 0; int ret = 0; + bool can_wait = !!(gfp & __GFP_WAIT); u32 pos; + /* the goal here is to just make sure that someone, somewhere + * is posting buffers. If we can't get the refill lock, + * let them do their thing + */ + if (!acquire_refill(conn)) + return; + while ((prefill || rds_conn_up(conn)) && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) { if (pos >= ic->i_recv_ring.w_nr) { @@ -372,7 +398,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill) } recv = &ic->i_recvs[pos]; - ret = rds_ib_recv_refill_one(conn, recv, prefill); + ret = rds_ib_recv_refill_one(conn, recv, gfp); if (ret) { break; } @@ -402,6 +428,24 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill) if (ret) rds_ib_ring_unalloc(&ic->i_recv_ring, 1); + + release_refill(conn); + + /* if we're called from the softirq handler, we'll be GFP_NOWAIT. + * in this case the ring being low is going to lead to more interrupts + * and we can safely let the softirq code take care of it unless the + * ring is completely empty. + * + * if we're called from krdsd, we'll be GFP_KERNEL. In this case + * we might have raced with the softirq code while we had the refill + * lock held. Use rds_ib_ring_low() instead of ring_empty to decide + * if we should requeue. + */ + if (rds_conn_up(conn) && + ((can_wait && rds_ib_ring_low(&ic->i_recv_ring)) || + rds_ib_ring_empty(&ic->i_recv_ring))) { + queue_delayed_work(rds_wq, &conn->c_recv_w, 1); + } } /* @@ -520,7 +564,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic) sge->addr = ic->i_ack_dma; sge->length = sizeof(struct rds_header); - sge->lkey = ic->i_mr->lkey; + sge->lkey = ic->i_pd->local_dma_lkey; wr->sg_list = sge; wr->num_sge = 1; @@ -982,10 +1026,17 @@ static inline void rds_poll_cq(struct rds_ib_connection *ic, } /* - * It's very important that we only free this ring entry if we've truly - * freed the resources allocated to the entry. The refilling path can - * leak if we don't. + * rds_ib_process_recv() doesn't always consume the frag, and + * we might not have called it at all if the wc didn't indicate + * success. We already unmapped the frag's pages, though, and + * the following rds_ib_ring_free() call tells the refill path + * that it will not find an allocated frag here. Make sure we + * keep that promise by freeing a frag that's still on the ring. */ + if (recv->r_frag) { + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; + } rds_ib_ring_free(&ic->i_recv_ring, 1); } } @@ -1016,7 +1067,7 @@ void rds_ib_recv_tasklet_fn(unsigned long data) rds_ib_stats_inc(s_ib_rx_ring_empty); if (rds_ib_ring_low(&ic->i_recv_ring)) - rds_ib_recv_refill(conn, 0); + rds_ib_recv_refill(conn, 0, GFP_NOWAIT); } int rds_ib_recv(struct rds_connection *conn) @@ -1025,8 +1076,10 @@ int rds_ib_recv(struct rds_connection *conn) int ret = 0; rdsdebug("conn %p\n", conn); - if (rds_conn_up(conn)) + if (rds_conn_up(conn)) { rds_ib_attempt_ack(ic); + rds_ib_recv_refill(conn, 0, GFP_KERNEL); + } return ret; } @@ -1049,9 +1102,10 @@ int rds_ib_recv_init(void) rds_ib_frag_slab = kmem_cache_create("rds_ib_frag", sizeof(struct rds_page_frag), 0, SLAB_HWCACHE_ALIGN, NULL); - if (!rds_ib_frag_slab) + if (!rds_ib_frag_slab) { kmem_cache_destroy(rds_ib_incoming_slab); - else + rds_ib_incoming_slab = NULL; + } else ret = 0; out: return ret; diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 5d0a704fa..4e8804708 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -202,9 +202,9 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic) sge = &send->s_sge[0]; sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header)); sge->length = sizeof(struct rds_header); - sge->lkey = ic->i_mr->lkey; + sge->lkey = ic->i_pd->local_dma_lkey; - send->s_sge[1].lkey = ic->i_mr->lkey; + send->s_sge[1].lkey = ic->i_pd->local_dma_lkey; } } @@ -709,6 +709,11 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, if (scat == &rm->data.op_sg[rm->data.op_count]) { prev->s_op = ic->i_data_op; prev->s_wr.send_flags |= IB_SEND_SOLICITED; + if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED)) { + ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; + prev->s_wr.send_flags |= IB_SEND_SIGNALED; + nr_sig++; + } ic->i_data_op = NULL; } @@ -813,7 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) /* Convert our struct scatterlist to struct ib_sge */ send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg); send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg); - send->s_sge[0].lkey = ic->i_mr->lkey; + send->s_sge[0].lkey = ic->i_pd->local_dma_lkey; rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, send->s_sge[0].addr, send->s_sge[0].length); @@ -927,7 +932,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat); send->s_sge[j].length = len; - send->s_sge[j].lkey = ic->i_mr->lkey; + send->s_sge[j].lkey = ic->i_pd->local_dma_lkey; sent += len; rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr); diff --git a/net/rds/iw.c b/net/rds/iw.c index 589935661..3df0295c6 100644 --- a/net/rds/iw.c +++ b/net/rds/iw.c @@ -125,12 +125,11 @@ free_attr: kfree(dev_attr); } -static void rds_iw_remove_one(struct ib_device *device) +static void rds_iw_remove_one(struct ib_device *device, void *client_data) { - struct rds_iw_device *rds_iwdev; + struct rds_iw_device *rds_iwdev = client_data; struct rds_iw_cm_id *i_cm_id, *next; - rds_iwdev = ib_get_client_data(device, &rds_iw_client); if (!rds_iwdev) return; @@ -149,10 +148,7 @@ static void rds_iw_remove_one(struct ib_device *device) if (rds_iwdev->mr) ib_dereg_mr(rds_iwdev->mr); - while (ib_dealloc_pd(rds_iwdev->pd)) { - rdsdebug("Failed to dealloc pd %p\n", rds_iwdev->pd); - msleep(1); - } + ib_dealloc_pd(rds_iwdev->pd); list_del(&rds_iwdev->list); kfree(rds_iwdev); @@ -218,7 +214,7 @@ static void rds_iw_ic_info(struct socket *sock, unsigned int len, * allowed to influence which paths have priority. We could call userspace * asserting this policy "routing". */ -static int rds_iw_laddr_check(__be32 addr) +static int rds_iw_laddr_check(struct net *net, __be32 addr) { int ret; struct rdma_cm_id *cm_id; diff --git a/net/rds/iw_cm.c b/net/rds/iw_cm.c index 8f486fa32..a6553a6fb 100644 --- a/net/rds/iw_cm.c +++ b/net/rds/iw_cm.c @@ -398,8 +398,9 @@ int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id, &dp->dp_saddr, &dp->dp_daddr, RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version)); - conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_iw_transport, - GFP_KERNEL); + /* RDS/IW is not currently netns aware, thus init_net */ + conn = rds_conn_create(&init_net, dp->dp_daddr, dp->dp_saddr, + &rds_iw_transport, GFP_KERNEL); if (IS_ERR(conn)) { rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn)); conn = NULL; diff --git a/net/rds/iw_rdma.c b/net/rds/iw_rdma.c index dba8d0864..6a8fbd6e6 100644 --- a/net/rds/iw_rdma.c +++ b/net/rds/iw_rdma.c @@ -667,11 +667,12 @@ static int rds_iw_init_fastreg(struct rds_iw_mr_pool *pool, struct ib_mr *mr; int err; - mr = ib_alloc_fast_reg_mr(rds_iwdev->pd, pool->max_message_size); + mr = ib_alloc_mr(rds_iwdev->pd, IB_MR_TYPE_MEM_REG, + pool->max_message_size); if (IS_ERR(mr)) { err = PTR_ERR(mr); - printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_mr failed (err=%d)\n", err); + printk(KERN_WARNING "RDS/IW: ib_alloc_mr failed (err=%d)\n", err); return err; } diff --git a/net/rds/iw_send.c b/net/rds/iw_send.c index 334fe98c5..86152ec3b 100644 --- a/net/rds/iw_send.c +++ b/net/rds/iw_send.c @@ -153,9 +153,10 @@ void rds_iw_send_init_ring(struct rds_iw_connection *ic) sge->length = sizeof(struct rds_header); sge->lkey = 0; - send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fastreg_message_size); + send->s_mr = ib_alloc_mr(ic->i_pd, IB_MR_TYPE_MEM_REG, + fastreg_message_size); if (IS_ERR(send->s_mr)) { - printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_mr failed\n"); + printk(KERN_WARNING "RDS/IW: ib_alloc_mr failed\n"); break; } diff --git a/net/rds/rdma.c b/net/rds/rdma.c index 40084d843..4c93badea 100644 --- a/net/rds/rdma.c +++ b/net/rds/rdma.c @@ -435,9 +435,10 @@ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force) /* If the MR was marked as invalidate, this will * trigger an async flush. */ - if (zot_me) + if (zot_me) { rds_destroy_mr(mr); - rds_mr_put(mr); + rds_mr_put(mr); + } } void rds_rdma_free_op(struct rm_rdma_op *ro) @@ -451,7 +452,7 @@ void rds_rdma_free_op(struct rm_rdma_op *ro) * is the case for a RDMA_READ which copies from remote * to local memory */ if (!ro->op_write) { - BUG_ON(irqs_disabled()); + WARN_ON(!page->mapping && irqs_disabled()); set_page_dirty(page); } put_page(page); @@ -658,6 +659,8 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm, ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write); if (ret < 0) goto out; + else + ret = 0; rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n", nr_bytes, nr, iov->bytes, iov->addr); diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index 208240836..b9b40af53 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -34,6 +34,7 @@ #include <rdma/rdma_cm.h> #include "rdma_transport.h" +#include "ib.h" static struct rdma_cm_id *rds_rdma_listen_id; @@ -82,8 +83,18 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, break; case RDMA_CM_EVENT_ROUTE_RESOLVED: - /* XXX worry about racing with listen acceptance */ - ret = trans->cm_initiate_connect(cm_id); + /* Connection could have been dropped so make sure the + * cm_id is valid before proceeding + */ + if (conn) { + struct rds_ib_connection *ibic; + + ibic = conn->c_transport_data; + if (ibic && ibic->i_cm_id == cm_id) + ret = trans->cm_initiate_connect(cm_id); + else + rds_conn_drop(conn); + } break; case RDMA_CM_EVENT_ESTABLISHED: diff --git a/net/rds/rds.h b/net/rds/rds.h index 2260c1e43..afb4048d0 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -80,6 +80,7 @@ enum { #define RDS_LL_SEND_FULL 0 #define RDS_RECONNECT_PENDING 1 #define RDS_IN_XMIT 2 +#define RDS_RECV_REFILL 3 struct rds_connection { struct hlist_node c_hash_node; @@ -128,8 +129,21 @@ struct rds_connection { /* Protocol version */ unsigned int c_version; + possible_net_t c_net; }; +static inline +struct net *rds_conn_net(struct rds_connection *conn) +{ + return read_pnet(&conn->c_net); +} + +static inline +void rds_conn_net_set(struct rds_connection *conn, struct net *net) +{ + write_pnet(&conn->c_net, net); +} + #define RDS_FLAG_CONG_BITMAP 0x01 #define RDS_FLAG_ACK_REQUIRED 0x02 #define RDS_FLAG_RETRANSMITTED 0x04 @@ -417,7 +431,7 @@ struct rds_transport { unsigned int t_prefer_loopback:1; unsigned int t_type; - int (*laddr_check)(__be32 addr); + int (*laddr_check)(struct net *net, __be32 addr); int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp); void (*conn_free)(void *data); int (*conn_connect)(struct rds_connection *conn); @@ -608,9 +622,11 @@ struct rds_message *rds_cong_update_alloc(struct rds_connection *conn); /* conn.c */ int rds_conn_init(void); void rds_conn_exit(void); -struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr, +struct rds_connection *rds_conn_create(struct net *net, + __be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp); -struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, +struct rds_connection *rds_conn_create_outgoing(struct net *net, + __be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp); void rds_conn_shutdown(struct rds_connection *conn); void rds_conn_destroy(struct rds_connection *conn); @@ -795,7 +811,7 @@ void rds_connect_complete(struct rds_connection *conn); /* transport.c */ int rds_trans_register(struct rds_transport *trans); void rds_trans_unregister(struct rds_transport *trans); -struct rds_transport *rds_trans_get_preferred(__be32 addr); +struct rds_transport *rds_trans_get_preferred(struct net *net, __be32 addr); void rds_trans_put(struct rds_transport *trans); unsigned int rds_trans_stats_info_copy(struct rds_info_iterator *iter, unsigned int avail); diff --git a/net/rds/send.c b/net/rds/send.c index e9430f537..4df61a515 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -282,26 +282,34 @@ restart: /* The transport either sends the whole rdma or none of it */ if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { rm->m_final_op = &rm->rdma; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue + */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); - if (ret) + if (ret) { + clear_bit(RDS_MSG_MAPPED, &rm->m_flags); + wake_up_interruptible(&rm->m_flush_wait); break; + } conn->c_xmit_rdma_sent = 1; - /* The transport owns the mapped memory for now. - * You can't unmap it while it's on the send queue */ - set_bit(RDS_MSG_MAPPED, &rm->m_flags); } if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { rm->m_final_op = &rm->atomic; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue + */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); - if (ret) + if (ret) { + clear_bit(RDS_MSG_MAPPED, &rm->m_flags); + wake_up_interruptible(&rm->m_flush_wait); break; + } conn->c_xmit_atomic_sent = 1; - /* The transport owns the mapped memory for now. - * You can't unmap it while it's on the send queue */ - set_bit(RDS_MSG_MAPPED, &rm->m_flags); } /* @@ -411,7 +419,8 @@ over_batch: */ if (ret == 0) { smp_mb(); - if (!list_empty(&conn->c_send_queue) && + if ((test_bit(0, &conn->c_map_queued) || + !list_empty(&conn->c_send_queue)) && send_gen == conn->c_send_gen) { rds_stats_inc(s_send_lock_queue_raced); goto restart; @@ -769,8 +778,22 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) while (!list_empty(&list)) { rm = list_entry(list.next, struct rds_message, m_sock_item); list_del_init(&rm->m_sock_item); - rds_message_wait(rm); + + /* just in case the code above skipped this message + * because RDS_MSG_ON_CONN wasn't set, run it again here + * taking m_rs_lock is the only thing that keeps us + * from racing with ack processing. + */ + spin_lock_irqsave(&rm->m_rs_lock, flags); + + spin_lock(&rs->rs_lock); + __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); + spin_unlock(&rs->rs_lock); + + rm->m_rs = NULL; + spin_unlock_irqrestore(&rm->m_rs_lock, flags); + rds_message_put(rm); } } @@ -992,6 +1015,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } + if (payload_len > rds_sk_sndbuf(rs)) { + ret = -EMSGSIZE; + goto out; + } + /* size of rm including all sgs */ ret = rds_rm_size(msg, payload_len); if (ret < 0) @@ -1023,7 +1051,8 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) conn = rs->rs_conn; else { - conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr, + conn = rds_conn_create_outgoing(sock_net(sock->sk), + rs->rs_bound_addr, daddr, rs->rs_transport, sock->sk->sk_allocation); if (IS_ERR(conn)) { @@ -1063,11 +1092,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); - /* XXX make sure this is reasonable */ - if (payload_len > rds_sk_sndbuf(rs)) { - ret = -EMSGSIZE; - goto out; - } + if (nonblock) { ret = -EAGAIN; goto out; diff --git a/net/rds/tcp.c b/net/rds/tcp.c index edac9ef2b..c42b60bf4 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -35,6 +35,9 @@ #include <linux/in.h> #include <linux/module.h> #include <net/tcp.h> +#include <net/net_namespace.h> +#include <net/netns/generic.h> +#include <net/tcp.h> #include "rds.h" #include "tcp.h" @@ -189,9 +192,9 @@ out: spin_unlock_irqrestore(&rds_tcp_tc_list_lock, flags); } -static int rds_tcp_laddr_check(__be32 addr) +static int rds_tcp_laddr_check(struct net *net, __be32 addr) { - if (inet_addr_type(&init_net, addr) == RTN_LOCAL) + if (inet_addr_type(net, addr) == RTN_LOCAL) return 0; return -EADDRNOTAVAIL; } @@ -250,16 +253,7 @@ static void rds_tcp_destroy_conns(void) } } -static void rds_tcp_exit(void) -{ - rds_info_deregister_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info); - rds_tcp_listen_stop(); - rds_tcp_destroy_conns(); - rds_trans_unregister(&rds_tcp_transport); - rds_tcp_recv_exit(); - kmem_cache_destroy(rds_tcp_conn_slab); -} -module_exit(rds_tcp_exit); +static void rds_tcp_exit(void); struct rds_transport rds_tcp_transport = { .laddr_check = rds_tcp_laddr_check, @@ -281,6 +275,136 @@ struct rds_transport rds_tcp_transport = { .t_prefer_loopback = 1, }; +static int rds_tcp_netid; + +/* per-network namespace private data for this module */ +struct rds_tcp_net { + struct socket *rds_tcp_listen_sock; + struct work_struct rds_tcp_accept_w; +}; + +static void rds_tcp_accept_worker(struct work_struct *work) +{ + struct rds_tcp_net *rtn = container_of(work, + struct rds_tcp_net, + rds_tcp_accept_w); + + while (rds_tcp_accept_one(rtn->rds_tcp_listen_sock) == 0) + cond_resched(); +} + +void rds_tcp_accept_work(struct sock *sk) +{ + struct net *net = sock_net(sk); + struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); + + queue_work(rds_wq, &rtn->rds_tcp_accept_w); +} + +static __net_init int rds_tcp_init_net(struct net *net) +{ + struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); + + rtn->rds_tcp_listen_sock = rds_tcp_listen_init(net); + if (!rtn->rds_tcp_listen_sock) { + pr_warn("could not set up listen sock\n"); + return -EAFNOSUPPORT; + } + INIT_WORK(&rtn->rds_tcp_accept_w, rds_tcp_accept_worker); + return 0; +} + +static void __net_exit rds_tcp_exit_net(struct net *net) +{ + struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); + + /* If rds_tcp_exit_net() is called as a result of netns deletion, + * the rds_tcp_kill_sock() device notifier would already have cleaned + * up the listen socket, thus there is no work to do in this function. + * + * If rds_tcp_exit_net() is called as a result of module unload, + * i.e., due to rds_tcp_exit() -> unregister_pernet_subsys(), then + * we do need to clean up the listen socket here. + */ + if (rtn->rds_tcp_listen_sock) { + rds_tcp_listen_stop(rtn->rds_tcp_listen_sock); + rtn->rds_tcp_listen_sock = NULL; + flush_work(&rtn->rds_tcp_accept_w); + } +} + +static struct pernet_operations rds_tcp_net_ops = { + .init = rds_tcp_init_net, + .exit = rds_tcp_exit_net, + .id = &rds_tcp_netid, + .size = sizeof(struct rds_tcp_net), +}; + +static void rds_tcp_kill_sock(struct net *net) +{ + struct rds_tcp_connection *tc, *_tc; + struct sock *sk; + LIST_HEAD(tmp_list); + struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); + + rds_tcp_listen_stop(rtn->rds_tcp_listen_sock); + rtn->rds_tcp_listen_sock = NULL; + flush_work(&rtn->rds_tcp_accept_w); + spin_lock_irq(&rds_tcp_conn_lock); + list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { + struct net *c_net = read_pnet(&tc->conn->c_net); + + if (net != c_net || !tc->t_sock) + continue; + list_move_tail(&tc->t_tcp_node, &tmp_list); + } + spin_unlock_irq(&rds_tcp_conn_lock); + list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) { + sk = tc->t_sock->sk; + sk->sk_prot->disconnect(sk, 0); + tcp_done(sk); + if (tc->conn->c_passive) + rds_conn_destroy(tc->conn->c_passive); + rds_conn_destroy(tc->conn); + } +} + +static int rds_tcp_dev_event(struct notifier_block *this, + unsigned long event, void *ptr) +{ + struct net_device *dev = netdev_notifier_info_to_dev(ptr); + + /* rds-tcp registers as a pernet subys, so the ->exit will only + * get invoked after network acitivity has quiesced. We need to + * clean up all sockets to quiesce network activity, and use + * the unregistration of the per-net loopback device as a trigger + * to start that cleanup. + */ + if (event == NETDEV_UNREGISTER_FINAL && + dev->ifindex == LOOPBACK_IFINDEX) + rds_tcp_kill_sock(dev_net(dev)); + + return NOTIFY_DONE; +} + +static struct notifier_block rds_tcp_dev_notifier = { + .notifier_call = rds_tcp_dev_event, + .priority = -10, /* must be called after other network notifiers */ +}; + +static void rds_tcp_exit(void) +{ + rds_info_deregister_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info); + unregister_pernet_subsys(&rds_tcp_net_ops); + if (unregister_netdevice_notifier(&rds_tcp_dev_notifier)) + pr_warn("could not unregister rds_tcp_dev_notifier\n"); + rds_tcp_destroy_conns(); + rds_trans_unregister(&rds_tcp_transport); + rds_tcp_recv_exit(); + kmem_cache_destroy(rds_tcp_conn_slab); +} +module_exit(rds_tcp_exit); + static int rds_tcp_init(void) { int ret; @@ -293,6 +417,16 @@ static int rds_tcp_init(void) goto out; } + ret = register_netdevice_notifier(&rds_tcp_dev_notifier); + if (ret) { + pr_warn("could not register rds_tcp_dev_notifier\n"); + goto out; + } + + ret = register_pernet_subsys(&rds_tcp_net_ops); + if (ret) + goto out_slab; + ret = rds_tcp_recv_init(); if (ret) goto out_slab; @@ -301,19 +435,14 @@ static int rds_tcp_init(void) if (ret) goto out_recv; - ret = rds_tcp_listen_init(); - if (ret) - goto out_register; - rds_info_register_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info); goto out; -out_register: - rds_trans_unregister(&rds_tcp_transport); out_recv: rds_tcp_recv_exit(); out_slab: + unregister_pernet_subsys(&rds_tcp_net_ops); kmem_cache_destroy(rds_tcp_conn_slab); out: return ret; diff --git a/net/rds/tcp.h b/net/rds/tcp.h index 0dbdd3716..64f873c0c 100644 --- a/net/rds/tcp.h +++ b/net/rds/tcp.h @@ -52,6 +52,7 @@ u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc); u32 rds_tcp_snd_una(struct rds_tcp_connection *tc); u64 rds_tcp_map_seq(struct rds_tcp_connection *tc, u32 seq); extern struct rds_transport rds_tcp_transport; +void rds_tcp_accept_work(struct sock *sk); /* tcp_connect.c */ int rds_tcp_conn_connect(struct rds_connection *conn); @@ -59,9 +60,11 @@ void rds_tcp_conn_shutdown(struct rds_connection *conn); void rds_tcp_state_change(struct sock *sk); /* tcp_listen.c */ -int rds_tcp_listen_init(void); -void rds_tcp_listen_stop(void); +struct socket *rds_tcp_listen_init(struct net *); +void rds_tcp_listen_stop(struct socket *); void rds_tcp_listen_data_ready(struct sock *sk); +int rds_tcp_accept_one(struct socket *sock); +int rds_tcp_keepalive(struct socket *sock); /* tcp_recv.c */ int rds_tcp_recv_init(void); diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c index 973109c7b..5cb16875c 100644 --- a/net/rds/tcp_connect.c +++ b/net/rds/tcp_connect.c @@ -79,7 +79,8 @@ int rds_tcp_conn_connect(struct rds_connection *conn) struct sockaddr_in src, dest; int ret; - ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); + ret = sock_create_kern(rds_conn_net(conn), PF_INET, + SOCK_STREAM, IPPROTO_TCP, &sock); if (ret < 0) goto out; @@ -111,10 +112,12 @@ int rds_tcp_conn_connect(struct rds_connection *conn) rdsdebug("connect to address %pI4 returned %d\n", &conn->c_faddr, ret); if (ret == -EINPROGRESS) ret = 0; - if (ret == 0) + if (ret == 0) { + rds_tcp_keepalive(sock); sock = NULL; - else + } else { rds_tcp_restore_callbacks(sock, conn->c_transport_data); + } out: if (sock) diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index 0da49e344..444d78d0b 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -38,14 +38,7 @@ #include "rds.h" #include "tcp.h" -/* - * cheesy, but simple.. - */ -static void rds_tcp_accept_worker(struct work_struct *work); -static DECLARE_WORK(rds_tcp_listen_work, rds_tcp_accept_worker); -static struct socket *rds_tcp_listen_sock; - -static int rds_tcp_keepalive(struct socket *sock) +int rds_tcp_keepalive(struct socket *sock) { /* values below based on xs_udp_default_timeout */ int keepidle = 5; /* send a probe 'keepidle' secs after last data */ @@ -77,7 +70,7 @@ bail: return ret; } -static int rds_tcp_accept_one(struct socket *sock) +int rds_tcp_accept_one(struct socket *sock) { struct socket *new_sock = NULL; struct rds_connection *conn; @@ -85,8 +78,9 @@ static int rds_tcp_accept_one(struct socket *sock) struct inet_sock *inet; struct rds_tcp_connection *rs_tcp; - ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type, - sock->sk->sk_protocol, &new_sock); + ret = sock_create_kern(sock_net(sock->sk), sock->sk->sk_family, + sock->sk->sk_type, sock->sk->sk_protocol, + &new_sock); if (ret) goto out; @@ -108,7 +102,8 @@ static int rds_tcp_accept_one(struct socket *sock) &inet->inet_saddr, ntohs(inet->inet_sport), &inet->inet_daddr, ntohs(inet->inet_dport)); - conn = rds_conn_create(inet->inet_saddr, inet->inet_daddr, + conn = rds_conn_create(sock_net(sock->sk), + inet->inet_saddr, inet->inet_daddr, &rds_tcp_transport, GFP_KERNEL); if (IS_ERR(conn)) { ret = PTR_ERR(conn); @@ -148,12 +143,6 @@ out: return ret; } -static void rds_tcp_accept_worker(struct work_struct *work) -{ - while (rds_tcp_accept_one(rds_tcp_listen_sock) == 0) - cond_resched(); -} - void rds_tcp_listen_data_ready(struct sock *sk) { void (*ready)(struct sock *sk); @@ -174,20 +163,20 @@ void rds_tcp_listen_data_ready(struct sock *sk) * socket */ if (sk->sk_state == TCP_LISTEN) - queue_work(rds_wq, &rds_tcp_listen_work); + rds_tcp_accept_work(sk); out: read_unlock(&sk->sk_callback_lock); ready(sk); } -int rds_tcp_listen_init(void) +struct socket *rds_tcp_listen_init(struct net *net) { struct sockaddr_in sin; struct socket *sock = NULL; int ret; - ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); + ret = sock_create_kern(net, PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret < 0) goto out; @@ -211,17 +200,15 @@ int rds_tcp_listen_init(void) if (ret < 0) goto out; - rds_tcp_listen_sock = sock; - sock = NULL; + return sock; out: if (sock) sock_release(sock); - return ret; + return NULL; } -void rds_tcp_listen_stop(void) +void rds_tcp_listen_stop(struct socket *sock) { - struct socket *sock = rds_tcp_listen_sock; struct sock *sk; if (!sock) @@ -242,5 +229,4 @@ void rds_tcp_listen_stop(void) /* wait for accepts to stop and close the socket */ flush_workqueue(rds_wq); sock_release(sock); - rds_tcp_listen_sock = NULL; } diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index fbc5ef88b..27a992154 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -214,8 +214,15 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } to_copy = min(tc->t_tinc_data_rem, left); - pskb_pull(clone, offset); - pskb_trim(clone, to_copy); + if (!pskb_pull(clone, offset) || + pskb_trim(clone, to_copy)) { + pr_warn("rds_tcp_data_recv: pull/trim failed " + "left %zu data_rem %zu skb_len %d\n", + left, tc->t_tinc_data_rem, skb->len); + kfree_skb(clone); + desc->error = -ENOMEM; + goto out; + } skb_queue_tail(&tinc->ti_skb_list, clone); rdsdebug("skb %p data %p len %d off %u to_copy %zu -> " diff --git a/net/rds/transport.c b/net/rds/transport.c index 83498e1c7..f3afd1d60 100644 --- a/net/rds/transport.c +++ b/net/rds/transport.c @@ -77,7 +77,7 @@ void rds_trans_put(struct rds_transport *trans) module_put(trans->t_owner); } -struct rds_transport *rds_trans_get_preferred(__be32 addr) +struct rds_transport *rds_trans_get_preferred(struct net *net, __be32 addr) { struct rds_transport *ret = NULL; struct rds_transport *trans; @@ -90,7 +90,7 @@ struct rds_transport *rds_trans_get_preferred(__be32 addr) for (i = 0; i < RDS_TRANS_COUNT; i++) { trans = transports[i]; - if (trans && (trans->laddr_check(addr) == 0) && + if (trans && (trans->laddr_check(net, addr) == 0) && (!trans->t_owner || try_module_get(trans->t_owner))) { ret = trans; break; |