dpdk-cryptodev: enq/deq scheme rework
[vpp.git] / src / plugins / dpdk / cryptodev / cryptodev_raw_data_path.c
index 3a2f46e..9f0936a 100644 (file)
@@ -109,6 +109,25 @@ static_always_inline int
 cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
                                     vnet_crypto_async_frame_t *frame,
                                     cryptodev_op_type_t op_type)
+{
+  cryptodev_main_t *cmt = &cryptodev_main;
+  cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  cryptodev_async_ring_elt *ring_elt = &ring->frames[ring->head];
+  cet->frames_on_ring++;
+  ring_elt->f = frame;
+  ring_elt->n_elts = frame->n_elts;
+  ring_elt->aad_len = 1;
+  ring_elt->op_type = (u8) op_type;
+  ring->head++;
+  ring->head &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+  return 0;
+}
+
+static_always_inline void
+cryptodev_frame_linked_algs_enqueue_internal (vlib_main_t *vm,
+                                             vnet_crypto_async_frame_t *frame,
+                                             cryptodev_op_type_t op_type)
 {
   cryptodev_main_t *cmt = &cryptodev_main;
   cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
@@ -116,26 +135,25 @@ cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
   vlib_buffer_t **b;
   struct rte_crypto_vec vec[CRYPTODEV_MAX_N_SGL];
   struct rte_crypto_va_iova_ptr iv_vec, digest_vec;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
   u32 n_elts;
   u32 last_key_index = ~0;
   i16 min_ofs;
   u32 max_end;
+  u32 max_to_enq = clib_min (CRYPTODE_ENQ_MAX,
+                            frame->n_elts - ring->frames[ring->enq].enqueued);
   u8 is_update = 0;
   int status;
 
-  n_elts = frame->n_elts;
+  if (cet->inflight + max_to_enq > CRYPTODEV_MAX_INFLIGHT)
+    return;
 
-  if (PREDICT_FALSE (CRYPTODEV_MAX_INFLIGHT - cet->inflight < n_elts))
-    {
-      cryptodev_mark_frame_err_status (frame,
-                                      VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR);
-      return -1;
-    }
+  n_elts = max_to_enq;
 
   vlib_get_buffers (vm, frame->buffer_indices, cet->b, frame->n_elts);
 
-  b = cet->b;
-  fe = frame->elts;
+  b = cet->b + ring->frames[ring->enq].enqueued;
+  fe = frame->elts + ring->frames[ring->enq].enqueued;
 
   while (n_elts)
     {
@@ -215,26 +233,34 @@ cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
       if (PREDICT_FALSE (status < 0))
        goto error_exit;
 
+      ring->frames[ring->enq].enqueued += 1;
       b++;
       fe++;
       n_elts--;
     }
 
-  status = rte_cryptodev_raw_enqueue_done (cet->ctx, frame->n_elts);
+  status = rte_cryptodev_raw_enqueue_done (cet->ctx, max_to_enq);
   if (PREDICT_FALSE (status < 0))
+    goto error_exit;
+
+  cet->inflight += max_to_enq;
+  ring->frames[ring->enq].frame_inflight += max_to_enq;
+  if (ring->frames[ring->enq].enqueued == frame->n_elts)
     {
-      cryptodev_reset_ctx (cet);
-      return -1;
+      cet->frame_ring.enq += 1;
+      cet->frame_ring.enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+      frame->state = VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED;
     }
-
-  cet->inflight += frame->n_elts;
-  return 0;
+  return;
 
 error_exit:
   cryptodev_mark_frame_err_status (frame,
-                                  VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR);
+                                  VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
+                                  VNET_CRYPTO_FRAME_STATE_ELT_ERROR);
   cryptodev_reset_ctx (cet);
-  return -1;
+  cet->frame_ring.enq += 1;
+  cet->frame_ring.enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+  return;
 }
 
 static_always_inline int
