port more changes to make PCI work
[linux-2.4.git] / net / sunrpc / xprt.c
1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -   When a process places a call, it allocates a request slot if
10  *      one is available. Otherwise, it sleeps on the backlog queue
11  *      (xprt_reserve).
12  *  -   Next, the caller puts together the RPC message, stuffs it into
13  *      the request struct, and calls xprt_call().
14  *  -   xprt_call transmits the message and installs the caller on the
15  *      socket's wait list. At the same time, it installs a timer that
16  *      is run after the packet's timeout has expired.
17  *  -   When a packet arrives, the data_ready handler walks the list of
18  *      pending requests for that socket. If a matching XID is found, the
19  *      caller is woken up, and the timer removed.
20  *  -   When no reply arrives within the timeout interval, the timer is
21  *      fired by the kernel and runs xprt_timer(). It either adjusts the
22  *      timeout values (minor timeout) or wakes up the caller with a status
23  *      of -ETIMEDOUT.
24  *  -   When the caller receives a notification from RPC that a reply arrived,
25  *      it should release the RPC slot, and process the reply.
26  *      If the call timed out, it may choose to retry the operation by
27  *      adjusting the initial timeout value, and simply calling rpc_call
28  *      again.
29  *
30  *  Support for async RPC is done through a set of RPC-specific scheduling
31  *  primitives that `transparently' work for processes as well as async
32  *  tasks that rely on callbacks.
33  *
34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35  *
36  *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37  *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38  *  TCP NFS related read + write fixes
39  *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40  *
41  *  Rewrite of larges part of the code in order to stabilize TCP stuff.
42  *  Fix behaviour when socket buffer is full.
43  *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44  */
45
46 #define __KERNEL_SYSCALLS__
47
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
55 #include <linux/in.h>
56 #include <linux/net.h>
57 #include <linux/mm.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
62
63 #include <net/sock.h>
64 #include <net/checksum.h>
65 #include <net/udp.h>
66 #include <net/tcp.h>
67
68 #include <asm/uaccess.h>
69
70 /*
71  * Local variables
72  */
73
74 #ifdef RPC_DEBUG
75 # undef  RPC_DEBUG_DATA
76 # define RPCDBG_FACILITY        RPCDBG_XPRT
77 #endif
78
79 #define XPRT_MAX_BACKOFF        (8)
80
81 /*
82  * Local functions
83  */
84 static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
85 static void     do_xprt_transmit(struct rpc_task *);
86 static inline void      do_xprt_reserve(struct rpc_task *);
87 static void     xprt_disconnect(struct rpc_xprt *);
88 static void     xprt_connect_status(struct rpc_task *task);
89 static struct socket *xprt_create_socket(int, struct rpc_timeout *, int);
90 static int      xprt_bind_socket(struct rpc_xprt *, struct socket *);
91 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
92
93 #ifdef RPC_DEBUG_DATA
94 /*
95  * Print the buffer contents (first 128 bytes only--just enough for
96  * diropres return).
97  */
98 static void
99 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
100 {
101         u8      *buf = (u8 *) packet;
102         int     j;
103
104         dprintk("RPC:      %s\n", msg);
105         for (j = 0; j < count && j < 128; j += 4) {
106                 if (!(j & 31)) {
107                         if (j)
108                                 dprintk("\n");
109                         dprintk("0x%04x ", j);
110                 }
111                 dprintk("%02x%02x%02x%02x ",
112                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
113         }
114         dprintk("\n");
115 }
116 #else
117 static inline void
118 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
119 {
120         /* NOP */
121 }
122 #endif
123
124 /*
125  * Look up RPC transport given an INET socket
126  */
127 static inline struct rpc_xprt *
128 xprt_from_sock(struct sock *sk)
129 {
130         return (struct rpc_xprt *) sk->user_data;
131 }
132
133 /*
134  * Serialize write access to sockets, in order to prevent different
135  * requests from interfering with each other.
136  * Also prevents TCP socket connections from colliding with writes.
137  */
138 static int
139 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
140 {
141         struct rpc_rqst *req = task->tk_rqstp;
142         if (!xprt->snd_task) {
143                 if (xprt->nocong || __xprt_get_cong(xprt, task)) {
144                         xprt->snd_task = task;
145                         if (req) {
146                                 req->rq_bytes_sent = 0;
147                                 req->rq_ntrans++;
148                         }
149                 }
150         }
151         if (xprt->snd_task != task) {
152                 dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
153                 task->tk_timeout = 0;
154                 task->tk_status = -EAGAIN;
155                 if (req && req->rq_ntrans)
156                         rpc_sleep_on(&xprt->resend, task, NULL, NULL);
157                 else
158                         rpc_sleep_on(&xprt->sending, task, NULL, NULL);
159         }
160         return xprt->snd_task == task;
161 }
162
163 static inline int
164 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
165 {
166         int retval;
167         spin_lock_bh(&xprt->sock_lock);
168         retval = __xprt_lock_write(xprt, task);
169         spin_unlock_bh(&xprt->sock_lock);
170         return retval;
171 }
172
173 static void
174 __xprt_lock_write_next(struct rpc_xprt *xprt)
175 {
176         struct rpc_task *task;
177
178         if (xprt->snd_task)
179                 return;
180         task = rpc_wake_up_next(&xprt->resend);
181         if (!task) {
182                 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
183                         return;
184                 task = rpc_wake_up_next(&xprt->sending);
185                 if (!task)
186                         return;
187         }
188         if (xprt->nocong || __xprt_get_cong(xprt, task)) {
189                 struct rpc_rqst *req = task->tk_rqstp;
190                 xprt->snd_task = task;
191                 if (req) {
192                         req->rq_bytes_sent = 0;
193                         req->rq_ntrans++;
194                 }
195         }
196 }
197
198 /*
199  * Releases the socket for use by other requests.
200  */
201 static void
202 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
203 {
204         if (xprt->snd_task == task)
205                 xprt->snd_task = NULL;
206         __xprt_lock_write_next(xprt);
207 }
208
209 static inline void
210 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
211 {
212         spin_lock_bh(&xprt->sock_lock);
213         __xprt_release_write(xprt, task);
214         spin_unlock_bh(&xprt->sock_lock);
215 }
216
217 /*
218  * Write data to socket.
219  */
220 static inline int
221 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
222 {
223         struct socket   *sock = xprt->sock;
224         struct msghdr   msg;
225         struct xdr_buf  *xdr = &req->rq_snd_buf;
226         struct iovec    niv[MAX_IOVEC];
227         unsigned int    niov, slen, skip;
228         mm_segment_t    oldfs;
229         int             result;
230
231         if (!sock)
232                 return -ENOTCONN;
233
234         xprt_pktdump("packet data:",
235                                 req->rq_svec->iov_base,
236                                 req->rq_svec->iov_len);
237
238         /* Dont repeat bytes */
239         skip = req->rq_bytes_sent;
240         slen = xdr->len - skip;
241         oldfs = get_fs(); set_fs(get_ds());
242         do {
243                 unsigned int slen_part, n;
244
245                 niov = xdr_kmap(niv, xdr, skip);
246                 if (!niov) {
247                         result = -EAGAIN;
248                         break;
249                 }
250
251                 msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
252                 msg.msg_iov     = niv;
253                 msg.msg_iovlen  = niov;
254                 msg.msg_name    = (struct sockaddr *) &xprt->addr;
255                 msg.msg_namelen = sizeof(xprt->addr);
256                 msg.msg_control = NULL;
257                 msg.msg_controllen = 0;
258
259                 slen_part = 0;
260                 for (n = 0; n < niov; n++)
261                         slen_part += niv[n].iov_len;
262
263                 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
264                 result = sock_sendmsg(sock, &msg, slen_part);
265
266                 xdr_kunmap(xdr, skip, niov);
267
268                 skip += slen_part;
269                 slen -= slen_part;
270         } while (result >= 0 && slen);
271         set_fs(oldfs);
272
273         dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
274
275         if (result >= 0)
276                 return result;
277
278         switch (result) {
279         case -ECONNREFUSED:
280                 /* When the server has died, an ICMP port unreachable message
281                  * prompts ECONNREFUSED.
282                  */
283         case -EAGAIN:
284                 break;
285         case -ECONNRESET:
286         case -ENOTCONN:
287         case -EPIPE:
288                 /* connection broken */
289                 if (xprt->stream)
290                         result = -ENOTCONN;
291                 break;
292         default:
293                 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
294         }
295         return result;
296 }
297
298 /*
299  * Van Jacobson congestion avoidance. Check if the congestion window
300  * overflowed. Put the task to sleep if this is the case.
301  */
302 static int
303 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
304 {
305         struct rpc_rqst *req = task->tk_rqstp;
306
307         if (req->rq_cong)
308                 return 1;
309         dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
310                         task->tk_pid, xprt->cong, xprt->cwnd);
311         if (RPCXPRT_CONGESTED(xprt))
312                 return 0;
313         req->rq_cong = 1;
314         xprt->cong += RPC_CWNDSCALE;
315         return 1;
316 }
317
318 /*
319  * Adjust the congestion window, and wake up the next task
320  * that has been sleeping due to congestion
321  */
322 static void
323 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
324 {
325         if (!req->rq_cong)
326                 return;
327         req->rq_cong = 0;
328         xprt->cong -= RPC_CWNDSCALE;
329         __xprt_lock_write_next(xprt);
330 }
331
332 /*
333  * Adjust RPC congestion window
334  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
335  */
336 static void
337 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
338 {
339         unsigned long   cwnd;
340
341         cwnd = xprt->cwnd;
342         if (result >= 0 && cwnd <= xprt->cong) {
343                 /* The (cwnd >> 1) term makes sure
344                  * the result gets rounded properly. */
345                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
346                 if (cwnd > RPC_MAXCWND)
347                         cwnd = RPC_MAXCWND;
348                 __xprt_lock_write_next(xprt);
349         } else if (result == -ETIMEDOUT) {
350                 cwnd >>= 1;
351                 if (cwnd < RPC_CWNDSCALE)
352                         cwnd = RPC_CWNDSCALE;
353         }
354         dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
355                         xprt->cong, xprt->cwnd, cwnd);
356         xprt->cwnd = cwnd;
357 }
358
359 /*
360  * Adjust timeout values etc for next retransmit
361  */
362 int
363 xprt_adjust_timeout(struct rpc_timeout *to)
364 {
365         if (to->to_retries > 0) {
366                 if (to->to_exponential)
367                         to->to_current <<= 1;
368                 else
369                         to->to_current += to->to_increment;
370                 if (to->to_maxval && to->to_current >= to->to_maxval)
371                         to->to_current = to->to_maxval;
372         } else {
373                 if (to->to_exponential)
374                         to->to_initval <<= 1;
375                 else
376                         to->to_initval += to->to_increment;
377                 if (to->to_maxval && to->to_initval >= to->to_maxval)
378                         to->to_initval = to->to_maxval;
379                 to->to_current = to->to_initval;
380         }
381
382         if (!to->to_current) {
383                 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
384                 to->to_current = 5 * HZ;
385         }
386         pprintk("RPC: %lu %s\n", jiffies,
387                         to->to_retries? "retrans" : "timeout");
388         return to->to_retries-- > 0;
389 }
390
391 /*
392  * Close down a transport socket
393  */
394 static void
395 xprt_close(struct rpc_xprt *xprt)
396 {
397         struct socket   *sock = xprt->sock;
398         struct sock     *sk = xprt->inet;
399
400         if (!sk)
401                 return;
402
403         write_lock_bh(&sk->callback_lock);
404         xprt->inet = NULL;
405         xprt->sock = NULL;
406
407         sk->user_data    = NULL;
408         sk->data_ready   = xprt->old_data_ready;
409         sk->state_change = xprt->old_state_change;
410         sk->write_space  = xprt->old_write_space;
411         write_unlock_bh(&sk->callback_lock);
412
413         xprt_disconnect(xprt);
414         sk->no_check     = 0;
415
416         sock_release(sock);
417 }
418
419 /*
420  * Mark a transport as disconnected
421  */
422 static void
423 xprt_disconnect(struct rpc_xprt *xprt)
424 {
425         dprintk("RPC:      disconnected transport %p\n", xprt);
426         spin_lock_bh(&xprt->sock_lock);
427         xprt_clear_connected(xprt);
428         rpc_wake_up_status(&xprt->pending, -ENOTCONN);
429         spin_unlock_bh(&xprt->sock_lock);
430 }
431
432 /*
433  * Reconnect a broken TCP connection.
434  *
435  */
436 void
437 xprt_connect(struct rpc_task *task)
438 {
439         struct rpc_xprt *xprt = task->tk_xprt;
440         struct socket   *sock = xprt->sock;
441         struct sock     *inet;
442         int             status;
443
444         dprintk("RPC: %4d xprt_connect %p connected %d\n",
445                                 task->tk_pid, xprt, xprt_connected(xprt));
446         if (xprt->shutdown)
447                 return;
448
449         if (!xprt->addr.sin_port) {
450                 task->tk_status = -EIO;
451                 return;
452         }
453
454         if (!xprt_lock_write(xprt, task))
455                 return;
456         if (xprt_connected(xprt))
457                 goto out_write;
458
459         if (task->tk_rqstp)
460                 task->tk_rqstp->rq_bytes_sent = 0;
461
462         xprt_close(xprt);
463         /* Create an unconnected socket */
464         sock = xprt_create_socket(xprt->prot, &xprt->timeout, xprt->resvport);
465         if (!sock) {
466                 /* couldn't create socket or bind to reserved port;
467                  * this is likely a permanent error, so cause an abort */
468                 task->tk_status = -EIO;
469                 goto out_write;
470         }
471         xprt_bind_socket(xprt, sock);
472
473         if (!xprt->stream)
474                 goto out_write;
475
476         inet = sock->sk;
477
478         /* Now connect it asynchronously. */
479         dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
480         status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
481                                 sizeof(xprt->addr), O_NONBLOCK);
482         dprintk("RPC: %4d connect status %d connected %d\n",
483                 task->tk_pid, status, xprt_connected(xprt));
484
485         if (status >= 0)
486                 return;
487
488         switch (status) {
489         case -EALREADY:
490         case -EINPROGRESS:
491                 /* Protect against TCP socket state changes */
492                 lock_sock(inet);
493                 if (inet->state != TCP_ESTABLISHED) {
494                         dprintk("RPC: %4d  waiting for connection\n",
495                                         task->tk_pid);
496                         task->tk_timeout = RPC_CONNECT_TIMEOUT;
497                         /* if the socket is already closing, delay briefly */
498                         if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
499                                 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
500                         rpc_sleep_on(&xprt->pending, task, xprt_connect_status,
501                                         NULL);
502                 }
503                 release_sock(inet);
504                 break;
505         case -ECONNREFUSED:
506         case -ECONNRESET:
507         case -ENOTCONN:
508                 if (!task->tk_client->cl_softrtry) {
509                         rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
510                         task->tk_status = -ENOTCONN;
511                         break;
512                 }
513         default:
514                 /* Report myriad other possible returns.  If this file
515                  * system is soft mounted, just error out, like Solaris.  */
516                 if (task->tk_client->cl_softrtry) {
517                         printk(KERN_WARNING
518                                         "RPC: error %d connecting to server %s, exiting\n",
519                                         -status, task->tk_client->cl_server);
520                         task->tk_status = -EIO;
521                         goto out_write;
522                 }
523                 printk(KERN_WARNING "RPC: error %d connecting to server %s\n",
524                                 -status, task->tk_client->cl_server);
525                 /* This will prevent anybody else from connecting */
526                 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
527                 task->tk_status = status;
528                 break;
529         }
530         return;
531  out_write:
532         xprt_release_write(xprt, task);
533 }
534
535 /*
536  * We arrive here when awoken from waiting on connection establishment.
537  */
538 static void
539 xprt_connect_status(struct rpc_task *task)
540 {
541         struct rpc_xprt *xprt = task->tk_xprt;
542
543         if (task->tk_status >= 0) {
544                 dprintk("RPC: %4d xprt_connect_status: connection established\n",
545                                 task->tk_pid);
546                 return;
547         }
548
549         /* if soft mounted, cause this RPC to fail */
550         if (task->tk_client->cl_softrtry)
551                 task->tk_status = -EIO;
552
553         switch (task->tk_status) {
554         case -ENOTCONN:
555                 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
556                 return;
557         case -ETIMEDOUT:
558                 dprintk("RPC: %4d xprt_connect_status: timed out\n",
559                                 task->tk_pid);
560                 break;
561         default:
562                 printk(KERN_ERR "RPC: error %d connecting to server %s\n",
563                                 -task->tk_status, task->tk_client->cl_server);
564         }
565         xprt_release_write(xprt, task);
566 }
567
568 /*
569  * Look up the RPC request corresponding to a reply, and then lock it.
570  */
571 static inline struct rpc_rqst *
572 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
573 {
574         struct list_head *pos;
575         struct rpc_rqst *req = NULL;
576
577         list_for_each(pos, &xprt->recv) {
578                 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
579                 if (entry->rq_xid == xid) {
580                         req = entry;
581                         break;
582                 }
583         }
584         return req;
585 }
586
587 /*
588  * Complete reply received.
589  * The TCP code relies on us to remove the request from xprt->pending.
590  */
591 static void
592 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
593 {
594         struct rpc_task *task = req->rq_task;
595         struct rpc_clnt *clnt = task->tk_client;
596
597         /* Adjust congestion window */
598         if (!xprt->nocong) {
599                 int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
600                 xprt_adjust_cwnd(xprt, copied);
601                 __xprt_put_cong(xprt, req);
602                 if (req->rq_ntrans == 1) {
603                         if (timer)
604                                 rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
605                 }
606                 rpc_set_timeo(&clnt->cl_rtt, timer, req->rq_ntrans - 1);
607         }
608
609 #ifdef RPC_PROFILE
610         /* Profile only reads for now */
611         if (copied > 1024) {
612                 static unsigned long    nextstat = 0;
613                 static unsigned long    pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
614
615                 pkt_cnt++;
616                 pkt_len += req->rq_slen + copied;
617                 pkt_rtt += jiffies - req->rq_xtime;
618                 if (time_before(nextstat, jiffies)) {
619                         printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
620                         printk("RPC: %ld %ld %ld %ld stat\n",
621                                         jiffies, pkt_cnt, pkt_len, pkt_rtt);
622                         pkt_rtt = pkt_len = pkt_cnt = 0;
623                         nextstat = jiffies + 5 * HZ;
624                 }
625         }
626 #endif
627
628         dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
629         req->rq_received = copied;
630         list_del_init(&req->rq_list);
631
632         /* ... and wake up the process. */
633         rpc_wake_up_task(task);
634         return;
635 }
636
637 static size_t
638 skb_read_bits(skb_reader_t *desc, void *to, size_t len)
639 {
640         if (len > desc->count)
641                 len = desc->count;
642         skb_copy_bits(desc->skb, desc->offset, to, len);
643         desc->count -= len;
644         desc->offset += len;
645         return len;
646 }
647
648 static size_t
649 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
650 {
651         unsigned int csum2, pos;
652
653         if (len > desc->count)
654                 len = desc->count;
655         pos = desc->offset;
656         csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
657         desc->csum = csum_block_add(desc->csum, csum2, pos);
658         desc->count -= len;
659         desc->offset += len;
660         return len;
661 }
662
663 /*
664  * We have set things up such that we perform the checksum of the UDP
665  * packet in parallel with the copies into the RPC client iovec.  -DaveM
666  */
667 static int
668 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
669 {
670         skb_reader_t desc;
671
672         desc.skb = skb;
673         desc.offset = sizeof(struct udphdr);
674         desc.count = skb->len - desc.offset;
675
676         if (skb->ip_summed == CHECKSUM_UNNECESSARY)
677                 goto no_checksum;
678
679         desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
680         xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
681         if (desc.offset != skb->len) {
682                 unsigned int csum2;
683                 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
684                 desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
685         }
686         if ((unsigned short)csum_fold(desc.csum))
687                 return -1;
688         return 0;
689 no_checksum:
690         xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
691         return 0;
692 }
693
694 /*
695  * Input handler for RPC replies. Called from a bottom half and hence
696  * atomic.
697  */
698 static void
699 udp_data_ready(struct sock *sk, int len)
700 {
701         struct rpc_task *task;
702         struct rpc_xprt *xprt;
703         struct rpc_rqst *rovr;
704         struct sk_buff  *skb;
705         int             err, repsize, copied;
706
707         read_lock(&sk->callback_lock);
708         dprintk("RPC:      udp_data_ready...\n");
709         if (sk->dead || !(xprt = xprt_from_sock(sk))) {
710                 printk("RPC:      udp_data_ready request not found!\n");
711                 goto out;
712         }
713
714         dprintk("RPC:      udp_data_ready client %p\n", xprt);
715
716         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
717                 goto out;
718
719         if (xprt->shutdown)
720                 goto dropit;
721
722         repsize = skb->len - sizeof(struct udphdr);
723         if (repsize < 4) {
724                 printk("RPC: impossible RPC reply size %d!\n", repsize);
725                 goto dropit;
726         }
727
728         /* Look up and lock the request corresponding to the given XID */
729         spin_lock(&xprt->sock_lock);
730         rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
731         if (!rovr)
732                 goto out_unlock;
733         task = rovr->rq_task;
734
735         dprintk("RPC: %4d received reply\n", task->tk_pid);
736         xprt_pktdump("packet data:",
737                      (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
738
739         if ((copied = rovr->rq_private_buf.len) > repsize)
740                 copied = repsize;
741
742         /* Suck it into the iovec, verify checksum if not done by hw. */
743         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
744                 goto out_unlock;
745
746         /* Something worked... */
747         dst_confirm(skb->dst);
748
749         xprt_complete_rqst(xprt, rovr, copied);
750
751  out_unlock:
752         spin_unlock(&xprt->sock_lock);
753  dropit:
754         skb_free_datagram(sk, skb);
755  out:
756         if (sk->sleep && waitqueue_active(sk->sleep))
757                 wake_up_interruptible(sk->sleep);
758         read_unlock(&sk->callback_lock);
759 }
760
761 /*
762  * Copy from an skb into memory and shrink the skb.
763  */
764 static inline size_t
765 tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
766 {
767         if (len > desc->count)
768                 len = desc->count;
769         skb_copy_bits(desc->skb, desc->offset, p, len);
770         desc->offset += len;
771         desc->count -= len;
772         return len;
773 }
774
775 /*
776  * TCP read fragment marker
777  */
778 static inline void
779 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
780 {
781         size_t len, used;
782         char *p;
783
784         p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
785         len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
786         used = tcp_copy_data(desc, p, len);
787         xprt->tcp_offset += used;
788         if (used != len)
789                 return;
790         xprt->tcp_reclen = ntohl(xprt->tcp_recm);
791         if (xprt->tcp_reclen & 0x80000000)
792                 xprt->tcp_flags |= XPRT_LAST_FRAG;
793         else
794                 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
795         xprt->tcp_reclen &= 0x7fffffff;
796         xprt->tcp_flags &= ~XPRT_COPY_RECM;
797         xprt->tcp_offset = 0;
798         /* Sanity check of the record length */
799         if (xprt->tcp_reclen < 4) {
800                 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
801                 xprt_disconnect(xprt);
802         }
803         dprintk("RPC:      reading TCP record fragment of length %d\n",
804                         xprt->tcp_reclen);
805 }
806
807 static void
808 tcp_check_recm(struct rpc_xprt *xprt)
809 {
810         if (xprt->tcp_offset == xprt->tcp_reclen) {
811                 xprt->tcp_flags |= XPRT_COPY_RECM;
812                 xprt->tcp_offset = 0;
813                 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
814                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
815                         xprt->tcp_flags |= XPRT_COPY_XID;
816                         xprt->tcp_copied = 0;
817                 }
818         }
819 }
820
821 /*
822  * TCP read xid
823  */
824 static inline void
825 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
826 {
827         size_t len, used;
828         char *p;
829
830         len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
831         dprintk("RPC:      reading XID (%Zu bytes)\n", len);
832         p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
833         used = tcp_copy_data(desc, p, len);
834         xprt->tcp_offset += used;
835         if (used != len)
836                 return;
837         xprt->tcp_flags &= ~XPRT_COPY_XID;
838         xprt->tcp_flags |= XPRT_COPY_DATA;
839         xprt->tcp_copied = 4;
840         dprintk("RPC:      reading reply for XID %08x\n", xprt->tcp_xid);
841         tcp_check_recm(xprt);
842 }
843
844 /*
845  * TCP read and complete request
846  */
847 static inline void
848 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
849 {
850         struct rpc_rqst *req;
851         struct xdr_buf *rcvbuf;
852         size_t len;
853
854         /* Find and lock the request corresponding to this xid */
855         spin_lock(&xprt->sock_lock);
856         req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
857         if (!req) {
858                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
859                 dprintk("RPC:      XID %08x request not found!\n",
860                                 xprt->tcp_xid);
861                 spin_unlock(&xprt->sock_lock);
862                 return;
863         }
864
865         rcvbuf = &req->rq_private_buf;
866         len = desc->count;
867         if (len > xprt->tcp_reclen - xprt->tcp_offset) {
868                 skb_reader_t my_desc;
869
870                 len = xprt->tcp_reclen - xprt->tcp_offset;
871                 memcpy(&my_desc, desc, sizeof(my_desc));
872                 my_desc.count = len;
873                 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
874                                           &my_desc, tcp_copy_data);
875                 desc->count -= len;
876                 desc->offset += len;
877         } else
878                 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
879                                           desc, tcp_copy_data);
880         xprt->tcp_copied += len;
881         xprt->tcp_offset += len;
882
883         if (xprt->tcp_copied == req->rq_private_buf.len)
884                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
885         else if (xprt->tcp_offset == xprt->tcp_reclen) {
886                 if (xprt->tcp_flags & XPRT_LAST_FRAG)
887                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
888         }
889
890         if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
891                 dprintk("RPC: %4d received reply complete\n",
892                                 req->rq_task->tk_pid);
893                 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
894         }
895         spin_unlock(&xprt->sock_lock);
896         tcp_check_recm(xprt);
897 }
898
899 /*
900  * TCP discard extra bytes from a short read
901  */
902 static inline void
903 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
904 {
905         size_t len;
906
907         len = xprt->tcp_reclen - xprt->tcp_offset;
908         if (len > desc->count)
909                 len = desc->count;
910         desc->count -= len;
911         desc->offset += len;
912         xprt->tcp_offset += len;
913         tcp_check_recm(xprt);
914 }
915
916 /*
917  * TCP record receive routine
918  * We first have to grab the record marker, then the XID, then the data.
919  */
920 static int
921 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
922                 unsigned int offset, size_t len)
923 {
924         struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
925         skb_reader_t desc = { skb, offset, len };
926
927         dprintk("RPC:      tcp_data_recv\n");
928         do {
929                 /* Read in a new fragment marker if necessary */
930                 /* Can we ever really expect to get completely empty fragments? */
931                 if (xprt->tcp_flags & XPRT_COPY_RECM) {
932                         tcp_read_fraghdr(xprt, &desc);
933                         continue;
934                 }
935                 /* Read in the xid if necessary */
936                 if (xprt->tcp_flags & XPRT_COPY_XID) {
937                         tcp_read_xid(xprt, &desc);
938                         continue;
939                 }
940                 /* Read in the request data */
941                 if (xprt->tcp_flags & XPRT_COPY_DATA) {
942                         tcp_read_request(xprt, &desc);
943                         continue;
944                 }
945                 /* Skip over any trailing bytes on short reads */
946                 tcp_read_discard(xprt, &desc);
947         } while (desc.count);
948         dprintk("RPC:      tcp_data_recv done\n");
949         return len - desc.count;
950 }
951
952 static void tcp_data_ready(struct sock *sk, int bytes)
953 {
954         struct rpc_xprt *xprt;
955         read_descriptor_t rd_desc;
956
957         read_lock(&sk->callback_lock);
958         dprintk("RPC:      tcp_data_ready...\n");
959         if (!(xprt = xprt_from_sock(sk))) {
960                 printk("RPC:      tcp_data_ready socket info not found!\n");
961                 goto out;
962         }
963         if (xprt->shutdown)
964                 goto out;
965
966         /* We use rd_desc to pass struct xprt to tcp_data_recv */
967         rd_desc.buf = (char *)xprt;
968         rd_desc.count = 65536;
969         tcp_read_sock(sk, &rd_desc, tcp_data_recv);
970 out:
971         read_unlock(&sk->callback_lock);
972 }
973
974 static void
975 tcp_state_change(struct sock *sk)
976 {
977         struct rpc_xprt *xprt;
978
979         read_lock(&sk->callback_lock);
980         if (!(xprt = xprt_from_sock(sk)))
981                 goto out;
982         dprintk("RPC:      tcp_state_change client %p...\n", xprt);
983         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
984                                 sk->state, xprt_connected(xprt),
985                                 sk->dead, sk->zapped);
986
987         switch (sk->state) {
988         case TCP_ESTABLISHED:
989                 if (xprt_test_and_set_connected(xprt))
990                         break;
991
992                 /* Reset TCP record info */
993                 xprt->tcp_offset = 0;
994                 xprt->tcp_reclen = 0;
995                 xprt->tcp_copied = 0;
996                 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
997
998                 spin_lock_bh(&xprt->sock_lock);
999                 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1000                         rpc_wake_up_task(xprt->snd_task);
1001                 spin_unlock_bh(&xprt->sock_lock);
1002                 break;
1003         case TCP_SYN_SENT:
1004         case TCP_SYN_RECV:
1005                 break;
1006         default:
1007                 xprt_disconnect(xprt);
1008                 break;
1009         }
1010  out:
1011         if (sk->sleep && waitqueue_active(sk->sleep))
1012                 wake_up_interruptible_all(sk->sleep);
1013         read_unlock(&sk->callback_lock);
1014 }
1015
1016 /*
1017  * Called when more output buffer space is available for this socket.
1018  * We try not to wake our writers until they can make "significant"
1019  * progress, otherwise we'll waste resources thrashing sock_sendmsg
1020  * with a bunch of small requests.
1021  */
1022 static void
1023 xprt_write_space(struct sock *sk)
1024 {
1025         struct rpc_xprt *xprt;
1026         struct socket   *sock;
1027
1028         read_lock(&sk->callback_lock);
1029         if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1030                 goto out;
1031         if (xprt->shutdown)
1032                 goto out;
1033
1034         /* Wait until we have enough socket memory */
1035         if (xprt->stream) {
1036                 /* from net/ipv4/tcp.c:tcp_write_space */
1037                 if (tcp_wspace(sk) < tcp_min_write_space(sk))
1038                         goto out;
1039         } else {
1040                 /* from net/core/sock.c:sock_def_write_space */
1041                 if (!sock_writeable(sk))
1042                         goto out;
1043         }
1044
1045         if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
1046                 goto out;
1047
1048         spin_lock_bh(&xprt->sock_lock);
1049         if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1050                 rpc_wake_up_task(xprt->snd_task);
1051         spin_unlock_bh(&xprt->sock_lock);
1052         if (sk->sleep && waitqueue_active(sk->sleep))
1053                 wake_up_interruptible(sk->sleep);
1054 out:
1055         read_unlock(&sk->callback_lock);
1056 }
1057
1058 /*
1059  * RPC receive timeout handler.
1060  */
1061 static void
1062 xprt_timer(struct rpc_task *task)
1063 {
1064         struct rpc_rqst *req = task->tk_rqstp;
1065         struct rpc_xprt *xprt = req->rq_xprt;
1066
1067         spin_lock(&xprt->sock_lock);
1068         if (req->rq_received)
1069                 goto out;
1070
1071         xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1072         __xprt_put_cong(xprt, req);
1073
1074         dprintk("RPC: %4d xprt_timer (%s request)\n",
1075                 task->tk_pid, req ? "pending" : "backlogged");
1076
1077         task->tk_status  = -ETIMEDOUT;
1078 out:
1079         task->tk_timeout = 0;
1080         rpc_wake_up_task(task);
1081         spin_unlock(&xprt->sock_lock);
1082 }
1083
1084 /*
1085  * Place the actual RPC call.
1086  * We have to copy the iovec because sendmsg fiddles with its contents.
1087  */
1088 void
1089 xprt_transmit(struct rpc_task *task)
1090 {
1091         struct rpc_rqst *req = task->tk_rqstp;
1092         struct rpc_xprt *xprt = req->rq_xprt;
1093
1094         dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, 
1095                                 *(u32 *)(req->rq_svec[0].iov_base));
1096
1097         if (xprt->shutdown)
1098                 task->tk_status = -EIO;
1099
1100         if (task->tk_status < 0)
1101                 return;
1102
1103         if (task->tk_rpcwait)
1104                 rpc_remove_wait_queue(task);
1105
1106         /* set up everything as needed. */
1107         /* Write the record marker */
1108         if (xprt->stream) {
1109                 u32     *marker = req->rq_svec[0].iov_base;
1110
1111                 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1112         }
1113
1114         spin_lock_bh(&xprt->sock_lock);
1115         if (req->rq_received != 0 && !req->rq_bytes_sent)
1116                 goto out_notrans;
1117
1118         if (!__xprt_lock_write(xprt, task))
1119                 goto out_notrans;
1120
1121         if (!xprt_connected(xprt)) {
1122                 task->tk_status = -ENOTCONN;
1123                 goto out_notrans;
1124         }
1125
1126         if (list_empty(&req->rq_list)) {
1127                 /* Update the softirq receive buffer */
1128                 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1129                                 sizeof(req->rq_private_buf));
1130                 list_add_tail(&req->rq_list, &xprt->recv);
1131         }
1132         spin_unlock_bh(&xprt->sock_lock);
1133
1134         do_xprt_transmit(task);
1135         return;
1136 out_notrans:
1137         spin_unlock_bh(&xprt->sock_lock);
1138 }
1139
1140 static void
1141 do_xprt_transmit(struct rpc_task *task)
1142 {
1143         struct rpc_clnt *clnt = task->tk_client;
1144         struct rpc_rqst *req = task->tk_rqstp;
1145         struct rpc_xprt *xprt = req->rq_xprt;
1146         int status, retry = 0;
1147
1148
1149         /* Continue transmitting the packet/record. We must be careful
1150          * to cope with writespace callbacks arriving _after_ we have
1151          * called xprt_sendmsg().
1152          */
1153         while (1) {
1154                 req->rq_xtime = jiffies;
1155                 status = xprt_sendmsg(xprt, req);
1156
1157                 if (status < 0)
1158                         break;
1159
1160                 if (xprt->stream) {
1161                         req->rq_bytes_sent += status;
1162
1163                         /* If we've sent the entire packet, immediately
1164                          * reset the count of bytes sent. */
1165                         if (req->rq_bytes_sent >= req->rq_slen) {
1166                                 req->rq_bytes_sent = 0;
1167                                 goto out_receive;
1168                         }
1169                 } else {
1170                         if (status >= req->rq_slen)
1171                                 goto out_receive;
1172                         status = -EAGAIN;
1173                         break;
1174                 }
1175
1176                 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1177                                 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1178                                 req->rq_slen);
1179
1180                 status = -EAGAIN;
1181                 if (retry++ > 50)
1182                         break;
1183         }
1184
1185         /* If we're doing a resend and have received a reply already,
1186          * then exit early.
1187          * Note, though, that we can't do this if we've already started
1188          * resending down a TCP stream.
1189          */
1190         task->tk_status = status;
1191
1192         switch (status) {
1193         case -EAGAIN:
1194                 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1195                         /* Protect against races with xprt_write_space */
1196                         spin_lock_bh(&xprt->sock_lock);
1197                         /* Don't race with disconnect */
1198                         if (!xprt_connected(xprt))
1199                                 task->tk_status = -ENOTCONN;
1200                         else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1201                                 task->tk_timeout = req->rq_timeout.to_current;
1202                                 rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1203                         }
1204                         spin_unlock_bh(&xprt->sock_lock);
1205                         return;
1206                 }
1207                 /* Keep holding the socket if it is blocked */
1208                 rpc_delay(task, HZ>>4);
1209                 return;
1210         case -ECONNREFUSED:
1211                 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
1212                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1213         case -ENOTCONN:
1214                 return;
1215         default:
1216                 if (xprt->stream)
1217                         xprt_disconnect(xprt);
1218         }
1219         xprt_release_write(xprt, task);
1220         return;
1221  out_receive:
1222         dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1223         spin_lock_bh(&xprt->sock_lock);
1224         /* Set the task's receive timeout value */
1225         if (!xprt->nocong) {
1226                 int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
1227                 task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt, timer);
1228                 task->tk_timeout <<= rpc_ntimeo(&clnt->cl_rtt, timer);
1229                 task->tk_timeout <<= clnt->cl_timeout.to_retries
1230                         - req->rq_timeout.to_retries;
1231                 if (task->tk_timeout > req->rq_timeout.to_maxval)
1232                         task->tk_timeout = req->rq_timeout.to_maxval;
1233         } else
1234                 task->tk_timeout = req->rq_timeout.to_current;
1235         /* Don't race with disconnect */
1236         if (!xprt_connected(xprt))
1237                 task->tk_status = -ENOTCONN;
1238         else if (!req->rq_received)
1239                 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1240         __xprt_release_write(xprt, task);
1241         spin_unlock_bh(&xprt->sock_lock);
1242 }
1243
1244 /*
1245  * Reserve an RPC call slot.
1246  */
1247 static inline void
1248 do_xprt_reserve(struct rpc_task *task)
1249 {
1250         struct rpc_xprt *xprt = task->tk_xprt;
1251
1252         task->tk_status = 0;
1253         if (task->tk_rqstp)
1254                 return;
1255         if (xprt->free) {
1256                 struct rpc_rqst *req = xprt->free;
1257                 xprt->free = req->rq_next;
1258                 req->rq_next = NULL;
1259                 task->tk_rqstp = req;
1260                 xprt_request_init(task, xprt);
1261                 return;
1262         }
1263         dprintk("RPC:      waiting for request slot\n");
1264         task->tk_status = -EAGAIN;
1265         task->tk_timeout = 0;
1266         rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1267 }
1268
1269 void
1270 xprt_reserve(struct rpc_task *task)
1271 {
1272         struct rpc_xprt *xprt = task->tk_xprt;
1273
1274         task->tk_status = -EIO;
1275         if (!xprt->shutdown) {
1276                 spin_lock(&xprt->xprt_lock);
1277                 do_xprt_reserve(task);
1278                 spin_unlock(&xprt->xprt_lock);
1279         }
1280 }
1281
1282 /*
1283  * Allocate a 'unique' XID
1284  */
1285 static u32
1286 xprt_alloc_xid(void)
1287 {
1288         static spinlock_t xid_lock = SPIN_LOCK_UNLOCKED;
1289         static int need_init = 1;
1290         static u32 xid;
1291         u32 ret;
1292
1293         spin_lock(&xid_lock);
1294         if (unlikely(need_init)) {
1295                 xid = CURRENT_TIME << 12;
1296                 need_init = 0;
1297         }
1298         ret = xid++;
1299         spin_unlock(&xid_lock);
1300         return ret;
1301 }
1302
1303 /*
1304  * Initialize RPC request
1305  */
1306 static void
1307 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1308 {
1309         struct rpc_rqst *req = task->tk_rqstp;
1310
1311         req->rq_timeout = xprt->timeout;
1312         req->rq_task    = task;
1313         req->rq_xprt    = xprt;
1314         req->rq_xid     = xprt_alloc_xid();
1315         INIT_LIST_HEAD(&req->rq_list);
1316         dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
1317                         req, req->rq_xid);
1318 }
1319
1320 /*
1321  * Release an RPC call slot
1322  */
1323 void
1324 xprt_release(struct rpc_task *task)
1325 {
1326         struct rpc_xprt *xprt = task->tk_xprt;
1327         struct rpc_rqst *req;
1328
1329         if (!(req = task->tk_rqstp))
1330                 return;
1331         spin_lock_bh(&xprt->sock_lock);
1332         __xprt_release_write(xprt, task);
1333         __xprt_put_cong(xprt, req);
1334         if (!list_empty(&req->rq_list))
1335                 list_del(&req->rq_list);
1336         spin_unlock_bh(&xprt->sock_lock);
1337         task->tk_rqstp = NULL;
1338         memset(req, 0, sizeof(*req));   /* mark unused */
1339
1340         dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1341
1342         spin_lock(&xprt->xprt_lock);
1343         req->rq_next = xprt->free;
1344         xprt->free   = req;
1345
1346         xprt_clear_backlog(xprt);
1347         spin_unlock(&xprt->xprt_lock);
1348 }
1349
1350 /*
1351  * Set default timeout parameters
1352  */
1353 void
1354 xprt_default_timeout(struct rpc_timeout *to, int proto)
1355 {
1356         if (proto == IPPROTO_UDP)
1357                 xprt_set_timeout(to, 5,  5 * HZ);
1358         else
1359                 xprt_set_timeout(to, 5, 60 * HZ);
1360 }
1361
1362 /*
1363  * Set constant timeout
1364  */
1365 void
1366 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1367 {
1368         to->to_current   = 
1369         to->to_initval   = 
1370         to->to_increment = incr;
1371         to->to_maxval    = incr * retr;
1372         to->to_retries   = retr;
1373         to->to_exponential = 0;
1374 }
1375
1376 /*
1377  * Initialize an RPC client
1378  */
1379 static struct rpc_xprt *
1380 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
1381 {
1382         struct rpc_xprt *xprt;
1383         struct rpc_rqst *req;
1384         int             i;
1385
1386         dprintk("RPC:      setting up %s transport...\n",
1387                                 proto == IPPROTO_UDP? "UDP" : "TCP");
1388
1389         if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1390                 return NULL;
1391         memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1392
1393         xprt->addr = *ap;
1394         xprt->prot = proto;
1395         xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1396         if (xprt->stream) {
1397                 xprt->cwnd = RPC_MAXCWND;
1398                 xprt->nocong = 1;
1399         } else
1400                 xprt->cwnd = RPC_INITCWND;
1401         spin_lock_init(&xprt->sock_lock);
1402         spin_lock_init(&xprt->xprt_lock);
1403         init_waitqueue_head(&xprt->cong_wait);
1404
1405         INIT_LIST_HEAD(&xprt->recv);
1406
1407         /* Set timeout parameters */
1408         if (to) {
1409                 xprt->timeout = *to;
1410                 xprt->timeout.to_current = to->to_initval;
1411         } else
1412                 xprt_default_timeout(&xprt->timeout, xprt->prot);
1413
1414         INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
1415         INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
1416         INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
1417         INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
1418
1419         /* initialize free list */
1420         for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1421                 req->rq_next = req + 1;
1422         req->rq_next = NULL;
1423         xprt->free = xprt->slot;
1424
1425         /* Check whether we want to use a reserved port */
1426         xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1427
1428         dprintk("RPC:      created transport %p\n", xprt);
1429         
1430         return xprt;
1431 }
1432
1433 /*
1434  * Bind to a reserved port
1435  */
1436 static inline int
1437 xprt_bindresvport(struct socket *sock)
1438 {
1439         struct sockaddr_in myaddr;
1440         int             err, port;
1441         kernel_cap_t saved_cap = current->cap_effective;
1442
1443         /* Override capabilities.
1444          * They were checked in xprt_create_proto i.e. at mount time
1445          */
1446         cap_raise (current->cap_effective, CAP_NET_BIND_SERVICE);
1447
1448         memset(&myaddr, 0, sizeof(myaddr));
1449         myaddr.sin_family = AF_INET;
1450         port = 800;
1451         do {
1452                 myaddr.sin_port = htons(port);
1453                 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1454                                                 sizeof(myaddr));
1455         } while (err == -EADDRINUSE && --port > 0);
1456         current->cap_effective = saved_cap;
1457
1458         if (err < 0)
1459                 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1460
1461         return err;
1462 }
1463
1464 static int 
1465 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1466 {
1467         struct sock     *sk = sock->sk;
1468
1469         if (xprt->inet)
1470                 return -EBUSY;
1471
1472         write_lock_bh(&sk->callback_lock);
1473         sk->user_data = xprt;
1474         xprt->old_data_ready = sk->data_ready;
1475         xprt->old_state_change = sk->state_change;
1476         xprt->old_write_space = sk->write_space;
1477         if (xprt->prot == IPPROTO_UDP) {
1478                 sk->data_ready = udp_data_ready;
1479                 sk->no_check = UDP_CSUM_NORCV;
1480                 xprt_set_connected(xprt);
1481         } else {
1482                 struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp);
1483                 tp->nonagle = 1;        /* disable Nagle's algorithm */
1484                 sk->data_ready = tcp_data_ready;
1485                 sk->state_change = tcp_state_change;
1486                 xprt_clear_connected(xprt);
1487         }
1488         sk->write_space = xprt_write_space;
1489
1490         /* Reset to new socket */
1491         xprt->sock = sock;
1492         xprt->inet = sk;
1493         write_unlock_bh(&sk->callback_lock);
1494
1495         return 0;
1496 }
1497
1498 /*
1499  * Set socket buffer length
1500  */
1501 void
1502 xprt_sock_setbufsize(struct rpc_xprt *xprt)
1503 {
1504         struct sock *sk = xprt->inet;
1505
1506         if (xprt->stream)
1507                 return;
1508         if (xprt->rcvsize) {
1509                 sk->userlocks |= SOCK_RCVBUF_LOCK;
1510                 sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
1511         }
1512         if (xprt->sndsize) {
1513                 sk->userlocks |= SOCK_SNDBUF_LOCK;
1514                 sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
1515                 sk->write_space(sk);
1516         }
1517 }
1518
1519 /*
1520  * Create a client socket given the protocol and peer address.
1521  */
1522 static struct socket *
1523 xprt_create_socket(int proto, struct rpc_timeout *to, int resvport)
1524 {
1525         struct socket   *sock;
1526         int             type, err;
1527
1528         dprintk("RPC:      xprt_create_socket(%s %d)\n",
1529                            (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1530
1531         type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1532
1533         if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1534                 printk("RPC: can't create socket (%d).\n", -err);
1535                 goto failed;
1536         }
1537
1538         /* bind to a reserved port */
1539         if (resvport && xprt_bindresvport(sock) < 0)
1540                 goto failed;
1541
1542         return sock;
1543
1544 failed:
1545         sock_release(sock);
1546         return NULL;
1547 }
1548
1549 /*
1550  * Create an RPC client transport given the protocol and peer address.
1551  */
1552 struct rpc_xprt *
1553 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1554 {
1555         struct rpc_xprt *xprt;
1556
1557         xprt = xprt_setup(proto, sap, to);
1558         if (!xprt)
1559                 goto out_bad;
1560
1561         dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
1562         return xprt;
1563 out_bad:
1564         dprintk("RPC:      xprt_create_proto failed\n");
1565         if (xprt)
1566                 kfree(xprt);
1567         return NULL;
1568 }
1569
1570 /*
1571  * Prepare for transport shutdown.
1572  */
1573 void
1574 xprt_shutdown(struct rpc_xprt *xprt)
1575 {
1576         xprt->shutdown = 1;
1577         rpc_wake_up(&xprt->sending);
1578         rpc_wake_up(&xprt->resend);
1579         rpc_wake_up(&xprt->pending);
1580         rpc_wake_up(&xprt->backlog);
1581         if (waitqueue_active(&xprt->cong_wait))
1582                 wake_up(&xprt->cong_wait);
1583 }
1584
1585 /*
1586  * Clear the xprt backlog queue
1587  */
1588 int
1589 xprt_clear_backlog(struct rpc_xprt *xprt) {
1590         rpc_wake_up_next(&xprt->backlog);
1591         if (waitqueue_active(&xprt->cong_wait))
1592                 wake_up(&xprt->cong_wait);
1593         return 1;
1594 }
1595
1596 /*
1597  * Destroy an RPC transport, killing off all requests.
1598  */
1599 int
1600 xprt_destroy(struct rpc_xprt *xprt)
1601 {
1602         dprintk("RPC:      destroying transport %p\n", xprt);
1603         xprt_shutdown(xprt);
1604         xprt_close(xprt);
1605         kfree(xprt);
1606
1607         return 0;
1608 }