2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #define __KERNEL_SYSCALLS__
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
56 #include <linux/net.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
64 #include <net/checksum.h>
68 #include <asm/uaccess.h>
75 # undef RPC_DEBUG_DATA
76 # define RPCDBG_FACILITY RPCDBG_XPRT
79 #define XPRT_MAX_BACKOFF (8)
84 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
85 static void do_xprt_transmit(struct rpc_task *);
86 static inline void do_xprt_reserve(struct rpc_task *);
87 static void xprt_disconnect(struct rpc_xprt *);
88 static void xprt_connect_status(struct rpc_task *task);
89 static struct socket *xprt_create_socket(int, struct rpc_timeout *, int);
90 static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
91 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
95 * Print the buffer contents (first 128 bytes only--just enough for
99 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
101 u8 *buf = (u8 *) packet;
104 dprintk("RPC: %s\n", msg);
105 for (j = 0; j < count && j < 128; j += 4) {
109 dprintk("0x%04x ", j);
111 dprintk("%02x%02x%02x%02x ",
112 buf[j], buf[j+1], buf[j+2], buf[j+3]);
118 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
125 * Look up RPC transport given an INET socket
127 static inline struct rpc_xprt *
128 xprt_from_sock(struct sock *sk)
130 return (struct rpc_xprt *) sk->user_data;
134 * Serialize write access to sockets, in order to prevent different
135 * requests from interfering with each other.
136 * Also prevents TCP socket connections from colliding with writes.
139 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
141 struct rpc_rqst *req = task->tk_rqstp;
142 if (!xprt->snd_task) {
143 if (xprt->nocong || __xprt_get_cong(xprt, task)) {
144 xprt->snd_task = task;
146 req->rq_bytes_sent = 0;
151 if (xprt->snd_task != task) {
152 dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
153 task->tk_timeout = 0;
154 task->tk_status = -EAGAIN;
155 if (req && req->rq_ntrans)
156 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
158 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
160 return xprt->snd_task == task;
164 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
167 spin_lock_bh(&xprt->sock_lock);
168 retval = __xprt_lock_write(xprt, task);
169 spin_unlock_bh(&xprt->sock_lock);
174 __xprt_lock_write_next(struct rpc_xprt *xprt)
176 struct rpc_task *task;
180 task = rpc_wake_up_next(&xprt->resend);
182 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
184 task = rpc_wake_up_next(&xprt->sending);
188 if (xprt->nocong || __xprt_get_cong(xprt, task)) {
189 struct rpc_rqst *req = task->tk_rqstp;
190 xprt->snd_task = task;
192 req->rq_bytes_sent = 0;
199 * Releases the socket for use by other requests.
202 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
204 if (xprt->snd_task == task)
205 xprt->snd_task = NULL;
206 __xprt_lock_write_next(xprt);
210 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
212 spin_lock_bh(&xprt->sock_lock);
213 __xprt_release_write(xprt, task);
214 spin_unlock_bh(&xprt->sock_lock);
218 * Write data to socket.
221 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
223 struct socket *sock = xprt->sock;
225 struct xdr_buf *xdr = &req->rq_snd_buf;
226 struct iovec niv[MAX_IOVEC];
227 unsigned int niov, slen, skip;
234 xprt_pktdump("packet data:",
235 req->rq_svec->iov_base,
236 req->rq_svec->iov_len);
238 /* Dont repeat bytes */
239 skip = req->rq_bytes_sent;
240 slen = xdr->len - skip;
241 oldfs = get_fs(); set_fs(get_ds());
243 unsigned int slen_part, n;
245 niov = xdr_kmap(niv, xdr, skip);
251 msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
253 msg.msg_iovlen = niov;
254 msg.msg_name = (struct sockaddr *) &xprt->addr;
255 msg.msg_namelen = sizeof(xprt->addr);
256 msg.msg_control = NULL;
257 msg.msg_controllen = 0;
260 for (n = 0; n < niov; n++)
261 slen_part += niv[n].iov_len;
263 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
264 result = sock_sendmsg(sock, &msg, slen_part);
266 xdr_kunmap(xdr, skip, niov);
270 } while (result >= 0 && slen);
273 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
280 /* When the server has died, an ICMP port unreachable message
281 * prompts ECONNREFUSED.
288 /* connection broken */
293 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
299 * Van Jacobson congestion avoidance. Check if the congestion window
300 * overflowed. Put the task to sleep if this is the case.
303 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
305 struct rpc_rqst *req = task->tk_rqstp;
309 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
310 task->tk_pid, xprt->cong, xprt->cwnd);
311 if (RPCXPRT_CONGESTED(xprt))
314 xprt->cong += RPC_CWNDSCALE;
319 * Adjust the congestion window, and wake up the next task
320 * that has been sleeping due to congestion
323 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
328 xprt->cong -= RPC_CWNDSCALE;
329 __xprt_lock_write_next(xprt);
333 * Adjust RPC congestion window
334 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
337 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
342 if (result >= 0 && cwnd <= xprt->cong) {
343 /* The (cwnd >> 1) term makes sure
344 * the result gets rounded properly. */
345 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
346 if (cwnd > RPC_MAXCWND)
348 __xprt_lock_write_next(xprt);
349 } else if (result == -ETIMEDOUT) {
351 if (cwnd < RPC_CWNDSCALE)
352 cwnd = RPC_CWNDSCALE;
354 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
355 xprt->cong, xprt->cwnd, cwnd);
360 * Adjust timeout values etc for next retransmit
363 xprt_adjust_timeout(struct rpc_timeout *to)
365 if (to->to_retries > 0) {
366 if (to->to_exponential)
367 to->to_current <<= 1;
369 to->to_current += to->to_increment;
370 if (to->to_maxval && to->to_current >= to->to_maxval)
371 to->to_current = to->to_maxval;
373 if (to->to_exponential)
374 to->to_initval <<= 1;
376 to->to_initval += to->to_increment;
377 if (to->to_maxval && to->to_initval >= to->to_maxval)
378 to->to_initval = to->to_maxval;
379 to->to_current = to->to_initval;
382 if (!to->to_current) {
383 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
384 to->to_current = 5 * HZ;
386 pprintk("RPC: %lu %s\n", jiffies,
387 to->to_retries? "retrans" : "timeout");
388 return to->to_retries-- > 0;
392 * Close down a transport socket
395 xprt_close(struct rpc_xprt *xprt)
397 struct socket *sock = xprt->sock;
398 struct sock *sk = xprt->inet;
403 write_lock_bh(&sk->callback_lock);
407 sk->user_data = NULL;
408 sk->data_ready = xprt->old_data_ready;
409 sk->state_change = xprt->old_state_change;
410 sk->write_space = xprt->old_write_space;
411 write_unlock_bh(&sk->callback_lock);
413 xprt_disconnect(xprt);
420 * Mark a transport as disconnected
423 xprt_disconnect(struct rpc_xprt *xprt)
425 dprintk("RPC: disconnected transport %p\n", xprt);
426 spin_lock_bh(&xprt->sock_lock);
427 xprt_clear_connected(xprt);
428 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
429 spin_unlock_bh(&xprt->sock_lock);
433 * Reconnect a broken TCP connection.
437 xprt_connect(struct rpc_task *task)
439 struct rpc_xprt *xprt = task->tk_xprt;
440 struct socket *sock = xprt->sock;
444 dprintk("RPC: %4d xprt_connect %p connected %d\n",
445 task->tk_pid, xprt, xprt_connected(xprt));
449 if (!xprt->addr.sin_port) {
450 task->tk_status = -EIO;
454 if (!xprt_lock_write(xprt, task))
456 if (xprt_connected(xprt))
460 task->tk_rqstp->rq_bytes_sent = 0;
463 /* Create an unconnected socket */
464 sock = xprt_create_socket(xprt->prot, &xprt->timeout, xprt->resvport);
466 /* couldn't create socket or bind to reserved port;
467 * this is likely a permanent error, so cause an abort */
468 task->tk_status = -EIO;
471 xprt_bind_socket(xprt, sock);
478 /* Now connect it asynchronously. */
479 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
480 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
481 sizeof(xprt->addr), O_NONBLOCK);
482 dprintk("RPC: %4d connect status %d connected %d\n",
483 task->tk_pid, status, xprt_connected(xprt));
491 /* Protect against TCP socket state changes */
493 if (inet->state != TCP_ESTABLISHED) {
494 dprintk("RPC: %4d waiting for connection\n",
496 task->tk_timeout = RPC_CONNECT_TIMEOUT;
497 /* if the socket is already closing, delay briefly */
498 if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
499 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
500 rpc_sleep_on(&xprt->pending, task, xprt_connect_status,
508 if (!task->tk_client->cl_softrtry) {
509 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
510 task->tk_status = -ENOTCONN;
514 /* Report myriad other possible returns. If this file
515 * system is soft mounted, just error out, like Solaris. */
516 if (task->tk_client->cl_softrtry) {
518 "RPC: error %d connecting to server %s, exiting\n",
519 -status, task->tk_client->cl_server);
520 task->tk_status = -EIO;
523 printk(KERN_WARNING "RPC: error %d connecting to server %s\n",
524 -status, task->tk_client->cl_server);
525 /* This will prevent anybody else from connecting */
526 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
527 task->tk_status = status;
532 xprt_release_write(xprt, task);
536 * We arrive here when awoken from waiting on connection establishment.
539 xprt_connect_status(struct rpc_task *task)
541 struct rpc_xprt *xprt = task->tk_xprt;
543 if (task->tk_status >= 0) {
544 dprintk("RPC: %4d xprt_connect_status: connection established\n",
549 /* if soft mounted, cause this RPC to fail */
550 if (task->tk_client->cl_softrtry)
551 task->tk_status = -EIO;
553 switch (task->tk_status) {
555 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
558 dprintk("RPC: %4d xprt_connect_status: timed out\n",
562 printk(KERN_ERR "RPC: error %d connecting to server %s\n",
563 -task->tk_status, task->tk_client->cl_server);
565 xprt_release_write(xprt, task);
569 * Look up the RPC request corresponding to a reply, and then lock it.
571 static inline struct rpc_rqst *
572 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
574 struct list_head *pos;
575 struct rpc_rqst *req = NULL;
577 list_for_each(pos, &xprt->recv) {
578 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
579 if (entry->rq_xid == xid) {
588 * Complete reply received.
589 * The TCP code relies on us to remove the request from xprt->pending.
592 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
594 struct rpc_task *task = req->rq_task;
595 struct rpc_clnt *clnt = task->tk_client;
597 /* Adjust congestion window */
599 int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
600 xprt_adjust_cwnd(xprt, copied);
601 __xprt_put_cong(xprt, req);
602 if (req->rq_ntrans == 1) {
604 rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
606 rpc_set_timeo(&clnt->cl_rtt, timer, req->rq_ntrans - 1);
610 /* Profile only reads for now */
612 static unsigned long nextstat = 0;
613 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
616 pkt_len += req->rq_slen + copied;
617 pkt_rtt += jiffies - req->rq_xtime;
618 if (time_before(nextstat, jiffies)) {
619 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
620 printk("RPC: %ld %ld %ld %ld stat\n",
621 jiffies, pkt_cnt, pkt_len, pkt_rtt);
622 pkt_rtt = pkt_len = pkt_cnt = 0;
623 nextstat = jiffies + 5 * HZ;
628 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
629 req->rq_received = copied;
630 list_del_init(&req->rq_list);
632 /* ... and wake up the process. */
633 rpc_wake_up_task(task);
638 skb_read_bits(skb_reader_t *desc, void *to, size_t len)
640 if (len > desc->count)
642 skb_copy_bits(desc->skb, desc->offset, to, len);
649 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
651 unsigned int csum2, pos;
653 if (len > desc->count)
656 csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
657 desc->csum = csum_block_add(desc->csum, csum2, pos);
664 * We have set things up such that we perform the checksum of the UDP
665 * packet in parallel with the copies into the RPC client iovec. -DaveM
668 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
673 desc.offset = sizeof(struct udphdr);
674 desc.count = skb->len - desc.offset;
676 if (skb->ip_summed == CHECKSUM_UNNECESSARY)
679 desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
680 xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
681 if (desc.offset != skb->len) {
683 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
684 desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
686 if ((unsigned short)csum_fold(desc.csum))
690 xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
695 * Input handler for RPC replies. Called from a bottom half and hence
699 udp_data_ready(struct sock *sk, int len)
701 struct rpc_task *task;
702 struct rpc_xprt *xprt;
703 struct rpc_rqst *rovr;
705 int err, repsize, copied;
707 read_lock(&sk->callback_lock);
708 dprintk("RPC: udp_data_ready...\n");
709 if (sk->dead || !(xprt = xprt_from_sock(sk))) {
710 printk("RPC: udp_data_ready request not found!\n");
714 dprintk("RPC: udp_data_ready client %p\n", xprt);
716 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
722 repsize = skb->len - sizeof(struct udphdr);
724 printk("RPC: impossible RPC reply size %d!\n", repsize);
728 /* Look up and lock the request corresponding to the given XID */
729 spin_lock(&xprt->sock_lock);
730 rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
733 task = rovr->rq_task;
735 dprintk("RPC: %4d received reply\n", task->tk_pid);
736 xprt_pktdump("packet data:",
737 (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
739 if ((copied = rovr->rq_private_buf.len) > repsize)
742 /* Suck it into the iovec, verify checksum if not done by hw. */
743 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
746 /* Something worked... */
747 dst_confirm(skb->dst);
749 xprt_complete_rqst(xprt, rovr, copied);
752 spin_unlock(&xprt->sock_lock);
754 skb_free_datagram(sk, skb);
756 if (sk->sleep && waitqueue_active(sk->sleep))
757 wake_up_interruptible(sk->sleep);
758 read_unlock(&sk->callback_lock);
762 * Copy from an skb into memory and shrink the skb.
765 tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
767 if (len > desc->count)
769 skb_copy_bits(desc->skb, desc->offset, p, len);
776 * TCP read fragment marker
779 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
784 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
785 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
786 used = tcp_copy_data(desc, p, len);
787 xprt->tcp_offset += used;
790 xprt->tcp_reclen = ntohl(xprt->tcp_recm);
791 if (xprt->tcp_reclen & 0x80000000)
792 xprt->tcp_flags |= XPRT_LAST_FRAG;
794 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
795 xprt->tcp_reclen &= 0x7fffffff;
796 xprt->tcp_flags &= ~XPRT_COPY_RECM;
797 xprt->tcp_offset = 0;
798 /* Sanity check of the record length */
799 if (xprt->tcp_reclen < 4) {
800 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
801 xprt_disconnect(xprt);
803 dprintk("RPC: reading TCP record fragment of length %d\n",
808 tcp_check_recm(struct rpc_xprt *xprt)
810 if (xprt->tcp_offset == xprt->tcp_reclen) {
811 xprt->tcp_flags |= XPRT_COPY_RECM;
812 xprt->tcp_offset = 0;
813 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
814 xprt->tcp_flags &= ~XPRT_COPY_DATA;
815 xprt->tcp_flags |= XPRT_COPY_XID;
816 xprt->tcp_copied = 0;
825 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
830 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
831 dprintk("RPC: reading XID (%Zu bytes)\n", len);
832 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
833 used = tcp_copy_data(desc, p, len);
834 xprt->tcp_offset += used;
837 xprt->tcp_flags &= ~XPRT_COPY_XID;
838 xprt->tcp_flags |= XPRT_COPY_DATA;
839 xprt->tcp_copied = 4;
840 dprintk("RPC: reading reply for XID %08x\n", xprt->tcp_xid);
841 tcp_check_recm(xprt);
845 * TCP read and complete request
848 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
850 struct rpc_rqst *req;
851 struct xdr_buf *rcvbuf;
854 /* Find and lock the request corresponding to this xid */
855 spin_lock(&xprt->sock_lock);
856 req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
858 xprt->tcp_flags &= ~XPRT_COPY_DATA;
859 dprintk("RPC: XID %08x request not found!\n",
861 spin_unlock(&xprt->sock_lock);
865 rcvbuf = &req->rq_private_buf;
867 if (len > xprt->tcp_reclen - xprt->tcp_offset) {
868 skb_reader_t my_desc;
870 len = xprt->tcp_reclen - xprt->tcp_offset;
871 memcpy(&my_desc, desc, sizeof(my_desc));
873 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
874 &my_desc, tcp_copy_data);
878 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
879 desc, tcp_copy_data);
880 xprt->tcp_copied += len;
881 xprt->tcp_offset += len;
883 if (xprt->tcp_copied == req->rq_private_buf.len)
884 xprt->tcp_flags &= ~XPRT_COPY_DATA;
885 else if (xprt->tcp_offset == xprt->tcp_reclen) {
886 if (xprt->tcp_flags & XPRT_LAST_FRAG)
887 xprt->tcp_flags &= ~XPRT_COPY_DATA;
890 if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
891 dprintk("RPC: %4d received reply complete\n",
892 req->rq_task->tk_pid);
893 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
895 spin_unlock(&xprt->sock_lock);
896 tcp_check_recm(xprt);
900 * TCP discard extra bytes from a short read
903 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
907 len = xprt->tcp_reclen - xprt->tcp_offset;
908 if (len > desc->count)
912 xprt->tcp_offset += len;
913 tcp_check_recm(xprt);
917 * TCP record receive routine
918 * We first have to grab the record marker, then the XID, then the data.
921 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
922 unsigned int offset, size_t len)
924 struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
925 skb_reader_t desc = { skb, offset, len };
927 dprintk("RPC: tcp_data_recv\n");
929 /* Read in a new fragment marker if necessary */
930 /* Can we ever really expect to get completely empty fragments? */
931 if (xprt->tcp_flags & XPRT_COPY_RECM) {
932 tcp_read_fraghdr(xprt, &desc);
935 /* Read in the xid if necessary */
936 if (xprt->tcp_flags & XPRT_COPY_XID) {
937 tcp_read_xid(xprt, &desc);
940 /* Read in the request data */
941 if (xprt->tcp_flags & XPRT_COPY_DATA) {
942 tcp_read_request(xprt, &desc);
945 /* Skip over any trailing bytes on short reads */
946 tcp_read_discard(xprt, &desc);
947 } while (desc.count);
948 dprintk("RPC: tcp_data_recv done\n");
949 return len - desc.count;
952 static void tcp_data_ready(struct sock *sk, int bytes)
954 struct rpc_xprt *xprt;
955 read_descriptor_t rd_desc;
957 read_lock(&sk->callback_lock);
958 dprintk("RPC: tcp_data_ready...\n");
959 if (!(xprt = xprt_from_sock(sk))) {
960 printk("RPC: tcp_data_ready socket info not found!\n");
966 /* We use rd_desc to pass struct xprt to tcp_data_recv */
967 rd_desc.buf = (char *)xprt;
968 rd_desc.count = 65536;
969 tcp_read_sock(sk, &rd_desc, tcp_data_recv);
971 read_unlock(&sk->callback_lock);
975 tcp_state_change(struct sock *sk)
977 struct rpc_xprt *xprt;
979 read_lock(&sk->callback_lock);
980 if (!(xprt = xprt_from_sock(sk)))
982 dprintk("RPC: tcp_state_change client %p...\n", xprt);
983 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
984 sk->state, xprt_connected(xprt),
985 sk->dead, sk->zapped);
988 case TCP_ESTABLISHED:
989 if (xprt_test_and_set_connected(xprt))
992 /* Reset TCP record info */
993 xprt->tcp_offset = 0;
994 xprt->tcp_reclen = 0;
995 xprt->tcp_copied = 0;
996 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
998 spin_lock_bh(&xprt->sock_lock);
999 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1000 rpc_wake_up_task(xprt->snd_task);
1001 spin_unlock_bh(&xprt->sock_lock);
1007 xprt_disconnect(xprt);
1011 if (sk->sleep && waitqueue_active(sk->sleep))
1012 wake_up_interruptible_all(sk->sleep);
1013 read_unlock(&sk->callback_lock);
1017 * Called when more output buffer space is available for this socket.
1018 * We try not to wake our writers until they can make "significant"
1019 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1020 * with a bunch of small requests.
1023 xprt_write_space(struct sock *sk)
1025 struct rpc_xprt *xprt;
1026 struct socket *sock;
1028 read_lock(&sk->callback_lock);
1029 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1034 /* Wait until we have enough socket memory */
1036 /* from net/ipv4/tcp.c:tcp_write_space */
1037 if (tcp_wspace(sk) < tcp_min_write_space(sk))
1040 /* from net/core/sock.c:sock_def_write_space */
1041 if (!sock_writeable(sk))
1045 if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
1048 spin_lock_bh(&xprt->sock_lock);
1049 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1050 rpc_wake_up_task(xprt->snd_task);
1051 spin_unlock_bh(&xprt->sock_lock);
1052 if (sk->sleep && waitqueue_active(sk->sleep))
1053 wake_up_interruptible(sk->sleep);
1055 read_unlock(&sk->callback_lock);
1059 * RPC receive timeout handler.
1062 xprt_timer(struct rpc_task *task)
1064 struct rpc_rqst *req = task->tk_rqstp;
1065 struct rpc_xprt *xprt = req->rq_xprt;
1067 spin_lock(&xprt->sock_lock);
1068 if (req->rq_received)
1071 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1072 __xprt_put_cong(xprt, req);
1074 dprintk("RPC: %4d xprt_timer (%s request)\n",
1075 task->tk_pid, req ? "pending" : "backlogged");
1077 task->tk_status = -ETIMEDOUT;
1079 task->tk_timeout = 0;
1080 rpc_wake_up_task(task);
1081 spin_unlock(&xprt->sock_lock);
1085 * Place the actual RPC call.
1086 * We have to copy the iovec because sendmsg fiddles with its contents.
1089 xprt_transmit(struct rpc_task *task)
1091 struct rpc_rqst *req = task->tk_rqstp;
1092 struct rpc_xprt *xprt = req->rq_xprt;
1094 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1095 *(u32 *)(req->rq_svec[0].iov_base));
1098 task->tk_status = -EIO;
1100 if (task->tk_status < 0)
1103 if (task->tk_rpcwait)
1104 rpc_remove_wait_queue(task);
1106 /* set up everything as needed. */
1107 /* Write the record marker */
1109 u32 *marker = req->rq_svec[0].iov_base;
1111 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1114 spin_lock_bh(&xprt->sock_lock);
1115 if (req->rq_received != 0 && !req->rq_bytes_sent)
1118 if (!__xprt_lock_write(xprt, task))
1121 if (!xprt_connected(xprt)) {
1122 task->tk_status = -ENOTCONN;
1126 if (list_empty(&req->rq_list)) {
1127 /* Update the softirq receive buffer */
1128 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1129 sizeof(req->rq_private_buf));
1130 list_add_tail(&req->rq_list, &xprt->recv);
1132 spin_unlock_bh(&xprt->sock_lock);
1134 do_xprt_transmit(task);
1137 spin_unlock_bh(&xprt->sock_lock);
1141 do_xprt_transmit(struct rpc_task *task)
1143 struct rpc_clnt *clnt = task->tk_client;
1144 struct rpc_rqst *req = task->tk_rqstp;
1145 struct rpc_xprt *xprt = req->rq_xprt;
1146 int status, retry = 0;
1149 /* Continue transmitting the packet/record. We must be careful
1150 * to cope with writespace callbacks arriving _after_ we have
1151 * called xprt_sendmsg().
1154 req->rq_xtime = jiffies;
1155 status = xprt_sendmsg(xprt, req);
1161 req->rq_bytes_sent += status;
1163 /* If we've sent the entire packet, immediately
1164 * reset the count of bytes sent. */
1165 if (req->rq_bytes_sent >= req->rq_slen) {
1166 req->rq_bytes_sent = 0;
1170 if (status >= req->rq_slen)
1176 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1177 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1185 /* If we're doing a resend and have received a reply already,
1187 * Note, though, that we can't do this if we've already started
1188 * resending down a TCP stream.
1190 task->tk_status = status;
1194 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1195 /* Protect against races with xprt_write_space */
1196 spin_lock_bh(&xprt->sock_lock);
1197 /* Don't race with disconnect */
1198 if (!xprt_connected(xprt))
1199 task->tk_status = -ENOTCONN;
1200 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1201 task->tk_timeout = req->rq_timeout.to_current;
1202 rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1204 spin_unlock_bh(&xprt->sock_lock);
1207 /* Keep holding the socket if it is blocked */
1208 rpc_delay(task, HZ>>4);
1211 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
1212 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1217 xprt_disconnect(xprt);
1219 xprt_release_write(xprt, task);
1222 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1223 spin_lock_bh(&xprt->sock_lock);
1224 /* Set the task's receive timeout value */
1225 if (!xprt->nocong) {
1226 int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
1227 task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt, timer);
1228 task->tk_timeout <<= rpc_ntimeo(&clnt->cl_rtt, timer);
1229 task->tk_timeout <<= clnt->cl_timeout.to_retries
1230 - req->rq_timeout.to_retries;
1231 if (task->tk_timeout > req->rq_timeout.to_maxval)
1232 task->tk_timeout = req->rq_timeout.to_maxval;
1234 task->tk_timeout = req->rq_timeout.to_current;
1235 /* Don't race with disconnect */
1236 if (!xprt_connected(xprt))
1237 task->tk_status = -ENOTCONN;
1238 else if (!req->rq_received)
1239 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1240 __xprt_release_write(xprt, task);
1241 spin_unlock_bh(&xprt->sock_lock);
1245 * Reserve an RPC call slot.
1248 do_xprt_reserve(struct rpc_task *task)
1250 struct rpc_xprt *xprt = task->tk_xprt;
1252 task->tk_status = 0;
1256 struct rpc_rqst *req = xprt->free;
1257 xprt->free = req->rq_next;
1258 req->rq_next = NULL;
1259 task->tk_rqstp = req;
1260 xprt_request_init(task, xprt);
1263 dprintk("RPC: waiting for request slot\n");
1264 task->tk_status = -EAGAIN;
1265 task->tk_timeout = 0;
1266 rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1270 xprt_reserve(struct rpc_task *task)
1272 struct rpc_xprt *xprt = task->tk_xprt;
1274 task->tk_status = -EIO;
1275 if (!xprt->shutdown) {
1276 spin_lock(&xprt->xprt_lock);
1277 do_xprt_reserve(task);
1278 spin_unlock(&xprt->xprt_lock);
1283 * Allocate a 'unique' XID
1286 xprt_alloc_xid(void)
1288 static spinlock_t xid_lock = SPIN_LOCK_UNLOCKED;
1289 static int need_init = 1;
1293 spin_lock(&xid_lock);
1294 if (unlikely(need_init)) {
1295 xid = CURRENT_TIME << 12;
1299 spin_unlock(&xid_lock);
1304 * Initialize RPC request
1307 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1309 struct rpc_rqst *req = task->tk_rqstp;
1311 req->rq_timeout = xprt->timeout;
1312 req->rq_task = task;
1313 req->rq_xprt = xprt;
1314 req->rq_xid = xprt_alloc_xid();
1315 INIT_LIST_HEAD(&req->rq_list);
1316 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
1321 * Release an RPC call slot
1324 xprt_release(struct rpc_task *task)
1326 struct rpc_xprt *xprt = task->tk_xprt;
1327 struct rpc_rqst *req;
1329 if (!(req = task->tk_rqstp))
1331 spin_lock_bh(&xprt->sock_lock);
1332 __xprt_release_write(xprt, task);
1333 __xprt_put_cong(xprt, req);
1334 if (!list_empty(&req->rq_list))
1335 list_del(&req->rq_list);
1336 spin_unlock_bh(&xprt->sock_lock);
1337 task->tk_rqstp = NULL;
1338 memset(req, 0, sizeof(*req)); /* mark unused */
1340 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1342 spin_lock(&xprt->xprt_lock);
1343 req->rq_next = xprt->free;
1346 xprt_clear_backlog(xprt);
1347 spin_unlock(&xprt->xprt_lock);
1351 * Set default timeout parameters
1354 xprt_default_timeout(struct rpc_timeout *to, int proto)
1356 if (proto == IPPROTO_UDP)
1357 xprt_set_timeout(to, 5, 5 * HZ);
1359 xprt_set_timeout(to, 5, 60 * HZ);
1363 * Set constant timeout
1366 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1370 to->to_increment = incr;
1371 to->to_maxval = incr * retr;
1372 to->to_retries = retr;
1373 to->to_exponential = 0;
1377 * Initialize an RPC client
1379 static struct rpc_xprt *
1380 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
1382 struct rpc_xprt *xprt;
1383 struct rpc_rqst *req;
1386 dprintk("RPC: setting up %s transport...\n",
1387 proto == IPPROTO_UDP? "UDP" : "TCP");
1389 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1391 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1395 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1397 xprt->cwnd = RPC_MAXCWND;
1400 xprt->cwnd = RPC_INITCWND;
1401 spin_lock_init(&xprt->sock_lock);
1402 spin_lock_init(&xprt->xprt_lock);
1403 init_waitqueue_head(&xprt->cong_wait);
1405 INIT_LIST_HEAD(&xprt->recv);
1407 /* Set timeout parameters */
1409 xprt->timeout = *to;
1410 xprt->timeout.to_current = to->to_initval;
1412 xprt_default_timeout(&xprt->timeout, xprt->prot);
1414 INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
1415 INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
1416 INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
1417 INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
1419 /* initialize free list */
1420 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1421 req->rq_next = req + 1;
1422 req->rq_next = NULL;
1423 xprt->free = xprt->slot;
1425 /* Check whether we want to use a reserved port */
1426 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1428 dprintk("RPC: created transport %p\n", xprt);
1434 * Bind to a reserved port
1437 xprt_bindresvport(struct socket *sock)
1439 struct sockaddr_in myaddr;
1441 kernel_cap_t saved_cap = current->cap_effective;
1443 /* Override capabilities.
1444 * They were checked in xprt_create_proto i.e. at mount time
1446 cap_raise (current->cap_effective, CAP_NET_BIND_SERVICE);
1448 memset(&myaddr, 0, sizeof(myaddr));
1449 myaddr.sin_family = AF_INET;
1452 myaddr.sin_port = htons(port);
1453 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1455 } while (err == -EADDRINUSE && --port > 0);
1456 current->cap_effective = saved_cap;
1459 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1465 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1467 struct sock *sk = sock->sk;
1472 write_lock_bh(&sk->callback_lock);
1473 sk->user_data = xprt;
1474 xprt->old_data_ready = sk->data_ready;
1475 xprt->old_state_change = sk->state_change;
1476 xprt->old_write_space = sk->write_space;
1477 if (xprt->prot == IPPROTO_UDP) {
1478 sk->data_ready = udp_data_ready;
1479 sk->no_check = UDP_CSUM_NORCV;
1480 xprt_set_connected(xprt);
1482 struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp);
1483 tp->nonagle = 1; /* disable Nagle's algorithm */
1484 sk->data_ready = tcp_data_ready;
1485 sk->state_change = tcp_state_change;
1486 xprt_clear_connected(xprt);
1488 sk->write_space = xprt_write_space;
1490 /* Reset to new socket */
1493 write_unlock_bh(&sk->callback_lock);
1499 * Set socket buffer length
1502 xprt_sock_setbufsize(struct rpc_xprt *xprt)
1504 struct sock *sk = xprt->inet;
1508 if (xprt->rcvsize) {
1509 sk->userlocks |= SOCK_RCVBUF_LOCK;
1510 sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
1512 if (xprt->sndsize) {
1513 sk->userlocks |= SOCK_SNDBUF_LOCK;
1514 sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
1515 sk->write_space(sk);
1520 * Create a client socket given the protocol and peer address.
1522 static struct socket *
1523 xprt_create_socket(int proto, struct rpc_timeout *to, int resvport)
1525 struct socket *sock;
1528 dprintk("RPC: xprt_create_socket(%s %d)\n",
1529 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1531 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1533 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1534 printk("RPC: can't create socket (%d).\n", -err);
1538 /* bind to a reserved port */
1539 if (resvport && xprt_bindresvport(sock) < 0)
1550 * Create an RPC client transport given the protocol and peer address.
1553 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1555 struct rpc_xprt *xprt;
1557 xprt = xprt_setup(proto, sap, to);
1561 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt);
1564 dprintk("RPC: xprt_create_proto failed\n");
1571 * Prepare for transport shutdown.
1574 xprt_shutdown(struct rpc_xprt *xprt)
1577 rpc_wake_up(&xprt->sending);
1578 rpc_wake_up(&xprt->resend);
1579 rpc_wake_up(&xprt->pending);
1580 rpc_wake_up(&xprt->backlog);
1581 if (waitqueue_active(&xprt->cong_wait))
1582 wake_up(&xprt->cong_wait);
1586 * Clear the xprt backlog queue
1589 xprt_clear_backlog(struct rpc_xprt *xprt) {
1590 rpc_wake_up_next(&xprt->backlog);
1591 if (waitqueue_active(&xprt->cong_wait))
1592 wake_up(&xprt->cong_wait);
1597 * Destroy an RPC transport, killing off all requests.
1600 xprt_destroy(struct rpc_xprt *xprt)
1602 dprintk("RPC: destroying transport %p\n", xprt);
1603 xprt_shutdown(xprt);