@@ -243,6 +269,26 @@ cryptodev_raw_aead_enqueue (vlib_main_t *vm, vnet_crypto_async_frame_t *frame,
 {
   cryptodev_main_t *cmt = &cryptodev_main;
   cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  cryptodev_async_ring_elt *ring_elt = &ring->frames[ring->head];
+  cet->frames_on_ring++;
+  ring_elt->f = frame;
+  ring_elt->n_elts = frame->n_elts;
+  ring_elt->aad_len = aad_len;
+  ring_elt->op_type = (u8) op_type;
+  ring->head++;
+  ring->head &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+  return 0;
+}
+
+static_always_inline void
+cryptodev_raw_aead_enqueue_internal (vlib_main_t *vm,
+                                    vnet_crypto_async_frame_t *frame,
+                                    cryptodev_op_type_t op_type, u8 aad_len)
+{
+  cryptodev_main_t *cmt = &cryptodev_main;
+  cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
   vnet_crypto_async_frame_elt_t *fe;
   vlib_buffer_t **b;
   u32 n_elts;
@@ -250,22 +296,22 @@ cryptodev_raw_aead_enqueue (vlib_main_t *vm, vnet_crypto_async_frame_t *frame,
   struct rte_crypto_vec vec[CRYPTODEV_MAX_N_SGL];
   struct rte_crypto_va_iova_ptr iv_vec, digest_vec, aad_vec;
   u32 last_key_index = ~0;
+  u16 left_to_enq = frame->n_elts - ring->frames[ring->enq].enqueued;
+  u16 max_to_enq = clib_min (CRYPTODE_ENQ_MAX, left_to_enq);
   u8 is_update = 0;
   int status;
 
-  n_elts = frame->n_elts;
-
-  if (PREDICT_FALSE (CRYPTODEV_MAX_INFLIGHT - cet->inflight < n_elts))
+  if (cet->inflight + max_to_enq > CRYPTODEV_MAX_INFLIGHT)
     {
-      cryptodev_mark_frame_err_status (frame,
-                                      VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR);
-      return -1;
+      return;
     }
 
+  n_elts = max_to_enq;
+
   vlib_get_buffers (vm, frame->buffer_indices, cet->b, frame->n_elts);
 
-  fe = frame->elts;
-  b = cet->b;
+  fe = frame->elts + ring->frames[ring->enq].enqueued;
+  b = cet->b + ring->frames[ring->enq].enqueued;
   cofs.raw = 0;
 
   while (n_elts)
@@ -378,31 +424,36 @@ cryptodev_raw_aead_enqueue (vlib_main_t *vm, vnet_crypto_async_frame_t *frame,
       if (PREDICT_FALSE (status < 0))
        goto error_exit;
 
+      ring->frames[ring->enq].enqueued += 1;
       fe++;
       b++;
       n_elts--;
     }
 
-  status = rte_cryptodev_raw_enqueue_done (cet->ctx, frame->n_elts);
+  status = rte_cryptodev_raw_enqueue_done (cet->ctx, max_to_enq);
   if (PREDICT_FALSE (status < 0))
     goto error_exit;
 
-  cet->inflight += frame->n_elts;
+  cet->inflight += max_to_enq;
+  ring->frames[ring->enq].frame_inflight += max_to_enq;
+  if (ring->frames[ring->enq].enqueued == frame->n_elts)
+    {
+      ring->enq += 1;
+      ring->enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+      frame->state = VNET_CRYPTO_FRAME_STATE_NOT_PROCESSED;
+      cet->enqueued_not_dequeueq++;
+    }
 
-  return 0;
+  return;
 
 error_exit:
   cryptodev_mark_frame_err_status (frame,
-                                  VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR);
+                                  VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR,
+                                  VNET_CRYPTO_FRAME_STATE_ELT_ERROR);
   cryptodev_reset_ctx (cet);
-  return -1;
-}
-
-static_always_inline u32
-cryptodev_get_frame_n_elts (void *frame)
-{
-  vnet_crypto_async_frame_t *f = (vnet_crypto_async_frame_t *) frame;
-  return f->n_elts;
+  ring->enq += 1;
+  ring->enq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+  return;
 }
 
 static_always_inline void
@@ -414,185 +465,132 @@ cryptodev_post_dequeue (void *frame, u32 index, u8 is_op_success)
                                          VNET_CRYPTO_OP_STATUS_FAIL_BAD_HMAC;
 }
 
