dpdk-cryptodev: introduce sw_ring to the crypto op data path 49/38749/8
authorPiotr Bronowski <piotrx.bronowski@intel.com>
Fri, 9 Jun 2023 15:43:05 +0000 (15:43 +0000)
committerFan Zhang <fanzhang.oss@gmail.com>
Thu, 6 Jul 2023 07:55:04 +0000 (07:55 +0000)
This patch introduces sw_ring to the crypto op data path implementation,
so that raw data path and crypto op data path use same mechanism of processing
async frames. Crypto op ring has been removed from the implementation.

Type: improvement
Signed-off-by: Piotr Bronowski <piotrx.bronowski@intel.com>
Change-Id: Id823f80a88cfa0ff40252616a36de8bb044c7f45

src/plugins/dpdk/cryptodev/cryptodev.h
src/plugins/dpdk/cryptodev/cryptodev_op_data_path.c

index f860467..1dc5e03 100644 (file)
@@ -183,12 +183,7 @@ typedef struct
   vlib_buffer_t *b[VNET_CRYPTO_FRAME_SIZE];
   union
   {
-    struct
-    {
-      cryptodev_op_t **cops;
-      struct rte_mempool *cop_pool;
-      struct rte_ring *ring;
-    };
+    struct rte_mempool *cop_pool;
     struct
     {
       struct rte_crypto_raw_dp_ctx *ctx;
index 4d3e00a..56b9105 100644 (file)
@@ -139,43 +139,62 @@ static_always_inline int
 cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
                                     vnet_crypto_async_frame_t *frame,
                                     cryptodev_op_type_t op_type)
+{
+  cryptodev_main_t *cmt = &cryptodev_main;
+  cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  cryptodev_async_ring_elt *ring_elt = &ring->frames[ring->head];
+
+  cet->frames_on_ring++;
+  ring_elt->f = frame;
+  ring_elt->n_elts = frame->n_elts;
+  ring_elt->aad_len = 1;
+  ring_elt->op_type = (u8) op_type;
+  ring->head++;
+  ring->head &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+  return 0;
+}
+
+static_always_inline void
+cryptodev_frame_linked_algs_enqueue_internal (vlib_main_t *vm,
+                                             vnet_crypto_async_frame_t *frame,
+                                             cryptodev_op_type_t op_type)
 {
   cryptodev_main_t *cmt = &cryptodev_main;
   clib_pmalloc_main_t *pm = vm->physmem_main.pmalloc_main;
   cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
   vnet_crypto_async_frame_elt_t *fe;
   cryptodev_session_t *sess = 0;
-  cryptodev_op_t **cop;
-  u32 *bi;
+  cryptodev_op_t *cops[CRYPTODE_ENQ_MAX] = {};
+  cryptodev_op_t **cop = cops;
+  u32 *bi = 0;
   u32 n_enqueue, n_elts;
   u32 last_key_index = ~0;
+  u32 max_to_enq;
 
   if (PREDICT_FALSE (frame == 0 || frame->n_elts == 0))
-    return -1;
-  n_elts = frame->n_elts;
+    return;
 
-  if (PREDICT_FALSE (CRYPTODEV_NB_CRYPTO_OPS - cet->inflight < n_elts))
-    {
-      cryptodev_mark_frame_err_status (frame,
-                                      VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
-                                      VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED);
-      return -1;
-    }
+  max_to_enq = clib_min (CRYPTODE_ENQ_MAX,
+                        frame->n_elts - ring->frames[ring->enq].enqueued);
+
+  if (cet->inflight + max_to_enq > CRYPTODEV_MAX_INFLIGHT)
+    return;
+
+  n_elts = max_to_enq;
 
   if (PREDICT_FALSE (
-       rte_mempool_get_bulk (cet->cop_pool, (void **) cet->cops, n_elts) < 0))
+       rte_mempool_get_bulk (cet->cop_pool, (void **) cops, n_elts) < 0))
     {
       cryptodev_mark_frame_err_status (frame,
                                       VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
                                       VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED);
-      return -1;
+      return;
     }
 
-  cop = cet->cops;
-  fe = frame->elts;
-  bi = frame->buffer_indices;
-  cop[0]->frame = frame;
-  cop[0]->n_elts = n_elts;
+  fe = frame->elts + ring->frames[ring->enq].enqueued;
+  bi = frame->buffer_indices + ring->frames[ring->enq].enqueued;
 
   while (n_elts)
     {
@@ -205,7 +224,7 @@ cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
                  cryptodev_mark_frame_err_status (
                    frame, VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
                    VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED);
-                 return -1;
+                 goto error_exit;
                }
            }
          sess = key->keys[vm->numa_node][op_type];
