cleanup
[linux-2.4.21-pre4.git] / net / sunrpc / xprt.c
1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -   When a process places a call, it allocates a request slot if
10  *      one is available. Otherwise, it sleeps on the backlog queue
11  *      (xprt_reserve).
12  *  -   Next, the caller puts together the RPC message, stuffs it into
13  *      the request struct, and calls xprt_call().
14  *  -   xprt_call transmits the message and installs the caller on the
15  *      socket's wait list. At the same time, it installs a timer that
16  *      is run after the packet's timeout has expired.
17  *  -   When a packet arrives, the data_ready handler walks the list of
18  *      pending requests for that socket. If a matching XID is found, the
19  *      caller is woken up, and the timer removed.
20  *  -   When no reply arrives within the timeout interval, the timer is
21  *      fired by the kernel and runs xprt_timer(). It either adjusts the
22  *      timeout values (minor timeout) or wakes up the caller with a status
23  *      of -ETIMEDOUT.
24  *  -   When the caller receives a notification from RPC that a reply arrived,
25  *      it should release the RPC slot, and process the reply.
26  *      If the call timed out, it may choose to retry the operation by
27  *      adjusting the initial timeout value, and simply calling rpc_call
28  *      again.
29  *
30  *  Support for async RPC is done through a set of RPC-specific scheduling
31  *  primitives that `transparently' work for processes as well as async
32  *  tasks that rely on callbacks.
33  *
34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35  *
36  *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37  *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38  *  TCP NFS related read + write fixes
39  *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40  *
41  *  Rewrite of larges part of the code in order to stabilize TCP stuff.
42  *  Fix behaviour when socket buffer is full.
43  *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44  */
45
46 #define __KERNEL_SYSCALLS__
47
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
55 #include <linux/in.h>
56 #include <linux/net.h>
57 #include <linux/mm.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
62
63 #include <net/sock.h>
64 #include <net/checksum.h>
65 #include <net/udp.h>
66 #include <net/tcp.h>
67
68 #include <asm/uaccess.h>
69
70 /*
71  * Local variables
72  */
73
74 #ifdef RPC_DEBUG
75 # undef  RPC_DEBUG_DATA
76 # define RPCDBG_FACILITY        RPCDBG_XPRT
77 #endif
78
79 #define XPRT_MAX_BACKOFF        (8)
80
81 /*
82  * Local functions
83  */
84 static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
85 static void     do_xprt_transmit(struct rpc_task *);
86 static void     xprt_reserve_status(struct rpc_task *task);
87 static void     xprt_disconnect(struct rpc_xprt *);
88 static void     xprt_reconn_status(struct rpc_task *task);
89 static struct socket *xprt_create_socket(int, struct rpc_timeout *);
90 static int      xprt_bind_socket(struct rpc_xprt *, struct socket *);
91 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
92
93 #ifdef RPC_DEBUG_DATA
94 /*
95  * Print the buffer contents (first 128 bytes only--just enough for
96  * diropres return).
97  */
98 static void
99 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
100 {
101         u8      *buf = (u8 *) packet;
102         int     j;
103
104         dprintk("RPC:      %s\n", msg);
105         for (j = 0; j < count && j < 128; j += 4) {
106                 if (!(j & 31)) {
107                         if (j)
108                                 dprintk("\n");
109                         dprintk("0x%04x ", j);
110                 }
111                 dprintk("%02x%02x%02x%02x ",
112                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
113         }
114         dprintk("\n");
115 }
116 #else
117 static inline void
118 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
119 {
120         /* NOP */
121 }
122 #endif
123
124 /*
125  * Look up RPC transport given an INET socket
126  */
127 static inline struct rpc_xprt *
128 xprt_from_sock(struct sock *sk)
129 {
130         return (struct rpc_xprt *) sk->user_data;
131 }
132
133 /*
134  * Serialize write access to sockets, in order to prevent different
135  * requests from interfering with each other.
136  * Also prevents TCP socket reconnections from colliding with writes.
137  */
138 static int
139 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
140 {
141         if (!xprt->snd_task) {
142                 if (xprt->nocong || __xprt_get_cong(xprt, task))
143                         xprt->snd_task = task;
144         }
145         if (xprt->snd_task != task) {
146                 dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
147                 task->tk_timeout = 0;
148                 task->tk_status = -EAGAIN;
149                 if (task->tk_rqstp && task->tk_rqstp->rq_nresend)
150                         rpc_sleep_on(&xprt->resend, task, NULL, NULL);
151                 else
152                         rpc_sleep_on(&xprt->sending, task, NULL, NULL);
153         }
154         return xprt->snd_task == task;
155 }
156
157 static inline int
158 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
159 {
160         int retval;
161         spin_lock_bh(&xprt->sock_lock);
162         retval = __xprt_lock_write(xprt, task);
163         spin_unlock_bh(&xprt->sock_lock);
164         return retval;
165 }
166
167 static void
168 __xprt_lock_write_next(struct rpc_xprt *xprt)
169 {
170         struct rpc_task *task;
171
172         if (xprt->snd_task)
173                 return;
174         task = rpc_wake_up_next(&xprt->resend);
175         if (!task) {
176                 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
177                         return;
178                 task = rpc_wake_up_next(&xprt->sending);
179                 if (!task)
180                         return;
181         }
182         if (xprt->nocong || __xprt_get_cong(xprt, task))
183                 xprt->snd_task = task;
184 }
185
186 /*
187  * Releases the socket for use by other requests.
188  */
189 static void
190 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
191 {
192         if (xprt->snd_task == task)
193                 xprt->snd_task = NULL;
194         __xprt_lock_write_next(xprt);
195 }
196
197 static inline void
198 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
199 {
200         spin_lock_bh(&xprt->sock_lock);
201         __xprt_release_write(xprt, task);
202         spin_unlock_bh(&xprt->sock_lock);
203 }
204
205 /*
206  * Write data to socket.
207  */
208 static inline int
209 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
210 {
211         struct socket   *sock = xprt->sock;
212         struct msghdr   msg;
213         struct xdr_buf  *xdr = &req->rq_snd_buf;
214         struct iovec    niv[MAX_IOVEC];
215         unsigned int    niov, slen, skip;
216         mm_segment_t    oldfs;
217         int             result;
218
219         if (!sock)
220                 return -ENOTCONN;
221
222         xprt_pktdump("packet data:",
223                                 req->rq_svec->iov_base,
224                                 req->rq_svec->iov_len);
225
226         /* Dont repeat bytes */
227         skip = req->rq_bytes_sent;
228         slen = xdr->len - skip;
229         niov = xdr_kmap(niv, xdr, skip);
230
231         msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
232         msg.msg_iov     = niv;
233         msg.msg_iovlen  = niov;
234         msg.msg_name    = (struct sockaddr *) &xprt->addr;
235         msg.msg_namelen = sizeof(xprt->addr);
236         msg.msg_control = NULL;
237         msg.msg_controllen = 0;
238
239         oldfs = get_fs(); set_fs(get_ds());
240         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
241         result = sock_sendmsg(sock, &msg, slen);
242         set_fs(oldfs);
243
244         xdr_kunmap(xdr, skip);
245
246         dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
247
248         if (result >= 0)
249                 return result;
250
251         switch (result) {
252         case -ECONNREFUSED:
253                 /* When the server has died, an ICMP port unreachable message
254                  * prompts ECONNREFUSED.
255                  */
256         case -EAGAIN:
257                 break;
258         case -ENOTCONN:
259         case -EPIPE:
260                 /* connection broken */
261                 if (xprt->stream)
262                         result = -ENOTCONN;
263                 break;
264         default:
265                 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
266         }
267         return result;
268 }
269
270 /*
271  * Van Jacobson congestion avoidance. Check if the congestion window
272  * overflowed. Put the task to sleep if this is the case.
273  */
274 static int
275 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
276 {
277         struct rpc_rqst *req = task->tk_rqstp;
278
279         if (req->rq_cong)
280                 return 1;
281         dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
282                         task->tk_pid, xprt->cong, xprt->cwnd);
283         if (RPCXPRT_CONGESTED(xprt))
284                 return 0;
285         req->rq_cong = 1;
286         xprt->cong += RPC_CWNDSCALE;
287         return 1;
288 }
289
290 /*
291  * Adjust the congestion window, and wake up the next task
292  * that has been sleeping due to congestion
293  */
294 static void
295 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
296 {
297         if (!req->rq_cong)
298                 return;
299         req->rq_cong = 0;
300         xprt->cong -= RPC_CWNDSCALE;
301         __xprt_lock_write_next(xprt);
302 }
303
304 /*
305  * Adjust RPC congestion window
306  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
307  */
308 static void
309 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
310 {
311         unsigned long   cwnd;
312
313         cwnd = xprt->cwnd;
314         if (result >= 0 && cwnd <= xprt->cong) {
315                 /* The (cwnd >> 1) term makes sure
316                  * the result gets rounded properly. */
317                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
318                 if (cwnd > RPC_MAXCWND)
319                         cwnd = RPC_MAXCWND;
320                 __xprt_lock_write_next(xprt);
321         } else if (result == -ETIMEDOUT) {
322                 cwnd >>= 1;
323                 if (cwnd < RPC_CWNDSCALE)
324                         cwnd = RPC_CWNDSCALE;
325         }
326         dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
327                         xprt->cong, xprt->cwnd, cwnd);
328         xprt->cwnd = cwnd;
329 }
330
331 /*
332  * Adjust timeout values etc for next retransmit
333  */
334 int
335 xprt_adjust_timeout(struct rpc_timeout *to)
336 {
337         if (to->to_retries > 0) {
338                 if (to->to_exponential)
339                         to->to_current <<= 1;
340                 else
341                         to->to_current += to->to_increment;
342                 if (to->to_maxval && to->to_current >= to->to_maxval)
343                         to->to_current = to->to_maxval;
344         } else {
345                 if (to->to_exponential)
346                         to->to_initval <<= 1;
347                 else
348                         to->to_initval += to->to_increment;
349                 if (to->to_maxval && to->to_initval >= to->to_maxval)
350                         to->to_initval = to->to_maxval;
351                 to->to_current = to->to_initval;
352         }
353
354         if (!to->to_current) {
355                 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
356                 to->to_current = 5 * HZ;
357         }
358         pprintk("RPC: %lu %s\n", jiffies,
359                         to->to_retries? "retrans" : "timeout");
360         return to->to_retries-- > 0;
361 }
362
363 /*
364  * Close down a transport socket
365  */
366 static void
367 xprt_close(struct rpc_xprt *xprt)
368 {
369         struct socket   *sock = xprt->sock;
370         struct sock     *sk = xprt->inet;
371
372         if (!sk)
373                 return;
374
375         xprt->inet = NULL;
376         xprt->sock = NULL;
377
378         sk->user_data    = NULL;
379         sk->data_ready   = xprt->old_data_ready;
380         sk->state_change = xprt->old_state_change;
381         sk->write_space  = xprt->old_write_space;
382
383         xprt_disconnect(xprt);
384         sk->no_check     = 0;
385
386         sock_release(sock);
387         /*
388          *      TCP doesn't require the rpciod now - other things may
389          *      but rpciod handles that not us.
390          */
391         if(xprt->stream)
392                 rpciod_down();
393 }
394
395 /*
396  * Mark a transport as disconnected
397  */
398 static void
399 xprt_disconnect(struct rpc_xprt *xprt)
400 {
401         dprintk("RPC:      disconnected transport %p\n", xprt);
402         xprt_clear_connected(xprt);
403         rpc_wake_up_status(&xprt->pending, -ENOTCONN);
404 }
405
406 /*
407  * Reconnect a broken TCP connection.
408  *
409  * Note: This cannot collide with the TCP reads, as both run from rpciod
410  */
411 void
412 xprt_reconnect(struct rpc_task *task)
413 {
414         struct rpc_xprt *xprt = task->tk_xprt;
415         struct socket   *sock = xprt->sock;
416         struct sock     *inet;
417         int             status;
418
419         dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
420                                 task->tk_pid, xprt, xprt_connected(xprt));
421         if (xprt->shutdown)
422                 return;
423
424         if (!xprt->stream)
425                 return;
426
427         if (!xprt->addr.sin_port) {
428                 task->tk_status = -EIO;
429                 return;
430         }
431
432         if (!xprt_lock_write(xprt, task))
433                 return;
434         if (xprt_connected(xprt))
435                 goto out_write;
436
437         if (sock && sock->state != SS_UNCONNECTED)
438                 xprt_close(xprt);
439         status = -ENOTCONN;
440         if (!(inet = xprt->inet)) {
441                 /* Create an unconnected socket */
442                 if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
443                         goto defer;
444                 xprt_bind_socket(xprt, sock);
445                 inet = sock->sk;
446         }
447
448         /* Now connect it asynchronously. */
449         dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
450         status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
451                                 sizeof(xprt->addr), O_NONBLOCK);
452
453         if (status < 0) {
454                 switch (status) {
455                 case -EALREADY:
456                 case -EINPROGRESS:
457                         status = 0;
458                         break;
459                 case -EISCONN:
460                 case -EPIPE:
461                         status = 0;
462                         xprt_close(xprt);
463                         goto defer;
464                 default:
465                         printk("RPC: TCP connect error %d!\n", -status);
466                         xprt_close(xprt);
467                         goto defer;
468                 }
469
470                 /* Protect against TCP socket state changes */
471                 lock_sock(inet);
472                 dprintk("RPC: %4d connect status %d connected %d\n",
473                                 task->tk_pid, status, xprt_connected(xprt));
474
475                 if (inet->state != TCP_ESTABLISHED) {
476                         task->tk_timeout = xprt->timeout.to_maxval;
477                         /* if the socket is already closing, delay 5 secs */
478                         if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
479                                 task->tk_timeout = 5*HZ;
480                         rpc_sleep_on(&xprt->pending, task, xprt_reconn_status, NULL);
481                         release_sock(inet);
482                         return;
483                 }
484                 release_sock(inet);
485         }
486 defer:
487         if (status < 0) {
488                 rpc_delay(task, 5*HZ);
489                 task->tk_status = -ENOTCONN;
490         }
491  out_write:
492         xprt_release_write(xprt, task);
493 }
494
495 /*
496  * Reconnect timeout. We just mark the transport as not being in the
497  * process of reconnecting, and leave the rest to the upper layers.
498  */
499 static void
500 xprt_reconn_status(struct rpc_task *task)
501 {
502         struct rpc_xprt *xprt = task->tk_xprt;
503
504         dprintk("RPC: %4d xprt_reconn_timeout %d\n",
505                                 task->tk_pid, task->tk_status);
506
507         xprt_release_write(xprt, task);
508 }
509
510 /*
511  * Look up the RPC request corresponding to a reply, and then lock it.
512  */
513 static inline struct rpc_rqst *
514 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
515 {
516         struct list_head *pos;
517         struct rpc_rqst *req = NULL;
518
519         list_for_each(pos, &xprt->recv) {
520                 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
521                 if (entry->rq_xid == xid) {
522                         req = entry;
523                         break;
524                 }
525         }
526         return req;
527 }
528
529 /*
530  * Complete reply received.
531  * The TCP code relies on us to remove the request from xprt->pending.
532  */
533 static void
534 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
535 {
536         struct rpc_task *task = req->rq_task;
537         struct rpc_clnt *clnt = task->tk_client;
538
539         /* Adjust congestion window */
540         if (!xprt->nocong) {
541                 xprt_adjust_cwnd(xprt, copied);
542                 __xprt_put_cong(xprt, req);
543                 if (!req->rq_nresend) {
544                         int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
545                         if (timer)
546                                 rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
547                 }
548                 rpc_clear_timeo(&clnt->cl_rtt);
549         }
550
551 #ifdef RPC_PROFILE
552         /* Profile only reads for now */
553         if (copied > 1024) {
554                 static unsigned long    nextstat = 0;
555                 static unsigned long    pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
556
557                 pkt_cnt++;
558                 pkt_len += req->rq_slen + copied;
559                 pkt_rtt += jiffies - req->rq_xtime;
560                 if (time_before(nextstat, jiffies)) {
561                         printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
562                         printk("RPC: %ld %ld %ld %ld stat\n",
563                                         jiffies, pkt_cnt, pkt_len, pkt_rtt);
564                         pkt_rtt = pkt_len = pkt_cnt = 0;
565                         nextstat = jiffies + 5 * HZ;
566                 }
567         }
568 #endif
569
570         dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
571         req->rq_received = copied;
572         list_del_init(&req->rq_list);
573
574         /* ... and wake up the process. */
575         rpc_wake_up_task(task);
576         return;
577 }
578
579 static size_t
580 skb_read_bits(skb_reader_t *desc, void *to, size_t len)
581 {
582         if (len > desc->count)
583                 len = desc->count;
584         skb_copy_bits(desc->skb, desc->offset, to, len);
585         desc->count -= len;
586         desc->offset += len;
587         return len;
588 }
589
590 static size_t
591 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
592 {
593         unsigned int csum2, pos;
594
595         if (len > desc->count)
596                 len = desc->count;
597         pos = desc->offset;
598         csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
599         desc->csum = csum_block_add(desc->csum, csum2, pos);
600         desc->count -= len;
601         desc->offset += len;
602         return len;
603 }
604
605 /*
606  * We have set things up such that we perform the checksum of the UDP
607  * packet in parallel with the copies into the RPC client iovec.  -DaveM
608  */
609 static int
610 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
611 {
612         skb_reader_t desc;
613
614         desc.skb = skb;
615         desc.offset = sizeof(struct udphdr);
616         desc.count = skb->len - desc.offset;
617
618         if (skb->ip_summed == CHECKSUM_UNNECESSARY)
619                 goto no_checksum;
620
621         desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
622         xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
623         if (desc.offset != skb->len) {
624                 unsigned int csum2;
625                 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
626                 desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
627         }
628         if ((unsigned short)csum_fold(desc.csum))
629                 return -1;
630         return 0;
631 no_checksum:
632         xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
633         return 0;
634 }
635
636 /*
637  * Input handler for RPC replies. Called from a bottom half and hence
638  * atomic.
639  */
640 static void
641 udp_data_ready(struct sock *sk, int len)
642 {
643         struct rpc_task *task;
644         struct rpc_xprt *xprt;
645         struct rpc_rqst *rovr;
646         struct sk_buff  *skb;
647         int             err, repsize, copied;
648
649         dprintk("RPC:      udp_data_ready...\n");
650         if (!(xprt = xprt_from_sock(sk))) {
651                 printk("RPC:      udp_data_ready request not found!\n");
652                 goto out;
653         }
654
655         dprintk("RPC:      udp_data_ready client %p\n", xprt);
656
657         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
658                 goto out;
659
660         if (xprt->shutdown)
661                 goto dropit;
662
663         repsize = skb->len - sizeof(struct udphdr);
664         if (repsize < 4) {
665                 printk("RPC: impossible RPC reply size %d!\n", repsize);
666                 goto dropit;
667         }
668
669         /* Look up and lock the request corresponding to the given XID */
670         spin_lock(&xprt->sock_lock);
671         rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
672         if (!rovr)
673                 goto out_unlock;
674         task = rovr->rq_task;
675
676         dprintk("RPC: %4d received reply\n", task->tk_pid);
677         xprt_pktdump("packet data:",
678                      (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
679
680         if ((copied = rovr->rq_rlen) > repsize)
681                 copied = repsize;
682
683         /* Suck it into the iovec, verify checksum if not done by hw. */
684         if (csum_partial_copy_to_xdr(&rovr->rq_rcv_buf, skb))
685                 goto out_unlock;
686
687         /* Something worked... */
688         dst_confirm(skb->dst);
689
690         xprt_complete_rqst(xprt, rovr, copied);
691
692  out_unlock:
693         spin_unlock(&xprt->sock_lock);
694  dropit:
695         skb_free_datagram(sk, skb);
696  out:
697         if (sk->sleep && waitqueue_active(sk->sleep))
698                 wake_up_interruptible(sk->sleep);
699 }
700
701 /*
702  * Copy from an skb into memory and shrink the skb.
703  */
704 static inline size_t
705 tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
706 {
707         if (len > desc->count)
708                 len = desc->count;
709         skb_copy_bits(desc->skb, desc->offset, p, len);
710         desc->offset += len;
711         desc->count -= len;
712         return len;
713 }
714
715 /*
716  * TCP read fragment marker
717  */
718 static inline void
719 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
720 {
721         size_t len, used;
722         char *p;
723
724         p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
725         len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
726         used = tcp_copy_data(desc, p, len);
727         xprt->tcp_offset += used;
728         if (used != len)
729                 return;
730         xprt->tcp_reclen = ntohl(xprt->tcp_recm);
731         if (xprt->tcp_reclen & 0x80000000)
732                 xprt->tcp_flags |= XPRT_LAST_FRAG;
733         else
734                 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
735         xprt->tcp_reclen &= 0x7fffffff;
736         xprt->tcp_flags &= ~XPRT_COPY_RECM;
737         xprt->tcp_offset = 0;
738         /* Sanity check of the record length */
739         if (xprt->tcp_reclen < 4) {
740                 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
741                 xprt_disconnect(xprt);
742         }
743         dprintk("RPC:      reading TCP record fragment of length %d\n",
744                         xprt->tcp_reclen);
745 }
746
747 static void
748 tcp_check_recm(struct rpc_xprt *xprt)
749 {
750         if (xprt->tcp_offset == xprt->tcp_reclen) {
751                 xprt->tcp_flags |= XPRT_COPY_RECM;
752                 xprt->tcp_offset = 0;
753                 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
754                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
755                         xprt->tcp_flags |= XPRT_COPY_XID;
756                         xprt->tcp_copied = 0;
757                 }
758         }
759 }
760
761 /*
762  * TCP read xid
763  */
764 static inline void
765 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
766 {
767         size_t len, used;
768         char *p;
769
770         len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
771         dprintk("RPC:      reading XID (%Zu bytes)\n", len);
772         p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
773         used = tcp_copy_data(desc, p, len);
774         xprt->tcp_offset += used;
775         if (used != len)
776                 return;
777         xprt->tcp_flags &= ~XPRT_COPY_XID;
778         xprt->tcp_flags |= XPRT_COPY_DATA;
779         xprt->tcp_copied = 4;
780         dprintk("RPC:      reading reply for XID %08x\n", xprt->tcp_xid);
781         tcp_check_recm(xprt);
782 }
783
784 /*
785  * TCP read and complete request
786  */
787 static inline void
788 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
789 {
790         struct rpc_rqst *req;
791         struct xdr_buf *rcvbuf;
792         size_t len;
793
794         /* Find and lock the request corresponding to this xid */
795         spin_lock(&xprt->sock_lock);
796         req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
797         if (!req) {
798                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
799                 dprintk("RPC:      XID %08x request not found!\n",
800                                 xprt->tcp_xid);
801                 spin_unlock(&xprt->sock_lock);
802                 return;
803         }
804
805         rcvbuf = &req->rq_rcv_buf;
806         len = desc->count;
807         if (len > xprt->tcp_reclen - xprt->tcp_offset) {
808                 skb_reader_t my_desc;
809
810                 len = xprt->tcp_reclen - xprt->tcp_offset;
811                 memcpy(&my_desc, desc, sizeof(my_desc));
812                 my_desc.count = len;
813                 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
814                                           &my_desc, tcp_copy_data);
815                 desc->count -= len;
816                 desc->offset += len;
817         } else
818                 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
819                                           desc, tcp_copy_data);
820         xprt->tcp_copied += len;
821         xprt->tcp_offset += len;
822
823         if (xprt->tcp_copied == req->rq_rlen)
824                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
825         else if (xprt->tcp_offset == xprt->tcp_reclen) {
826                 if (xprt->tcp_flags & XPRT_LAST_FRAG)
827                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
828         }
829
830         if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
831                 dprintk("RPC: %4d received reply complete\n",
832                                 req->rq_task->tk_pid);
833                 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
834         }
835         spin_unlock(&xprt->sock_lock);
836         tcp_check_recm(xprt);
837 }
838
839 /*
840  * TCP discard extra bytes from a short read
841  */
842 static inline void
843 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
844 {
845         size_t len;
846
847         len = xprt->tcp_reclen - xprt->tcp_offset;
848         if (len > desc->count)
849                 len = desc->count;
850         desc->count -= len;
851         desc->offset += len;
852         xprt->tcp_offset += len;
853         tcp_check_recm(xprt);
854 }
855
856 /*
857  * TCP record receive routine
858  * We first have to grab the record marker, then the XID, then the data.
859  */
860 static int
861 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
862                 unsigned int offset, size_t len)
863 {
864         struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
865         skb_reader_t desc = { skb, offset, len };
866
867         dprintk("RPC:      tcp_data_recv\n");
868         do {
869                 /* Read in a new fragment marker if necessary */
870                 /* Can we ever really expect to get completely empty fragments? */
871                 if (xprt->tcp_flags & XPRT_COPY_RECM) {
872                         tcp_read_fraghdr(xprt, &desc);
873                         continue;
874                 }
875                 /* Read in the xid if necessary */
876                 if (xprt->tcp_flags & XPRT_COPY_XID) {
877                         tcp_read_xid(xprt, &desc);
878                         continue;
879                 }
880                 /* Read in the request data */
881                 if (xprt->tcp_flags & XPRT_COPY_DATA) {
882                         tcp_read_request(xprt, &desc);
883                         continue;
884                 }
885                 /* Skip over any trailing bytes on short reads */
886                 tcp_read_discard(xprt, &desc);
887         } while (desc.count && xprt_connected(xprt));
888         dprintk("RPC:      tcp_data_recv done\n");
889         return len - desc.count;
890 }
891
892 static void tcp_data_ready(struct sock *sk, int bytes)
893 {
894         struct rpc_xprt *xprt;
895         read_descriptor_t rd_desc;
896
897         dprintk("RPC:      tcp_data_ready...\n");
898         if (!(xprt = xprt_from_sock(sk))) {
899                 printk("RPC:      tcp_data_ready socket info not found!\n");
900                 return;
901         }
902         if (xprt->shutdown)
903                 return;
904
905         /* We use rd_desc to pass struct xprt to tcp_data_recv */
906         rd_desc.buf = (char *)xprt;
907         rd_desc.count = 65536;
908         tcp_read_sock(sk, &rd_desc, tcp_data_recv);
909 }
910
911 static void
912 tcp_state_change(struct sock *sk)
913 {
914         struct rpc_xprt *xprt;
915
916         if (!(xprt = xprt_from_sock(sk)))
917                 goto out;
918         dprintk("RPC:      tcp_state_change client %p...\n", xprt);
919         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
920                                 sk->state, xprt_connected(xprt),
921                                 sk->dead, sk->zapped);
922
923         switch (sk->state) {
924         case TCP_ESTABLISHED:
925                 if (xprt_test_and_set_connected(xprt))
926                         break;
927
928                 /* Reset TCP record info */
929                 xprt->tcp_offset = 0;
930                 xprt->tcp_reclen = 0;
931                 xprt->tcp_copied = 0;
932                 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
933
934                 spin_lock(&xprt->sock_lock);
935                 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
936                         rpc_wake_up_task(xprt->snd_task);
937                 spin_unlock(&xprt->sock_lock);
938                 break;
939         case TCP_SYN_SENT:
940         case TCP_SYN_RECV:
941                 break;
942         default:
943                 xprt_disconnect(xprt);
944                 break;
945         }
946  out:
947         if (sk->sleep && waitqueue_active(sk->sleep))
948                 wake_up_interruptible_all(sk->sleep);
949 }
950
951 /*
952  * Called when more output buffer space is available for this socket.
953  * We try not to wake our writers until they can make "significant"
954  * progress, otherwise we'll waste resources thrashing sock_sendmsg
955  * with a bunch of small requests.
956  */
957 static void
958 xprt_write_space(struct sock *sk)
959 {
960         struct rpc_xprt *xprt;
961         struct socket   *sock;
962
963         if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
964                 return;
965         if (xprt->shutdown)
966                 return;
967
968         /* Wait until we have enough socket memory */
969         if (xprt->stream) {
970                 /* from net/ipv4/tcp.c:tcp_write_space */
971                 if (tcp_wspace(sk) < tcp_min_write_space(sk))
972                         return;
973         } else {
974                 /* from net/core/sock.c:sock_def_write_space */
975                 if (!sock_writeable(sk))
976                         return;
977         }
978
979         if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
980                 return;
981
982         spin_lock_bh(&xprt->sock_lock);
983         if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
984                 rpc_wake_up_task(xprt->snd_task);
985         spin_unlock_bh(&xprt->sock_lock);
986         if (sk->sleep && waitqueue_active(sk->sleep))
987                 wake_up_interruptible(sk->sleep);
988 }
989
990 /*
991  * Exponential backoff for UDP retries
992  */
993 static inline int
994 xprt_expbackoff(struct rpc_task *task, struct rpc_rqst *req)
995 {
996         int backoff;
997
998         req->rq_ntimeo++;
999         backoff = min(rpc_ntimeo(&task->tk_client->cl_rtt), XPRT_MAX_BACKOFF);
1000         if (req->rq_ntimeo < (1 << backoff))
1001                 return 1;
1002         return 0;
1003 }
1004
1005 /*
1006  * RPC receive timeout handler.
1007  */
1008 static void
1009 xprt_timer(struct rpc_task *task)
1010 {
1011         struct rpc_rqst *req = task->tk_rqstp;
1012         struct rpc_xprt *xprt = req->rq_xprt;
1013
1014         spin_lock(&xprt->sock_lock);
1015         if (req->rq_received)
1016                 goto out;
1017
1018         if (!xprt->nocong) {
1019                 if (xprt_expbackoff(task, req)) {
1020                         rpc_add_timer(task, xprt_timer);
1021                         goto out_unlock;
1022                 }
1023                 rpc_inc_timeo(&task->tk_client->cl_rtt);
1024                 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1025         }
1026         req->rq_nresend++;
1027
1028         dprintk("RPC: %4d xprt_timer (%s request)\n",
1029                 task->tk_pid, req ? "pending" : "backlogged");
1030
1031         task->tk_status  = -ETIMEDOUT;
1032 out:
1033         task->tk_timeout = 0;
1034         rpc_wake_up_task(task);
1035 out_unlock:
1036         spin_unlock(&xprt->sock_lock);
1037 }
1038
1039 /*
1040  * Place the actual RPC call.
1041  * We have to copy the iovec because sendmsg fiddles with its contents.
1042  */
1043 void
1044 xprt_transmit(struct rpc_task *task)
1045 {
1046         struct rpc_rqst *req = task->tk_rqstp;
1047         struct rpc_xprt *xprt = req->rq_xprt;
1048
1049         dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, 
1050                                 *(u32 *)(req->rq_svec[0].iov_base));
1051
1052         if (xprt->shutdown)
1053                 task->tk_status = -EIO;
1054
1055         if (!xprt_connected(xprt))
1056                 task->tk_status = -ENOTCONN;
1057
1058         if (task->tk_status < 0)
1059                 return;
1060
1061         if (task->tk_rpcwait)
1062                 rpc_remove_wait_queue(task);
1063
1064         /* set up everything as needed. */
1065         /* Write the record marker */
1066         if (xprt->stream) {
1067                 u32     *marker = req->rq_svec[0].iov_base;
1068
1069                 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1070         }
1071
1072         spin_lock_bh(&xprt->sock_lock);
1073         if (!__xprt_lock_write(xprt, task)) {
1074                 spin_unlock_bh(&xprt->sock_lock);
1075                 return;
1076         }
1077         if (list_empty(&req->rq_list)) {
1078                 list_add_tail(&req->rq_list, &xprt->recv);
1079                 req->rq_received = 0;
1080         }
1081         spin_unlock_bh(&xprt->sock_lock);
1082
1083         do_xprt_transmit(task);
1084 }
1085
1086 static void
1087 do_xprt_transmit(struct rpc_task *task)
1088 {
1089         struct rpc_clnt *clnt = task->tk_client;
1090         struct rpc_rqst *req = task->tk_rqstp;
1091         struct rpc_xprt *xprt = req->rq_xprt;
1092         int status, retry = 0;
1093
1094
1095         /* Continue transmitting the packet/record. We must be careful
1096          * to cope with writespace callbacks arriving _after_ we have
1097          * called xprt_sendmsg().
1098          */
1099         while (1) {
1100                 req->rq_xtime = jiffies;
1101                 status = xprt_sendmsg(xprt, req);
1102
1103                 if (status < 0)
1104                         break;
1105
1106                 if (xprt->stream) {
1107                         req->rq_bytes_sent += status;
1108
1109                         if (req->rq_bytes_sent >= req->rq_slen)
1110                                 goto out_receive;
1111                 } else {
1112                         if (status >= req->rq_slen)
1113                                 goto out_receive;
1114                         status = -EAGAIN;
1115                         break;
1116                 }
1117
1118                 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1119                                 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1120                                 req->rq_slen);
1121
1122                 status = -EAGAIN;
1123                 if (retry++ > 50)
1124                         break;
1125         }
1126
1127         /* Note: at this point, task->tk_sleeping has not yet been set,
1128          *       hence there is no danger of the waking up task being put on
1129          *       schedq, and being picked up by a parallel run of rpciod().
1130          */
1131         if (req->rq_received)
1132                 goto out_release;
1133
1134         task->tk_status = status;
1135
1136         switch (status) {
1137         case -EAGAIN:
1138                 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1139                         /* Protect against races with xprt_write_space */
1140                         spin_lock_bh(&xprt->sock_lock);
1141                         if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1142                                 task->tk_timeout = req->rq_timeout.to_current;
1143                                 rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1144                         }
1145                         spin_unlock_bh(&xprt->sock_lock);
1146                         return;
1147                 }
1148                 /* Keep holding the socket if it is blocked */
1149                 rpc_delay(task, HZ>>4);
1150                 return;
1151         case -ECONNREFUSED:
1152         case -ENOTCONN:
1153                 if (!xprt->stream)
1154                         return;
1155         default:
1156                 if (xprt->stream)
1157                         xprt_disconnect(xprt);
1158                 req->rq_bytes_sent = 0;
1159         }
1160  out_release:
1161         xprt_release_write(xprt, task);
1162         return;
1163  out_receive:
1164         dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1165         /* Set the task's receive timeout value */
1166         if (!xprt->nocong) {
1167                 task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt,
1168                                 rpcproc_timer(clnt, task->tk_msg.rpc_proc));
1169                 req->rq_ntimeo = 0;
1170                 if (task->tk_timeout > req->rq_timeout.to_maxval)
1171                         task->tk_timeout = req->rq_timeout.to_maxval;
1172         } else
1173                 task->tk_timeout = req->rq_timeout.to_current;
1174         spin_lock_bh(&xprt->sock_lock);
1175         if (!req->rq_received)
1176                 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1177         __xprt_release_write(xprt, task);
1178         spin_unlock_bh(&xprt->sock_lock);
1179 }
1180
1181 /*
1182  * Reserve an RPC call slot.
1183  */
1184 int
1185 xprt_reserve(struct rpc_task *task)
1186 {
1187         struct rpc_xprt *xprt = task->tk_xprt;
1188
1189         /* We already have an initialized request. */
1190         if (task->tk_rqstp)
1191                 return 0;
1192
1193         spin_lock(&xprt->xprt_lock);
1194         xprt_reserve_status(task);
1195         if (task->tk_rqstp) {
1196                 task->tk_timeout = 0;
1197         } else if (!task->tk_timeout) {
1198                 task->tk_status = -ENOBUFS;
1199         } else {
1200                 dprintk("RPC:      xprt_reserve waiting on backlog\n");
1201                 task->tk_status = -EAGAIN;
1202                 rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1203         }
1204         spin_unlock(&xprt->xprt_lock);
1205         dprintk("RPC: %4d xprt_reserve returns %d\n",
1206                                 task->tk_pid, task->tk_status);
1207         return task->tk_status;
1208 }
1209
1210 /*
1211  * Reservation callback
1212  */
1213 static void
1214 xprt_reserve_status(struct rpc_task *task)
1215 {
1216         struct rpc_xprt *xprt = task->tk_xprt;
1217         struct rpc_rqst *req;
1218
1219         if (xprt->shutdown) {
1220                 task->tk_status = -EIO;
1221         } else if (task->tk_status < 0) {
1222                 /* NOP */
1223         } else if (task->tk_rqstp) {
1224                 /* We've already been given a request slot: NOP */
1225         } else {
1226                 if (!(req = xprt->free))
1227                         goto out_nofree;
1228                 /* OK: There's room for us. Grab a free slot */
1229                 xprt->free     = req->rq_next;
1230                 req->rq_next   = NULL;
1231                 task->tk_rqstp = req;
1232                 xprt_request_init(task, xprt);
1233         }
1234
1235         return;
1236
1237 out_nofree:
1238         task->tk_status = -EAGAIN;
1239 }
1240
1241 /*
1242  * Initialize RPC request
1243  */
1244 static void
1245 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1246 {
1247         struct rpc_rqst *req = task->tk_rqstp;
1248         static u32      xid = 0;
1249
1250         if (!xid)
1251                 xid = CURRENT_TIME << 12;
1252
1253         dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1254         task->tk_status = 0;
1255         req->rq_timeout = xprt->timeout;
1256         req->rq_task    = task;
1257         req->rq_xprt    = xprt;
1258         req->rq_xid     = xid++;
1259         if (!xid)
1260                 xid++;
1261         INIT_LIST_HEAD(&req->rq_list);
1262 }
1263
1264 /*
1265  * Release an RPC call slot
1266  */
1267 void
1268 xprt_release(struct rpc_task *task)
1269 {
1270         struct rpc_xprt *xprt = task->tk_xprt;
1271         struct rpc_rqst *req;
1272
1273         if (!(req = task->tk_rqstp))
1274                 return;
1275         spin_lock_bh(&xprt->sock_lock);
1276         __xprt_release_write(xprt, task);
1277         __xprt_put_cong(xprt, req);
1278         if (!list_empty(&req->rq_list))
1279                 list_del(&req->rq_list);
1280         spin_unlock_bh(&xprt->sock_lock);
1281         task->tk_rqstp = NULL;
1282         memset(req, 0, sizeof(*req));   /* mark unused */
1283
1284         dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1285
1286         spin_lock(&xprt->xprt_lock);
1287         req->rq_next = xprt->free;
1288         xprt->free   = req;
1289
1290         xprt_clear_backlog(xprt);
1291         spin_unlock(&xprt->xprt_lock);
1292 }
1293
1294 /*
1295  * Set default timeout parameters
1296  */
1297 void
1298 xprt_default_timeout(struct rpc_timeout *to, int proto)
1299 {
1300         if (proto == IPPROTO_UDP)
1301                 xprt_set_timeout(to, 5,  5 * HZ);
1302         else
1303                 xprt_set_timeout(to, 5, 60 * HZ);
1304 }
1305
1306 /*
1307  * Set constant timeout
1308  */
1309 void
1310 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1311 {
1312         to->to_current   = 
1313         to->to_initval   = 
1314         to->to_increment = incr;
1315         to->to_maxval    = incr * retr;
1316         to->to_resrvval  = incr * retr;
1317         to->to_retries   = retr;
1318         to->to_exponential = 0;
1319 }
1320
1321 /*
1322  * Initialize an RPC client
1323  */
1324 static struct rpc_xprt *
1325 xprt_setup(struct socket *sock, int proto,
1326                         struct sockaddr_in *ap, struct rpc_timeout *to)
1327 {
1328         struct rpc_xprt *xprt;
1329         struct rpc_rqst *req;
1330         int             i;
1331
1332         dprintk("RPC:      setting up %s transport...\n",
1333                                 proto == IPPROTO_UDP? "UDP" : "TCP");
1334
1335         if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1336                 return NULL;
1337         memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1338
1339         xprt->addr = *ap;
1340         xprt->prot = proto;
1341         xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1342         if (xprt->stream) {
1343                 xprt->cwnd = RPC_MAXCWND;
1344                 xprt->nocong = 1;
1345         } else
1346                 xprt->cwnd = RPC_INITCWND;
1347         spin_lock_init(&xprt->sock_lock);
1348         spin_lock_init(&xprt->xprt_lock);
1349         init_waitqueue_head(&xprt->cong_wait);
1350
1351         INIT_LIST_HEAD(&xprt->recv);
1352
1353         /* Set timeout parameters */
1354         if (to) {
1355                 xprt->timeout = *to;
1356                 xprt->timeout.to_current = to->to_initval;
1357                 xprt->timeout.to_resrvval = to->to_maxval << 1;
1358         } else
1359                 xprt_default_timeout(&xprt->timeout, xprt->prot);
1360
1361         INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
1362         INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
1363         INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
1364         INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
1365
1366         /* initialize free list */
1367         for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1368                 req->rq_next = req + 1;
1369         req->rq_next = NULL;
1370         xprt->free = xprt->slot;
1371
1372         dprintk("RPC:      created transport %p\n", xprt);
1373         
1374         xprt_bind_socket(xprt, sock);
1375         return xprt;
1376 }
1377
1378 /*
1379  * Bind to a reserved port
1380  */
1381 static inline int
1382 xprt_bindresvport(struct socket *sock)
1383 {
1384         struct sockaddr_in myaddr;
1385         int             err, port;
1386
1387         memset(&myaddr, 0, sizeof(myaddr));
1388         myaddr.sin_family = AF_INET;
1389         port = 800;
1390         do {
1391                 myaddr.sin_port = htons(port);
1392                 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1393                                                 sizeof(myaddr));
1394         } while (err == -EADDRINUSE && --port > 0);
1395
1396         if (err < 0)
1397                 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1398
1399         return err;
1400 }
1401
1402 static int 
1403 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1404 {
1405         struct sock     *sk = sock->sk;
1406
1407         if (xprt->inet)
1408                 return -EBUSY;
1409
1410         sk->user_data = xprt;
1411         xprt->old_data_ready = sk->data_ready;
1412         xprt->old_state_change = sk->state_change;
1413         xprt->old_write_space = sk->write_space;
1414         if (xprt->prot == IPPROTO_UDP) {
1415                 sk->data_ready = udp_data_ready;
1416                 sk->no_check = UDP_CSUM_NORCV;
1417                 xprt_set_connected(xprt);
1418         } else {
1419                 struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp);
1420                 tp->nonagle = 1;        /* disable Nagle's algorithm */
1421                 sk->data_ready = tcp_data_ready;
1422                 sk->state_change = tcp_state_change;
1423                 xprt_clear_connected(xprt);
1424         }
1425         sk->write_space = xprt_write_space;
1426
1427         /* Reset to new socket */
1428         xprt->sock = sock;
1429         xprt->inet = sk;
1430         /*
1431          *      TCP requires the rpc I/O daemon is present
1432          */
1433         if(xprt->stream)
1434                 rpciod_up();
1435
1436         return 0;
1437 }
1438
1439 /*
1440  * Set socket buffer length
1441  */
1442 void
1443 xprt_sock_setbufsize(struct rpc_xprt *xprt)
1444 {
1445         struct sock *sk = xprt->inet;
1446
1447         if (xprt->stream)
1448                 return;
1449         if (xprt->rcvsize) {
1450                 sk->userlocks |= SOCK_RCVBUF_LOCK;
1451                 sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
1452         }
1453         if (xprt->sndsize) {
1454                 sk->userlocks |= SOCK_SNDBUF_LOCK;
1455                 sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
1456                 sk->write_space(sk);
1457         }
1458 }
1459
1460 /*
1461  * Create a client socket given the protocol and peer address.
1462  */
1463 static struct socket *
1464 xprt_create_socket(int proto, struct rpc_timeout *to)
1465 {
1466         struct socket   *sock;
1467         int             type, err;
1468
1469         dprintk("RPC:      xprt_create_socket(%s %d)\n",
1470                            (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1471
1472         type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1473
1474         if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1475                 printk("RPC: can't create socket (%d).\n", -err);
1476                 goto failed;
1477         }
1478
1479         /* If the caller has the capability, bind to a reserved port */
1480         if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)
1481                 goto failed;
1482
1483         return sock;
1484
1485 failed:
1486         sock_release(sock);
1487         return NULL;
1488 }
1489
1490 /*
1491  * Create an RPC client transport given the protocol and peer address.
1492  */
1493 struct rpc_xprt *
1494 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1495 {
1496         struct socket   *sock;
1497         struct rpc_xprt *xprt;
1498
1499         dprintk("RPC:      xprt_create_proto called\n");
1500
1501         if (!(sock = xprt_create_socket(proto, to)))
1502                 return NULL;
1503
1504         if (!(xprt = xprt_setup(sock, proto, sap, to)))
1505                 sock_release(sock);
1506
1507         return xprt;
1508 }
1509
1510 /*
1511  * Prepare for transport shutdown.
1512  */
1513 void
1514 xprt_shutdown(struct rpc_xprt *xprt)
1515 {
1516         xprt->shutdown = 1;
1517         rpc_wake_up(&xprt->sending);
1518         rpc_wake_up(&xprt->resend);
1519         rpc_wake_up(&xprt->pending);
1520         rpc_wake_up(&xprt->backlog);
1521         if (waitqueue_active(&xprt->cong_wait))
1522                 wake_up(&xprt->cong_wait);
1523 }
1524
1525 /*
1526  * Clear the xprt backlog queue
1527  */
1528 int
1529 xprt_clear_backlog(struct rpc_xprt *xprt) {
1530         rpc_wake_up_next(&xprt->backlog);
1531         if (waitqueue_active(&xprt->cong_wait))
1532                 wake_up(&xprt->cong_wait);
1533         return 1;
1534 }
1535
1536 /*
1537  * Destroy an RPC transport, killing off all requests.
1538  */
1539 int
1540 xprt_destroy(struct rpc_xprt *xprt)
1541 {
1542         dprintk("RPC:      destroying transport %p\n", xprt);
1543         xprt_shutdown(xprt);
1544         xprt_close(xprt);
1545         kfree(xprt);
1546
1547         return 0;
1548 }