xprtrdma: Wait on empty sendctx queue
authorChuck Lever <chuck.lever@oracle.com>
Fri, 4 May 2018 19:35:57 +0000 (15:35 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Fri, 1 Jun 2018 17:56:30 +0000 (13:56 -0400)
Currently, when the sendctx queue is exhausted during marshaling, the
RPC/RDMA transport places the RPC task on the delayq, which forces a
wait for HZ >> 2 before the marshal and send is retried.

With this change, the transport now places such an RPC task on the
pending queue, and wakes it just as soon as more sendctxs become
available. This typically takes less than a millisecond, and the
write_space waking mechanism is less deadlock-prone.

Moreover, the waiting RPC task is holding the transport's write
lock, which blocks the transport from sending RPCs. Therefore faster
recovery from sendctx queue exhaustion is desirable.

Cf. commit 5804891455d5 ("xprtrdma: ->send_request returns -EAGAIN
when there are no free MRs").

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index b12b044..a373d03 100644 (file)
@@ -695,7 +695,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
 {
        req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
        if (!req->rl_sendctx)
-               return -ENOBUFS;
+               return -EAGAIN;
        req->rl_sendctx->sc_wr.num_sge = 0;
        req->rl_sendctx->sc_unmap_count = 0;
        req->rl_sendctx->sc_req = req;
index 0e0b7d5..7276e82 100644 (file)
@@ -878,6 +878,7 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
                sc->sc_xprt = r_xprt;
                buf->rb_sc_ctxs[i] = sc;
        }
+       buf->rb_flags = 0;
 
        return 0;
 
@@ -935,7 +936,7 @@ out_emptyq:
         * completions recently. This is a sign the Send Queue is
         * backing up. Cause the caller to pause and try again.
         */
-       dprintk("RPC:       %s: empty sendctx queue\n", __func__);
+       set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
        r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
        r_xprt->rx_stats.empty_sendctx_q++;
        return NULL;
@@ -970,6 +971,11 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
 
        /* Paired with READ_ONCE */
        smp_store_release(&buf->rb_sc_tail, next_tail);
+
+       if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
+               smp_mb__after_atomic();
+               xprt_write_space(&sc->sc_xprt->rx_xprt);
+       }
 }
 
 static void
index c606879..e4a408d 100644 (file)
@@ -400,6 +400,7 @@ struct rpcrdma_buffer {
        spinlock_t              rb_lock;        /* protect buf lists */
        struct list_head        rb_send_bufs;
        struct list_head        rb_recv_bufs;
+       unsigned long           rb_flags;
        u32                     rb_max_requests;
        u32                     rb_credits;     /* most recent credit grant */
        int                     rb_posted_receives;
@@ -417,6 +418,11 @@ struct rpcrdma_buffer {
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
+/* rb_flags */
+enum {
+       RPCRDMA_BUF_F_EMPTY_SCQ = 0,
+};
+
 /*
  * Internal structure for transport instance creation. This
  * exists primarily for modularity.