X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=net%2Fsunrpc%2Fsvcsock.c;h=cba85d195222e9603be9c29c68628b113fd5582a;hb=b4a9071af62f95dc6d22040a0b37ac7225ce4d54;hp=a27905a0ad277a4322d291e333c479794e3383de;hpb=9d8f057acbd38d8177cf2ffd5e151d52c2477372;p=powerpc.git diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index a27905a0ad..cba85d1952 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -45,13 +46,16 @@ /* SMP locking strategy: * - * svc_serv->sv_lock protects most stuff for that service. + * svc_pool->sp_lock protects most of the fields of that pool. + * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. + * when both need to be taken (rare), svc_serv->sv_lock is first. + * BKL protects svc_serv->sv_nrthread. + * svc_sock->sk_defer_lock protects the svc_sock->sk_deferred list + * svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply. * * Some flags can be set to certain values at any time * providing that certain rules are followed: * - * SK_BUSY can be set to 0 at any time. - * svc_sock_enqueue must be called afterwards * SK_CONN, SK_DATA, can be set or cleared at any time. * after a set, svc_sock_enqueue must be called. * after a clear, the socket must be read/accepted @@ -73,23 +77,30 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk); static int svc_deferred_recv(struct svc_rqst *rqstp); static struct cache_deferred_req *svc_defer(struct cache_req *req); +/* apparently the "standard" is that clients close + * idle connections after 5 minutes, servers after + * 6 minutes + * http://www.connectathon.org/talks96/nfstcp.pdf + */ +static int svc_conn_age_period = 6*60; + /* - * Queue up an idle server thread. Must have serv->sv_lock held. + * Queue up an idle server thread. Must have pool->sp_lock held. * Note: this is really a stack rather than a queue, so that we only - * use as many different threads as we need, and the rest don't polute + * use as many different threads as we need, and the rest don't pollute * the cache. */ static inline void -svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) { - list_add(&rqstp->rq_list, &serv->sv_threads); + list_add(&rqstp->rq_list, &pool->sp_threads); } /* - * Dequeue an nfsd thread. Must have serv->sv_lock held. + * Dequeue an nfsd thread. Must have pool->sp_lock held. */ static inline void -svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) { list_del(&rqstp->rq_list); } @@ -140,7 +151,9 @@ static void svc_sock_enqueue(struct svc_sock *svsk) { struct svc_serv *serv = svsk->sk_server; + struct svc_pool *pool; struct svc_rqst *rqstp; + int cpu; if (!(svsk->sk_flags & ( (1<sk_flags)) return; - spin_lock_bh(&serv->sv_lock); + cpu = get_cpu(); + pool = svc_pool_for_cpu(svsk->sk_server, cpu); + put_cpu(); + + spin_lock_bh(&pool->sp_lock); - if (!list_empty(&serv->sv_threads) && - !list_empty(&serv->sv_sockets)) + if (!list_empty(&pool->sp_threads) && + !list_empty(&pool->sp_sockets)) printk(KERN_ERR "svc_sock_enqueue: threads and sockets both waiting??\n"); @@ -161,73 +178,79 @@ svc_sock_enqueue(struct svc_sock *svsk) goto out_unlock; } - if (test_bit(SK_BUSY, &svsk->sk_flags)) { - /* Don't enqueue socket while daemon is receiving */ + /* Mark socket as busy. It will remain in this state until the + * server has processed all pending data and put the socket back + * on the idle list. We update SK_BUSY atomically because + * it also guards against trying to enqueue the svc_sock twice. + */ + if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) { + /* Don't enqueue socket while already enqueued */ dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk); goto out_unlock; } + BUG_ON(svsk->sk_pool != NULL); + svsk->sk_pool = pool; set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - if (((svsk->sk_reserved + serv->sv_bufsz)*2 + if (((atomic_read(&svsk->sk_reserved) + serv->sv_bufsz)*2 > svc_sock_wspace(svsk)) && !test_bit(SK_CLOSE, &svsk->sk_flags) && !test_bit(SK_CONN, &svsk->sk_flags)) { /* Don't enqueue while not enough space for reply */ dprintk("svc: socket %p no space, %d*2 > %ld, not enqueued\n", - svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz, + svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_bufsz, svc_sock_wspace(svsk)); + svsk->sk_pool = NULL; + clear_bit(SK_BUSY, &svsk->sk_flags); goto out_unlock; } clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - /* Mark socket as busy. It will remain in this state until the - * server has processed all pending data and put the socket back - * on the idle list. - */ - set_bit(SK_BUSY, &svsk->sk_flags); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, struct svc_rqst, rq_list); dprintk("svc: socket %p served by daemon %p\n", svsk->sk_sk, rqstp); - svc_serv_dequeue(serv, rqstp); + svc_thread_dequeue(pool, rqstp); if (rqstp->rq_sock) printk(KERN_ERR "svc_sock_enqueue: server %p, rq_sock=%p!\n", rqstp, rqstp->rq_sock); rqstp->rq_sock = svsk; - svsk->sk_inuse++; + atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; - svsk->sk_reserved += rqstp->rq_reserved; + atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); + BUG_ON(svsk->sk_pool != pool); wake_up(&rqstp->rq_wait); } else { dprintk("svc: socket %p put into queue\n", svsk->sk_sk); - list_add_tail(&svsk->sk_ready, &serv->sv_sockets); + list_add_tail(&svsk->sk_ready, &pool->sp_sockets); + BUG_ON(svsk->sk_pool != pool); } out_unlock: - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); } /* - * Dequeue the first socket. Must be called with the serv->sv_lock held. + * Dequeue the first socket. Must be called with the pool->sp_lock held. */ static inline struct svc_sock * -svc_sock_dequeue(struct svc_serv *serv) +svc_sock_dequeue(struct svc_pool *pool) { struct svc_sock *svsk; - if (list_empty(&serv->sv_sockets)) + if (list_empty(&pool->sp_sockets)) return NULL; - svsk = list_entry(serv->sv_sockets.next, + svsk = list_entry(pool->sp_sockets.next, struct svc_sock, sk_ready); list_del_init(&svsk->sk_ready); dprintk("svc: socket %p dequeued, inuse=%d\n", - svsk->sk_sk, svsk->sk_inuse); + svsk->sk_sk, atomic_read(&svsk->sk_inuse)); return svsk; } @@ -241,6 +264,7 @@ svc_sock_dequeue(struct svc_serv *serv) static inline void svc_sock_received(struct svc_sock *svsk) { + svsk->sk_pool = NULL; clear_bit(SK_BUSY, &svsk->sk_flags); svc_sock_enqueue(svsk); } @@ -262,10 +286,8 @@ void svc_reserve(struct svc_rqst *rqstp, int space) if (space < rqstp->rq_reserved) { struct svc_sock *svsk = rqstp->rq_sock; - spin_lock_bh(&svsk->sk_server->sv_lock); - svsk->sk_reserved -= (rqstp->rq_reserved - space); + atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved); rqstp->rq_reserved = space; - spin_unlock_bh(&svsk->sk_server->sv_lock); svc_sock_enqueue(svsk); } @@ -277,17 +299,11 @@ void svc_reserve(struct svc_rqst *rqstp, int space) static inline void svc_sock_put(struct svc_sock *svsk) { - struct svc_serv *serv = svsk->sk_server; - - spin_lock_bh(&serv->sv_lock); - if (!--(svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) { - spin_unlock_bh(&serv->sv_lock); + if (atomic_dec_and_test(&svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) { dprintk("svc: releasing dead socket\n"); sock_release(svsk->sk_sock); kfree(svsk); } - else - spin_unlock_bh(&serv->sv_lock); } static void @@ -321,25 +337,33 @@ svc_sock_release(struct svc_rqst *rqstp) /* * External function to wake up a server waiting for data + * This really only makes sense for services like lockd + * which have exactly one thread anyway. */ void svc_wake_up(struct svc_serv *serv) { struct svc_rqst *rqstp; - - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: daemon %p woken up.\n", rqstp); - /* - svc_serv_dequeue(serv, rqstp); - rqstp->rq_sock = NULL; - */ - wake_up(&rqstp->rq_wait); + unsigned int i; + struct svc_pool *pool; + + for (i = 0; i < serv->sv_nrpools; i++) { + pool = &serv->sv_pools[i]; + + spin_lock_bh(&pool->sp_lock); + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, + struct svc_rqst, + rq_list); + dprintk("svc: daemon %p woken up.\n", rqstp); + /* + svc_thread_dequeue(pool, rqstp); + rqstp->rq_sock = NULL; + */ + wake_up(&rqstp->rq_wait); + } + spin_unlock_bh(&pool->sp_lock); } - spin_unlock_bh(&serv->sv_lock); } /* @@ -388,7 +412,7 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) /* send head */ if (slen == xdr->head[0].iov_len) flags = 0; - len = sock->ops->sendpage(sock, rqstp->rq_respages[0], 0, xdr->head[0].iov_len, flags); + len = kernel_sendpage(sock, rqstp->rq_respages[0], 0, xdr->head[0].iov_len, flags); if (len != xdr->head[0].iov_len) goto out; slen -= xdr->head[0].iov_len; @@ -400,7 +424,7 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) while (pglen > 0) { if (slen == size) flags = 0; - result = sock->ops->sendpage(sock, *ppage, base, size, flags); + result = kernel_sendpage(sock, *ppage, base, size, flags); if (result > 0) len += result; if (result != size) @@ -413,7 +437,7 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) } /* send tail */ if (xdr->tail[0].iov_len) { - result = sock->ops->sendpage(sock, rqstp->rq_respages[rqstp->rq_restailpage], + result = kernel_sendpage(sock, rqstp->rq_respages[rqstp->rq_restailpage], ((unsigned long)xdr->tail[0].iov_base)& (PAGE_SIZE-1), xdr->tail[0].iov_len, 0); @@ -428,19 +452,61 @@ out: return len; } +/* + * Report socket names for nfsdfs + */ +static int one_sock_name(char *buf, struct svc_sock *svsk) +{ + int len; + + switch(svsk->sk_sk->sk_family) { + case AF_INET: + len = sprintf(buf, "ipv4 %s %u.%u.%u.%u %d\n", + svsk->sk_sk->sk_protocol==IPPROTO_UDP? + "udp" : "tcp", + NIPQUAD(inet_sk(svsk->sk_sk)->rcv_saddr), + inet_sk(svsk->sk_sk)->num); + break; + default: + len = sprintf(buf, "*unknown-%d*\n", + svsk->sk_sk->sk_family); + } + return len; +} + +int +svc_sock_names(char *buf, struct svc_serv *serv, char *toclose) +{ + struct svc_sock *svsk, *closesk = NULL; + int len = 0; + + if (!serv) + return 0; + spin_lock(&serv->sv_lock); + list_for_each_entry(svsk, &serv->sv_permsocks, sk_list) { + int onelen = one_sock_name(buf+len, svsk); + if (toclose && strcmp(toclose, buf+len) == 0) + closesk = svsk; + else + len += onelen; + } + spin_unlock(&serv->sv_lock); + if (closesk) + svc_delete_socket(closesk); + return len; +} +EXPORT_SYMBOL(svc_sock_names); + /* * Check input queue length */ static int svc_recv_available(struct svc_sock *svsk) { - mm_segment_t oldfs; struct socket *sock = svsk->sk_sock; int avail, err; - oldfs = get_fs(); set_fs(KERNEL_DS); - err = sock->ops->ioctl(sock, TIOCINQ, (unsigned long) &avail); - set_fs(oldfs); + err = kernel_sock_ioctl(sock, TIOCINQ, (unsigned long) &avail); return (err >= 0)? avail : err; } @@ -472,7 +538,7 @@ svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, int buflen) * at accept time. FIXME */ alen = sizeof(rqstp->rq_addr); - sock->ops->getname(sock, (struct sockaddr *)&rqstp->rq_addr, &alen, 1); + kernel_getpeername(sock, (struct sockaddr *)&rqstp->rq_addr, &alen); dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n", rqstp->rq_sock, iov[0].iov_base, iov[0].iov_len, len); @@ -560,7 +626,10 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) /* udp sockets need large rcvbuf as all pending * requests are still in that buffer. sndbuf must * also be large enough that there is enough space - * for one reply per thread. + * for one reply per thread. We count all threads + * rather than threads in a particular pool, which + * provides an upper bound on the number of threads + * which will access the socket. */ svc_sock_setbufsize(svsk->sk_sock, (serv->sv_nrthreads+3) * serv->sv_bufsz, @@ -758,7 +827,6 @@ svc_tcp_accept(struct svc_sock *svsk) struct svc_serv *serv = svsk->sk_server; struct socket *sock = svsk->sk_sock; struct socket *newsock; - const struct proto_ops *ops; struct svc_sock *newsvsk; int err, slen; @@ -766,29 +834,23 @@ svc_tcp_accept(struct svc_sock *svsk) if (!sock) return; - err = sock_create_lite(PF_INET, SOCK_STREAM, IPPROTO_TCP, &newsock); - if (err) { + clear_bit(SK_CONN, &svsk->sk_flags); + err = kernel_accept(sock, &newsock, O_NONBLOCK); + if (err < 0) { if (err == -ENOMEM) printk(KERN_WARNING "%s: no more sockets!\n", serv->sv_name); - return; - } - - dprintk("svc: tcp_accept %p allocated\n", newsock); - newsock->ops = ops = sock->ops; - - clear_bit(SK_CONN, &svsk->sk_flags); - if ((err = ops->accept(sock, newsock, O_NONBLOCK)) < 0) { - if (err != -EAGAIN && net_ratelimit()) + else if (err != -EAGAIN && net_ratelimit()) printk(KERN_WARNING "%s: accept failed (err %d)!\n", serv->sv_name, -err); - goto failed; /* aborted connection or whatever */ + return; } + set_bit(SK_CONN, &svsk->sk_flags); svc_sock_enqueue(svsk); slen = sizeof(sin); - err = ops->getname(newsock, (struct sockaddr *) &sin, &slen, 1); + err = kernel_getpeername(newsock, (struct sockaddr *) &sin, &slen); if (err < 0) { if (net_ratelimit()) printk(KERN_WARNING "%s: peername failed (err %d)!\n", @@ -854,7 +916,7 @@ svc_tcp_accept(struct svc_sock *svsk) struct svc_sock, sk_list); set_bit(SK_CLOSE, &svsk->sk_flags); - svsk->sk_inuse ++; + atomic_inc(&svsk->sk_inuse); } spin_unlock_bh(&serv->sv_lock); @@ -912,6 +974,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) /* sndbuf needs to have room for one request * per thread, otherwise we can stall even when the * network isn't a bottleneck. + * + * We count all threads rather than threads in a + * particular pool, which provides an upper bound + * on the number of threads which will access the socket. + * * rcvbuf just needs to be able to hold a few requests. * Normally they will be removed from the queue * as soon a a complete request arrives. @@ -1040,7 +1107,7 @@ svc_tcp_sendto(struct svc_rqst *rqstp) { struct xdr_buf *xbufp = &rqstp->rq_res; int sent; - u32 reclen; + __be32 reclen; /* Set up the first element of the reply kvec. * Any other kvecs that may be in use have been taken @@ -1127,12 +1194,16 @@ svc_sock_update_bufs(struct svc_serv *serv) } /* - * Receive the next request on any socket. + * Receive the next request on any socket. This code is carefully + * organised not to touch any cachelines in the shared svc_serv + * structure, only cachelines in the local svc_pool. */ int -svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) +svc_recv(struct svc_rqst *rqstp, long timeout) { struct svc_sock *svsk =NULL; + struct svc_serv *serv = rqstp->rq_server; + struct svc_pool *pool = rqstp->rq_pool; int len; int pages; struct xdr_buf *arg; @@ -1182,32 +1253,15 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) if (signalled()) return -EINTR; - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_tempsocks)) { - svsk = list_entry(serv->sv_tempsocks.next, - struct svc_sock, sk_list); - /* apparently the "standard" is that clients close - * idle connections after 5 minutes, servers after - * 6 minutes - * http://www.connectathon.org/talks96/nfstcp.pdf - */ - if (get_seconds() - svsk->sk_lastrecv < 6*60 - || test_bit(SK_BUSY, &svsk->sk_flags)) - svsk = NULL; - } - if (svsk) { - set_bit(SK_BUSY, &svsk->sk_flags); - set_bit(SK_CLOSE, &svsk->sk_flags); - rqstp->rq_sock = svsk; - svsk->sk_inuse++; - } else if ((svsk = svc_sock_dequeue(serv)) != NULL) { + spin_lock_bh(&pool->sp_lock); + if ((svsk = svc_sock_dequeue(pool)) != NULL) { rqstp->rq_sock = svsk; - svsk->sk_inuse++; + atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; - svsk->sk_reserved += rqstp->rq_reserved; + atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); } else { /* No data pending. Go to sleep */ - svc_serv_enqueue(serv, rqstp); + svc_thread_enqueue(pool, rqstp); /* * We have to be able to interrupt this wait @@ -1215,26 +1269,26 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) */ set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&rqstp->rq_wait, &wait); - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); schedule_timeout(timeout); try_to_freeze(); - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&pool->sp_lock); remove_wait_queue(&rqstp->rq_wait, &wait); if (!(svsk = rqstp->rq_sock)) { - svc_serv_dequeue(serv, rqstp); - spin_unlock_bh(&serv->sv_lock); + svc_thread_dequeue(pool, rqstp); + spin_unlock_bh(&pool->sp_lock); dprintk("svc: server %p, no data yet\n", rqstp); return signalled()? -EINTR : -EAGAIN; } } - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); - dprintk("svc: server %p, socket %p, inuse=%d\n", - rqstp, svsk, svsk->sk_inuse); + dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n", + rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse)); len = svsk->sk_recvfrom(rqstp); dprintk("svc: got len=%d\n", len); @@ -1245,13 +1299,7 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) return -EAGAIN; } svsk->sk_lastrecv = get_seconds(); - if (test_bit(SK_TEMP, &svsk->sk_flags)) { - /* push active sockets to end of list */ - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&svsk->sk_list)) - list_move_tail(&svsk->sk_list, &serv->sv_tempsocks); - spin_unlock_bh(&serv->sv_lock); - } + clear_bit(SK_OLD, &svsk->sk_flags); rqstp->rq_secure = ntohs(rqstp->rq_addr.sin_port) < 1024; rqstp->rq_chandle.defer = svc_defer; @@ -1310,6 +1358,58 @@ svc_send(struct svc_rqst *rqstp) return len; } +/* + * Timer function to close old temporary sockets, using + * a mark-and-sweep algorithm. + */ +static void +svc_age_temp_sockets(unsigned long closure) +{ + struct svc_serv *serv = (struct svc_serv *)closure; + struct svc_sock *svsk; + struct list_head *le, *next; + LIST_HEAD(to_be_aged); + + dprintk("svc_age_temp_sockets\n"); + + if (!spin_trylock_bh(&serv->sv_lock)) { + /* busy, try again 1 sec later */ + dprintk("svc_age_temp_sockets: busy\n"); + mod_timer(&serv->sv_temptimer, jiffies + HZ); + return; + } + + list_for_each_safe(le, next, &serv->sv_tempsocks) { + svsk = list_entry(le, struct svc_sock, sk_list); + + if (!test_and_set_bit(SK_OLD, &svsk->sk_flags)) + continue; + if (atomic_read(&svsk->sk_inuse) || test_bit(SK_BUSY, &svsk->sk_flags)) + continue; + atomic_inc(&svsk->sk_inuse); + list_move(le, &to_be_aged); + set_bit(SK_CLOSE, &svsk->sk_flags); + set_bit(SK_DETACHED, &svsk->sk_flags); + } + spin_unlock_bh(&serv->sv_lock); + + while (!list_empty(&to_be_aged)) { + le = to_be_aged.next; + /* fiddling the sk_list node is safe 'cos we're SK_DETACHED */ + list_del_init(le); + svsk = list_entry(le, struct svc_sock, sk_list); + + dprintk("queuing svsk %p for closing, %lu seconds old\n", + svsk, get_seconds() - svsk->sk_lastrecv); + + /* a thread will dequeue and close it soon */ + svc_sock_enqueue(svsk); + svc_sock_put(svsk); + } + + mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); +} + /* * Initialize socket for RPC use and create svc_sock struct * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF. @@ -1322,11 +1422,10 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, struct sock *inet; dprintk("svc: svc_setup_socket %p\n", sock); - if (!(svsk = kmalloc(sizeof(*svsk), GFP_KERNEL))) { + if (!(svsk = kzalloc(sizeof(*svsk), GFP_KERNEL))) { *errp = -ENOMEM; return NULL; } - memset(svsk, 0, sizeof(*svsk)); inet = sock->sk; @@ -1348,7 +1447,9 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, svsk->sk_odata = inet->sk_data_ready; svsk->sk_owspace = inet->sk_write_space; svsk->sk_server = serv; + atomic_set(&svsk->sk_inuse, 0); svsk->sk_lastrecv = get_seconds(); + spin_lock_init(&svsk->sk_defer_lock); INIT_LIST_HEAD(&svsk->sk_deferred); INIT_LIST_HEAD(&svsk->sk_ready); mutex_init(&svsk->sk_mutex); @@ -1364,6 +1465,13 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, set_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_tempsocks); serv->sv_tmpcnt++; + if (serv->sv_temptimer.function == NULL) { + /* setup timer to age temp sockets */ + setup_timer(&serv->sv_temptimer, svc_age_temp_sockets, + (unsigned long)serv); + mod_timer(&serv->sv_temptimer, + jiffies + svc_conn_age_period * HZ); + } } else { clear_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_permsocks); @@ -1378,6 +1486,38 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, return svsk; } +int svc_addsock(struct svc_serv *serv, + int fd, + char *name_return, + int *proto) +{ + int err = 0; + struct socket *so = sockfd_lookup(fd, &err); + struct svc_sock *svsk = NULL; + + if (!so) + return err; + if (so->sk->sk_family != AF_INET) + err = -EAFNOSUPPORT; + else if (so->sk->sk_protocol != IPPROTO_TCP && + so->sk->sk_protocol != IPPROTO_UDP) + err = -EPROTONOSUPPORT; + else if (so->state > SS_UNCONNECTED) + err = -EISCONN; + else { + svsk = svc_setup_socket(serv, so, &err, 1); + if (svsk) + err = 0; + } + if (err) { + sockfd_put(so); + return err; + } + if (proto) *proto = so->sk->sk_protocol; + return one_sock_name(name_return, svsk); +} +EXPORT_SYMBOL_GPL(svc_addsock); + /* * Create socket for RPC service. */ @@ -1404,17 +1544,15 @@ svc_create_socket(struct svc_serv *serv, int protocol, struct sockaddr_in *sin) if ((error = sock_create_kern(PF_INET, type, protocol, &sock)) < 0) return error; - if (sin != NULL) { - if (type == SOCK_STREAM) - sock->sk->sk_reuse = 1; /* allow address reuse */ - error = sock->ops->bind(sock, (struct sockaddr *) sin, - sizeof(*sin)); - if (error < 0) - goto bummer; - } + if (type == SOCK_STREAM) + sock->sk->sk_reuse = 1; /* allow address reuse */ + error = kernel_bind(sock, (struct sockaddr *) sin, + sizeof(*sin)); + if (error < 0) + goto bummer; if (protocol == IPPROTO_TCP) { - if ((error = sock->ops->listen(sock, 64)) < 0) + if ((error = kernel_listen(sock, 64)) < 0) goto bummer; } @@ -1447,15 +1585,25 @@ svc_delete_socket(struct svc_sock *svsk) spin_lock_bh(&serv->sv_lock); - list_del_init(&svsk->sk_list); - list_del_init(&svsk->sk_ready); + if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags)) + list_del_init(&svsk->sk_list); + /* + * We used to delete the svc_sock from whichever list + * it's sk_ready node was on, but we don't actually + * need to. This is because the only time we're called + * while still attached to a queue, the queue itself + * is about to be destroyed (in svc_destroy). + */ if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) if (test_bit(SK_TEMP, &svsk->sk_flags)) serv->sv_tmpcnt--; - if (!svsk->sk_inuse) { + if (!atomic_read(&svsk->sk_inuse)) { spin_unlock_bh(&serv->sv_lock); - sock_release(svsk->sk_sock); + if (svsk->sk_sock->file) + sockfd_put(svsk->sk_sock); + else + sock_release(svsk->sk_sock); kfree(svsk); } else { spin_unlock_bh(&serv->sv_lock); @@ -1486,7 +1634,6 @@ svc_makesock(struct svc_serv *serv, int protocol, unsigned short port) static void svc_revisit(struct cache_deferred_req *dreq, int too_many) { struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle); - struct svc_serv *serv = dreq->owner; struct svc_sock *svsk; if (too_many) { @@ -1497,9 +1644,9 @@ static void svc_revisit(struct cache_deferred_req *dreq, int too_many) dprintk("revisit queued\n"); svsk = dr->svsk; dr->svsk = NULL; - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&svsk->sk_defer_lock); list_add(&dr->handle.recent, &svsk->sk_deferred); - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&svsk->sk_defer_lock); set_bit(SK_DEFERRED, &svsk->sk_flags); svc_sock_enqueue(svsk); svc_sock_put(svsk); @@ -1531,10 +1678,8 @@ svc_defer(struct cache_req *req) dr->argslen = rqstp->rq_arg.len >> 2; memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2); } - spin_lock_bh(&rqstp->rq_server->sv_lock); - rqstp->rq_sock->sk_inuse++; + atomic_inc(&rqstp->rq_sock->sk_inuse); dr->svsk = rqstp->rq_sock; - spin_unlock_bh(&rqstp->rq_server->sv_lock); dr->handle.revisit = svc_revisit; return &dr->handle; @@ -1561,11 +1706,10 @@ static int svc_deferred_recv(struct svc_rqst *rqstp) static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) { struct svc_deferred_req *dr = NULL; - struct svc_serv *serv = svsk->sk_server; if (!test_bit(SK_DEFERRED, &svsk->sk_flags)) return NULL; - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&svsk->sk_defer_lock); clear_bit(SK_DEFERRED, &svsk->sk_flags); if (!list_empty(&svsk->sk_deferred)) { dr = list_entry(svsk->sk_deferred.next, @@ -1574,6 +1718,6 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) list_del_init(&dr->handle.recent); set_bit(SK_DEFERRED, &svsk->sk_flags); } - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&svsk->sk_defer_lock); return dr; }