-#define GET_RING_OBJ(r, pos, f)                                               \
-  do                                                                          \
-    {                                                                         \
-      vnet_crypto_async_frame_t **ring = (void *) &r[1];                      \
-      f = ring[(r->cons.head + pos) & r->mask];                               \
-    }                                                                         \
-  while (0)
-
-static_always_inline vnet_crypto_async_frame_t *
-cryptodev_raw_dequeue (vlib_main_t *vm, u32 *nb_elts_processed,
-                      u32 *enqueue_thread_idx)
+static_always_inline u8
+cryptodev_raw_dequeue_internal (vlib_main_t *vm, u32 *nb_elts_processed,
+                               u32 *enqueue_thread_idx)
 {
   cryptodev_main_t *cmt = &cryptodev_main;
-  vnet_crypto_main_t *cm = &crypto_main;
   cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
-  vnet_crypto_async_frame_t *frame, *frame_ret = 0;
-  u32 n_deq, n_success;
-  u32 n_cached_frame = rte_ring_count (cet->cached_frame), n_room_left;
-  u8 no_job_to_deq = 0;
+  vnet_crypto_async_frame_t *frame;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  u32 n_success;
+  u16 n_deq, indice, i, left_to_deq;
+  u16 max_to_deq = 0;
   u16 inflight = cet->inflight;
+  u8 dequeue_more = 0;
   int dequeue_status;
 
-  n_room_left = CRYPTODEV_DEQ_CACHE_SZ - n_cached_frame - 1;
+  indice = ring->deq;
 
-  if (n_cached_frame)
+  for (i = 0; i < VNET_CRYPTO_FRAME_POOL_SIZE; i++)
     {
-      u32 i;
-      for (i = 0; i < n_cached_frame; i++)
-       {
-         vnet_crypto_async_frame_t *f;
-         void *f_ret;
-         enum rte_crypto_op_status op_status;
-         u8 n_left, err, j;
-
-         GET_RING_OBJ (cet->cached_frame, i, f);
-
-         if (i < n_cached_frame - 2)
-           {
-             vnet_crypto_async_frame_t *f1, *f2;
-             GET_RING_OBJ (cet->cached_frame, i + 1, f1);
-             GET_RING_OBJ (cet->cached_frame, i + 2, f2);
-             clib_prefetch_load (f1);
-             clib_prefetch_load (f2);
-           }
-
-         n_left = f->state & 0x7f;
-         err = f->state & 0x80;
-
-         for (j = f->n_elts - n_left; j < f->n_elts && inflight; j++)
-           {
-             int ret;
-             f_ret = rte_cryptodev_raw_dequeue (cet->ctx, &ret, &op_status);
-
-             if (!f_ret)
-               break;
-
-             switch (op_status)
-               {
-               case RTE_CRYPTO_OP_STATUS_SUCCESS:
-                 f->elts[j].status = VNET_CRYPTO_OP_STATUS_COMPLETED;
-                 break;
-               default:
-                 f->elts[j].status = VNET_CRYPTO_OP_STATUS_FAIL_ENGINE_ERR;
-                 err |= 1 << 7;
-               }
-
-             inflight--;
-           }
-
-         if (j == f->n_elts)
-           {
-             if (i == 0)
-               {
-                 frame_ret = f;
-                 f->state = err ? VNET_CRYPTO_FRAME_STATE_ELT_ERROR :
-                                  VNET_CRYPTO_FRAME_STATE_SUCCESS;
-               }
-             else
-               {
-                 f->state = f->n_elts - j;
-                 f->state |= err;
-               }
-             if (inflight)
-               continue;
-           }
+      if (PREDICT_TRUE (ring->frames[indice].frame_inflight > 0))
+       break;
+      indice += 1;
+      indice &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+    }
 
-         /* to here f is not completed dequeued and no more job can be
-          * dequeued
-          */
-         f->state = f->n_elts - j;
-         f->state |= err;
-         no_job_to_deq = 1;
-         break;
-       }
+  ASSERT (i != VNET_CRYPTO_FRAME_POOL_SIZE);
 
-      if (frame_ret)
-       {
-         rte_ring_sc_dequeue (cet->cached_frame, (void **) &frame_ret);
-         n_room_left++;
-       }
-    }
+  ring->deq = indice;
 
-  if (inflight > 0)
-    vlib_node_set_interrupt_pending (vlib_get_main_by_index (vm->thread_index),
-                                    cm->crypto_node_index);
+  left_to_deq =
+    ring->frames[ring->deq].f->n_elts - ring->frames[ring->deq].dequeued;
+  max_to_deq = clib_min (left_to_deq, CRYPTODE_DEQ_MAX);
 
