[PATCH] RPC: Reduce stack utilization in xs_sendpages
[powerpc.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
7  * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  */
15
16 #include <linux/types.h>
17 #include <linux/slab.h>
18 #include <linux/capability.h>
19 #include <linux/sched.h>
20 #include <linux/pagemap.h>
21 #include <linux/errno.h>
22 #include <linux/socket.h>
23 #include <linux/in.h>
24 #include <linux/net.h>
25 #include <linux/mm.h>
26 #include <linux/udp.h>
27 #include <linux/tcp.h>
28 #include <linux/sunrpc/clnt.h>
29 #include <linux/file.h>
30
31 #include <net/sock.h>
32 #include <net/checksum.h>
33 #include <net/udp.h>
34 #include <net/tcp.h>
35
36 /*
37  * Maximum port number to use when requesting a reserved port.
38  */
39 #define XS_MAX_RESVPORT         (800U)
40
41 #ifdef RPC_DEBUG
42 # undef  RPC_DEBUG_DATA
43 # define RPCDBG_FACILITY        RPCDBG_TRANS
44 #endif
45
46 #ifdef RPC_DEBUG_DATA
47 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
48 {
49         u8 *buf = (u8 *) packet;
50         int j;
51
52         dprintk("RPC:      %s\n", msg);
53         for (j = 0; j < count && j < 128; j += 4) {
54                 if (!(j & 31)) {
55                         if (j)
56                                 dprintk("\n");
57                         dprintk("0x%04x ", j);
58                 }
59                 dprintk("%02x%02x%02x%02x ",
60                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
61         }
62         dprintk("\n");
63 }
64 #else
65 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
66 {
67         /* NOP */
68 }
69 #endif
70
71 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
72
73 static inline int xs_send_head(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, unsigned int len)
74 {
75         struct kvec iov = {
76                 .iov_base       = xdr->head[0].iov_base + base,
77                 .iov_len        = len - base,
78         };
79         struct msghdr msg = {
80                 .msg_name       = addr,
81                 .msg_namelen    = addrlen,
82                 .msg_flags      = XS_SENDMSG_FLAGS,
83         };
84
85         if (xdr->len > len)
86                 msg.msg_flags |= MSG_MORE;
87
88         if (likely(iov.iov_len))
89                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
90         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
91 }
92
93 static int xs_send_tail(struct socket *sock, struct xdr_buf *xdr, unsigned int base, unsigned int len)
94 {
95         struct kvec iov = {
96                 .iov_base       = xdr->tail[0].iov_base + base,
97                 .iov_len        = len - base,
98         };
99         struct msghdr msg = {
100                 .msg_flags      = XS_SENDMSG_FLAGS,
101         };
102
103         return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
104 }
105
106 /**
107  * xs_sendpages - write pages directly to a socket
108  * @sock: socket to send on
109  * @addr: UDP only -- address of destination
110  * @addrlen: UDP only -- length of destination address
111  * @xdr: buffer containing this request
112  * @base: starting position in the buffer
113  *
114  */
115 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
116 {
117         struct page **ppage = xdr->pages;
118         unsigned int len, pglen = xdr->page_len;
119         int err, ret = 0;
120         ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);
121
122         len = xdr->head[0].iov_len;
123         if (base < len || (addr != NULL && base == 0)) {
124                 err = xs_send_head(sock, addr, addrlen, xdr, base, len);
125                 if (ret == 0)
126                         ret = err;
127                 else if (err > 0)
128                         ret += err;
129                 if (err != (len - base))
130                         goto out;
131                 base = 0;
132         } else
133                 base -= len;
134
135         if (unlikely(pglen == 0))
136                 goto copy_tail;
137         if (unlikely(base >= pglen)) {
138                 base -= pglen;
139                 goto copy_tail;
140         }
141         if (base || xdr->page_base) {
142                 pglen -= base;
143                 base += xdr->page_base;
144                 ppage += base >> PAGE_CACHE_SHIFT;
145                 base &= ~PAGE_CACHE_MASK;
146         }
147
148         sendpage = sock->ops->sendpage ? : sock_no_sendpage;
149         do {
150                 int flags = XS_SENDMSG_FLAGS;
151
152                 len = PAGE_CACHE_SIZE;
153                 if (base)
154                         len -= base;
155                 if (pglen < len)
156                         len = pglen;
157
158                 if (pglen != len || xdr->tail[0].iov_len != 0)
159                         flags |= MSG_MORE;
160
161                 /* Hmm... We might be dealing with highmem pages */
162                 if (PageHighMem(*ppage))
163                         sendpage = sock_no_sendpage;
164                 err = sendpage(sock, *ppage, base, len, flags);
165                 if (ret == 0)
166                         ret = err;
167                 else if (err > 0)
168                         ret += err;
169                 if (err != len)
170                         goto out;
171                 base = 0;
172                 ppage++;
173         } while ((pglen -= len) != 0);
174 copy_tail:
175         len = xdr->tail[0].iov_len;
176         if (base < len) {
177                 err = xs_send_tail(sock, xdr, base, len);
178                 if (ret == 0)
179                         ret = err;
180                 else if (err > 0)
181                         ret += err;
182         }
183 out:
184         return ret;
185 }
186
187 /**
188  * xs_sendmsg - write an RPC request to a socket
189  * @xprt: generic transport
190  * @req: the RPC request to write
191  *
192  */
193 static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
194 {
195         struct socket *sock = xprt->sock;
196         struct xdr_buf *xdr = &req->rq_snd_buf;
197         struct sockaddr *addr = NULL;
198         int addrlen = 0;
199         unsigned int skip;
200         int result;
201
202         if (!sock)
203                 return -ENOTCONN;
204
205         xs_pktdump("packet data:",
206                                 req->rq_svec->iov_base,
207                                 req->rq_svec->iov_len);
208
209         /* For UDP, we need to provide an address */
210         if (!xprt->stream) {
211                 addr = (struct sockaddr *) &xprt->addr;
212                 addrlen = sizeof(xprt->addr);
213         }
214         /* Don't repeat bytes */
215         skip = req->rq_bytes_sent;
216
217         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
218         result = xs_sendpages(sock, addr, addrlen, xdr, skip);
219
220         dprintk("RPC:      xs_sendmsg(%d) = %d\n", xdr->len - skip, result);
221
222         if (result >= 0)
223                 return result;
224
225         switch (result) {
226         case -ECONNREFUSED:
227                 /* When the server has died, an ICMP port unreachable message
228                  * prompts ECONNREFUSED. */
229         case -EAGAIN:
230                 break;
231         case -ECONNRESET:
232         case -ENOTCONN:
233         case -EPIPE:
234                 /* connection broken */
235                 if (xprt->stream)
236                         result = -ENOTCONN;
237                 break;
238         default:
239                 break;
240         }
241         return result;
242 }
243
244 /**
245  * xs_send_request - write an RPC request to a socket
246  * @task: address of RPC task that manages the state of an RPC request
247  *
248  * Return values:
249  *      0:  The request has been sent
250  * EAGAIN:  The socket was blocked, please call again later to
251  *          complete the request
252  *  other:  Some other error occured, the request was not sent
253  *
254  * XXX: In the case of soft timeouts, should we eventually give up
255  *      if the socket is not able to make progress?
256  */
257 static int xs_send_request(struct rpc_task *task)
258 {
259         struct rpc_rqst *req = task->tk_rqstp;
260         struct rpc_xprt *xprt = req->rq_xprt;
261         int status, retry = 0;
262
263         /* set up everything as needed. */
264         /* Write the record marker */
265         if (xprt->stream) {
266                 u32 *marker = req->rq_svec[0].iov_base;
267
268                 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
269         }
270
271         /* Continue transmitting the packet/record. We must be careful
272          * to cope with writespace callbacks arriving _after_ we have
273          * called sendmsg().
274          */
275         while (1) {
276                 req->rq_xtime = jiffies;
277                 status = xs_sendmsg(xprt, req);
278
279                 if (status < 0)
280                         break;
281
282                 if (xprt->stream) {
283                         req->rq_bytes_sent += status;
284
285                         /* If we've sent the entire packet, immediately
286                          * reset the count of bytes sent. */
287                         if (req->rq_bytes_sent >= req->rq_slen) {
288                                 req->rq_bytes_sent = 0;
289                                 return 0;
290                         }
291                 } else {
292                         if (status >= req->rq_slen)
293                                 return 0;
294                         status = -EAGAIN;
295                         break;
296                 }
297
298                 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
299                                 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
300                                 req->rq_slen);
301
302                 status = -EAGAIN;
303                 if (retry++ > 50)
304                         break;
305         }
306
307         if (status == -EAGAIN) {
308                 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
309                         /* Protect against races with xs_write_space */
310                         spin_lock_bh(&xprt->sock_lock);
311                         /* Don't race with disconnect */
312                         if (!xprt_connected(xprt))
313                                 task->tk_status = -ENOTCONN;
314                         else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
315                                 task->tk_timeout = req->rq_timeout;
316                                 rpc_sleep_on(&xprt->pending, task, NULL, NULL);
317                         }
318                         spin_unlock_bh(&xprt->sock_lock);
319                         return status;
320                 }
321                 /* Keep holding the socket if it is blocked */
322                 rpc_delay(task, HZ>>4);
323         }
324         return status;
325 }
326
327 /**
328  * xs_close - close a socket
329  * @xprt: transport
330  *
331  */
332 static void xs_close(struct rpc_xprt *xprt)
333 {
334         struct socket *sock = xprt->sock;
335         struct sock *sk = xprt->inet;
336
337         if (!sk)
338                 return;
339
340         dprintk("RPC:      xs_close xprt %p\n", xprt);
341
342         write_lock_bh(&sk->sk_callback_lock);
343         xprt->inet = NULL;
344         xprt->sock = NULL;
345
346         sk->sk_user_data = NULL;
347         sk->sk_data_ready = xprt->old_data_ready;
348         sk->sk_state_change = xprt->old_state_change;
349         sk->sk_write_space = xprt->old_write_space;
350         write_unlock_bh(&sk->sk_callback_lock);
351
352         sk->sk_no_check = 0;
353
354         sock_release(sock);
355 }
356
357 /**
358  * xs_destroy - prepare to shutdown a transport
359  * @xprt: doomed transport
360  *
361  */
362 static void xs_destroy(struct rpc_xprt *xprt)
363 {
364         dprintk("RPC:      xs_destroy xprt %p\n", xprt);
365
366         cancel_delayed_work(&xprt->sock_connect);
367         flush_scheduled_work();
368
369         xprt_disconnect(xprt);
370         xs_close(xprt);
371         kfree(xprt->slot);
372 }
373
374 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
375 {
376         return (struct rpc_xprt *) sk->sk_user_data;
377 }
378
379 /**
380  * xs_udp_data_ready - "data ready" callback for UDP sockets
381  * @sk: socket with data to read
382  * @len: how much data to read
383  *
384  */
385 static void xs_udp_data_ready(struct sock *sk, int len)
386 {
387         struct rpc_task *task;
388         struct rpc_xprt *xprt;
389         struct rpc_rqst *rovr;
390         struct sk_buff *skb;
391         int err, repsize, copied;
392         u32 _xid, *xp;
393
394         read_lock(&sk->sk_callback_lock);
395         dprintk("RPC:      xs_udp_data_ready...\n");
396         if (!(xprt = xprt_from_sock(sk)))
397                 goto out;
398
399         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
400                 goto out;
401
402         if (xprt->shutdown)
403                 goto dropit;
404
405         repsize = skb->len - sizeof(struct udphdr);
406         if (repsize < 4) {
407                 dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
408                 goto dropit;
409         }
410
411         /* Copy the XID from the skb... */
412         xp = skb_header_pointer(skb, sizeof(struct udphdr),
413                                 sizeof(_xid), &_xid);
414         if (xp == NULL)
415                 goto dropit;
416
417         /* Look up and lock the request corresponding to the given XID */
418         spin_lock(&xprt->sock_lock);
419         rovr = xprt_lookup_rqst(xprt, *xp);
420         if (!rovr)
421                 goto out_unlock;
422         task = rovr->rq_task;
423
424         dprintk("RPC: %4d received reply\n", task->tk_pid);
425
426         if ((copied = rovr->rq_private_buf.buflen) > repsize)
427                 copied = repsize;
428
429         /* Suck it into the iovec, verify checksum if not done by hw. */
430         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
431                 goto out_unlock;
432
433         /* Something worked... */
434         dst_confirm(skb->dst);
435
436         xprt_complete_rqst(xprt, rovr, copied);
437
438  out_unlock:
439         spin_unlock(&xprt->sock_lock);
440  dropit:
441         skb_free_datagram(sk, skb);
442  out:
443         read_unlock(&sk->sk_callback_lock);
444 }
445
446 static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
447 {
448         if (len > desc->count)
449                 len = desc->count;
450         if (skb_copy_bits(desc->skb, desc->offset, p, len)) {
451                 dprintk("RPC:      failed to copy %zu bytes from skb. %zu bytes remain\n",
452                                 len, desc->count);
453                 return 0;
454         }
455         desc->offset += len;
456         desc->count -= len;
457         dprintk("RPC:      copied %zu bytes from skb. %zu bytes remain\n",
458                         len, desc->count);
459         return len;
460 }
461
462 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
463 {
464         size_t len, used;
465         char *p;
466
467         p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
468         len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
469         used = xs_tcp_copy_data(desc, p, len);
470         xprt->tcp_offset += used;
471         if (used != len)
472                 return;
473         xprt->tcp_reclen = ntohl(xprt->tcp_recm);
474         if (xprt->tcp_reclen & 0x80000000)
475                 xprt->tcp_flags |= XPRT_LAST_FRAG;
476         else
477                 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
478         xprt->tcp_reclen &= 0x7fffffff;
479         xprt->tcp_flags &= ~XPRT_COPY_RECM;
480         xprt->tcp_offset = 0;
481         /* Sanity check of the record length */
482         if (xprt->tcp_reclen < 4) {
483                 dprintk("RPC:      invalid TCP record fragment length\n");
484                 xprt_disconnect(xprt);
485                 return;
486         }
487         dprintk("RPC:      reading TCP record fragment of length %d\n",
488                         xprt->tcp_reclen);
489 }
490
491 static void xs_tcp_check_recm(struct rpc_xprt *xprt)
492 {
493         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
494                         xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
495         if (xprt->tcp_offset == xprt->tcp_reclen) {
496                 xprt->tcp_flags |= XPRT_COPY_RECM;
497                 xprt->tcp_offset = 0;
498                 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
499                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
500                         xprt->tcp_flags |= XPRT_COPY_XID;
501                         xprt->tcp_copied = 0;
502                 }
503         }
504 }
505
506 static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
507 {
508         size_t len, used;
509         char *p;
510
511         len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
512         dprintk("RPC:      reading XID (%Zu bytes)\n", len);
513         p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
514         used = xs_tcp_copy_data(desc, p, len);
515         xprt->tcp_offset += used;
516         if (used != len)
517                 return;
518         xprt->tcp_flags &= ~XPRT_COPY_XID;
519         xprt->tcp_flags |= XPRT_COPY_DATA;
520         xprt->tcp_copied = 4;
521         dprintk("RPC:      reading reply for XID %08x\n",
522                                                 ntohl(xprt->tcp_xid));
523         xs_tcp_check_recm(xprt);
524 }
525
526 static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
527 {
528         struct rpc_rqst *req;
529         struct xdr_buf *rcvbuf;
530         size_t len;
531         ssize_t r;
532
533         /* Find and lock the request corresponding to this xid */
534         spin_lock(&xprt->sock_lock);
535         req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
536         if (!req) {
537                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
538                 dprintk("RPC:      XID %08x request not found!\n",
539                                 ntohl(xprt->tcp_xid));
540                 spin_unlock(&xprt->sock_lock);
541                 return;
542         }
543
544         rcvbuf = &req->rq_private_buf;
545         len = desc->count;
546         if (len > xprt->tcp_reclen - xprt->tcp_offset) {
547                 skb_reader_t my_desc;
548
549                 len = xprt->tcp_reclen - xprt->tcp_offset;
550                 memcpy(&my_desc, desc, sizeof(my_desc));
551                 my_desc.count = len;
552                 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
553                                           &my_desc, xs_tcp_copy_data);
554                 desc->count -= r;
555                 desc->offset += r;
556         } else
557                 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
558                                           desc, xs_tcp_copy_data);
559
560         if (r > 0) {
561                 xprt->tcp_copied += r;
562                 xprt->tcp_offset += r;
563         }
564         if (r != len) {
565                 /* Error when copying to the receive buffer,
566                  * usually because we weren't able to allocate
567                  * additional buffer pages. All we can do now
568                  * is turn off XPRT_COPY_DATA, so the request
569                  * will not receive any additional updates,
570                  * and time out.
571                  * Any remaining data from this record will
572                  * be discarded.
573                  */
574                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
575                 dprintk("RPC:      XID %08x truncated request\n",
576                                 ntohl(xprt->tcp_xid));
577                 dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
578                                 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
579                 goto out;
580         }
581
582         dprintk("RPC:      XID %08x read %Zd bytes\n",
583                         ntohl(xprt->tcp_xid), r);
584         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
585                         xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
586
587         if (xprt->tcp_copied == req->rq_private_buf.buflen)
588                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
589         else if (xprt->tcp_offset == xprt->tcp_reclen) {
590                 if (xprt->tcp_flags & XPRT_LAST_FRAG)
591                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
592         }
593
594 out:
595         if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
596                 dprintk("RPC: %4d received reply complete\n",
597                                 req->rq_task->tk_pid);
598                 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
599         }
600         spin_unlock(&xprt->sock_lock);
601         xs_tcp_check_recm(xprt);
602 }
603
604 static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
605 {
606         size_t len;
607
608         len = xprt->tcp_reclen - xprt->tcp_offset;
609         if (len > desc->count)
610                 len = desc->count;
611         desc->count -= len;
612         desc->offset += len;
613         xprt->tcp_offset += len;
614         dprintk("RPC:      discarded %Zu bytes\n", len);
615         xs_tcp_check_recm(xprt);
616 }
617
618 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
619 {
620         struct rpc_xprt *xprt = rd_desc->arg.data;
621         skb_reader_t desc = {
622                 .skb    = skb,
623                 .offset = offset,
624                 .count  = len,
625                 .csum   = 0
626         };
627
628         dprintk("RPC:      xs_tcp_data_recv started\n");
629         do {
630                 /* Read in a new fragment marker if necessary */
631                 /* Can we ever really expect to get completely empty fragments? */
632                 if (xprt->tcp_flags & XPRT_COPY_RECM) {
633                         xs_tcp_read_fraghdr(xprt, &desc);
634                         continue;
635                 }
636                 /* Read in the xid if necessary */
637                 if (xprt->tcp_flags & XPRT_COPY_XID) {
638                         xs_tcp_read_xid(xprt, &desc);
639                         continue;
640                 }
641                 /* Read in the request data */
642                 if (xprt->tcp_flags & XPRT_COPY_DATA) {
643                         xs_tcp_read_request(xprt, &desc);
644                         continue;
645                 }
646                 /* Skip over any trailing bytes on short reads */
647                 xs_tcp_read_discard(xprt, &desc);
648         } while (desc.count);
649         dprintk("RPC:      xs_tcp_data_recv done\n");
650         return len - desc.count;
651 }
652
653 /**
654  * xs_tcp_data_ready - "data ready" callback for TCP sockets
655  * @sk: socket with data to read
656  * @bytes: how much data to read
657  *
658  */
659 static void xs_tcp_data_ready(struct sock *sk, int bytes)
660 {
661         struct rpc_xprt *xprt;
662         read_descriptor_t rd_desc;
663
664         read_lock(&sk->sk_callback_lock);
665         dprintk("RPC:      xs_tcp_data_ready...\n");
666         if (!(xprt = xprt_from_sock(sk)))
667                 goto out;
668         if (xprt->shutdown)
669                 goto out;
670
671         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
672         rd_desc.arg.data = xprt;
673         rd_desc.count = 65536;
674         tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
675 out:
676         read_unlock(&sk->sk_callback_lock);
677 }
678
679 /**
680  * xs_tcp_state_change - callback to handle TCP socket state changes
681  * @sk: socket whose state has changed
682  *
683  */
684 static void xs_tcp_state_change(struct sock *sk)
685 {
686         struct rpc_xprt *xprt;
687
688         read_lock(&sk->sk_callback_lock);
689         if (!(xprt = xprt_from_sock(sk)))
690                 goto out;
691         dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
692         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
693                                 sk->sk_state, xprt_connected(xprt),
694                                 sock_flag(sk, SOCK_DEAD),
695                                 sock_flag(sk, SOCK_ZAPPED));
696
697         switch (sk->sk_state) {
698         case TCP_ESTABLISHED:
699                 spin_lock_bh(&xprt->sock_lock);
700                 if (!xprt_test_and_set_connected(xprt)) {
701                         /* Reset TCP record info */
702                         xprt->tcp_offset = 0;
703                         xprt->tcp_reclen = 0;
704                         xprt->tcp_copied = 0;
705                         xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
706                         rpc_wake_up(&xprt->pending);
707                 }
708                 spin_unlock_bh(&xprt->sock_lock);
709                 break;
710         case TCP_SYN_SENT:
711         case TCP_SYN_RECV:
712                 break;
713         default:
714                 xprt_disconnect(xprt);
715                 break;
716         }
717  out:
718         read_unlock(&sk->sk_callback_lock);
719 }
720
721 /**
722  * xs_write_space - callback invoked when socket buffer space becomes
723  *                         available
724  * @sk: socket whose state has changed
725  *
726  * Called when more output buffer space is available for this socket.
727  * We try not to wake our writers until they can make "significant"
728  * progress, otherwise we'll waste resources thrashing sock_sendmsg
729  * with a bunch of small requests.
730  */
731 static void xs_write_space(struct sock *sk)
732 {
733         struct rpc_xprt *xprt;
734         struct socket *sock;
735
736         read_lock(&sk->sk_callback_lock);
737         if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
738                 goto out;
739         if (xprt->shutdown)
740                 goto out;
741
742         /* Wait until we have enough socket memory */
743         if (xprt->stream) {
744                 /* from net/core/stream.c:sk_stream_write_space */
745                 if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk))
746                         goto out;
747         } else {
748                 /* from net/core/sock.c:sock_def_write_space */
749                 if (!sock_writeable(sk))
750                         goto out;
751         }
752
753         if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
754                 goto out;
755
756         spin_lock_bh(&xprt->sock_lock);
757         if (xprt->snd_task)
758                 rpc_wake_up_task(xprt->snd_task);
759         spin_unlock_bh(&xprt->sock_lock);
760 out:
761         read_unlock(&sk->sk_callback_lock);
762 }
763
764 /**
765  * xs_set_buffer_size - set send and receive limits
766  * @xprt: generic transport
767  *
768  * Set socket send and receive limits based on the
769  * sndsize and rcvsize fields in the generic transport
770  * structure. This applies only to UDP sockets.
771  */
772 static void xs_set_buffer_size(struct rpc_xprt *xprt)
773 {
774         struct sock *sk = xprt->inet;
775
776         if (xprt->stream)
777                 return;
778         if (xprt->rcvsize) {
779                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
780                 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;
781         }
782         if (xprt->sndsize) {
783                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
784                 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
785                 sk->sk_write_space(sk);
786         }
787 }
788
789 static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
790 {
791         struct sockaddr_in myaddr = {
792                 .sin_family = AF_INET,
793         };
794         int err, port;
795
796         /* Were we already bound to a given port? Try to reuse it */
797         port = xprt->port;
798         do {
799                 myaddr.sin_port = htons(port);
800                 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
801                                                 sizeof(myaddr));
802                 if (err == 0) {
803                         xprt->port = port;
804                         dprintk("RPC:      xs_bindresvport bound to port %u\n",
805                                         port);
806                         return 0;
807                 }
808                 if (--port == 0)
809                         port = XS_MAX_RESVPORT;
810         } while (err == -EADDRINUSE && port != xprt->port);
811
812         dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
813         return err;
814 }
815
816 static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
817 {
818         struct socket *sock;
819         int type, err;
820
821         dprintk("RPC:      xs_create(%s %d)\n",
822                            (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
823
824         type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
825
826         if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
827                 dprintk("RPC:      can't create socket (%d).\n", -err);
828                 return NULL;
829         }
830
831         /* If the caller has the capability, bind to a reserved port */
832         if (resvport && xs_bindresvport(xprt, sock) < 0)
833                 goto failed;
834
835         return sock;
836
837 failed:
838         sock_release(sock);
839         return NULL;
840 }
841
842 static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
843 {
844         struct sock *sk = sock->sk;
845
846         if (xprt->inet)
847                 return;
848
849         write_lock_bh(&sk->sk_callback_lock);
850         sk->sk_user_data = xprt;
851         xprt->old_data_ready = sk->sk_data_ready;
852         xprt->old_state_change = sk->sk_state_change;
853         xprt->old_write_space = sk->sk_write_space;
854         if (xprt->prot == IPPROTO_UDP) {
855                 sk->sk_data_ready = xs_udp_data_ready;
856                 sk->sk_no_check = UDP_CSUM_NORCV;
857                 xprt_set_connected(xprt);
858         } else {
859                 tcp_sk(sk)->nonagle = 1;        /* disable Nagle's algorithm */
860                 sk->sk_data_ready = xs_tcp_data_ready;
861                 sk->sk_state_change = xs_tcp_state_change;
862                 xprt_clear_connected(xprt);
863         }
864         sk->sk_write_space = xs_write_space;
865
866         /* Reset to new socket */
867         xprt->sock = sock;
868         xprt->inet = sk;
869         write_unlock_bh(&sk->sk_callback_lock);
870
871         return;
872 }
873
874 /**
875  * xs_connect_worker - try to connect a socket to a remote endpoint
876  * @args: RPC transport to connect
877  *
878  * Invoked by a work queue tasklet.
879  */
880 static void xs_connect_worker(void *args)
881 {
882         struct rpc_xprt *xprt = (struct rpc_xprt *)args;
883         struct socket *sock = xprt->sock;
884         int status = -EIO;
885
886         if (xprt->shutdown || xprt->addr.sin_port == 0)
887                 goto out;
888
889         dprintk("RPC:      xs_connect_worker xprt %p\n", xprt);
890
891         /*
892          * Start by resetting any existing state
893          */
894         xs_close(xprt);
895         sock = xs_create(xprt, xprt->prot, xprt->resvport);
896         if (sock == NULL) {
897                 /* couldn't create socket or bind to reserved port;
898                  * this is likely a permanent error, so cause an abort */
899                 goto out;
900         }
901         xs_bind(xprt, sock);
902         xs_set_buffer_size(xprt);
903
904         status = 0;
905         if (!xprt->stream)
906                 goto out;
907
908         /*
909          * Tell the socket layer to start connecting...
910          */
911         status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
912                         sizeof(xprt->addr), O_NONBLOCK);
913         dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
914                         xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
915         if (status < 0) {
916                 switch (status) {
917                         case -EINPROGRESS:
918                         case -EALREADY:
919                                 goto out_clear;
920                 }
921         }
922 out:
923         if (status < 0)
924                 rpc_wake_up_status(&xprt->pending, status);
925         else
926                 rpc_wake_up(&xprt->pending);
927 out_clear:
928         smp_mb__before_clear_bit();
929         clear_bit(XPRT_CONNECTING, &xprt->sockstate);
930         smp_mb__after_clear_bit();
931 }
932
933 /**
934  * xs_connect - connect a socket to a remote endpoint
935  * @task: address of RPC task that manages state of connect request
936  *
937  * TCP: If the remote end dropped the connection, delay reconnecting.
938  */
939 static void xs_connect(struct rpc_task *task)
940 {
941         struct rpc_xprt *xprt = task->tk_xprt;
942
943         if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
944                 if (xprt->sock != NULL) {
945                         dprintk("RPC:      xs_connect delayed xprt %p\n", xprt);
946                         schedule_delayed_work(&xprt->sock_connect,
947                                         RPC_REESTABLISH_TIMEOUT);
948                 } else {
949                         dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
950                         schedule_work(&xprt->sock_connect);
951                         /* flush_scheduled_work can sleep... */
952                         if (!RPC_IS_ASYNC(task))
953                                 flush_scheduled_work();
954                 }
955         }
956 }
957
958 static struct rpc_xprt_ops xs_ops = {
959         .set_buffer_size        = xs_set_buffer_size,
960         .connect                = xs_connect,
961         .send_request           = xs_send_request,
962         .close                  = xs_close,
963         .destroy                = xs_destroy,
964 };
965
966 extern unsigned int xprt_udp_slot_table_entries;
967 extern unsigned int xprt_tcp_slot_table_entries;
968
969 /**
970  * xs_setup_udp - Set up transport to use a UDP socket
971  * @xprt: transport to set up
972  * @to:   timeout parameters
973  *
974  */
975 int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
976 {
977         size_t slot_table_size;
978
979         dprintk("RPC:      setting up udp-ipv4 transport...\n");
980
981         xprt->max_reqs = xprt_udp_slot_table_entries;
982         slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
983         xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);
984         if (xprt->slot == NULL)
985                 return -ENOMEM;
986         memset(xprt->slot, 0, slot_table_size);
987
988         xprt->prot = IPPROTO_UDP;
989         xprt->port = XS_MAX_RESVPORT;
990         xprt->stream = 0;
991         xprt->nocong = 0;
992         xprt->cwnd = RPC_INITCWND;
993         xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
994         /* XXX: header size can vary due to auth type, IPv6, etc. */
995         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
996
997         INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
998
999         xprt->ops = &xs_ops;
1000
1001         if (to)
1002                 xprt->timeout = *to;
1003         else
1004                 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1005
1006         return 0;
1007 }
1008
1009 /**
1010  * xs_setup_tcp - Set up transport to use a TCP socket
1011  * @xprt: transport to set up
1012  * @to: timeout parameters
1013  *
1014  */
1015 int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1016 {
1017         size_t slot_table_size;
1018
1019         dprintk("RPC:      setting up tcp-ipv4 transport...\n");
1020
1021         xprt->max_reqs = xprt_tcp_slot_table_entries;
1022         slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
1023         xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);
1024         if (xprt->slot == NULL)
1025                 return -ENOMEM;
1026         memset(xprt->slot, 0, slot_table_size);
1027
1028         xprt->prot = IPPROTO_TCP;
1029         xprt->port = XS_MAX_RESVPORT;
1030         xprt->stream = 1;
1031         xprt->nocong = 1;
1032         xprt->cwnd = RPC_MAXCWND(xprt);
1033         xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1034         xprt->max_payload = (1U << 31) - 1;
1035
1036         INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
1037
1038         xprt->ops = &xs_ops;
1039
1040         if (to)
1041                 xprt->timeout = *to;
1042         else
1043                 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1044
1045         return 0;
1046 }