RDMA/srpt: Add support for immediate data
authorBart Van Assche <bvanassche@acm.org>
Mon, 17 Dec 2018 21:20:46 +0000 (13:20 -0800)
committerDoug Ledford <dledford@redhat.com>
Wed, 19 Dec 2018 20:07:26 +0000 (15:07 -0500)
Modify allocation of the non-SRQ receive queues such that immediate
data is aligned on a 512 byte boundary. That alignment is necessary
to pass the immediate data without copying to the block layer. When
receiving an SRP_CMD with immediate data, postpone the ib_post_recv()
call until target_execute_cmd() has finished. See also
srpt_release_cmd().

Cc: Sergey Gorenko <sergeygo@mellanox.com>
Cc: Max Gurtovoy <maxg@mellanox.com>
Cc: Laurence Oberman <loberman@redhat.com>
Signed-off-by: Bart Van Assche <bvanassche@acm.org>
Signed-off-by: Doug Ledford <dledford@redhat.com>
drivers/infiniband/ulp/srpt/ib_srpt.c
drivers/infiniband/ulp/srpt/ib_srpt.h

index 772d756..bddd0ad 100644 (file)
@@ -648,24 +648,26 @@ static void srpt_unregister_mad_agent(struct srpt_device *sdev)
  * srpt_alloc_ioctx - allocate a SRPT I/O context structure
  * @sdev: SRPT HCA pointer.
  * @ioctx_size: I/O context size.
- * @dma_size: Size of I/O context DMA buffer.
+ * @buf_cache: I/O buffer cache.
  * @dir: DMA data direction.
  */
 static struct srpt_ioctx *srpt_alloc_ioctx(struct srpt_device *sdev,
-                                          int ioctx_size, int dma_size,
+                                          int ioctx_size,
+                                          struct kmem_cache *buf_cache,
                                           enum dma_data_direction dir)
 {
        struct srpt_ioctx *ioctx;
 
-       ioctx = kmalloc(ioctx_size, GFP_KERNEL);
+       ioctx = kzalloc(ioctx_size, GFP_KERNEL);
        if (!ioctx)
                goto err;
 
-       ioctx->buf = kmalloc(dma_size, GFP_KERNEL);
+       ioctx->buf = kmem_cache_alloc(buf_cache, GFP_KERNEL);
        if (!ioctx->buf)
                goto err_free_ioctx;
 
-       ioctx->dma = ib_dma_map_single(sdev->device, ioctx->buf, dma_size, dir);
+       ioctx->dma = ib_dma_map_single(sdev->device, ioctx->buf,
+                                      kmem_cache_size(buf_cache), dir);
        if (ib_dma_mapping_error(sdev->device, ioctx->dma))
                goto err_free_buf;
 
@@ -683,17 +685,19 @@ err:
  * srpt_free_ioctx - free a SRPT I/O context structure
  * @sdev: SRPT HCA pointer.
  * @ioctx: I/O context pointer.
- * @dma_size: Size of I/O context DMA buffer.
+ * @buf_cache: I/O buffer cache.
  * @dir: DMA data direction.
  */
 static void srpt_free_ioctx(struct srpt_device *sdev, struct srpt_ioctx *ioctx,
-                           int dma_size, enum dma_data_direction dir)
+                           struct kmem_cache *buf_cache,
+                           enum dma_data_direction dir)
 {
        if (!ioctx)
                return;
 
-       ib_dma_unmap_single(sdev->device, ioctx->dma, dma_size, dir);
-       kfree(ioctx->buf);
+       ib_dma_unmap_single(sdev->device, ioctx->dma,
+                           kmem_cache_size(buf_cache), dir);
+       kmem_cache_free(buf_cache, ioctx->buf);
        kfree(ioctx);
 }
 