-  /* no point to dequeue further */
-  if (!inflight || no_job_to_deq || !n_room_left)
-    goto end_deq;
+  /* you can use deq field to track frame that is currently dequeued */
+  /* based on that you can specify the amount of elements to deq for the frame
+   */
 
-#if RTE_VERSION >= RTE_VERSION_NUM(21, 5, 0, 0)
-  n_deq = rte_cryptodev_raw_dequeue_burst (
-    cet->ctx, cryptodev_get_frame_n_elts, 0, cryptodev_post_dequeue,
-    (void **) &frame, 0, &n_success, &dequeue_status);
-#else
   n_deq = rte_cryptodev_raw_dequeue_burst (
-    cet->ctx, cryptodev_get_frame_n_elts, cryptodev_post_dequeue,
-    (void **) &frame, 0, &n_success, &dequeue_status);
-#endif
+    cet->ctx, NULL, max_to_deq, cryptodev_post_dequeue, (void **) &frame, 0,
+    &n_success, &dequeue_status);
 
   if (!n_deq)
-    goto end_deq;
+    return dequeue_more;
 
   inflight -= n_deq;
-  no_job_to_deq = n_deq < frame->n_elts;
-  /* we have to cache the frame */
-  if (frame_ret || n_cached_frame || no_job_to_deq)
+  ring->frames[ring->deq].dequeued += n_deq;
+  ring->frames[ring->deq].deq_state += n_success;
+  ring->frames[ring->deq].frame_inflight -= n_deq;
+
+  if (ring->frames[ring->deq].dequeued == ring->frames[ring->deq].n_elts)
     {
-      frame->state = frame->n_elts - n_deq;
-      frame->state |= ((n_success < n_deq) << 7);
-      rte_ring_sp_enqueue (cet->cached_frame, (void *) frame);
-      n_room_left--;
+      frame->state = ring->frames[ring->deq].deq_state == frame->n_elts ?
+                            VNET_CRYPTO_FRAME_STATE_SUCCESS :
+                            VNET_CRYPTO_FRAME_STATE_ELT_ERROR;
+      *nb_elts_processed = frame->n_elts;
+      *enqueue_thread_idx = frame->enqueue_thread_index;
+      cet->deqeued_not_returned++;
+      cet->enqueued_not_dequeueq--;
+      ring->deq += 1;
+      ring->deq &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+      dequeue_more = max_to_deq < CRYPTODE_DEQ_MAX;
     }
+
+  int res =
+    rte_cryptodev_raw_dequeue_done (cet->ctx, cet->inflight - inflight);
+  ASSERT (res == 0);
+  cet->inflight = inflight;
+  return dequeue_more;
+}
+
+static_always_inline void
+cryptodev_enqueue_frame (vlib_main_t *vm, cryptodev_async_ring_elt *ring_elt)
+{
+  cryptodev_op_type_t op_type = (cryptodev_op_type_t) ring_elt->op_type;
+  u8 linked_or_aad_len = ring_elt->aad_len;
+
+  if (linked_or_aad_len == 1)
+    cryptodev_frame_linked_algs_enqueue_internal (vm, ring_elt->f, op_type);
   else
-    {
-      frame->state = n_success == frame->n_elts ?
-                      VNET_CRYPTO_FRAME_STATE_SUCCESS :
-                      VNET_CRYPTO_FRAME_STATE_ELT_ERROR;
-      frame_ret = frame;
-    }
+    cryptodev_raw_aead_enqueue_internal (vm, ring_elt->f, op_type,
+                                        linked_or_aad_len);
+}
 