@@ -238,19 +257,32 @@ cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
        cryptodev_validate_mbuf (sop->m_src, b);
 
       clib_memcpy_fast (cop[0]->iv, fe->iv, 16);
+      ring->frames[ring->enq].enqueued++;
       cop++;
       bi++;
       fe++;
       n_elts--;
     }
 
-  n_enqueue = rte_cryptodev_enqueue_burst (cet->cryptodev_id, cet->cryptodev_q,
-                                          (struct rte_crypto_op **) cet->cops,
-                                          frame->n_elts);
-  ASSERT (n_enqueue == frame->n_elts);
-  cet->inflight += n_enqueue;
+  n_enqueue =
+    rte_cryptodev_enqueue_burst (cet->cryptodev_id, cet->cryptodev_q,
+                                (struct rte_crypto_op **) cops, max_to_enq);
+  ASSERT (n_enqueue == max_to_enq);
+  cet->inflight += max_to_enq;
+  ring->frames[ring->enq].frame_inflight += max_to_enq;
+  if (ring->frames[ring->enq].enqueued == frame->n_elts)
+    {
+      cet->frame_ring.enq++;
+      cet->frame_ring.enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+      frame->state = VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED;
+    }
 
-  return 0;
+  return;
+
+error_exit:
+  ring->enq++;
+  ring->enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+  rte_mempool_put_bulk (cet->cop_pool, (void **) cops, max_to_enq);
 }
 
 static_always_inline int