@@ -702,12 +706,16 @@ static void srpt_free_ioctx(struct srpt_device *sdev, struct srpt_ioctx *ioctx,
  * @sdev:       Device to allocate the I/O context ring for.
  * @ring_size:  Number of elements in the I/O context ring.
  * @ioctx_size: I/O context size.
- * @dma_size:   DMA buffer size.
+ * @buf_cache:  I/O buffer cache.
+ * @alignment_offset: Offset in each ring buffer at which the SRP information
+ *             unit starts.
  * @dir:        DMA data direction.
  */
 static struct srpt_ioctx **srpt_alloc_ioctx_ring(struct srpt_device *sdev,
                                int ring_size, int ioctx_size,
-                               int dma_size, enum dma_data_direction dir)
+                               struct kmem_cache *buf_cache,
+                               int alignment_offset,
+                               enum dma_data_direction dir)
 {
        struct srpt_ioctx **ring;
        int i;
@@ -719,16 +727,17 @@ static struct srpt_ioctx **srpt_alloc_ioctx_ring(struct srpt_device *sdev,
        if (!ring)
                goto out;
        for (i = 0; i < ring_size; ++i) {
-               ring[i] = srpt_alloc_ioctx(sdev, ioctx_size, dma_size, dir);
+               ring[i] = srpt_alloc_ioctx(sdev, ioctx_size, buf_cache, dir);
                if (!ring[i])
                        goto err;
                ring[i]->index = i;
+               ring[i]->offset = alignment_offset;
        }
        goto out;
 
 err:
        while (--i >= 0)
-               srpt_free_ioctx(sdev, ring[i], dma_size, dir);
+               srpt_free_ioctx(sdev, ring[i], buf_cache, dir);
        kvfree(ring);
        ring = NULL;
 out:
@@ -740,12 +749,13 @@ out:
  * @ioctx_ring: I/O context ring to be freed.
  * @sdev: SRPT HCA pointer.
  * @ring_size: Number of ring elements.
- * @dma_size: Size of I/O context DMA buffer.
+ * @buf_cache: I/O buffer cache.
  * @dir: DMA data direction.
  */
 static void srpt_free_ioctx_ring(struct srpt_ioctx **ioctx_ring,
                                 struct srpt_device *sdev, int ring_size,
-                                int dma_size, enum dma_data_direction dir)
+                                struct kmem_cache *buf_cache,
+                                enum dma_data_direction dir)
 {
        int i;
 
@@ -753,7 +763,7 @@ static void srpt_free_ioctx_ring(struct srpt_ioctx **ioctx_ring,
                return;
 
        for (i = 0; i < ring_size; ++i)
-               srpt_free_ioctx(sdev, ioctx_ring[i], dma_size, dir);
+               srpt_free_ioctx(sdev, ioctx_ring[i], buf_cache, dir);
        kvfree(ioctx_ring);
 }
 
@@ -815,7 +825,7 @@ static int srpt_post_recv(struct srpt_device *sdev, struct srpt_rdma_ch *ch,
        struct ib_recv_wr wr;
 
        BUG_ON(!sdev);
-       list.addr = ioctx->ioctx.dma;
+       list.addr = ioctx->ioctx.dma + ioctx->ioctx.offset;
        list.length = srp_max_req_size;
        list.lkey = sdev->lkey;
 
@@ -981,23 +991,28 @@ static inline void *srpt_get_desc_buf(struct srp_cmd *srp_cmd)
 
 /**
  * srpt_get_desc_tbl - parse the data descriptors of a SRP_CMD request
- * @ioctx: Pointer to the I/O context associated with the request.
+ * @recv_ioctx: I/O context associated with the received command @srp_cmd.
+ * @ioctx: I/O context that will be used for responding to the initiator.
  * @srp_cmd: Pointer to the SRP_CMD request data.
  * @dir: Pointer to the variable to which the transfer direction will be
  *   written.
- * @sg: [out] scatterlist allocated for the parsed SRP_CMD.
+ * @sg: [out] scatterlist for the parsed SRP_CMD.
  * @sg_cnt: [out] length of @sg.
  * @data_len: Pointer to the variable to which the total data length of all
  *   descriptors in the SRP_CMD request will be written.
+ * @imm_data_offset: [in] Offset in SRP_CMD requests at which immediate data
+ *   starts.
  *
  * This function initializes ioctx->nrbuf and ioctx->r_bufs.
  *
  * Returns -EINVAL when the SRP_CMD request contains inconsistent descriptors;
  * -ENOMEM when memory allocation fails and zero upon success.
  */
-static int srpt_get_desc_tbl(struct srpt_send_ioctx *ioctx,
+static int srpt_get_desc_tbl(struct srpt_recv_ioctx *recv_ioctx,
+               struct srpt_send_ioctx *ioctx,
                struct srp_cmd *srp_cmd, enum dma_data_direction *dir,
-               struct scatterlist **sg, unsigned *sg_cnt, u64 *data_len)
+               struct scatterlist **sg, unsigned int *sg_cnt, u64 *data_len,
+               u16 imm_data_offset)
 {
        BUG_ON(!dir);
        BUG_ON(!data_len);
@@ -1044,6 +1059,40 @@ static int srpt_get_desc_tbl(struct srpt_send_ioctx *ioctx,
                *data_len = be32_to_cpu(idb->len);
                return srpt_alloc_rw_ctxs(ioctx, idb->desc_list, nbufs,
                                sg, sg_cnt);
+       } else if ((srp_cmd->buf_fmt >> 4) == SRP_DATA_DESC_IMM) {
+               struct srp_imm_buf *imm_buf = srpt_get_desc_buf(srp_cmd);
+               void *data = (void *)srp_cmd + imm_data_offset;
+               uint32_t len = be32_to_cpu(imm_buf->len);
+               uint32_t req_size = imm_data_offset + len;
+
+               if (req_size > srp_max_req_size) {
+                       pr_err("Immediate data (length %d + %d) exceeds request size %d\n",
+                              imm_data_offset, len, srp_max_req_size);
+                       return -EINVAL;
+               }
+               if (recv_ioctx->byte_len < req_size) {
+                       pr_err("Received too few data - %d < %d\n",
+                              recv_ioctx->byte_len, req_size);
+                       return -EIO;
+               }
+               /*
+                * The immediate data buffer descriptor must occur before the
+                * immediate data itself.
+                */
+               if ((void *)(imm_buf + 1) > (void *)data) {
+                       pr_err("Received invalid write request\n");
+                       return -EINVAL;
+               }
+               *data_len = len;
+               ioctx->recv_ioctx = recv_ioctx;
+               if ((uintptr_t)data & 511) {
+                       pr_warn_once("Internal error - the receive buffers are not aligned properly.\n");
+                       return -EINVAL;
+               }
+               sg_init_one(&ioctx->imm_sg, data, len);
+               *sg = &ioctx->imm_sg;
+               *sg_cnt = 1;
+               return 0;
        } else {
                *data_len = 0;
                return 0;
@@ -1186,6 +1235,7 @@ static struct srpt_send_ioctx *srpt_get_send_ioctx(struct srpt_rdma_ch *ch)
 
        BUG_ON(ioctx->ch != ch);
        ioctx->state = SRPT_STATE_NEW;
+       WARN_ON_ONCE(ioctx->recv_ioctx);
        ioctx->n_rdma = 0;
        ioctx->n_rw_ctx = 0;
        ioctx->queue_status_only = false;
@@ -1428,7 +1478,7 @@ static void srpt_handle_cmd(struct srpt_rdma_ch *ch,
 
        BUG_ON(!send_ioctx);
 
-       srp_cmd = recv_ioctx->ioctx.buf;
+       srp_cmd = recv_ioctx->ioctx.buf + recv_ioctx->ioctx.offset;
        cmd = &send_ioctx->cmd;
        cmd->tag = srp_cmd->tag;
 
@@ -1448,8 +1498,8 @@ static void srpt_handle_cmd(struct srpt_rdma_ch *ch,
                break;
        }
 
-       rc = srpt_get_desc_tbl(send_ioctx, srp_cmd, &dir, &sg, &sg_cnt,
-                       &data_len);
+       rc = srpt_get_desc_tbl(recv_ioctx, send_ioctx, srp_cmd, &dir,
+                              &sg, &sg_cnt, &data_len, ch->imm_data_offset);
        if (rc) {
                if (rc != -EAGAIN) {
                        pr_err("0x%llx: parsing SRP descriptor table failed.\n",
@@ -1516,7 +1566,7 @@ static void srpt_handle_tsk_mgmt(struct srpt_rdma_ch *ch,
 
        BUG_ON(!send_ioctx);
 
-       srp_tsk = recv_ioctx->ioctx.buf;
+       srp_tsk = recv_ioctx->ioctx.buf + recv_ioctx->ioctx.offset;
        cmd = &send_ioctx->cmd;
 
        pr_debug("recv tsk_mgmt fn %d for task_tag %lld and cmd tag %lld ch %p sess %p\n",
@@ -1559,10 +1609,11 @@ srpt_handle_new_iu(struct srpt_rdma_ch *ch, struct srpt_recv_ioctx *recv_ioctx)
                goto push;
 
        ib_dma_sync_single_for_cpu(ch->sport->sdev->device,
-                                  recv_ioctx->ioctx.dma, srp_max_req_size,
+                                  recv_ioctx->ioctx.dma,
+                                  recv_ioctx->ioctx.offset + srp_max_req_size,
                                   DMA_FROM_DEVICE);
 
-       srp_cmd = recv_ioctx->ioctx.buf;
+       srp_cmd = recv_ioctx->ioctx.buf + recv_ioctx->ioctx.offset;
        opcode = srp_cmd->opcode;
        if (opcode == SRP_CMD || opcode == SRP_TSK_MGMT) {
                send_ioctx = srpt_get_send_ioctx(ch);
@@ -1599,7 +1650,8 @@ srpt_handle_new_iu(struct srpt_rdma_ch *ch, struct srpt_recv_ioctx *recv_ioctx)
                break;
        }
 
-       srpt_post_recv(ch->sport->sdev, ch, recv_ioctx);
+       if (!send_ioctx || !send_ioctx->recv_ioctx)
+               srpt_post_recv(ch->sport->sdev, ch, recv_ioctx);
        res = true;
 
 out:
@@ -1625,6 +1677,7 @@ static void srpt_recv_done(struct ib_cq *cq, struct ib_wc *wc)
                req_lim = atomic_dec_return(&ch->req_lim);
                if (unlikely(req_lim < 0))
                        pr_err("req_lim = %d < 0\n", req_lim);
+               ioctx->byte_len = wc->byte_len;
                srpt_handle_new_iu(ch, ioctx);
        } else {
                pr_info_ratelimited("receiving failed for ioctx %p with status %d\n",
@@ -1749,6 +1802,8 @@ retry:
        qp_init->cap.max_rdma_ctxs = sq_size / 2;
        qp_init->cap.max_send_sge = min(attrs->max_send_sge,
                                        SRPT_MAX_SG_PER_WQE);
+       qp_init->cap.max_recv_sge = min(attrs->max_recv_sge,
+                                       SRPT_MAX_SG_PER_WQE);
        qp_init->port_num = ch->sport->port;
        if (sdev->use_srq) {
                qp_init->srq = sdev->srq;
@@ -2049,11 +2104,15 @@ static void srpt_release_channel_work(struct work_struct *w)
 
        srpt_free_ioctx_ring((struct srpt_ioctx **)ch->ioctx_ring,
                             ch->sport->sdev, ch->rq_size,
-                            ch->max_rsp_size, DMA_TO_DEVICE);
+                            ch->rsp_buf_cache, DMA_TO_DEVICE);
+
+       kmem_cache_destroy(ch->rsp_buf_cache);
 
        srpt_free_ioctx_ring((struct srpt_ioctx **)ch->ioctx_recv_ring,
                             sdev, ch->rq_size,
-                            srp_max_req_size, DMA_FROM_DEVICE);
+                            ch->req_buf_cache, DMA_FROM_DEVICE);
+
+       kmem_cache_destroy(ch->req_buf_cache);
 
        wake_up(&sport->ch_releaseQ);
 
@@ -2177,14 +2236,19 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev,
        INIT_LIST_HEAD(&ch->cmd_wait_list);
        ch->max_rsp_size = ch->sport->port_attrib.srp_max_rsp_size;
 
+       ch->rsp_buf_cache = kmem_cache_create("srpt-rsp-buf", ch->max_rsp_size,
+                                             512, 0, NULL);
+       if (!ch->rsp_buf_cache)
+               goto free_ch;
+
        ch->ioctx_ring = (struct srpt_send_ioctx **)
                srpt_alloc_ioctx_ring(ch->sport->sdev, ch->rq_size,
                                      sizeof(*ch->ioctx_ring[0]),
-                                     ch->max_rsp_size, DMA_TO_DEVICE);
+                                     ch->rsp_buf_cache, 0, DMA_TO_DEVICE);
        if (!ch->ioctx_ring) {
                pr_err("rejected SRP_LOGIN_REQ because creating a new QP SQ ring failed.\n");
                rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
-               goto free_ch;
+               goto free_rsp_cache;
        }
 
        INIT_LIST_HEAD(&ch->free_list);
@@ -2193,16 +2257,39 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev,
                list_add_tail(&ch->ioctx_ring[i]->free_list, &ch->free_list);
        }
        if (!sdev->use_srq) {
+               u16 imm_data_offset = req->req_flags & SRP_IMMED_REQUESTED ?
+                       be16_to_cpu(req->imm_data_offset) : 0;
+               u16 alignment_offset;
+               u32 req_sz;
+
+               if (req->req_flags & SRP_IMMED_REQUESTED)
+                       pr_debug("imm_data_offset = %d\n",
+                                be16_to_cpu(req->imm_data_offset));
+               if (imm_data_offset >= sizeof(struct srp_cmd)) {
+                       ch->imm_data_offset = imm_data_offset;
+                       rsp->rsp_flags |= SRP_LOGIN_RSP_IMMED_SUPP;
+               } else {
+                       ch->imm_data_offset = 0;
+               }
+               alignment_offset = round_up(imm_data_offset, 512) -
+                       imm_data_offset;
+               req_sz = alignment_offset + imm_data_offset + srp_max_req_size;
+               ch->req_buf_cache = kmem_cache_create("srpt-req-buf", req_sz,
+                                                     512, 0, NULL);
+               if (!ch->req_buf_cache)
+                       goto free_rsp_ring;
+
                ch->ioctx_recv_ring = (struct srpt_recv_ioctx **)
                        srpt_alloc_ioctx_ring(ch->sport->sdev, ch->rq_size,
                                              sizeof(*ch->ioctx_recv_ring[0]),
-                                             srp_max_req_size,
+                                             ch->req_buf_cache,
+                                             alignment_offset,
                                              DMA_FROM_DEVICE);
                if (!ch->ioctx_recv_ring) {
                        pr_err("rejected SRP_LOGIN_REQ because creating a new QP RQ ring failed.\n");
                        rej->reason =
                            cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
-                       goto free_ring;
+                       goto free_recv_cache;
                }
                for (i = 0; i < ch->rq_size; i++)
                        INIT_LIST_HEAD(&ch->ioctx_recv_ring[i]->wait_list);
@@ -2252,17 +2339,15 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev,
        if ((req->req_flags & SRP_MTCH_ACTION) == SRP_MULTICHAN_SINGLE) {
                struct srpt_rdma_ch *ch2;
 
-               rsp->rsp_flags = SRP_LOGIN_RSP_MULTICHAN_NO_CHAN;
-
                list_for_each_entry(ch2, &nexus->ch_list, list) {
                        if (srpt_disconnect_ch(ch2) < 0)
                                continue;
                        pr_info("Relogin - closed existing channel %s\n",
                                ch2->sess_name);
-                       rsp->rsp_flags = SRP_LOGIN_RSP_MULTICHAN_TERMINATED;
+                       rsp->rsp_flags |= SRP_LOGIN_RSP_MULTICHAN_TERMINATED;
                }
        } else {
-               rsp->rsp_flags = SRP_LOGIN_RSP_MULTICHAN_MAINTAINED;
+               rsp->rsp_flags |= SRP_LOGIN_RSP_MULTICHAN_MAINTAINED;
        }
 
        list_add_tail_rcu(&ch->list, &nexus->ch_list);
@@ -2292,7 +2377,7 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev,
        /* create srp_login_response */
        rsp->opcode = SRP_LOGIN_RSP;
        rsp->tag = req->tag;
-       rsp->max_it_iu_len = req->req_it_iu_len;
+       rsp->max_it_iu_len = cpu_to_be32(srp_max_req_size);
        rsp->max_ti_iu_len = req->req_it_iu_len;
        ch->max_ti_iu_len = it_iu_len;
        rsp->buf_fmt = cpu_to_be16(SRP_BUF_FORMAT_DIRECT |
@@ -2356,12 +2441,18 @@ destroy_ib:
 free_recv_ring:
        srpt_free_ioctx_ring((struct srpt_ioctx **)ch->ioctx_recv_ring,
                             ch->sport->sdev, ch->rq_size,
-                            srp_max_req_size, DMA_FROM_DEVICE);
+                            ch->req_buf_cache, DMA_FROM_DEVICE);
 
-free_ring:
+free_recv_cache:
+       kmem_cache_destroy(ch->req_buf_cache);
+
+free_rsp_ring:
        srpt_free_ioctx_ring((struct srpt_ioctx **)ch->ioctx_ring,
                             ch->sport->sdev, ch->rq_size,
-                            ch->max_rsp_size, DMA_TO_DEVICE);
+                            ch->rsp_buf_cache, DMA_TO_DEVICE);
+
+free_rsp_cache:
+       kmem_cache_destroy(ch->rsp_buf_cache);
 
 free_ch:
        if (rdma_cm_id)
@@ -2442,6 +2533,7 @@ static int srpt_rdma_cm_req_recv(struct rdma_cm_id *cm_id,
        req.req_flags           = req_rdma->req_flags;
        memcpy(req.initiator_port_id, req_rdma->initiator_port_id, 16);
        memcpy(req.target_port_id, req_rdma->target_port_id, 16);
+       req.imm_data_offset     = req_rdma->imm_data_offset;
 
        snprintf(src_addr, sizeof(src_addr), "%pIS",
                 &cm_id->route.addr.src_addr);
@@ -2632,6 +2724,12 @@ static int srpt_write_pending(struct se_cmd *se_cmd)
        enum srpt_command_state new_state;
        int ret, i;
 
+       if (ioctx->recv_ioctx) {
+               srpt_set_cmd_state(ioctx, SRPT_STATE_DATA_IN);
+               target_execute_cmd(&ioctx->cmd);
+               return 0;
+       }
+
        new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_NEED_DATA);
        WARN_ON(new_state == SRPT_STATE_DONE);
 
@@ -2911,7 +3009,9 @@ static void srpt_free_srq(struct srpt_device *sdev)
 
        ib_destroy_srq(sdev->srq);
        srpt_free_ioctx_ring((struct srpt_ioctx **)sdev->ioctx_ring, sdev,
-                            sdev->srq_size, srp_max_req_size, DMA_FROM_DEVICE);
+                            sdev->srq_size, sdev->req_buf_cache,
+                            DMA_FROM_DEVICE);
+       kmem_cache_destroy(sdev->req_buf_cache);
        sdev->srq = NULL;
 }
 
@@ -2938,12 +3038,17 @@ static int srpt_alloc_srq(struct srpt_device *sdev)
        pr_debug("create SRQ #wr= %d max_allow=%d dev= %s\n", sdev->srq_size,
                 sdev->device->attrs.max_srq_wr, dev_name(&device->dev));
 
+       sdev->req_buf_cache = kmem_cache_create("srpt-srq-req-buf",
+                                               srp_max_req_size, 0, 0, NULL);
+       if (!sdev->req_buf_cache)
+               goto free_srq;
+
        sdev->ioctx_ring = (struct srpt_recv_ioctx **)
                srpt_alloc_ioctx_ring(sdev, sdev->srq_size,
                                      sizeof(*sdev->ioctx_ring[0]),
-                                     srp_max_req_size, DMA_FROM_DEVICE);
+                                     sdev->req_buf_cache, 0, DMA_FROM_DEVICE);
        if (!sdev->ioctx_ring)
-               goto free_srq;
+               goto free_cache;
 
        sdev->use_srq = true;
        sdev->srq = srq;
@@ -2955,6 +3060,9 @@ static int srpt_alloc_srq(struct srpt_device *sdev)
 
        return 0;
 
+free_cache:
+       kmem_cache_destroy(sdev->req_buf_cache);
+
 free_srq:
        ib_destroy_srq(srq);
        return -ENOMEM;
@@ -3186,11 +3294,18 @@ static void srpt_release_cmd(struct se_cmd *se_cmd)
        struct srpt_send_ioctx *ioctx = container_of(se_cmd,
                                struct srpt_send_ioctx, cmd);
        struct srpt_rdma_ch *ch = ioctx->ch;
+       struct srpt_recv_ioctx *recv_ioctx = ioctx->recv_ioctx;
        unsigned long flags;
 
        WARN_ON_ONCE(ioctx->state != SRPT_STATE_DONE &&
                     !(ioctx->cmd.transport_state & CMD_T_ABORTED));
 
+       if (recv_ioctx) {
+               WARN_ON_ONCE(!list_empty(&recv_ioctx->wait_list));
+               ioctx->recv_ioctx = NULL;
+               srpt_post_recv(ch->sport->sdev, ch, recv_ioctx);
+       }
+
        if (ioctx->n_rw_ctx) {
                srpt_free_rw_ctxs(ch, ioctx);
                ioctx->n_rw_ctx = 0;
index 8bca7a3..39b3e50 100644 (file)
@@ -120,11 +120,18 @@ enum {
        MAX_SRPT_RDMA_SIZE = 1U << 24,
        MAX_SRPT_RSP_SIZE = 1024,
 
+       SRP_MAX_ADD_CDB_LEN = 16,
+       SRP_MAX_IMM_DATA_OFFSET = 80,
+       SRP_MAX_IMM_DATA = 8 * 1024,
        MIN_MAX_REQ_SIZE = 996,
-       DEFAULT_MAX_REQ_SIZE
-               = sizeof(struct srp_cmd)/*48*/
-               + sizeof(struct srp_indirect_buf)/*20*/
-               + 128 * sizeof(struct srp_direct_buf)/*16*/,
+       DEFAULT_MAX_REQ_SIZE_1 = sizeof(struct srp_cmd)/*48*/ +
+                                SRP_MAX_ADD_CDB_LEN +
+                                sizeof(struct srp_indirect_buf)/*20*/ +
+                                128 * sizeof(struct srp_direct_buf)/*16*/,
+       DEFAULT_MAX_REQ_SIZE_2 = SRP_MAX_IMM_DATA_OFFSET +
+                                sizeof(struct srp_imm_buf) + SRP_MAX_IMM_DATA,
+       DEFAULT_MAX_REQ_SIZE = DEFAULT_MAX_REQ_SIZE_1 > DEFAULT_MAX_REQ_SIZE_2 ?
+                              DEFAULT_MAX_REQ_SIZE_1 : DEFAULT_MAX_REQ_SIZE_2,
 
        MIN_MAX_RSP_SIZE = sizeof(struct srp_rsp)/*36*/ + 4,
        DEFAULT_MAX_RSP_SIZE = 256, /* leaves 220 bytes for sense data */
@@ -161,12 +168,14 @@ enum srpt_command_state {
  * @cqe:   Completion queue element.
  * @buf:   Pointer to the buffer.
  * @dma:   DMA address of the buffer.
+ * @offset: Offset of the first byte in @buf and @dma that is actually used.
  * @index: Index of the I/O context in its ioctx_ring array.
  */
 struct srpt_ioctx {
        struct ib_cqe           cqe;
        void                    *buf;
        dma_addr_t              dma;
+       uint32_t                offset;
        uint32_t                index;
 };
 
@@ -174,10 +183,12 @@ struct srpt_ioctx {
  * struct srpt_recv_ioctx - SRPT receive I/O context
  * @ioctx:     See above.
  * @wait_list: Node for insertion in srpt_rdma_ch.cmd_wait_list.
+ * @byte_len:  Number of bytes in @ioctx.buf.
  */
 struct srpt_recv_ioctx {
        struct srpt_ioctx       ioctx;
        struct list_head        wait_list;
+       int                     byte_len;
 };
 
 struct srpt_rw_ctx {
@@ -190,8 +201,11 @@ struct srpt_rw_ctx {
  * struct srpt_send_ioctx - SRPT send I/O context
  * @ioctx:       See above.
  * @ch:          Channel pointer.
+ * @recv_ioctx:  Receive I/O context associated with this send I/O context.
+ *              Only used for processing immediate data.
  * @s_rw_ctx:    @rw_ctxs points here if only a single rw_ctx is needed.
  * @rw_ctxs:     RDMA read/write contexts.
+ * @imm_sg:      Scatterlist for immediate data.
  * @rdma_cqe:    RDMA completion queue element.
  * @free_list:   Node in srpt_rdma_ch.free_list.
  * @state:       I/O context state.
@@ -205,10 +219,13 @@ struct srpt_rw_ctx {
 struct srpt_send_ioctx {
        struct srpt_ioctx       ioctx;
        struct srpt_rdma_ch     *ch;
+       struct srpt_recv_ioctx  *recv_ioctx;
 
        struct srpt_rw_ctx      s_rw_ctx;
        struct srpt_rw_ctx      *rw_ctxs;
 
+       struct scatterlist      imm_sg;
+
        struct ib_cqe           rdma_cqe;
        struct list_head        free_list;
        enum srpt_command_state state;
@@ -258,12 +275,15 @@ enum rdma_ch_state {
  * @req_lim:       request limit: maximum number of requests that may be sent
  *                 by the initiator without having received a response.
  * @req_lim_delta: Number of credits not yet sent back to the initiator.
+ * @imm_data_offset: Offset from start of SRP_CMD for immediate data.
  * @spinlock:      Protects free_list and state.
  * @free_list:     Head of list with free send I/O contexts.
  * @state:         channel state. See also enum rdma_ch_state.
  * @using_rdma_cm: Whether the RDMA/CM or IB/CM is used for this channel.
  * @processing_wait_list: Whether or not cmd_wait_list is being processed.
+ * @rsp_buf_cache: kmem_cache for @ioctx_ring.
  * @ioctx_ring:    Send ring.
+ * @req_buf_cache: kmem_cache for @ioctx_recv_ring.
  * @ioctx_recv_ring: Receive I/O context ring.
  * @list:          Node in srpt_nexus.ch_list.
  * @cmd_wait_list: List of SCSI commands that arrived before the RTU event. This
@@ -296,10 +316,13 @@ struct srpt_rdma_ch {
        int                     max_ti_iu_len;
        atomic_t                req_lim;
        atomic_t                req_lim_delta;
+       u16                     imm_data_offset;
        spinlock_t              spinlock;
        struct list_head        free_list;
        enum rdma_ch_state      state;
+       struct kmem_cache       *rsp_buf_cache;
        struct srpt_send_ioctx  **ioctx_ring;
+       struct kmem_cache       *req_buf_cache;
        struct srpt_recv_ioctx  **ioctx_recv_ring;
        struct list_head        list;
        struct list_head        cmd_wait_list;
@@ -394,6 +417,7 @@ struct srpt_port {
  * @srq_size:      SRQ size.
  * @sdev_mutex:           Serializes use_srq changes.
  * @use_srq:       Whether or not to use SRQ.
+ * @req_buf_cache: kmem_cache for @ioctx_ring buffers.
  * @ioctx_ring:    Per-HCA SRQ.
  * @event_handler: Per-HCA asynchronous IB event handler.
  * @list:          Node in srpt_dev_list.
@@ -408,6 +432,7 @@ struct srpt_device {
        int                     srq_size;
        struct mutex            sdev_mutex;
        bool                    use_srq;
+       struct kmem_cache       *req_buf_cache;
        struct srpt_recv_ioctx  **ioctx_ring;
        struct ib_event_handler event_handler;
        struct list_head        list;