diff options
Diffstat (limited to 'net/rds')
-rw-r--r-- | net/rds/af_rds.c | 16 | ||||
-rw-r--r-- | net/rds/bind.c | 124 | ||||
-rw-r--r-- | net/rds/connection.c | 22 | ||||
-rw-r--r-- | net/rds/ib.c | 49 | ||||
-rw-r--r-- | net/rds/ib.h | 84 | ||||
-rw-r--r-- | net/rds/ib_cm.c | 116 | ||||
-rw-r--r-- | net/rds/ib_rdma.c | 116 | ||||
-rw-r--r-- | net/rds/ib_recv.c | 140 | ||||
-rw-r--r-- | net/rds/ib_send.c | 181 | ||||
-rw-r--r-- | net/rds/ib_stats.c | 22 | ||||
-rw-r--r-- | net/rds/iw.c | 2 | ||||
-rw-r--r-- | net/rds/iw.h | 9 | ||||
-rw-r--r-- | net/rds/iw_cm.c | 2 | ||||
-rw-r--r-- | net/rds/iw_rdma.c | 135 | ||||
-rw-r--r-- | net/rds/iw_send.c | 154 | ||||
-rw-r--r-- | net/rds/rdma_transport.c | 4 | ||||
-rw-r--r-- | net/rds/rds.h | 10 | ||||
-rw-r--r-- | net/rds/send.c | 19 | ||||
-rw-r--r-- | net/rds/tcp.c | 16 | ||||
-rw-r--r-- | net/rds/tcp_listen.c | 25 | ||||
-rw-r--r-- | net/rds/tcp_send.c | 8 | ||||
-rw-r--r-- | net/rds/threads.c | 2 |
22 files changed, 680 insertions, 576 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index a2f28a6d4..b5476aebd 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -72,13 +72,7 @@ static int rds_release(struct socket *sock) rds_clear_recv_queue(rs); rds_cong_remove_socket(rs); - /* - * the binding lookup hash uses rcu, we need to - * make sure we synchronize_rcu before we free our - * entry - */ rds_remove_bound(rs); - synchronize_rcu(); rds_send_drop_to(rs, NULL); rds_rdma_drop_keys(rs); @@ -579,6 +573,7 @@ static void rds_exit(void) rds_threads_exit(); rds_stats_exit(); rds_page_exit(); + rds_bind_lock_destroy(); rds_info_deregister_func(RDS_INFO_SOCKETS, rds_sock_info); rds_info_deregister_func(RDS_INFO_RECV_MESSAGES, rds_sock_inc_info); } @@ -588,9 +583,14 @@ static int rds_init(void) { int ret; - ret = rds_conn_init(); + ret = rds_bind_lock_init(); if (ret) goto out; + + ret = rds_conn_init(); + if (ret) + goto out_bind; + ret = rds_threads_init(); if (ret) goto out_conn; @@ -624,6 +624,8 @@ out_conn: rds_conn_exit(); rds_cong_exit(); rds_page_exit(); +out_bind: + rds_bind_lock_destroy(); out: return ret; } diff --git a/net/rds/bind.c b/net/rds/bind.c index dd666fb9b..b22ea9565 100644 --- a/net/rds/bind.c +++ b/net/rds/bind.c @@ -38,51 +38,16 @@ #include <linux/ratelimit.h> #include "rds.h" -#define BIND_HASH_SIZE 1024 -static struct hlist_head bind_hash_table[BIND_HASH_SIZE]; -static DEFINE_SPINLOCK(rds_bind_lock); +static struct rhashtable bind_hash_table; -static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port) -{ - return bind_hash_table + (jhash_2words((u32)addr, (u32)port, 0) & - (BIND_HASH_SIZE - 1)); -} - -static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port, - struct rds_sock *insert) -{ - struct rds_sock *rs; - struct hlist_head *head = hash_to_bucket(addr, port); - u64 cmp; - u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port); - - rcu_read_lock(); - hlist_for_each_entry_rcu(rs, head, rs_bound_node) { - cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) | - be16_to_cpu(rs->rs_bound_port); - - if (cmp == needle) { - rcu_read_unlock(); - return rs; - } - } - rcu_read_unlock(); - - if (insert) { - /* - * make sure our addr and port are set before - * we are added to the list, other people - * in rcu will find us as soon as the - * hlist_add_head_rcu is done - */ - insert->rs_bound_addr = addr; - insert->rs_bound_port = port; - rds_sock_addref(insert); - - hlist_add_head_rcu(&insert->rs_bound_node, head); - } - return NULL; -} +static struct rhashtable_params ht_parms = { + .nelem_hint = 768, + .key_len = sizeof(u64), + .key_offset = offsetof(struct rds_sock, rs_bound_key), + .head_offset = offsetof(struct rds_sock, rs_bound_node), + .max_size = 16384, + .min_size = 1024, +}; /* * Return the rds_sock bound at the given local address. @@ -92,10 +57,10 @@ static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port, */ struct rds_sock *rds_find_bound(__be32 addr, __be16 port) { + u64 key = ((u64)addr << 32) | port; struct rds_sock *rs; - rs = rds_bind_lookup(addr, port, NULL); - + rs = rhashtable_lookup_fast(&bind_hash_table, &key, ht_parms); if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD)) rds_sock_addref(rs); else @@ -103,15 +68,16 @@ struct rds_sock *rds_find_bound(__be32 addr, __be16 port) rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr, ntohs(port)); + return rs; } /* returns -ve errno or +ve port */ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port) { - unsigned long flags; int ret = -EADDRINUSE; u16 rover, last; + u64 key; if (*port != 0) { rover = be16_to_cpu(*port); @@ -121,42 +87,49 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port) last = rover - 1; } - spin_lock_irqsave(&rds_bind_lock, flags); - do { if (rover == 0) rover++; - if (!rds_bind_lookup(addr, cpu_to_be16(rover), rs)) { + + key = ((u64)addr << 32) | cpu_to_be16(rover); + if (rhashtable_lookup_fast(&bind_hash_table, &key, ht_parms)) + continue; + + rs->rs_bound_key = key; + rs->rs_bound_addr = addr; + rs->rs_bound_port = cpu_to_be16(rover); + rs->rs_bound_node.next = NULL; + rds_sock_addref(rs); + if (!rhashtable_insert_fast(&bind_hash_table, + &rs->rs_bound_node, ht_parms)) { *port = rs->rs_bound_port; ret = 0; rdsdebug("rs %p binding to %pI4:%d\n", rs, &addr, (int)ntohs(*port)); break; + } else { + rds_sock_put(rs); + ret = -ENOMEM; + break; } } while (rover++ != last); - spin_unlock_irqrestore(&rds_bind_lock, flags); - return ret; } void rds_remove_bound(struct rds_sock *rs) { - unsigned long flags; - spin_lock_irqsave(&rds_bind_lock, flags); + if (!rs->rs_bound_addr) + return; - if (rs->rs_bound_addr) { - rdsdebug("rs %p unbinding from %pI4:%d\n", - rs, &rs->rs_bound_addr, - ntohs(rs->rs_bound_port)); - - hlist_del_init_rcu(&rs->rs_bound_node); - rds_sock_put(rs); - rs->rs_bound_addr = 0; - } + rdsdebug("rs %p unbinding from %pI4:%d\n", + rs, &rs->rs_bound_addr, + ntohs(rs->rs_bound_port)); - spin_unlock_irqrestore(&rds_bind_lock, flags); + rhashtable_remove_fast(&bind_hash_table, &rs->rs_bound_node, ht_parms); + rds_sock_put(rs); + rs->rs_bound_addr = 0; } int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) @@ -182,7 +155,14 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) goto out; if (rs->rs_transport) { /* previously bound */ - ret = 0; + trans = rs->rs_transport; + if (trans->laddr_check(sock_net(sock->sk), + sin->sin_addr.s_addr) != 0) { + ret = -ENOPROTOOPT; + rds_remove_bound(rs); + } else { + ret = 0; + } goto out; } trans = rds_trans_get_preferred(sock_net(sock->sk), @@ -200,9 +180,15 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) out: release_sock(sk); - - /* we might have called rds_remove_bound on error */ - if (ret) - synchronize_rcu(); return ret; } + +void rds_bind_lock_destroy(void) +{ + rhashtable_destroy(&bind_hash_table); +} + +int rds_bind_lock_init(void) +{ + return rhashtable_init(&bind_hash_table, &ht_parms); +} diff --git a/net/rds/connection.c b/net/rds/connection.c index 9b2de5e67..e3b118cae 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -128,10 +128,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, struct rds_transport *loop_trans; unsigned long flags; int ret; - struct rds_transport *otrans = trans; - if (!is_outgoing && otrans->t_type == RDS_TRANS_TCP) - goto new_conn; rcu_read_lock(); conn = rds_conn_lookup(net, head, laddr, faddr, trans); if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport && @@ -147,7 +144,6 @@ static struct rds_connection *__rds_conn_create(struct net *net, if (conn) goto out; -new_conn: conn = kmem_cache_zalloc(rds_conn_slab, gfp); if (!conn) { conn = ERR_PTR(-ENOMEM); @@ -201,6 +197,7 @@ new_conn: atomic_set(&conn->c_state, RDS_CONN_DOWN); conn->c_send_gen = 0; + conn->c_outgoing = (is_outgoing ? 1 : 0); conn->c_reconnect_jiffies = 0; INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); @@ -237,22 +234,13 @@ new_conn: /* Creating normal conn */ struct rds_connection *found; - if (!is_outgoing && otrans->t_type == RDS_TRANS_TCP) - found = NULL; - else - found = rds_conn_lookup(net, head, laddr, faddr, trans); + found = rds_conn_lookup(net, head, laddr, faddr, trans); if (found) { trans->conn_free(conn->c_transport_data); kmem_cache_free(rds_conn_slab, conn); conn = found; } else { - if ((is_outgoing && otrans->t_type == RDS_TRANS_TCP) || - (otrans->t_type != RDS_TRANS_TCP)) { - /* Only the active side should be added to - * reconnect list for TCP. - */ - hlist_add_head_rcu(&conn->c_hash_node, head); - } + hlist_add_head_rcu(&conn->c_hash_node, head); rds_cong_add_conn(conn); rds_conn_count++; } @@ -331,7 +319,9 @@ void rds_conn_shutdown(struct rds_connection *conn) rcu_read_lock(); if (!hlist_unhashed(&conn->c_hash_node)) { rcu_read_unlock(); - rds_queue_reconnect(conn); + if (conn->c_trans->t_type != RDS_TRANS_TCP || + conn->c_outgoing == 1) + rds_queue_reconnect(conn); } else { rcu_read_unlock(); } diff --git a/net/rds/ib.c b/net/rds/ib.c index 2d3f2ab47..f222885ac 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -43,14 +43,14 @@ #include "rds.h" #include "ib.h" -static unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE; -unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */ +unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE; +unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE; unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT; -module_param(fmr_pool_size, int, 0444); -MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA"); -module_param(fmr_message_size, int, 0444); -MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer"); +module_param(rds_ib_fmr_1m_pool_size, int, 0444); +MODULE_PARM_DESC(rds_ib_fmr_1m_pool_size, " Max number of 1M fmr per HCA"); +module_param(rds_ib_fmr_8k_pool_size, int, 0444); +MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8K fmr per HCA"); module_param(rds_ib_retry_count, int, 0444); MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error"); @@ -97,8 +97,10 @@ static void rds_ib_dev_free(struct work_struct *work) struct rds_ib_device *rds_ibdev = container_of(work, struct rds_ib_device, free_work); - if (rds_ibdev->mr_pool) - rds_ib_destroy_mr_pool(rds_ibdev->mr_pool); + if (rds_ibdev->mr_8k_pool) + rds_ib_destroy_mr_pool(rds_ibdev->mr_8k_pool); + if (rds_ibdev->mr_1m_pool) + rds_ib_destroy_mr_pool(rds_ibdev->mr_1m_pool); if (rds_ibdev->pd) ib_dealloc_pd(rds_ibdev->pd); @@ -148,9 +150,13 @@ static void rds_ib_add_one(struct ib_device *device) rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32; - rds_ibdev->max_fmrs = dev_attr->max_fmr ? - min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) : - fmr_pool_size; + rds_ibdev->max_1m_fmrs = dev_attr->max_mr ? + min_t(unsigned int, (dev_attr->max_mr / 2), + rds_ib_fmr_1m_pool_size) : rds_ib_fmr_1m_pool_size; + + rds_ibdev->max_8k_fmrs = dev_attr->max_mr ? + min_t(unsigned int, ((dev_attr->max_mr / 2) * RDS_MR_8K_SCALE), + rds_ib_fmr_8k_pool_size) : rds_ib_fmr_8k_pool_size; rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom; rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom; @@ -162,12 +168,25 @@ static void rds_ib_add_one(struct ib_device *device) goto put_dev; } - rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev); - if (IS_ERR(rds_ibdev->mr_pool)) { - rds_ibdev->mr_pool = NULL; + rds_ibdev->mr_1m_pool = + rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_1M_POOL); + if (IS_ERR(rds_ibdev->mr_1m_pool)) { + rds_ibdev->mr_1m_pool = NULL; goto put_dev; } + rds_ibdev->mr_8k_pool = + rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_8K_POOL); + if (IS_ERR(rds_ibdev->mr_8k_pool)) { + rds_ibdev->mr_8k_pool = NULL; + goto put_dev; + } + + rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_fmrs = %d, max_8k_fmrs = %d\n", + dev_attr->max_fmr, rds_ibdev->max_wrs, rds_ibdev->max_sge, + rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_fmrs, + rds_ibdev->max_8k_fmrs); + INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); INIT_LIST_HEAD(&rds_ibdev->conn_list); @@ -317,7 +336,7 @@ static int rds_ib_laddr_check(struct net *net, __be32 addr) /* Create a CMA ID and try to bind it. This catches both * IB and iWARP capable NICs. */ - cm_id = rdma_create_id(NULL, NULL, RDMA_PS_TCP, IB_QPT_RC); + cm_id = rdma_create_id(&init_net, NULL, NULL, RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(cm_id)) return PTR_ERR(cm_id); diff --git a/net/rds/ib.h b/net/rds/ib.h index aae60fda7..b3fdebb57 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -9,8 +9,11 @@ #include "rds.h" #include "rdma_transport.h" -#define RDS_FMR_SIZE 256 -#define RDS_FMR_POOL_SIZE 8192 +#define RDS_FMR_1M_POOL_SIZE (8192 / 2) +#define RDS_FMR_1M_MSG_SIZE 256 +#define RDS_FMR_8K_MSG_SIZE 2 +#define RDS_MR_8K_SCALE (256 / (RDS_FMR_8K_MSG_SIZE + 1)) +#define RDS_FMR_8K_POOL_SIZE (RDS_MR_8K_SCALE * (8192 / 2)) #define RDS_IB_MAX_SGE 8 #define RDS_IB_RECV_SGE 2 @@ -24,6 +27,9 @@ #define RDS_IB_RECYCLE_BATCH_COUNT 32 +#define RDS_IB_WC_MAX 32 +#define RDS_IB_SEND_OP BIT_ULL(63) + extern struct rw_semaphore rds_ib_devices_lock; extern struct list_head rds_ib_devices; @@ -69,7 +75,11 @@ struct rds_ib_connect_private { struct rds_ib_send_work { void *s_op; - struct ib_send_wr s_wr; + union { + struct ib_send_wr s_wr; + struct ib_rdma_wr s_rdma_wr; + struct ib_atomic_wr s_atomic_wr; + }; struct ib_sge s_sge[RDS_IB_MAX_SGE]; unsigned long s_queued; }; @@ -89,6 +99,20 @@ struct rds_ib_work_ring { atomic_t w_free_ctr; }; +/* Rings are posted with all the allocations they'll need to queue the + * incoming message to the receiving socket so this can't fail. + * All fragments start with a header, so we can make sure we're not receiving + * garbage, and we can tell a small 8 byte fragment from an ACK frame. + */ +struct rds_ib_ack_state { + u64 ack_next; + u64 ack_recv; + unsigned int ack_required:1; + unsigned int ack_next_valid:1; + unsigned int ack_recv_valid:1; +}; + + struct rds_ib_device; struct rds_ib_connection { @@ -102,6 +126,12 @@ struct rds_ib_connection { struct ib_pd *i_pd; struct ib_cq *i_send_cq; struct ib_cq *i_recv_cq; + struct ib_wc i_send_wc[RDS_IB_WC_MAX]; + struct ib_wc i_recv_wc[RDS_IB_WC_MAX]; + + /* interrupt handling */ + struct tasklet_struct i_send_tasklet; + struct tasklet_struct i_recv_tasklet; /* tx */ struct rds_ib_work_ring i_send_ring; @@ -112,7 +142,6 @@ struct rds_ib_connection { atomic_t i_signaled_sends; /* rx */ - struct tasklet_struct i_recv_tasklet; struct mutex i_recv_mutex; struct rds_ib_work_ring i_recv_ring; struct rds_ib_incoming *i_ibinc; @@ -164,6 +193,12 @@ struct rds_ib_connection { struct rds_ib_ipaddr { struct list_head list; __be32 ipaddr; + struct rcu_head rcu; +}; + +enum { + RDS_IB_MR_8K_POOL, + RDS_IB_MR_1M_POOL, }; struct rds_ib_device { @@ -172,9 +207,12 @@ struct rds_ib_device { struct list_head conn_list; struct ib_device *dev; struct ib_pd *pd; - struct rds_ib_mr_pool *mr_pool; - unsigned int fmr_max_remaps; unsigned int max_fmrs; + struct rds_ib_mr_pool *mr_1m_pool; + struct rds_ib_mr_pool *mr_8k_pool; + unsigned int fmr_max_remaps; + unsigned int max_8k_fmrs; + unsigned int max_1m_fmrs; int max_sge; unsigned int max_wrs; unsigned int max_initiator_depth; @@ -197,14 +235,14 @@ struct rds_ib_device { struct rds_ib_statistics { uint64_t s_ib_connect_raced; uint64_t s_ib_listen_closed_stale; - uint64_t s_ib_tx_cq_call; + uint64_t s_ib_evt_handler_call; + uint64_t s_ib_tasklet_call; uint64_t s_ib_tx_cq_event; uint64_t s_ib_tx_ring_full; uint64_t s_ib_tx_throttle; uint64_t s_ib_tx_sg_mapping_failure; uint64_t s_ib_tx_stalled; uint64_t s_ib_tx_credit_updates; - uint64_t s_ib_rx_cq_call; uint64_t s_ib_rx_cq_event; uint64_t s_ib_rx_ring_empty; uint64_t s_ib_rx_refill_from_cq; @@ -216,12 +254,18 @@ struct rds_ib_statistics { uint64_t s_ib_ack_send_delayed; uint64_t s_ib_ack_send_piggybacked; uint64_t s_ib_ack_received; - uint64_t s_ib_rdma_mr_alloc; - uint64_t s_ib_rdma_mr_free; - uint64_t s_ib_rdma_mr_used; - uint64_t s_ib_rdma_mr_pool_flush; - uint64_t s_ib_rdma_mr_pool_wait; - uint64_t s_ib_rdma_mr_pool_depleted; + uint64_t s_ib_rdma_mr_8k_alloc; + uint64_t s_ib_rdma_mr_8k_free; + uint64_t s_ib_rdma_mr_8k_used; + uint64_t s_ib_rdma_mr_8k_pool_flush; + uint64_t s_ib_rdma_mr_8k_pool_wait; + uint64_t s_ib_rdma_mr_8k_pool_depleted; + uint64_t s_ib_rdma_mr_1m_alloc; + uint64_t s_ib_rdma_mr_1m_free; + uint64_t s_ib_rdma_mr_1m_used; + uint64_t s_ib_rdma_mr_1m_pool_flush; + uint64_t s_ib_rdma_mr_1m_pool_wait; + uint64_t s_ib_rdma_mr_1m_pool_depleted; uint64_t s_ib_atomic_cswp; uint64_t s_ib_atomic_fadd; }; @@ -273,7 +317,8 @@ struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device); void rds_ib_dev_put(struct rds_ib_device *rds_ibdev); extern struct ib_client rds_ib_client; -extern unsigned int fmr_message_size; +extern unsigned int rds_ib_fmr_1m_pool_size; +extern unsigned int rds_ib_fmr_8k_pool_size; extern unsigned int rds_ib_retry_count; extern spinlock_t ib_nodev_conns_lock; @@ -303,7 +348,8 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr); void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn); void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn); void rds_ib_destroy_nodev_conns(void); -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *); +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev, + int npages); void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo); void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *); void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, @@ -323,7 +369,8 @@ void rds_ib_recv_free_caches(struct rds_ib_connection *ic); void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp); void rds_ib_inc_free(struct rds_incoming *inc); int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); -void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context); +void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc, + struct rds_ib_ack_state *state); void rds_ib_recv_tasklet_fn(unsigned long data); void rds_ib_recv_init_ring(struct rds_ib_connection *ic); void rds_ib_recv_clear_ring(struct rds_ib_connection *ic); @@ -331,6 +378,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic); void rds_ib_attempt_ack(struct rds_ib_connection *ic); void rds_ib_ack_send_complete(struct rds_ib_connection *ic); u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic); +void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required); /* ib_ring.c */ void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr); @@ -348,7 +396,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait; void rds_ib_xmit_complete(struct rds_connection *conn); int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); -void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context); +void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc); void rds_ib_send_init_ring(struct rds_ib_connection *ic); void rds_ib_send_clear_ring(struct rds_ib_connection *ic); int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 9043f5c04..da5a7fb98 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -216,6 +216,96 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data) event->event, ib_event_msg(event->event), data); } +/* Plucking the oldest entry from the ring can be done concurrently with + * the thread refilling the ring. Each ring operation is protected by + * spinlocks and the transient state of refilling doesn't change the + * recording of which entry is oldest. + * + * This relies on IB only calling one cq comp_handler for each cq so that + * there will only be one caller of rds_recv_incoming() per RDS connection. + */ +static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) +{ + struct rds_connection *conn = context; + struct rds_ib_connection *ic = conn->c_transport_data; + + rdsdebug("conn %p cq %p\n", conn, cq); + + rds_ib_stats_inc(s_ib_evt_handler_call); + + tasklet_schedule(&ic->i_recv_tasklet); +} + +static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, + struct ib_wc *wcs, + struct rds_ib_ack_state *ack_state) +{ + int nr; + int i; + struct ib_wc *wc; + + while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) { + for (i = 0; i < nr; i++) { + wc = wcs + i; + rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + wc->byte_len, be32_to_cpu(wc->ex.imm_data)); + + if (wc->wr_id & RDS_IB_SEND_OP) + rds_ib_send_cqe_handler(ic, wc); + else + rds_ib_recv_cqe_handler(ic, wc, ack_state); + } + } +} + +static void rds_ib_tasklet_fn_send(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *)data; + struct rds_connection *conn = ic->conn; + struct rds_ib_ack_state state; + + rds_ib_stats_inc(s_ib_tasklet_call); + + memset(&state, 0, sizeof(state)); + poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state); + ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP); + poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state); + + if (rds_conn_up(conn) && + (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || + test_bit(0, &conn->c_map_queued))) + rds_send_xmit(ic->conn); +} + +static void rds_ib_tasklet_fn_recv(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *)data; + struct rds_connection *conn = ic->conn; + struct rds_ib_device *rds_ibdev = ic->rds_ibdev; + struct rds_ib_ack_state state; + + if (!rds_ibdev) + rds_conn_drop(conn); + + rds_ib_stats_inc(s_ib_tasklet_call); + + memset(&state, 0, sizeof(state)); + poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state); + ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); + poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state); + + if (state.ack_next_valid) + rds_ib_set_ack(ic, state.ack_next, state.ack_required); + if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) { + rds_send_drop_acked(conn, state.ack_recv, NULL); + ic->i_ack_recv = state.ack_recv; + } + + if (rds_conn_up(conn)) + rds_ib_attempt_ack(ic); +} + static void rds_ib_qp_event_handler(struct ib_event *event, void *data) { struct rds_connection *conn = data; @@ -238,6 +328,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data) } } +static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context) +{ + struct rds_connection *conn = context; + struct rds_ib_connection *ic = conn->c_transport_data; + + rdsdebug("conn %p cq %p\n", conn, cq); + + rds_ib_stats_inc(s_ib_evt_handler_call); + + tasklet_schedule(&ic->i_send_tasklet); +} + /* * This needs to be very careful to not leave IS_ERR pointers around for * cleanup to trip over. @@ -271,7 +373,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn) ic->i_pd = rds_ibdev->pd; cq_attr.cqe = ic->i_send_ring.w_nr + 1; - ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler, + + ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send, rds_ib_cq_event_handler, conn, &cq_attr); if (IS_ERR(ic->i_send_cq)) { @@ -282,7 +385,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) } cq_attr.cqe = ic->i_recv_ring.w_nr; - ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler, + ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, rds_ib_cq_event_handler, conn, &cq_attr); if (IS_ERR(ic->i_recv_cq)) { @@ -565,7 +668,7 @@ int rds_ib_conn_connect(struct rds_connection *conn) /* XXX I wonder what affect the port space has */ /* delegate cm event handler to rdma_transport */ - ic->i_cm_id = rdma_create_id(rds_rdma_cm_event_handler, conn, + ic->i_cm_id = rdma_create_id(&init_net, rds_rdma_cm_event_handler, conn, RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(ic->i_cm_id)) { ret = PTR_ERR(ic->i_cm_id); @@ -637,6 +740,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) wait_event(rds_ib_ring_empty_wait, rds_ib_ring_empty(&ic->i_recv_ring) && (atomic_read(&ic->i_signaled_sends) == 0)); + tasklet_kill(&ic->i_send_tasklet); tasklet_kill(&ic->i_recv_tasklet); /* first destroy the ib state that generates callbacks */ @@ -743,8 +847,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) } INIT_LIST_HEAD(&ic->ib_node); - tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn, - (unsigned long) ic); + tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send, + (unsigned long)ic); + tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv, + (unsigned long)ic); mutex_init(&ic->i_recv_mutex); #ifndef KERNEL_HAS_ATOMIC64 spin_lock_init(&ic->i_ack_lock); diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index 251d1ce0b..a2340748e 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -65,6 +65,7 @@ struct rds_ib_mr { * Our own little FMR pool */ struct rds_ib_mr_pool { + unsigned int pool_type; struct mutex flush_lock; /* serialize fmr invalidate */ struct delayed_work flush_worker; /* flush worker */ @@ -83,7 +84,7 @@ struct rds_ib_mr_pool { struct ib_fmr_attr fmr_attr; }; -struct workqueue_struct *rds_ib_fmr_wq; +static struct workqueue_struct *rds_ib_fmr_wq; int rds_ib_fmr_init(void) { @@ -159,10 +160,8 @@ static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) } spin_unlock_irq(&rds_ibdev->spinlock); - if (to_free) { - synchronize_rcu(); - kfree(to_free); - } + if (to_free) + kfree_rcu(to_free, rcu); } int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) @@ -236,7 +235,8 @@ void rds_ib_destroy_nodev_conns(void) rds_conn_destroy(ic->conn); } -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev, + int pool_type) { struct rds_ib_mr_pool *pool; @@ -244,6 +244,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) if (!pool) return ERR_PTR(-ENOMEM); + pool->pool_type = pool_type; init_llist_head(&pool->free_list); init_llist_head(&pool->drop_list); init_llist_head(&pool->clean_list); @@ -251,28 +252,30 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) init_waitqueue_head(&pool->flush_wait); INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker); - pool->fmr_attr.max_pages = fmr_message_size; + if (pool_type == RDS_IB_MR_1M_POOL) { + /* +1 allows for unaligned MRs */ + pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE + 1; + pool->max_items = RDS_FMR_1M_POOL_SIZE; + } else { + /* pool_type == RDS_IB_MR_8K_POOL */ + pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE + 1; + pool->max_items = RDS_FMR_8K_POOL_SIZE; + } + + pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4; pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps; pool->fmr_attr.page_shift = PAGE_SHIFT; - pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4; - - /* We never allow more than max_items MRs to be allocated. - * When we exceed more than max_items_soft, we start freeing - * items more aggressively. - * Make sure that max_items > max_items_soft > max_items / 2 - */ pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4; - pool->max_items = rds_ibdev->max_fmrs; return pool; } void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool; - iinfo->rdma_mr_max = pool->max_items; - iinfo->rdma_mr_size = pool->fmr_attr.max_pages; + iinfo->rdma_mr_max = pool_1m->max_items; + iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages; } void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) @@ -314,14 +317,28 @@ static inline void wait_clean_list_grace(void) } } -static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) +static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, + int npages) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool; struct rds_ib_mr *ibmr = NULL; int err = 0, iter = 0; + if (npages <= RDS_FMR_8K_MSG_SIZE) + pool = rds_ibdev->mr_8k_pool; + else + pool = rds_ibdev->mr_1m_pool; + if (atomic_read(&pool->dirty_count) >= pool->max_items / 10) - schedule_delayed_work(&pool->flush_worker, 10); + queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10); + + /* Switch pools if one of the pool is reaching upper limit */ + if (atomic_read(&pool->dirty_count) >= pool->max_items * 9 / 10) { + if (pool->pool_type == RDS_IB_MR_8K_POOL) + pool = rds_ibdev->mr_1m_pool; + else + pool = rds_ibdev->mr_8k_pool; + } while (1) { ibmr = rds_ib_reuse_fmr(pool); @@ -343,12 +360,18 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) atomic_dec(&pool->item_count); if (++iter > 2) { - rds_ib_stats_inc(s_ib_rdma_mr_pool_depleted); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted); return ERR_PTR(-EAGAIN); } /* We do have some empty MRs. Flush them out. */ - rds_ib_stats_inc(s_ib_rdma_mr_pool_wait); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait); rds_ib_flush_mr_pool(pool, 0, &ibmr); if (ibmr) return ibmr; @@ -373,7 +396,12 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) goto out_no_cigar; } - rds_ib_stats_inc(s_ib_rdma_mr_alloc); + ibmr->pool = pool; + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc); + return ibmr; out_no_cigar: @@ -429,7 +457,7 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm } page_cnt += len >> PAGE_SHIFT; - if (page_cnt > fmr_message_size) + if (page_cnt > ibmr->pool->fmr_attr.max_pages) return -EINVAL; dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC, @@ -461,7 +489,10 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm ibmr->sg_dma_len = sg_dma_len; ibmr->remap_count++; - rds_ib_stats_inc(s_ib_rdma_mr_used); + if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_used); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_used); ret = 0; out: @@ -524,8 +555,7 @@ static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr) __rds_ib_teardown_mr(ibmr); if (pinned) { - struct rds_ib_device *rds_ibdev = ibmr->device; - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool = ibmr->pool; atomic_sub(pinned, &pool->free_pinned); } @@ -594,7 +624,7 @@ static void list_to_llist_nodes(struct rds_ib_mr_pool *pool, * to free as many MRs as needed to get back to this limit. */ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, - int free_all, struct rds_ib_mr **ibmr_ret) + int free_all, struct rds_ib_mr **ibmr_ret) { struct rds_ib_mr *ibmr, *next; struct llist_node *clean_nodes; @@ -605,11 +635,14 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, unsigned int nfreed = 0, dirty_to_clean = 0, free_goal; int ret = 0; - rds_ib_stats_inc(s_ib_rdma_mr_pool_flush); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush); if (ibmr_ret) { DEFINE_WAIT(wait); - while(!mutex_trylock(&pool->flush_lock)) { + while (!mutex_trylock(&pool->flush_lock)) { ibmr = rds_ib_reuse_fmr(pool); if (ibmr) { *ibmr_ret = ibmr; @@ -666,8 +699,12 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) { unpinned += ibmr->sg_len; __rds_ib_teardown_mr(ibmr); - if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) { - rds_ib_stats_inc(s_ib_rdma_mr_free); + if (nfreed < free_goal || + ibmr->remap_count >= pool->fmr_attr.max_maps) { + if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_free); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_free); list_del(&ibmr->unmap_list); ib_dealloc_fmr(ibmr->fmr); kfree(ibmr); @@ -719,8 +756,8 @@ static void rds_ib_mr_pool_flush_worker(struct work_struct *work) void rds_ib_free_mr(void *trans_private, int invalidate) { struct rds_ib_mr *ibmr = trans_private; + struct rds_ib_mr_pool *pool = ibmr->pool; struct rds_ib_device *rds_ibdev = ibmr->device; - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len); @@ -759,10 +796,11 @@ void rds_ib_flush_mrs(void) down_read(&rds_ib_devices_lock); list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + if (rds_ibdev->mr_8k_pool) + rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL); - if (pool) - rds_ib_flush_mr_pool(pool, 0, NULL); + if (rds_ibdev->mr_1m_pool) + rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL); } up_read(&rds_ib_devices_lock); } @@ -780,12 +818,12 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, goto out; } - if (!rds_ibdev->mr_pool) { + if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) { ret = -ENODEV; goto out; } - ibmr = rds_ib_alloc_fmr(rds_ibdev); + ibmr = rds_ib_alloc_fmr(rds_ibdev, nents); if (IS_ERR(ibmr)) { rds_ib_dev_put(rds_ibdev); return ibmr; diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index f43831e41..977fb8606 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -305,7 +305,7 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn, gfp_t slab_mask = GFP_NOWAIT; gfp_t page_mask = GFP_NOWAIT; - if (gfp & __GFP_WAIT) { + if (gfp & __GFP_DIRECT_RECLAIM) { slab_mask = GFP_KERNEL; page_mask = GFP_HIGHUSER; } @@ -379,7 +379,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) struct ib_recv_wr *failed_wr; unsigned int posted = 0; int ret = 0; - bool can_wait = !!(gfp & __GFP_WAIT); + bool can_wait = !!(gfp & __GFP_DIRECT_RECLAIM); u32 pos; /* the goal here is to just make sure that someone, somewhere @@ -596,8 +596,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic) * wr_id and avoids working with the ring in that case. */ #ifndef KERNEL_HAS_ATOMIC64 -static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, - int ack_required) +void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) { unsigned long flags; @@ -622,8 +621,7 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic) return seq; } #else -static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, - int ack_required) +void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) { atomic64_set(&ic->i_ack_next, seq); if (ack_required) { @@ -830,20 +828,6 @@ static void rds_ib_cong_recv(struct rds_connection *conn, rds_cong_map_updated(map, uncongested); } -/* - * Rings are posted with all the allocations they'll need to queue the - * incoming message to the receiving socket so this can't fail. - * All fragments start with a header, so we can make sure we're not receiving - * garbage, and we can tell a small 8 byte fragment from an ACK frame. - */ -struct rds_ib_ack_state { - u64 ack_next; - u64 ack_recv; - unsigned int ack_required:1; - unsigned int ack_next_valid:1; - unsigned int ack_recv_valid:1; -}; - static void rds_ib_process_recv(struct rds_connection *conn, struct rds_ib_recv_work *recv, u32 data_len, struct rds_ib_ack_state *state) @@ -969,96 +953,50 @@ static void rds_ib_process_recv(struct rds_connection *conn, } } -/* - * Plucking the oldest entry from the ring can be done concurrently with - * the thread refilling the ring. Each ring operation is protected by - * spinlocks and the transient state of refilling doesn't change the - * recording of which entry is oldest. - * - * This relies on IB only calling one cq comp_handler for each cq so that - * there will only be one caller of rds_recv_incoming() per RDS connection. - */ -void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context) -{ - struct rds_connection *conn = context; - struct rds_ib_connection *ic = conn->c_transport_data; - - rdsdebug("conn %p cq %p\n", conn, cq); - - rds_ib_stats_inc(s_ib_rx_cq_call); - - tasklet_schedule(&ic->i_recv_tasklet); -} - -static inline void rds_poll_cq(struct rds_ib_connection *ic, - struct rds_ib_ack_state *state) +void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, + struct ib_wc *wc, + struct rds_ib_ack_state *state) { struct rds_connection *conn = ic->conn; - struct ib_wc wc; struct rds_ib_recv_work *recv; - while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) { - rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", - (unsigned long long)wc.wr_id, wc.status, - ib_wc_status_msg(wc.status), wc.byte_len, - be32_to_cpu(wc.ex.imm_data)); - rds_ib_stats_inc(s_ib_rx_cq_event); + rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + ib_wc_status_msg(wc->status), wc->byte_len, + be32_to_cpu(wc->ex.imm_data)); - recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; - - ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); - - /* - * Also process recvs in connecting state because it is possible - * to get a recv completion _before_ the rdmacm ESTABLISHED - * event is processed. - */ - if (wc.status == IB_WC_SUCCESS) { - rds_ib_process_recv(conn, recv, wc.byte_len, state); - } else { - /* We expect errors as the qp is drained during shutdown */ - if (rds_conn_up(conn) || rds_conn_connecting(conn)) - rds_ib_conn_error(conn, "recv completion on %pI4 had " - "status %u (%s), disconnecting and " - "reconnecting\n", &conn->c_faddr, - wc.status, - ib_wc_status_msg(wc.status)); - } + rds_ib_stats_inc(s_ib_rx_cq_event); + recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; + ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, + DMA_FROM_DEVICE); - /* - * rds_ib_process_recv() doesn't always consume the frag, and - * we might not have called it at all if the wc didn't indicate - * success. We already unmapped the frag's pages, though, and - * the following rds_ib_ring_free() call tells the refill path - * that it will not find an allocated frag here. Make sure we - * keep that promise by freeing a frag that's still on the ring. - */ - if (recv->r_frag) { - rds_ib_frag_free(ic, recv->r_frag); - recv->r_frag = NULL; - } - rds_ib_ring_free(&ic->i_recv_ring, 1); + /* Also process recvs in connecting state because it is possible + * to get a recv completion _before_ the rdmacm ESTABLISHED + * event is processed. + */ + if (wc->status == IB_WC_SUCCESS) { + rds_ib_process_recv(conn, recv, wc->byte_len, state); + } else { + /* We expect errors as the qp is drained during shutdown */ + if (rds_conn_up(conn) || rds_conn_connecting(conn)) + rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n", + &conn->c_faddr, + wc->status, + ib_wc_status_msg(wc->status)); } -} -void rds_ib_recv_tasklet_fn(unsigned long data) -{ - struct rds_ib_connection *ic = (struct rds_ib_connection *) data; - struct rds_connection *conn = ic->conn; - struct rds_ib_ack_state state = { 0, }; - - rds_poll_cq(ic, &state); - ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); - rds_poll_cq(ic, &state); - - if (state.ack_next_valid) - rds_ib_set_ack(ic, state.ack_next, state.ack_required); - if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) { - rds_send_drop_acked(conn, state.ack_recv, NULL); - ic->i_ack_recv = state.ack_recv; + /* rds_ib_process_recv() doesn't always consume the frag, and + * we might not have called it at all if the wc didn't indicate + * success. We already unmapped the frag's pages, though, and + * the following rds_ib_ring_free() call tells the refill path + * that it will not find an allocated frag here. Make sure we + * keep that promise by freeing a frag that's still on the ring. + */ + if (recv->r_frag) { + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; } - if (rds_conn_up(conn)) - rds_ib_attempt_ack(ic); + rds_ib_ring_free(&ic->i_recv_ring, 1); /* If we ever end up with a really empty receive ring, we're * in deep trouble, as the sender will definitely see RNR diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 4e8804708..eac30bf48 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -195,7 +195,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic) send->s_op = NULL; - send->s_wr.wr_id = i; + send->s_wr.wr_id = i | RDS_IB_SEND_OP; send->s_wr.sg_list = send->s_sge; send->s_wr.ex.imm_data = 0; @@ -237,81 +237,73 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr) * unallocs the next free entry in the ring it doesn't alter which is * the next to be freed, which is what this is concerned with. */ -void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) +void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) { - struct rds_connection *conn = context; - struct rds_ib_connection *ic = conn->c_transport_data; struct rds_message *rm = NULL; - struct ib_wc wc; + struct rds_connection *conn = ic->conn; struct rds_ib_send_work *send; u32 completed; u32 oldest; u32 i = 0; - int ret; int nr_sig = 0; - rdsdebug("cq %p conn %p\n", cq, conn); - rds_ib_stats_inc(s_ib_tx_cq_call); - ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); - if (ret) - rdsdebug("ib_req_notify_cq send failed: %d\n", ret); - - while (ib_poll_cq(cq, 1, &wc) > 0) { - rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", - (unsigned long long)wc.wr_id, wc.status, - ib_wc_status_msg(wc.status), wc.byte_len, - be32_to_cpu(wc.ex.imm_data)); - rds_ib_stats_inc(s_ib_tx_cq_event); - - if (wc.wr_id == RDS_IB_ACK_WR_ID) { - if (time_after(jiffies, ic->i_ack_queued + HZ/2)) - rds_ib_stats_inc(s_ib_tx_stalled); - rds_ib_ack_send_complete(ic); - continue; - } - oldest = rds_ib_ring_oldest(&ic->i_send_ring); + rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + ib_wc_status_msg(wc->status), wc->byte_len, + be32_to_cpu(wc->ex.imm_data)); + rds_ib_stats_inc(s_ib_tx_cq_event); - completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest); + if (wc->wr_id == RDS_IB_ACK_WR_ID) { + if (time_after(jiffies, ic->i_ack_queued + HZ / 2)) + rds_ib_stats_inc(s_ib_tx_stalled); + rds_ib_ack_send_complete(ic); + return; + } - for (i = 0; i < completed; i++) { - send = &ic->i_sends[oldest]; - if (send->s_wr.send_flags & IB_SEND_SIGNALED) - nr_sig++; + oldest = rds_ib_ring_oldest(&ic->i_send_ring); - rm = rds_ib_send_unmap_op(ic, send, wc.status); + completed = rds_ib_ring_completed(&ic->i_send_ring, + (wc->wr_id & ~RDS_IB_SEND_OP), + oldest); - if (time_after(jiffies, send->s_queued + HZ/2)) - rds_ib_stats_inc(s_ib_tx_stalled); + for (i = 0; i < completed; i++) { + send = &ic->i_sends[oldest]; + if (send->s_wr.send_flags & IB_SEND_SIGNALED) + nr_sig++; - if (send->s_op) { - if (send->s_op == rm->m_final_op) { - /* If anyone waited for this message to get flushed out, wake - * them up now */ - rds_message_unmapped(rm); - } - rds_message_put(rm); - send->s_op = NULL; - } + rm = rds_ib_send_unmap_op(ic, send, wc->status); - oldest = (oldest + 1) % ic->i_send_ring.w_nr; - } + if (time_after(jiffies, send->s_queued + HZ / 2)) + rds_ib_stats_inc(s_ib_tx_stalled); - rds_ib_ring_free(&ic->i_send_ring, completed); - rds_ib_sub_signaled(ic, nr_sig); - nr_sig = 0; - - if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || - test_bit(0, &conn->c_map_queued)) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); - - /* We expect errors as the qp is drained during shutdown */ - if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) { - rds_ib_conn_error(conn, "send completion on %pI4 had status " - "%u (%s), disconnecting and reconnecting\n", - &conn->c_faddr, wc.status, - ib_wc_status_msg(wc.status)); + if (send->s_op) { + if (send->s_op == rm->m_final_op) { + /* If anyone waited for this message to get + * flushed out, wake them up now + */ + rds_message_unmapped(rm); + } + rds_message_put(rm); + send->s_op = NULL; } + + oldest = (oldest + 1) % ic->i_send_ring.w_nr; + } + + rds_ib_ring_free(&ic->i_send_ring, completed); + rds_ib_sub_signaled(ic, nr_sig); + nr_sig = 0; + + if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || + test_bit(0, &conn->c_map_queued)) + queue_delayed_work(rds_wq, &conn->c_send_w, 0); + + /* We expect errors as the qp is drained during shutdown */ + if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) { + rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n", + &conn->c_faddr, wc->status, + ib_wc_status_msg(wc->status)); } } @@ -785,23 +777,23 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) send->s_queued = jiffies; if (op->op_type == RDS_ATOMIC_TYPE_CSWP) { - send->s_wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP; - send->s_wr.wr.atomic.compare_add = op->op_m_cswp.compare; - send->s_wr.wr.atomic.swap = op->op_m_cswp.swap; - send->s_wr.wr.atomic.compare_add_mask = op->op_m_cswp.compare_mask; - send->s_wr.wr.atomic.swap_mask = op->op_m_cswp.swap_mask; + send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP; + send->s_atomic_wr.compare_add = op->op_m_cswp.compare; + send->s_atomic_wr.swap = op->op_m_cswp.swap; + send->s_atomic_wr.compare_add_mask = op->op_m_cswp.compare_mask; + send->s_atomic_wr.swap_mask = op->op_m_cswp.swap_mask; } else { /* FADD */ - send->s_wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD; - send->s_wr.wr.atomic.compare_add = op->op_m_fadd.add; - send->s_wr.wr.atomic.swap = 0; - send->s_wr.wr.atomic.compare_add_mask = op->op_m_fadd.nocarry_mask; - send->s_wr.wr.atomic.swap_mask = 0; + send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD; + send->s_atomic_wr.compare_add = op->op_m_fadd.add; + send->s_atomic_wr.swap = 0; + send->s_atomic_wr.compare_add_mask = op->op_m_fadd.nocarry_mask; + send->s_atomic_wr.swap_mask = 0; } nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify); - send->s_wr.num_sge = 1; - send->s_wr.next = NULL; - send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; - send->s_wr.wr.atomic.rkey = op->op_rkey; + send->s_atomic_wr.wr.num_sge = 1; + send->s_atomic_wr.wr.next = NULL; + send->s_atomic_wr.remote_addr = op->op_remote_addr; + send->s_atomic_wr.rkey = op->op_rkey; send->s_op = op; rds_message_addref(container_of(send->s_op, struct rds_message, atomic)); @@ -826,11 +818,11 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) if (nr_sig) atomic_add(nr_sig, &ic->i_signaled_sends); - failed_wr = &send->s_wr; - ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr); + failed_wr = &send->s_atomic_wr.wr; + ret = ib_post_send(ic->i_cm_id->qp, &send->s_atomic_wr.wr, &failed_wr); rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, - send, &send->s_wr, ret, failed_wr); - BUG_ON(failed_wr != &send->s_wr); + send, &send->s_atomic_wr, ret, failed_wr); + BUG_ON(failed_wr != &send->s_atomic_wr.wr); if (ret) { printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); @@ -839,9 +831,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) goto out; } - if (unlikely(failed_wr != &send->s_wr)) { + if (unlikely(failed_wr != &send->s_atomic_wr.wr)) { printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret); - BUG_ON(failed_wr != &send->s_wr); + BUG_ON(failed_wr != &send->s_atomic_wr.wr); } out: @@ -912,22 +904,23 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; - send->s_wr.wr.rdma.remote_addr = remote_addr; - send->s_wr.wr.rdma.rkey = op->op_rkey; + send->s_rdma_wr.remote_addr = remote_addr; + send->s_rdma_wr.rkey = op->op_rkey; if (num_sge > max_sge) { - send->s_wr.num_sge = max_sge; + send->s_rdma_wr.wr.num_sge = max_sge; num_sge -= max_sge; } else { - send->s_wr.num_sge = num_sge; + send->s_rdma_wr.wr.num_sge = num_sge; } - send->s_wr.next = NULL; + send->s_rdma_wr.wr.next = NULL; if (prev) - prev->s_wr.next = &send->s_wr; + prev->s_rdma_wr.wr.next = &send->s_rdma_wr.wr; - for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) { + for (j = 0; j < send->s_rdma_wr.wr.num_sge && + scat != &op->op_sg[op->op_count]; j++) { len = ib_sg_dma_len(ic->i_cm_id->device, scat); send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat); @@ -942,7 +935,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) } rdsdebug("send %p wr %p num_sge %u next %p\n", send, - &send->s_wr, send->s_wr.num_sge, send->s_wr.next); + &send->s_rdma_wr.wr, + send->s_rdma_wr.wr.num_sge, + send->s_rdma_wr.wr.next); prev = send; if (++send == &ic->i_sends[ic->i_send_ring.w_nr]) @@ -963,11 +958,11 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) if (nr_sig) atomic_add(nr_sig, &ic->i_signaled_sends); - failed_wr = &first->s_wr; - ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); + failed_wr = &first->s_rdma_wr.wr; + ret = ib_post_send(ic->i_cm_id->qp, &first->s_rdma_wr.wr, &failed_wr); rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, - first, &first->s_wr, ret, failed_wr); - BUG_ON(failed_wr != &first->s_wr); + first, &first->s_rdma_wr.wr, ret, failed_wr); + BUG_ON(failed_wr != &first->s_rdma_wr.wr); if (ret) { printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); @@ -976,9 +971,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) goto out; } - if (unlikely(failed_wr != &first->s_wr)) { + if (unlikely(failed_wr != &first->s_rdma_wr.wr)) { printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret); - BUG_ON(failed_wr != &first->s_wr); + BUG_ON(failed_wr != &first->s_rdma_wr.wr); } diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c index 2d5965d6e..d77e04473 100644 --- a/net/rds/ib_stats.c +++ b/net/rds/ib_stats.c @@ -42,14 +42,14 @@ DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_ib_statistics, rds_ib_stats); static const char *const rds_ib_stat_names[] = { "ib_connect_raced", "ib_listen_closed_stale", - "ib_tx_cq_call", + "s_ib_evt_handler_call", + "ib_tasklet_call", "ib_tx_cq_event", "ib_tx_ring_full", "ib_tx_throttle", "ib_tx_sg_mapping_failure", "ib_tx_stalled", "ib_tx_credit_updates", - "ib_rx_cq_call", "ib_rx_cq_event", "ib_rx_ring_empty", "ib_rx_refill_from_cq", @@ -61,12 +61,18 @@ static const char *const rds_ib_stat_names[] = { "ib_ack_send_delayed", "ib_ack_send_piggybacked", "ib_ack_received", - "ib_rdma_mr_alloc", - "ib_rdma_mr_free", - "ib_rdma_mr_used", - "ib_rdma_mr_pool_flush", - "ib_rdma_mr_pool_wait", - "ib_rdma_mr_pool_depleted", + "ib_rdma_mr_8k_alloc", + "ib_rdma_mr_8k_free", + "ib_rdma_mr_8k_used", + "ib_rdma_mr_8k_pool_flush", + "ib_rdma_mr_8k_pool_wait", + "ib_rdma_mr_8k_pool_depleted", + "ib_rdma_mr_1m_alloc", + "ib_rdma_mr_1m_free", + "ib_rdma_mr_1m_used", + "ib_rdma_mr_1m_pool_flush", + "ib_rdma_mr_1m_pool_wait", + "ib_rdma_mr_1m_pool_depleted", "ib_atomic_cswp", "ib_atomic_fadd", }; diff --git a/net/rds/iw.c b/net/rds/iw.c index 3df0295c6..576f1825f 100644 --- a/net/rds/iw.c +++ b/net/rds/iw.c @@ -223,7 +223,7 @@ static int rds_iw_laddr_check(struct net *net, __be32 addr) /* Create a CMA ID and try to bind it. This catches both * IB and iWARP capable NICs. */ - cm_id = rdma_create_id(NULL, NULL, RDMA_PS_TCP, IB_QPT_RC); + cm_id = rdma_create_id(&init_net, NULL, NULL, RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(cm_id)) return PTR_ERR(cm_id); diff --git a/net/rds/iw.h b/net/rds/iw.h index cbe6674e3..5af01d175 100644 --- a/net/rds/iw.h +++ b/net/rds/iw.h @@ -74,10 +74,13 @@ struct rds_iw_send_work { struct rm_rdma_op *s_op; struct rds_iw_mapping *s_mapping; struct ib_mr *s_mr; - struct ib_fast_reg_page_list *s_page_list; unsigned char s_remap_count; - struct ib_send_wr s_wr; + union { + struct ib_send_wr s_send_wr; + struct ib_rdma_wr s_rdma_wr; + struct ib_reg_wr s_reg_wr; + }; struct ib_sge s_sge[RDS_IW_MAX_SGE]; unsigned long s_queued; }; @@ -195,7 +198,7 @@ struct rds_iw_device { /* Magic WR_ID for ACKs */ #define RDS_IW_ACK_WR_ID ((u64)0xffffffffffffffffULL) -#define RDS_IW_FAST_REG_WR_ID ((u64)0xefefefefefefefefULL) +#define RDS_IW_REG_WR_ID ((u64)0xefefefefefefefefULL) #define RDS_IW_LOCAL_INV_WR_ID ((u64)0xdfdfdfdfdfdfdfdfULL) struct rds_iw_statistics { diff --git a/net/rds/iw_cm.c b/net/rds/iw_cm.c index a6553a6fb..aea4c911b 100644 --- a/net/rds/iw_cm.c +++ b/net/rds/iw_cm.c @@ -524,7 +524,7 @@ int rds_iw_conn_connect(struct rds_connection *conn) /* XXX I wonder what affect the port space has */ /* delegate cm event handler to rdma_transport */ - ic->i_cm_id = rdma_create_id(rds_rdma_cm_event_handler, conn, + ic->i_cm_id = rdma_create_id(&init_net, rds_rdma_cm_event_handler, conn, RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(ic->i_cm_id)) { ret = PTR_ERR(ic->i_cm_id); diff --git a/net/rds/iw_rdma.c b/net/rds/iw_rdma.c index 6a8fbd6e6..b09a40c1a 100644 --- a/net/rds/iw_rdma.c +++ b/net/rds/iw_rdma.c @@ -47,7 +47,6 @@ struct rds_iw_mr { struct rdma_cm_id *cm_id; struct ib_mr *mr; - struct ib_fast_reg_page_list *page_list; struct rds_iw_mapping mapping; unsigned char remap_count; @@ -75,10 +74,10 @@ struct rds_iw_mr_pool { int max_pages; }; -static int rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all); +static void rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all); static void rds_iw_mr_pool_flush_worker(struct work_struct *work); -static int rds_iw_init_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr); -static int rds_iw_map_fastreg(struct rds_iw_mr_pool *pool, +static int rds_iw_init_reg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr); +static int rds_iw_map_reg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr, struct scatterlist *sg, unsigned int nents); static void rds_iw_free_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr); @@ -258,19 +257,18 @@ static void rds_iw_set_scatterlist(struct rds_iw_scatterlist *sg, sg->bytes = 0; } -static u64 *rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev, - struct rds_iw_scatterlist *sg) +static int rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev, + struct rds_iw_scatterlist *sg) { struct ib_device *dev = rds_iwdev->dev; - u64 *dma_pages = NULL; - int i, j, ret; + int i, ret; WARN_ON(sg->dma_len); sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL); if (unlikely(!sg->dma_len)) { printk(KERN_WARNING "RDS/IW: dma_map_sg failed!\n"); - return ERR_PTR(-EBUSY); + return -EBUSY; } sg->bytes = 0; @@ -303,31 +301,14 @@ static u64 *rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev, if (sg->dma_npages > fastreg_message_size) goto out_unmap; - dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC); - if (!dma_pages) { - ret = -ENOMEM; - goto out_unmap; - } - - for (i = j = 0; i < sg->dma_len; ++i) { - unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]); - u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]); - u64 end_addr; - end_addr = dma_addr + dma_len; - dma_addr &= ~PAGE_MASK; - for (; dma_addr < end_addr; dma_addr += PAGE_SIZE) - dma_pages[j++] = dma_addr; - BUG_ON(j > sg->dma_npages); - } - return dma_pages; + return 0; out_unmap: ib_dma_unmap_sg(rds_iwdev->dev, sg->list, sg->len, DMA_BIDIRECTIONAL); sg->dma_len = 0; - kfree(dma_pages); - return ERR_PTR(ret); + return ret; } @@ -440,7 +421,7 @@ static struct rds_iw_mr *rds_iw_alloc_mr(struct rds_iw_device *rds_iwdev) INIT_LIST_HEAD(&ibmr->mapping.m_list); ibmr->mapping.m_mr = ibmr; - err = rds_iw_init_fastreg(pool, ibmr); + err = rds_iw_init_reg(pool, ibmr); if (err) goto out_no_cigar; @@ -479,14 +460,13 @@ void rds_iw_sync_mr(void *trans_private, int direction) * If the number of MRs allocated exceeds the limit, we also try * to free as many MRs as needed to get back to this limit. */ -static int rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all) +static void rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all) { struct rds_iw_mr *ibmr, *next; LIST_HEAD(unmap_list); LIST_HEAD(kill_list); unsigned long flags; unsigned int nfreed = 0, ncleaned = 0, unpinned = 0; - int ret = 0; rds_iw_stats_inc(s_iw_rdma_mr_pool_flush); @@ -538,7 +518,6 @@ static int rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all) atomic_sub(nfreed, &pool->item_count); mutex_unlock(&pool->flush_lock); - return ret; } static void rds_iw_mr_pool_flush_worker(struct work_struct *work) @@ -622,7 +601,7 @@ void *rds_iw_get_mr(struct scatterlist *sg, unsigned long nents, ibmr->cm_id = cm_id; ibmr->device = rds_iwdev; - ret = rds_iw_map_fastreg(rds_iwdev->mr_pool, ibmr, sg, nents); + ret = rds_iw_map_reg(rds_iwdev->mr_pool, ibmr, sg, nents); if (ret == 0) *key_ret = ibmr->mr->rkey; else @@ -638,7 +617,7 @@ out: } /* - * iWARP fastreg handling + * iWARP reg handling * * The life cycle of a fastreg registration is a bit different from * FMRs. @@ -650,7 +629,7 @@ out: * This creates a bit of a problem for us, as we do not have the destination * IP in GET_MR, so the connection must be setup prior to the GET_MR call for * RDMA to be correctly setup. If a fastreg request is present, rds_iw_xmit - * will try to queue a LOCAL_INV (if needed) and a FAST_REG_MR work request + * will try to queue a LOCAL_INV (if needed) and a REG_MR work request * before queuing the SEND. When completions for these arrive, they are * dispatched to the MR has a bit set showing that RDMa can be performed. * @@ -659,11 +638,10 @@ out: * The expectation there is that this invalidation step includes ALL * PREVIOUSLY FREED MRs. */ -static int rds_iw_init_fastreg(struct rds_iw_mr_pool *pool, - struct rds_iw_mr *ibmr) +static int rds_iw_init_reg(struct rds_iw_mr_pool *pool, + struct rds_iw_mr *ibmr) { struct rds_iw_device *rds_iwdev = pool->device; - struct ib_fast_reg_page_list *page_list = NULL; struct ib_mr *mr; int err; @@ -676,55 +654,44 @@ static int rds_iw_init_fastreg(struct rds_iw_mr_pool *pool, return err; } - /* FIXME - this is overkill, but mapping->m_sg.dma_len/mapping->m_sg.dma_npages - * is not filled in. - */ - page_list = ib_alloc_fast_reg_page_list(rds_iwdev->dev, pool->max_message_size); - if (IS_ERR(page_list)) { - err = PTR_ERR(page_list); - - printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_page_list failed (err=%d)\n", err); - ib_dereg_mr(mr); - return err; - } - - ibmr->page_list = page_list; ibmr->mr = mr; return 0; } -static int rds_iw_rdma_build_fastreg(struct rds_iw_mapping *mapping) +static int rds_iw_rdma_reg_mr(struct rds_iw_mapping *mapping) { struct rds_iw_mr *ibmr = mapping->m_mr; - struct ib_send_wr f_wr, *failed_wr; - int ret; + struct rds_iw_scatterlist *m_sg = &mapping->m_sg; + struct ib_reg_wr reg_wr; + struct ib_send_wr *failed_wr; + int ret, n; + + n = ib_map_mr_sg_zbva(ibmr->mr, m_sg->list, m_sg->len, PAGE_SIZE); + if (unlikely(n != m_sg->len)) + return n < 0 ? n : -EINVAL; + + reg_wr.wr.next = NULL; + reg_wr.wr.opcode = IB_WR_REG_MR; + reg_wr.wr.wr_id = RDS_IW_REG_WR_ID; + reg_wr.wr.num_sge = 0; + reg_wr.mr = ibmr->mr; + reg_wr.key = mapping->m_rkey; + reg_wr.access = IB_ACCESS_LOCAL_WRITE | + IB_ACCESS_REMOTE_READ | + IB_ACCESS_REMOTE_WRITE; /* - * Perform a WR for the fast_reg_mr. Each individual page + * Perform a WR for the reg_mr. Each individual page * in the sg list is added to the fast reg page list and placed - * inside the fast_reg_mr WR. The key used is a rolling 8bit + * inside the reg_mr WR. The key used is a rolling 8bit * counter, which should guarantee uniqueness. */ ib_update_fast_reg_key(ibmr->mr, ibmr->remap_count++); mapping->m_rkey = ibmr->mr->rkey; - memset(&f_wr, 0, sizeof(f_wr)); - f_wr.wr_id = RDS_IW_FAST_REG_WR_ID; - f_wr.opcode = IB_WR_FAST_REG_MR; - f_wr.wr.fast_reg.length = mapping->m_sg.bytes; - f_wr.wr.fast_reg.rkey = mapping->m_rkey; - f_wr.wr.fast_reg.page_list = ibmr->page_list; - f_wr.wr.fast_reg.page_list_len = mapping->m_sg.dma_len; - f_wr.wr.fast_reg.page_shift = PAGE_SHIFT; - f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE | - IB_ACCESS_REMOTE_READ | - IB_ACCESS_REMOTE_WRITE; - f_wr.wr.fast_reg.iova_start = 0; - f_wr.send_flags = IB_SEND_SIGNALED; - - failed_wr = &f_wr; - ret = ib_post_send(ibmr->cm_id->qp, &f_wr, &failed_wr); - BUG_ON(failed_wr != &f_wr); + failed_wr = ®_wr.wr; + ret = ib_post_send(ibmr->cm_id->qp, ®_wr.wr, &failed_wr); + BUG_ON(failed_wr != ®_wr.wr); if (ret) printk_ratelimited(KERN_WARNING "RDS/IW: %s:%d ib_post_send returned %d\n", __func__, __LINE__, ret); @@ -756,21 +723,20 @@ out: return ret; } -static int rds_iw_map_fastreg(struct rds_iw_mr_pool *pool, - struct rds_iw_mr *ibmr, - struct scatterlist *sg, - unsigned int sg_len) +static int rds_iw_map_reg(struct rds_iw_mr_pool *pool, + struct rds_iw_mr *ibmr, + struct scatterlist *sg, + unsigned int sg_len) { struct rds_iw_device *rds_iwdev = pool->device; struct rds_iw_mapping *mapping = &ibmr->mapping; u64 *dma_pages; - int i, ret = 0; + int ret = 0; rds_iw_set_scatterlist(&mapping->m_sg, sg, sg_len); - dma_pages = rds_iw_map_scatterlist(rds_iwdev, &mapping->m_sg); - if (IS_ERR(dma_pages)) { - ret = PTR_ERR(dma_pages); + ret = rds_iw_map_scatterlist(rds_iwdev, &mapping->m_sg); + if (ret) { dma_pages = NULL; goto out; } @@ -780,10 +746,7 @@ static int rds_iw_map_fastreg(struct rds_iw_mr_pool *pool, goto out; } - for (i = 0; i < mapping->m_sg.dma_npages; ++i) - ibmr->page_list->page_list[i] = dma_pages[i]; - - ret = rds_iw_rdma_build_fastreg(mapping); + ret = rds_iw_rdma_reg_mr(mapping); if (ret) goto out; @@ -869,8 +832,6 @@ static unsigned int rds_iw_unmap_fastreg_list(struct rds_iw_mr_pool *pool, static void rds_iw_destroy_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr) { - if (ibmr->page_list) - ib_free_fast_reg_page_list(ibmr->page_list); if (ibmr->mr) ib_dereg_mr(ibmr->mr); } diff --git a/net/rds/iw_send.c b/net/rds/iw_send.c index 86152ec3b..e20bd503f 100644 --- a/net/rds/iw_send.c +++ b/net/rds/iw_send.c @@ -137,13 +137,13 @@ void rds_iw_send_init_ring(struct rds_iw_connection *ic) send->s_op = NULL; send->s_mapping = NULL; - send->s_wr.next = NULL; - send->s_wr.wr_id = i; - send->s_wr.sg_list = send->s_sge; - send->s_wr.num_sge = 1; - send->s_wr.opcode = IB_WR_SEND; - send->s_wr.send_flags = 0; - send->s_wr.ex.imm_data = 0; + send->s_send_wr.next = NULL; + send->s_send_wr.wr_id = i; + send->s_send_wr.sg_list = send->s_sge; + send->s_send_wr.num_sge = 1; + send->s_send_wr.opcode = IB_WR_SEND; + send->s_send_wr.send_flags = 0; + send->s_send_wr.ex.imm_data = 0; sge = rds_iw_data_sge(ic, send->s_sge); sge->lkey = 0; @@ -159,13 +159,6 @@ void rds_iw_send_init_ring(struct rds_iw_connection *ic) printk(KERN_WARNING "RDS/IW: ib_alloc_mr failed\n"); break; } - - send->s_page_list = ib_alloc_fast_reg_page_list( - ic->i_cm_id->device, fastreg_message_size); - if (IS_ERR(send->s_page_list)) { - printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_page_list failed\n"); - break; - } } } @@ -177,9 +170,7 @@ void rds_iw_send_clear_ring(struct rds_iw_connection *ic) for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { BUG_ON(!send->s_mr); ib_dereg_mr(send->s_mr); - BUG_ON(!send->s_page_list); - ib_free_fast_reg_page_list(send->s_page_list); - if (send->s_wr.opcode == 0xdead) + if (send->s_send_wr.opcode == 0xdead) continue; if (send->s_rm) rds_iw_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR); @@ -227,7 +218,7 @@ void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context) continue; } - if (wc.opcode == IB_WC_FAST_REG_MR && wc.wr_id == RDS_IW_FAST_REG_WR_ID) { + if (wc.opcode == IB_WC_REG_MR && wc.wr_id == RDS_IW_REG_WR_ID) { ic->i_fastreg_posted = 1; continue; } @@ -247,12 +238,12 @@ void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context) send = &ic->i_sends[oldest]; /* In the error case, wc.opcode sometimes contains garbage */ - switch (send->s_wr.opcode) { + switch (send->s_send_wr.opcode) { case IB_WR_SEND: if (send->s_rm) rds_iw_send_unmap_rm(ic, send, wc.status); break; - case IB_WR_FAST_REG_MR: + case IB_WR_REG_MR: case IB_WR_RDMA_WRITE: case IB_WR_RDMA_READ: case IB_WR_RDMA_READ_WITH_INV: @@ -262,12 +253,12 @@ void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context) default: printk_ratelimited(KERN_NOTICE "RDS/IW: %s: unexpected opcode 0x%x in WR!\n", - __func__, send->s_wr.opcode); + __func__, send->s_send_wr.opcode); break; } - send->s_wr.opcode = 0xdead; - send->s_wr.num_sge = 1; + send->s_send_wr.opcode = 0xdead; + send->s_send_wr.num_sge = 1; if (time_after(jiffies, send->s_queued + HZ/2)) rds_iw_stats_inc(s_iw_tx_stalled); @@ -455,10 +446,10 @@ rds_iw_xmit_populate_wr(struct rds_iw_connection *ic, WARN_ON(pos != send - ic->i_sends); - send->s_wr.send_flags = send_flags; - send->s_wr.opcode = IB_WR_SEND; - send->s_wr.num_sge = 2; - send->s_wr.next = NULL; + send->s_send_wr.send_flags = send_flags; + send->s_send_wr.opcode = IB_WR_SEND; + send->s_send_wr.num_sge = 2; + send->s_send_wr.next = NULL; send->s_queued = jiffies; send->s_op = NULL; @@ -472,7 +463,7 @@ rds_iw_xmit_populate_wr(struct rds_iw_connection *ic, } else { /* We're sending a packet with no payload. There is only * one SGE */ - send->s_wr.num_sge = 1; + send->s_send_wr.num_sge = 1; sge = &send->s_sge[0]; } @@ -672,23 +663,23 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm, */ if (ic->i_unsignaled_wrs-- == 0) { ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; - send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; + send->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; } ic->i_unsignaled_bytes -= len; if (ic->i_unsignaled_bytes <= 0) { ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes; - send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; + send->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; } /* * Always signal the last one if we're stopping due to flow control. */ if (flow_controlled && i == (work_alloc-1)) - send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; + send->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; rdsdebug("send %p wr %p num_sge %u next %p\n", send, - &send->s_wr, send->s_wr.num_sge, send->s_wr.next); + &send->s_send_wr, send->s_send_wr.num_sge, send->s_send_wr.next); sent += len; rm->data.op_dmaoff += len; @@ -722,7 +713,7 @@ add_header: } if (prev) - prev->s_wr.next = &send->s_wr; + prev->s_send_wr.next = &send->s_send_wr; prev = send; pos = (pos + 1) % ic->i_send_ring.w_nr; @@ -736,7 +727,7 @@ add_header: /* if we finished the message then send completion owns it */ if (scat == &rm->data.op_sg[rm->data.op_count]) { prev->s_rm = ic->i_rm; - prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; + prev->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; ic->i_rm = NULL; } @@ -748,11 +739,11 @@ add_header: rds_iw_send_add_credits(conn, credit_alloc - i); /* XXX need to worry about failed_wr and partial sends. */ - failed_wr = &first->s_wr; - ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); + failed_wr = &first->s_send_wr; + ret = ib_post_send(ic->i_cm_id->qp, &first->s_send_wr, &failed_wr); rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, - first, &first->s_wr, ret, failed_wr); - BUG_ON(failed_wr != &first->s_wr); + first, &first->s_send_wr, ret, failed_wr); + BUG_ON(failed_wr != &first->s_send_wr); if (ret) { printk(KERN_WARNING "RDS/IW: ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); @@ -770,24 +761,26 @@ out: return ret; } -static void rds_iw_build_send_fastreg(struct rds_iw_device *rds_iwdev, struct rds_iw_connection *ic, struct rds_iw_send_work *send, int nent, int len, u64 sg_addr) +static int rds_iw_build_send_reg(struct rds_iw_send_work *send, + struct scatterlist *sg, + int sg_nents) { - BUG_ON(nent > send->s_page_list->max_page_list_len); - /* - * Perform a WR for the fast_reg_mr. Each individual page - * in the sg list is added to the fast reg page list and placed - * inside the fast_reg_mr WR. - */ - send->s_wr.opcode = IB_WR_FAST_REG_MR; - send->s_wr.wr.fast_reg.length = len; - send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey; - send->s_wr.wr.fast_reg.page_list = send->s_page_list; - send->s_wr.wr.fast_reg.page_list_len = nent; - send->s_wr.wr.fast_reg.page_shift = PAGE_SHIFT; - send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE; - send->s_wr.wr.fast_reg.iova_start = sg_addr; + int n; + + n = ib_map_mr_sg(send->s_mr, sg, sg_nents, PAGE_SIZE); + if (unlikely(n != sg_nents)) + return n < 0 ? n : -EINVAL; + + send->s_reg_wr.wr.opcode = IB_WR_REG_MR; + send->s_reg_wr.wr.wr_id = 0; + send->s_reg_wr.wr.num_sge = 0; + send->s_reg_wr.mr = send->s_mr; + send->s_reg_wr.key = send->s_mr->rkey; + send->s_reg_wr.access = IB_ACCESS_REMOTE_WRITE; ib_update_fast_reg_key(send->s_mr, send->s_remap_count++); + + return 0; } int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) @@ -808,6 +801,7 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) int sent; int ret; int num_sge; + int sg_nents; rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client); @@ -861,9 +855,10 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) scat = &op->op_sg[0]; sent = 0; num_sge = op->op_count; + sg_nents = 0; for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) { - send->s_wr.send_flags = 0; + send->s_rdma_wr.wr.send_flags = 0; send->s_queued = jiffies; /* @@ -872,7 +867,7 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) */ if (ic->i_unsignaled_wrs-- == 0) { ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; - send->s_wr.send_flags = IB_SEND_SIGNALED; + send->s_rdma_wr.wr.send_flags = IB_SEND_SIGNALED; } /* To avoid the need to have the plumbing to invalidate the fastreg_mr used @@ -880,30 +875,31 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) * IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed. */ if (op->op_write) - send->s_wr.opcode = IB_WR_RDMA_WRITE; + send->s_rdma_wr.wr.opcode = IB_WR_RDMA_WRITE; else - send->s_wr.opcode = IB_WR_RDMA_READ_WITH_INV; + send->s_rdma_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV; - send->s_wr.wr.rdma.remote_addr = remote_addr; - send->s_wr.wr.rdma.rkey = op->op_rkey; + send->s_rdma_wr.remote_addr = remote_addr; + send->s_rdma_wr.rkey = op->op_rkey; send->s_op = op; if (num_sge > rds_iwdev->max_sge) { - send->s_wr.num_sge = rds_iwdev->max_sge; + send->s_rdma_wr.wr.num_sge = rds_iwdev->max_sge; num_sge -= rds_iwdev->max_sge; } else - send->s_wr.num_sge = num_sge; + send->s_rdma_wr.wr.num_sge = num_sge; - send->s_wr.next = NULL; + send->s_rdma_wr.wr.next = NULL; if (prev) - prev->s_wr.next = &send->s_wr; + prev->s_send_wr.next = &send->s_rdma_wr.wr; - for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) { + for (j = 0; j < send->s_rdma_wr.wr.num_sge && + scat != &op->op_sg[op->op_count]; j++) { len = ib_sg_dma_len(ic->i_cm_id->device, scat); - if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) - send->s_page_list->page_list[j] = ib_sg_dma_address(ic->i_cm_id->device, scat); + if (send->s_rdma_wr.wr.opcode == IB_WR_RDMA_READ_WITH_INV) + sg_nents++; else { send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat); send->s_sge[j].length = len; @@ -917,15 +913,17 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) scat++; } - if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) { - send->s_wr.num_sge = 1; + if (send->s_rdma_wr.wr.opcode == IB_WR_RDMA_READ_WITH_INV) { + send->s_rdma_wr.wr.num_sge = 1; send->s_sge[0].addr = conn->c_xmit_rm->m_rs->rs_user_addr; send->s_sge[0].length = conn->c_xmit_rm->m_rs->rs_user_bytes; send->s_sge[0].lkey = ic->i_sends[fr_pos].s_mr->lkey; } rdsdebug("send %p wr %p num_sge %u next %p\n", send, - &send->s_wr, send->s_wr.num_sge, send->s_wr.next); + &send->s_rdma_wr, + send->s_rdma_wr.wr.num_sge, + send->s_rdma_wr.wr.next); prev = send; if (++send == &ic->i_sends[ic->i_send_ring.w_nr]) @@ -934,7 +932,7 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) /* if we finished the message then send completion owns it */ if (scat == &op->op_sg[op->op_count]) - first->s_wr.send_flags = IB_SEND_SIGNALED; + first->s_rdma_wr.wr.send_flags = IB_SEND_SIGNALED; if (i < work_alloc) { rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i); @@ -948,16 +946,20 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) * fastreg_mr (or possibly a dma_mr) */ if (!op->op_write) { - rds_iw_build_send_fastreg(rds_iwdev, ic, &ic->i_sends[fr_pos], - op->op_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr); + ret = rds_iw_build_send_reg(&ic->i_sends[fr_pos], + &op->op_sg[0], sg_nents); + if (ret) { + printk(KERN_WARNING "RDS/IW: failed to reg send mem\n"); + goto out; + } work_alloc++; } - failed_wr = &first->s_wr; - ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); + failed_wr = &first->s_rdma_wr.wr; + ret = ib_post_send(ic->i_cm_id->qp, &first->s_rdma_wr.wr, &failed_wr); rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, - first, &first->s_wr, ret, failed_wr); - BUG_ON(failed_wr != &first->s_wr); + first, &first->s_rdma_wr, ret, failed_wr); + BUG_ON(failed_wr != &first->s_rdma_wr.wr); if (ret) { printk(KERN_WARNING "RDS/IW: rdma ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index b9b40af53..9c1fed81b 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -142,8 +142,8 @@ static int rds_rdma_listen_init(void) struct rdma_cm_id *cm_id; int ret; - cm_id = rdma_create_id(rds_rdma_cm_event_handler, NULL, RDMA_PS_TCP, - IB_QPT_RC); + cm_id = rdma_create_id(&init_net, rds_rdma_cm_event_handler, NULL, + RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(cm_id)) { ret = PTR_ERR(cm_id); printk(KERN_ERR "RDS/RDMA: failed to setup listener, " diff --git a/net/rds/rds.h b/net/rds/rds.h index afb4048d0..0e2797bdc 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -7,6 +7,7 @@ #include <rdma/rdma_cm.h> #include <linux/mutex.h> #include <linux/rds.h> +#include <linux/rhashtable.h> #include "info.h" @@ -86,7 +87,9 @@ struct rds_connection { struct hlist_node c_hash_node; __be32 c_laddr; __be32 c_faddr; - unsigned int c_loopback:1; + unsigned int c_loopback:1, + c_outgoing:1, + c_pad_to_32:30; struct rds_connection *c_passive; struct rds_cong_map *c_lcong; @@ -472,7 +475,8 @@ struct rds_sock { * bound_addr used for both incoming and outgoing, no INADDR_ANY * support. */ - struct hlist_node rs_bound_node; + struct rhash_head rs_bound_node; + u64 rs_bound_key; __be32 rs_bound_addr; __be32 rs_conn_addr; __be16 rs_bound_port; @@ -603,6 +607,8 @@ extern wait_queue_head_t rds_poll_waitq; int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len); void rds_remove_bound(struct rds_sock *rs); struct rds_sock *rds_find_bound(__be32 addr, __be16 port); +int rds_bind_lock_init(void); +void rds_bind_lock_destroy(void); /* cong.c */ int rds_cong_get_maps(struct rds_connection *conn); diff --git a/net/rds/send.c b/net/rds/send.c index 859de6f32..c9cdb358e 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -38,6 +38,7 @@ #include <linux/list.h> #include <linux/ratelimit.h> #include <linux/export.h> +#include <linux/sizes.h> #include "rds.h" @@ -51,7 +52,7 @@ * it to 0 will restore the old behavior (where we looped until we had * drained the queue). */ -static int send_batch_count = 64; +static int send_batch_count = SZ_1K; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); @@ -223,7 +224,7 @@ restart: * through a lot of messages, lets back off and see * if anyone else jumps in */ - if (batch_count >= 1024) + if (batch_count >= send_batch_count) goto over_batch; spin_lock_irqsave(&conn->c_lock, flags); @@ -423,12 +424,15 @@ over_batch: !list_empty(&conn->c_send_queue)) && send_gen == conn->c_send_gen) { rds_stats_inc(s_send_lock_queue_raced); - goto restart; + if (batch_count < send_batch_count) + goto restart; + queue_delayed_work(rds_wq, &conn->c_send_w, 1); } } out: return ret; } +EXPORT_SYMBOL_GPL(rds_send_xmit); static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) { @@ -1122,8 +1126,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) */ rds_stats_inc(s_send_queued); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - rds_send_xmit(conn); + ret = rds_send_xmit(conn); + if (ret == -ENOMEM || ret == -EAGAIN) + queue_delayed_work(rds_wq, &conn->c_send_w, 1); rds_message_put(rm); return payload_len; @@ -1179,8 +1184,8 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + /* schedule the send work on rds_wq */ + queue_delayed_work(rds_wq, &conn->c_send_w, 1); rds_message_put(rm); return 0; diff --git a/net/rds/tcp.c b/net/rds/tcp.c index c42b60bf4..9d6ddbacd 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -67,21 +67,13 @@ void rds_tcp_nonagle(struct socket *sock) set_fs(oldfs); } +/* All module specific customizations to the RDS-TCP socket should be done in + * rds_tcp_tune() and applied after socket creation. In general these + * customizations should be tunable via module_param() + */ void rds_tcp_tune(struct socket *sock) { - struct sock *sk = sock->sk; - rds_tcp_nonagle(sock); - - /* - * We're trying to saturate gigabit with the default, - * see svc_sock_setbufsize(). - */ - lock_sock(sk); - sk->sk_sndbuf = RDS_TCP_DEFAULT_BUFSIZE; - sk->sk_rcvbuf = RDS_TCP_DEFAULT_BUFSIZE; - sk->sk_userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK; - release_sock(sk); } u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc) diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index 444d78d0b..0936a4a32 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -110,28 +110,27 @@ int rds_tcp_accept_one(struct socket *sock) goto out; } /* An incoming SYN request came in, and TCP just accepted it. - * We always create a new conn for listen side of TCP, and do not - * add it to the c_hash_list. * * If the client reboots, this conn will need to be cleaned up. * rds_tcp_state_change() will do that cleanup */ rs_tcp = (struct rds_tcp_connection *)conn->c_transport_data; - WARN_ON(!rs_tcp || rs_tcp->t_sock); - - /* - * see the comment above rds_queue_delayed_reconnect() - */ - if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) { - if (rds_conn_state(conn) == RDS_CONN_UP) - rds_tcp_stats_inc(s_tcp_listen_closed_stale); - else - rds_tcp_stats_inc(s_tcp_connect_raced); - rds_conn_drop(conn); + if (rs_tcp->t_sock && + ntohl(inet->inet_saddr) < ntohl(inet->inet_daddr)) { + struct sock *nsk = new_sock->sk; + + nsk->sk_user_data = NULL; + nsk->sk_prot->disconnect(nsk, 0); + tcp_done(nsk); + new_sock = NULL; ret = 0; goto out; + } else if (rs_tcp->t_sock) { + rds_tcp_restore_callbacks(rs_tcp->t_sock, rs_tcp); + conn->c_outgoing = 0; } + rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING); rds_tcp_set_callbacks(new_sock, conn); rds_connect_complete(conn); new_sock = NULL; diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index 53b17ca0d..2894e6095 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -83,6 +83,7 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, struct rds_tcp_connection *tc = conn->c_transport_data; int done = 0; int ret = 0; + int more; if (hdr_off == 0) { /* @@ -116,12 +117,15 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, goto out; } + more = rm->data.op_nents > 1 ? (MSG_MORE | MSG_SENDPAGE_NOTLAST) : 0; while (sg < rm->data.op_nents) { + int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; + ret = tc->t_sock->ops->sendpage(tc->t_sock, sg_page(&rm->data.op_sg[sg]), rm->data.op_sg[sg].offset + off, rm->data.op_sg[sg].length - off, - MSG_DONTWAIT|MSG_NOSIGNAL); + flags); rdsdebug("tcp sendpage %p:%u:%u ret %d\n", (void *)sg_page(&rm->data.op_sg[sg]), rm->data.op_sg[sg].offset + off, rm->data.op_sg[sg].length - off, ret); @@ -134,6 +138,8 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, off = 0; sg++; } + if (sg == rm->data.op_nents - 1) + more = 0; } out: diff --git a/net/rds/threads.c b/net/rds/threads.c index dc2402e87..454aa6d23 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -162,7 +162,9 @@ void rds_send_worker(struct work_struct *work) int ret; if (rds_conn_state(conn) == RDS_CONN_UP) { + clear_bit(RDS_LL_SEND_FULL, &conn->c_flags); ret = rds_send_xmit(conn); + cond_resched(); rdsdebug("conn %p ret %d\n", conn, ret); switch (ret) { case -EAGAIN: |