@@ -259,29 +291,49 @@ cryptodev_frame_aead_enqueue (vlib_main_t *vm,
                              cryptodev_op_type_t op_type, u8 aad_len)
 {
   cryptodev_main_t *cmt = &cryptodev_main;
-  clib_pmalloc_main_t *pm = vm->physmem_main.pmalloc_main;
   cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  cryptodev_async_ring_elt *ring_elt = &ring->frames[ring->head];
+  cet->frames_on_ring++;
+  ring_elt->f = frame;
+  ring_elt->n_elts = frame->n_elts;
+  ring_elt->aad_len = aad_len;
+  ring_elt->op_type = (u8) op_type;
+  ring->head++;
+  ring->head &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+
+  return 0;
+}
+
+static_always_inline int
+cryptodev_aead_enqueue_internal (vlib_main_t *vm,
+                                vnet_crypto_async_frame_t *frame,
+                                cryptodev_op_type_t op_type, u8 aad_len)
+{
+  cryptodev_main_t *cmt = &cryptodev_main;
+  cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  clib_pmalloc_main_t *pm = vm->physmem_main.pmalloc_main;
   vnet_crypto_async_frame_elt_t *fe;
   cryptodev_session_t *sess = 0;
-  cryptodev_op_t **cop;
-  u32 *bi;
+  cryptodev_op_t *cops[CRYPTODE_ENQ_MAX] = {};
+  cryptodev_op_t **cop = cops;
+  u32 *bi = 0;
   u32 n_enqueue = 0, n_elts;
   u32 last_key_index = ~0;
+  u16 left_to_enq = frame->n_elts - ring->frames[ring->enq].enqueued;
+  const u16 max_to_enq = clib_min (CRYPTODE_ENQ_MAX, left_to_enq);
 
   if (PREDICT_FALSE (frame == 0 || frame->n_elts == 0))
     return -1;
-  n_elts = frame->n_elts;
 
-  if (PREDICT_FALSE (CRYPTODEV_MAX_INFLIGHT - cet->inflight < n_elts))
-    {
-      cryptodev_mark_frame_err_status (frame,
-                                      VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
-                                      VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED);
-      return -1;
-    }
+  if (cet->inflight + max_to_enq > CRYPTODEV_MAX_INFLIGHT)
+    return -1;
+
+  n_elts = max_to_enq;
 
   if (PREDICT_FALSE (
-       rte_mempool_get_bulk (cet->cop_pool, (void **) cet->cops, n_elts) < 0))
+       rte_mempool_get_bulk (cet->cop_pool, (void **) cops, n_elts) < 0))
     {
       cryptodev_mark_frame_err_status (frame,
                                       VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
@@ -289,11 +341,8 @@ cryptodev_frame_aead_enqueue (vlib_main_t *vm,
       return -1;
     }
 
-  cop = cet->cops;
-  fe = frame->elts;
-  bi = frame->buffer_indices;
-  cop[0]->frame = frame;
-  cop[0]->n_elts = n_elts;
+  fe = frame->elts + ring->frames[ring->enq].enqueued;
+  bi = frame->buffer_indices + ring->frames[ring->enq].enqueued;
 
   while (n_elts)
     {
@@ -321,7 +370,7 @@ cryptodev_frame_aead_enqueue (vlib_main_t *vm,
                  cryptodev_mark_frame_err_status (
                    frame, VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
                    VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED);
-                 return -1;
+                 goto error_exit;
                }
            }
          else if (PREDICT_FALSE (
@@ -341,7 +390,7 @@ cryptodev_frame_aead_enqueue (vlib_main_t *vm,
                  cryptodev_mark_frame_err_status (
                    frame, VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
                    VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED);
-                 return -1;
+                 goto error_exit;
                }
            }
 
@@ -376,79 +425,92 @@ cryptodev_frame_aead_enqueue (vlib_main_t *vm,
 
       clib_memcpy_fast (cop[0]->iv, fe->iv, 12);
       clib_memcpy_fast (cop[0]->aad, fe->aad, aad_len);
+
+      ring->frames[ring->enq].enqueued++;
       cop++;
       bi++;
       fe++;
       n_elts--;
     }
 
-  n_enqueue = rte_cryptodev_enqueue_burst (cet->cryptodev_id, cet->cryptodev_q,
-                                          (struct rte_crypto_op **) cet->cops,
-                                          frame->n_elts);
-  ASSERT (n_enqueue == frame->n_elts);
-  cet->inflight += n_enqueue;
+  n_enqueue =
+    rte_cryptodev_enqueue_burst (cet->cryptodev_id, cet->cryptodev_q,
+                                (struct rte_crypto_op **) cops, max_to_enq);
+  ASSERT (n_enqueue == max_to_enq);
+  cet->inflight += max_to_enq;
+  ring->frames[ring->enq].frame_inflight += max_to_enq;
+  if (ring->frames[ring->enq].enqueued == frame->n_elts)
+    {
+      ring->enq++;
+      ring->enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+      frame->state = VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED;
+      cet->enqueued_not_dequeueq++;
+    }
 
   return 0;
-}
-
-static_always_inline u16
-cryptodev_ring_deq (struct rte_ring *r, cryptodev_op_t **cops)
-{
-  u16 n, n_elts = 0;
-
-  n = rte_ring_dequeue_bulk_start (r, (void **) cops, 1, 0);
-  rte_ring_dequeue_finish (r, 0);
-  if (!n)
-    return 0;
 
-  n = cops[0]->n_elts;
-  if (rte_ring_count (r) < n)
-    return 0;
-
-  n_elts = rte_ring_sc_dequeue_bulk (r, (void **) cops, n, 0);
-  ASSERT (n_elts == n);
+error_exit:
+  ring->enq++;
+  ring->enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+  rte_mempool_put_bulk (cet->cop_pool, (void **) cops, max_to_enq);
 
-  return n_elts;
+  return -1;
 }
 
-static_always_inline vnet_crypto_async_frame_t *
-cryptodev_frame_dequeue (vlib_main_t *vm, u32 *nb_elts_processed,
-                        u32 *enqueue_thread_idx)
+static_always_inline u8
+cryptodev_frame_dequeue_internal (vlib_main_t *vm, u32 *nb_elts_processed,
+                                 u32 *enqueue_thread_idx)
 {
   cryptodev_main_t *cmt = &cryptodev_main;
   cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
-  cryptodev_op_t **cop = cet->cops;
+  vnet_crypto_async_frame_t *frame = NULL;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  u16 n_deq, idx, left_to_deq, i;
+  u16 max_to_deq = 0;
+  u16 inflight = cet->inflight;
+  u8 dequeue_more = 0;
+  cryptodev_op_t *cops[CRYPTODE_DEQ_MAX] = {};
+  cryptodev_op_t **cop = cops;
   vnet_crypto_async_frame_elt_t *fe;
-  vnet_crypto_async_frame_t *frame;
-  u32 n_elts, n_completed_ops = rte_ring_count (cet->ring);
+  u32 n_elts;
   u32 ss0 = 0, ss1 = 0, ss2 = 0, ss3 = 0; /* sum of status */
 
-  if (cet->inflight)
+  idx = ring->deq;
+
+  for (i = 0; i < VNET_CRYPTO_FRAME_POOL_SIZE; i++)
     {
-      n_elts = rte_cryptodev_dequeue_burst (
-       cet->cryptodev_id, cet->cryptodev_q,
-       (struct rte_crypto_op **) cet->cops, VNET_CRYPTO_FRAME_SIZE);
+      if (PREDICT_TRUE (ring->frames[idx].frame_inflight > 0))
+       break;
+      idx++;
+      idx &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+    }
 
-      if (n_elts)
-       {
-         cet->inflight -= n_elts;
-         n_completed_ops += n_elts;
+  ASSERT (i != VNET_CRYPTO_FRAME_POOL_SIZE);
+  ring->deq = idx;
 
-         rte_ring_sp_enqueue_burst (cet->ring, (void **) cet->cops, n_elts,
-                                    NULL);
-       }
-    }
+  left_to_deq =
+    ring->frames[ring->deq].f->n_elts - ring->frames[ring->deq].dequeued;
+  max_to_deq = clib_min (left_to_deq, CRYPTODE_DEQ_MAX);
 
-  if (PREDICT_FALSE (n_completed_ops == 0))
-    return 0;
+  /* deq field can be used to track frame that is currently dequeued
+   based on that you can specify the amount of elements to deq for the frame */
 
-  n_elts = cryptodev_ring_deq (cet->ring, cop);
-  if (!n_elts)
-    return 0;
+  n_deq =
+    rte_cryptodev_dequeue_burst (cet->cryptodev_id, cet->cryptodev_q,
+                                (struct rte_crypto_op **) cops, max_to_deq);
 
-  frame = cop[0]->frame;
-  fe = frame->elts;
+  if (n_deq == 0)
+    return dequeue_more;
 
+  ss0 = ring->frames[ring->deq].deq_state;
+  ss1 = ring->frames[ring->deq].deq_state;
+  ss2 = ring->frames[ring->deq].deq_state;
+  ss3 = ring->frames[ring->deq].deq_state;
+
+  frame = ring->frames[ring->deq].f;
+  fe = frame->elts + ring->frames[ring->deq].dequeued;
+
+  n_elts = n_deq;
   while (n_elts > 4)
     {
       ss0 |= fe[0].status = cryptodev_status_conversion[cop[0]->op.status];
@@ -469,16 +531,88 @@ cryptodev_frame_dequeue (vlib_main_t *vm, u32 *nb_elts_processed,
       n_elts--;
     }
 
-  frame->state = (ss0 | ss1 | ss2 | ss3) == VNET_CRYPTO_OP_STATUS_COMPLETED ?
-                  VNET_CRYPTO_FRAME_STATE_SUCCESS :
-                  VNET_CRYPTO_FRAME_STATE_ELT_ERROR;
+  ring->frames[ring->deq].deq_state |= (u8) (ss0 | ss1 | ss2 | ss3);
+
+  rte_mempool_put_bulk (cet->cop_pool, (void **) cops, n_deq);
 
-  rte_mempool_put_bulk (cet->cop_pool, (void **) cet->cops, frame->n_elts);
-  *nb_elts_processed = frame->n_elts;
-  *enqueue_thread_idx = frame->enqueue_thread_index;
-  return frame;
+  inflight -= n_deq;
+  ring->frames[ring->deq].dequeued += n_deq;
+  ring->frames[ring->deq].frame_inflight -= n_deq;
+  if (ring->frames[ring->deq].dequeued == ring->frames[ring->deq].n_elts)
+    {
+      frame->state =
+       (ss0 | ss1 | ss2 | ss3) == VNET_CRYPTO_OP_STATUS_COMPLETED ?
+               VNET_CRYPTO_FRAME_STATE_SUCCESS :
+               VNET_CRYPTO_FRAME_STATE_ELT_ERROR;
+
+      *nb_elts_processed = frame->n_elts;
+      *enqueue_thread_idx = frame->enqueue_thread_index;
+      cet->deqeued_not_returned++;
+      cet->enqueued_not_dequeueq--;
+      ring->deq++;
+      ring->deq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+      dequeue_more = (max_to_deq < CRYPTODE_DEQ_MAX);
+    }
+
+  cet->inflight = inflight;
+  return dequeue_more;
+}
+
+static_always_inline void
+cryptodev_enqueue_frame (vlib_main_t *vm, cryptodev_async_ring_elt *ring_elt)
+{
+  cryptodev_op_type_t op_type = (cryptodev_op_type_t) ring_elt->op_type;
+  u8 linked_or_aad_len = ring_elt->aad_len;
+
+  if (linked_or_aad_len == 1)
+    cryptodev_frame_linked_algs_enqueue_internal (vm, ring_elt->f, op_type);
+  else
+    cryptodev_aead_enqueue_internal (vm, ring_elt->f, op_type,
+                                    linked_or_aad_len);
 }
 
+static_always_inline vnet_crypto_async_frame_t *
+cryptodev_frame_dequeue (vlib_main_t *vm, u32 *nb_elts_processed,
+                        u32 *enqueue_thread_idx)
+{
+  cryptodev_main_t *cmt = &cryptodev_main;
+  vnet_crypto_main_t *cm = &crypto_main;
+  cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  cryptodev_async_ring_elt *ring_elt = &ring->frames[ring->tail];
+  vnet_crypto_async_frame_t *ret_frame = 0;
+  u8 dequeue_more = 1;
+
+  while (cet->inflight > 0 && dequeue_more)
+    {
+      dequeue_more = cryptodev_frame_dequeue_internal (vm, nb_elts_processed,
+                                                      enqueue_thread_idx);
+    }
+
+  if (PREDICT_TRUE (ring->frames[ring->enq].f != 0))
+    cryptodev_enqueue_frame (vm, &ring->frames[ring->enq]);
+
+  if (PREDICT_TRUE (ring_elt->f != 0))
+    {
+      if ((ring_elt->f->state == VNET_CRYPTO_FRAME_STATE_SUCCESS ||
+          ring_elt->f->state == VNET_CRYPTO_FRAME_STATE_ELT_ERROR) &&
+         ring_elt->enqueued == ring_elt->dequeued)
+       {
+         vlib_node_set_interrupt_pending (
+           vlib_get_main_by_index (vm->thread_index), cm->crypto_node_index);
+         ret_frame = ring_elt->f;
+         memset (ring_elt, 0, sizeof (*ring_elt));
+         ring->tail += 1;
+         ring->tail &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+         cet->frames_on_ring--;
+         cet->deqeued_not_returned--;
+         return ret_frame;
+       }
+    }
+
+  return ret_frame;
+}
 static_always_inline int
 cryptodev_enqueue_aead_aad_0_enc (vlib_main_t *vm,
                                  vnet_crypto_async_frame_t *frame)
@@ -560,29 +694,14 @@ cryptodev_register_cop_hdl (vlib_main_t *vm, u32 eidx)
        (char *) name, CRYPTODEV_NB_CRYPTO_OPS, sizeof (cryptodev_op_t), 0,
        sizeof (struct rte_crypto_op_pool_private), NULL, NULL, crypto_op_init,
        NULL, vm->numa_node, 0);
-      if (!cet->cop_pool)
-       {
-         error = clib_error_return (
-           0, "Failed to create cryptodev op pool %s", name);
-
-         goto error_exit;
-       }
       vec_free (name);
-
-      name = format (0, "frames_ring_%u_%u", numa, thread_index);
-      cet->ring =
-       rte_ring_create ((char *) name, CRYPTODEV_NB_CRYPTO_OPS, vm->numa_node,
-                        RING_F_SP_ENQ | RING_F_SC_DEQ);
-      if (!cet->ring)
+      if (!cet->cop_pool)
        {
          error = clib_error_return (
            0, "Failed to create cryptodev op pool %s", name);
 
          goto error_exit;
        }
-      vec_free (name);
-
-      vec_validate (cet->cops, VNET_CRYPTO_FRAME_SIZE - 1);
     }
 
 #define _(a, b, c, d, e, f, g)                                                \
@@ -628,9 +747,6 @@ cryptodev_register_cop_hdl (vlib_main_t *vm, u32 eidx)
 error_exit:
   vec_foreach (cet, cmt->per_thread_data)
     {
-      if (cet->ring)
-       rte_ring_free (cet->ring);
-
       if (cet->cop_pool)
        rte_mempool_free (cet->cop_pool);
     }