diff options
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r-- | net/sunrpc/xprtsock.c | 274 |
1 files changed, 189 insertions, 85 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1a85e0ed0..2ffaf6a79 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i int flags = XS_SENDMSG_FLAGS; remainder -= len; - if (remainder != 0 || more) + if (more) flags |= MSG_MORE; + if (remainder != 0) + flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE; err = do_sendpage(sock, *ppage, base, len, flags); if (remainder == 0 || err != len) break; @@ -396,7 +398,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, if (unlikely(!sock)) return -ENOTSOCK; - clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); + clear_bit(SOCKWQ_ASYNC_NOSPACE, &sock->flags); if (base != 0) { addr = NULL; addrlen = 0; @@ -440,7 +442,7 @@ static void xs_nospace_callback(struct rpc_task *task) struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); transport->inet->sk_write_pending--; - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); + clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags); } /** @@ -465,7 +467,7 @@ static int xs_nospace(struct rpc_task *task) /* Don't race with disconnect */ if (xprt_connected(xprt)) { - if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) { + if (test_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags)) { /* * Notify TCP that we're limited by the application * window size @@ -476,7 +478,7 @@ static int xs_nospace(struct rpc_task *task) xprt_wait_for_buffer_space(task, xs_nospace_callback); } } else { - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); + clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags); ret = -ENOTCONN; } @@ -624,7 +626,7 @@ process_status: case -EPERM: /* When the server has died, an ICMP port unreachable message * prompts ECONNREFUSED. */ - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); + clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags); } return status; @@ -713,7 +715,7 @@ static int xs_tcp_send_request(struct rpc_task *task) case -EADDRINUSE: case -ENOBUFS: case -EPIPE: - clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); + clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags); } return status; @@ -823,6 +825,7 @@ static void xs_reset_transport(struct sock_xprt *transport) kernel_sock_shutdown(sock, SHUT_RDWR); + mutex_lock(&transport->recv_mutex); write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -833,6 +836,7 @@ static void xs_reset_transport(struct sock_xprt *transport) xprt_clear_connected(xprt); write_unlock_bh(&sk->sk_callback_lock); xs_sock_reset_connection_flags(xprt); + mutex_unlock(&transport->recv_mutex); trace_rpc_socket_close(xprt, sock); sock_release(sock); @@ -886,6 +890,7 @@ static void xs_destroy(struct rpc_xprt *xprt) cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); + cancel_work_sync(&transport->recv_worker); xs_xprt_free(xprt); module_put(THIS_MODULE); } @@ -906,44 +911,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) } /** - * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets - * @sk: socket with data to read + * xs_local_data_read_skb + * @xprt: transport + * @sk: socket + * @skb: skbuff * * Currently this assumes we can read the whole reply in a single gulp. */ -static void xs_local_data_ready(struct sock *sk) +static void xs_local_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: %s...\n", __func__); - xprt = xprt_from_sock(sk); - if (xprt == NULL) - goto out; - - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) - goto out; - repsize = skb->len - sizeof(rpc_fraghdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -961,50 +958,68 @@ static void xs_local_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: - read_unlock_bh(&sk->sk_callback_lock); + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_local_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_local_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_local_data_receive(transport); } /** - * xs_udp_data_ready - "data ready" callback for UDP sockets - * @sk: socket with data to read + * xs_udp_data_read_skb - receive callback for UDP sockets + * @xprt: transport + * @sk: socket + * @skb: skbuff * */ -static void xs_udp_data_ready(struct sock *sk) +static void xs_udp_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: xs_udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) - goto out; - - if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - goto out; - repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d!\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(struct udphdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -1025,10 +1040,54 @@ static void xs_udp_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_udp_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_udp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_udp_data_receive(transport); +} + +/** + * xs_data_ready - "data ready" callback for UDP sockets + * @sk: socket with data to read + * + */ +static void xs_data_ready(struct sock *sk) +{ + struct rpc_xprt *xprt; + + read_lock_bh(&sk->sk_callback_lock); + dprintk("RPC: xs_data_ready...\n"); + xprt = xprt_from_sock(sk); + if (xprt != NULL) { + struct sock_xprt *transport = container_of(xprt, + struct sock_xprt, xprt); + queue_work(rpciod_workqueue, &transport->recv_worker); + } read_unlock_bh(&sk->sk_callback_lock); } @@ -1243,12 +1302,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_rqst(xprt, transport->tcp_xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return -1; } @@ -1257,7 +1316,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1277,10 +1336,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct rpc_rqst *req; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_bc_request(xprt, transport->tcp_xid); if (req == NULL) { - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); return -1; @@ -1291,7 +1350,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_bc_request(req, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1306,6 +1365,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, xs_tcp_read_reply(xprt, desc) : xs_tcp_read_callback(xprt, desc); } + +static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) +{ + int ret; + + ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0, + SVC_SOCK_ANONYMOUS); + if (ret < 0) + return ret; + return 0; +} #else static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) @@ -1391,6 +1461,44 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns return len - desc.count; } +static void xs_tcp_data_receive(struct sock_xprt *transport) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct sock *sk; + read_descriptor_t rd_desc = { + .count = 2*1024*1024, + .arg.data = xprt, + }; + unsigned long total = 0; + int read = 0; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + + /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ + for (;;) { + lock_sock(sk); + read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + release_sock(sk); + if (read <= 0) + break; + total += read; + rd_desc.count = 65536; + } +out: + mutex_unlock(&transport->recv_mutex); + trace_xs_tcp_data_ready(xprt, read, total); +} + +static void xs_tcp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_tcp_data_receive(transport); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read @@ -1398,34 +1506,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns */ static void xs_tcp_data_ready(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; - read_descriptor_t rd_desc; - int read; - unsigned long total = 0; dprintk("RPC: xs_tcp_data_ready...\n"); read_lock_bh(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) { - read = 0; + if (!(xprt = xprt_from_sock(sk))) goto out; - } + transport = container_of(xprt, struct sock_xprt, xprt); + /* Any data means we had a useful conversation, so * the we don't need to delay the next reconnect */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; + queue_work(rpciod_workqueue, &transport->recv_worker); - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - rd_desc.arg.data = xprt; - do { - rd_desc.count = 65536; - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (read > 0) - total += read; - } while (read > 0); out: - trace_xs_tcp_data_ready(xprt, read, total); read_unlock_bh(&sk->sk_callback_lock); } @@ -1520,7 +1618,7 @@ static void xs_write_space(struct sock *sk) if (unlikely(!(xprt = xprt_from_sock(sk)))) return; - if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) + if (test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &sock->flags) == 0) return; xprt_write_space(xprt); @@ -1873,7 +1971,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_local_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_NOIO; @@ -2059,7 +2157,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_udp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_allocation = GFP_NOIO; @@ -2472,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct svc_xprt *xprt; - u32 len; + int len; dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); /* @@ -2580,6 +2678,12 @@ static struct rpc_xprt_ops xs_tcp_ops = { .enable_swap = xs_enable_swap, .disable_swap = xs_disable_swap, .inject_disconnect = xs_inject_disconnect, +#ifdef CONFIG_SUNRPC_BACKCHANNEL + .bc_setup = xprt_setup_bc, + .bc_up = xs_tcp_bc_up, + .bc_free_rqst = xprt_free_bc_rqst, + .bc_destroy = xprt_destroy_bc, +#endif }; /* @@ -2650,6 +2754,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, } new = container_of(xprt, struct sock_xprt, xprt); + mutex_init(&new->recv_mutex); memcpy(&xprt->addr, args->dstaddr, args->addrlen); xprt->addrlen = args->addrlen; if (args->srcaddr) @@ -2703,6 +2808,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; + INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); @@ -2774,21 +2880,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); break; default: @@ -2853,21 +2958,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->ops = &xs_tcp_ops; xprt->timeout = &xs_tcp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); break; default: |