-  /* see if we can dequeue more */
-  while (inflight && n_room_left && !no_job_to_deq)
-    {
-#if RTE_VERSION >= RTE_VERSION_NUM(21, 5, 0, 0)
-      n_deq = rte_cryptodev_raw_dequeue_burst (
-       cet->ctx, cryptodev_get_frame_n_elts, 0, cryptodev_post_dequeue,
-       (void **) &frame, 0, &n_success, &dequeue_status);
-#else
-      n_deq = rte_cryptodev_raw_dequeue_burst (
-       cet->ctx, cryptodev_get_frame_n_elts, cryptodev_post_dequeue,
-       (void **) &frame, 0, &n_success, &dequeue_status);
-#endif
-      if (!n_deq)
-       break;
-      inflight -= n_deq;
-      no_job_to_deq = n_deq < frame->n_elts;
-      frame->state = frame->n_elts - n_deq;
-      frame->state |= ((n_success < n_deq) << 7);
-      rte_ring_sp_enqueue (cet->cached_frame, (void *) frame);
-      n_room_left--;
-    }
+static_always_inline vnet_crypto_async_frame_t *
+cryptodev_raw_dequeue (vlib_main_t *vm, u32 *nb_elts_processed,
+                      u32 *enqueue_thread_idx)
+{
+  cryptodev_main_t *cmt = &cryptodev_main;
+  vnet_crypto_main_t *cm = &crypto_main;
+  cryptodev_engine_thread_t *cet = cmt->per_thread_data + vm->thread_index;
+  cryptodev_async_frame_sw_ring *ring = &cet->frame_ring;
+  cryptodev_async_ring_elt *ring_elt = &ring->frames[ring->tail];
+  vnet_crypto_async_frame_t *ret_frame = 0;
+  u8 dequeue_more = 1;
 
-end_deq:
-  if (inflight < cet->inflight)
+  while (cet->inflight > 0 && dequeue_more)
     {
-      int res =
-       rte_cryptodev_raw_dequeue_done (cet->ctx, cet->inflight - inflight);
-      ASSERT (res == 0);
-      cet->inflight = inflight;
+      dequeue_more = cryptodev_raw_dequeue_internal (vm, nb_elts_processed,
+                                                    enqueue_thread_idx);
     }
 
-  if (frame_ret)
+  if (PREDICT_TRUE (ring->frames[ring->enq].f != 0))
+    cryptodev_enqueue_frame (vm, &ring->frames[ring->enq]);
+
+  if (PREDICT_TRUE (ring_elt->f != 0))
     {
-      *nb_elts_processed = frame_ret->n_elts;
-      *enqueue_thread_idx = frame_ret->enqueue_thread_index;
+      if ((ring_elt->f->state == VNET_CRYPTO_FRAME_STATE_SUCCESS ||
+          ring_elt->f->state == VNET_CRYPTO_FRAME_STATE_ELT_ERROR) &&
+         ring_elt->enqueued == ring_elt->dequeued)
+       {
+         vlib_node_set_interrupt_pending (
+           vlib_get_main_by_index (vm->thread_index), cm->crypto_node_index);
+         ret_frame = ring_elt->f;
+         ring_elt->f = 0;
+         ring_elt->dequeued = 0;
+         ring_elt->enqueued = 0;
+         ring_elt->deq_state = 0;
+         ring->tail += 1;
+         ring->tail &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
+         cet->frames_on_ring--;
+         cet->deqeued_not_returned--;
+         return ret_frame;
+       }
     }
 
-  return frame_ret;
+  return ret_frame;
 }
 
 static_always_inline int
@@ -688,10 +686,6 @@ cryptodev_register_raw_hdl (vlib_main_t *vm, u32 eidx)
       u32 numa = vlib_get_main_by_index (thread_id)->numa_node;
       u8 *name = format (0, "cache_frame_ring_%u_%u", numa, thread_id);
 
-      cet->cached_frame =
-       rte_ring_create ((char *) name, CRYPTODEV_DEQ_CACHE_SZ, numa,
-                        RING_F_SC_DEQ | RING_F_SP_ENQ);
-
       cet->aad_buf = rte_zmalloc_socket (
        0, CRYPTODEV_NB_CRYPTO_OPS * CRYPTODEV_MAX_AAD_SIZE,
        CLIB_CACHE_LINE_BYTES, numa);
@@ -709,13 +703,6 @@ cryptodev_register_raw_hdl (vlib_main_t *vm, u32 eidx)
          error = clib_error_return (0, "Failed to alloc raw dp ctx");
          goto err_handling;
        }
-
-      if (cet->cached_frame == 0)
-       {
-         error = clib_error_return (0, "Failed to alloc frame ring %s", name);
-         goto err_handling;
-       }
-
       vec_free (name);
     }
 
@@ -762,11 +749,5 @@ cryptodev_register_raw_hdl (vlib_main_t *vm, u32 eidx)
   return 0;
 
 err_handling:
-  vec_foreach (cet, cmt->per_thread_data)
-    {
-      if (cet->cached_frame)
-       rte_ring_free (cet->cached_frame);
-    }
-
   return error;
 }