dpdk-cryptodev: improve dequeue behavior, fix cache stats logging 35/39535/8
authorPiotr Bronowski <piotrx.bronowski@intel.com>
Thu, 14 Sep 2023 17:41:13 +0000 (17:41 +0000)
committerFan Zhang <fanzhang.oss@gmail.com>
Thu, 28 Sep 2023 15:07:52 +0000 (15:07 +0000)
This patch provides minor improvements to the logic governing dequeuing
from the ring. Previously whenever a frame was dequeued
we've been trying to dequeue from the ring another one till
inflight == 0. Now threshold is set for 8 frames pending in the cache
to be consumed by the vnet. This threshold has been chosen based on
cache ring stats observation in the system under load.
Some unnecessary logic for setting deq_tail has been removed.
Also logging has been corrected, and cache ring logic simplied.

Type: improvement
Signed-off-by: Piotr Bronowski <piotrx.bronowski@intel.com>
Change-Id: I19f3daf5913006e9cb23e142a163f596e85f5bda

src/plugins/dpdk/cryptodev/cryptodev.c
src/plugins/dpdk/cryptodev/cryptodev.h
src/plugins/dpdk/cryptodev/cryptodev_op_data_path.c
src/plugins/dpdk/cryptodev/cryptodev_raw_data_path.c

index 4e8bc02..43c2c87 100644 (file)
@@ -672,50 +672,73 @@ cryptodev_show_cache_rings_fn (vlib_main_t *vm, unformat_input_t *input,
 {
   cryptodev_main_t *cmt = &cryptodev_main;
   u32 thread_index = 0;
+  u16 i;
   vec_foreach_index (thread_index, cmt->per_thread_data)
     {
       cryptodev_engine_thread_t *cet = cmt->per_thread_data + thread_index;
       cryptodev_cache_ring_t *ring = &cet->cache_ring;
       u16 head = ring->head;
       u16 tail = ring->tail;
-      u16 n_cached = ((head == tail) && (ring->frames[head].f == 0)) ?
-                            0 :
-                    ((head == tail) && (ring->frames[head].f != 0)) ?
-                            (CRYPTODEV_CACHE_QUEUE_MASK + 1) :
-                    (head > tail) ?
-                            (head - tail) :
-                            (CRYPTODEV_CACHE_QUEUE_MASK - tail + head);
+      u16 n_cached = (CRYPTODEV_CACHE_QUEUE_SIZE - tail + head) &
+                    CRYPTODEV_CACHE_QUEUE_MASK;
 
       u16 enq_head = ring->enq_head;
       u16 deq_tail = ring->deq_tail;
       u16 n_frames_inflight =
-       ((enq_head == deq_tail) && (ring->frames[enq_head].f == 0)) ?
+       (enq_head == deq_tail) ?
                0 :
-       ((enq_head == deq_tail) && (ring->frames[enq_head].f != 0)) ?
-               CRYPTODEV_CACHE_QUEUE_MASK + 1 :
-       (enq_head > deq_tail) ?
-               (enq_head - deq_tail) :
-               (CRYPTODEV_CACHE_QUEUE_MASK - deq_tail + enq_head);
-
+               ((CRYPTODEV_CACHE_QUEUE_SIZE + enq_head - deq_tail) &
+          CRYPTODEV_CACHE_QUEUE_MASK);
+      /* even if some elements of dequeued frame are still pending for deq
+       * we consider the frame as processed */
       u16 n_frames_processed =
-       ((tail == deq_tail) && (ring->frames[deq_tail].f == 0)) ? 0 :
-       ((tail == deq_tail) && (ring->frames[deq_tail].f != 0)) ? 1 :
-       (deq_tail > tail) ? (deq_tail - tail + 1) :
-                                 (CRYPTODEV_CACHE_QUEUE_MASK - tail + deq_tail - 1);
+       ((tail == deq_tail) && (ring->frames[deq_tail].f == 0)) ?
+               0 :
+               ((CRYPTODEV_CACHE_QUEUE_SIZE - tail + deq_tail) &
+          CRYPTODEV_CACHE_QUEUE_MASK) +
+           1;
+      /* even if some elements of enqueued frame are still pending for enq
+       * we consider the frame as enqueued */
+      u16 n_frames_pending =
+       (head == enq_head) ? 0 :
+                                  ((CRYPTODEV_CACHE_QUEUE_SIZE - enq_head + head) &
+                             CRYPTODEV_CACHE_QUEUE_MASK) -
+                              1;
+
+      u16 elts_to_enq =
+       (ring->frames[enq_head].n_elts - ring->frames[enq_head].enq_elts_head);
+      u16 elts_to_deq =
+       (ring->frames[deq_tail].n_elts - ring->frames[deq_tail].deq_elts_tail);
+
+      u32 elts_total = 0;
+
+      for (i = 0; i < CRYPTODEV_CACHE_QUEUE_SIZE; i++)
+       elts_total += ring->frames[i].n_elts;
 
       if (vlib_num_workers () > 0 && thread_index == 0)
        continue;
 
       vlib_cli_output (vm, "\n\n");
-      vlib_cli_output (vm, "Frames total: %u", n_cached);
-      vlib_cli_output (vm, "Frames pending in the ring: %u",
-                      n_cached - n_frames_inflight - n_frames_processed);
+      vlib_cli_output (vm, "Frames cached in the ring: %u", n_cached);
+      vlib_cli_output (vm, "Frames cached but not processed: %u",
+                      n_frames_pending);
       vlib_cli_output (vm, "Frames inflight: %u", n_frames_inflight);
-      vlib_cli_output (vm, "Frames dequed but not returned: %u",
-                      n_frames_processed);
+      vlib_cli_output (vm, "Frames processed: %u", n_frames_processed);
+      vlib_cli_output (vm, "Elements total: %u", elts_total);
       vlib_cli_output (vm, "Elements inflight: %u", cet->inflight);
-      vlib_cli_output (vm, "Head: %u", head);
-      vlib_cli_output (vm, "Tail: %u", tail);
+      vlib_cli_output (vm, "Head index: %u", head);
+      vlib_cli_output (vm, "Tail index: %u", tail);
+      vlib_cli_output (vm, "Current frame index beeing enqueued: %u",
+                      enq_head);
+      vlib_cli_output (vm, "Current frame index being dequeued: %u", deq_tail);
+      vlib_cli_output (vm,
+                      "Elements in current frame to be enqueued: %u, waiting "
+                      "to be enqueued: %u",
+                      ring->frames[enq_head].n_elts, elts_to_enq);
+      vlib_cli_output (vm,
+                      "Elements in current frame to be dequeued: %u, waiting "
+                      "to be dequeued: %u",
+                      ring->frames[deq_tail].n_elts, elts_to_deq);
       vlib_cli_output (vm, "\n\n");
     }
   return 0;
index 63eb46e..7cd525d 100644 (file)
@@ -32,6 +32,7 @@
 #define CRYPTODEV_MAX_IV_SIZE     16
 #define CRYPTODEV_MAX_AAD_SIZE    16
 #define CRYPTODEV_MAX_N_SGL       8 /**< maximum number of segments */
+#define CRYPTODEV_MAX_PROCESED_IN_CACHE_QUEUE 8
 
 #define CRYPTODEV_IV_OFFSET  (offsetof (cryptodev_op_t, iv))
 #define CRYPTODEV_AAD_OFFSET (offsetof (cryptodev_op_t, aad))
@@ -303,19 +304,24 @@ cryptodev_cache_ring_push (cryptodev_cache_ring_t *r,
                           vnet_crypto_async_frame_t *f)
 {
   u16 head = r->head;
+  u16 tail = r->tail;
+
   cryptodev_cache_ring_elt_t *ring_elt = &r->frames[head];
   /**
    * in debug mode we do the ring sanity test when a frame is enqueued to
    * the ring.
    **/
 #if CLIB_DEBUG > 0
-  u16 tail = r->tail;
   u16 n_cached = (head >= tail) ? (head - tail) :
                                        (CRYPTODEV_CACHE_QUEUE_MASK - tail + head);
-  ERROR_ASSERT (n_cached < VNET_CRYPTO_FRAME_POOL_SIZE);
+  ERROR_ASSERT (n_cached < CRYPTODEV_CACHE_QUEUE_SIZE);
   ERROR_ASSERT (r->raw == 0 && r->frames[head].raw == 0 &&
                r->frames[head].f == 0);
 #endif
+  /*the ring capacity is CRYPTODEV_CACHE_QUEUE_SIZE - 1*/
+  if (PREDICT_FALSE (head + 1) == tail)
+    return 0;
+
   ring_elt->f = f;
   ring_elt->n_elts = f->n_elts;
   /* update head */
index 29af2fa..d0f093e 100644 (file)
@@ -148,6 +148,9 @@ cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
   cryptodev_cache_ring_elt_t *ring_elt =
     cryptodev_cache_ring_push (ring, frame);
 
+  if (PREDICT_FALSE (ring_elt == NULL))
+    return -1;
+
   ring_elt->aad_len = 1;
   ring_elt->op_type = (u8) op_type;
   return 0;
@@ -295,6 +298,10 @@ cryptodev_frame_aead_enqueue (vlib_main_t *vm,
   ERROR_ASSERT (frame->n_elts > 0);
   cryptodev_cache_ring_elt_t *ring_elt =
     cryptodev_cache_ring_push (ring, frame);
+
+  if (PREDICT_FALSE (ring_elt == NULL))
+    return -1;
+
   ring_elt->aad_len = aad_len;
   ring_elt->op_type = (u8) op_type;
   return 0;
@@ -462,7 +469,7 @@ cryptodev_frame_dequeue_internal (vlib_main_t *vm, u32 *nb_elts_processed,
   vnet_crypto_async_frame_t *frame = NULL;
   cryptodev_cache_ring_t *ring = &cet->cache_ring;
   u16 *const deq = &ring->deq_tail;
-  u16 n_deq, idx, left_to_deq, i;
+  u16 n_deq, left_to_deq;
   u16 max_to_deq = 0;
   u16 inflight = cet->inflight;
   u8 dequeue_more = 0;
@@ -472,29 +479,12 @@ cryptodev_frame_dequeue_internal (vlib_main_t *vm, u32 *nb_elts_processed,
   u32 n_elts, n;
   u64 err0 = 0, err1 = 0, err2 = 0, err3 = 0; /* partial errors mask */
 
-  idx = ring->deq_tail;
-
-  for (i = 0; i < VNET_CRYPTO_FRAME_POOL_SIZE; i++)
-    {
-      u32 frame_inflight =
-       CRYPTODEV_CACHE_RING_GET_FRAME_ELTS_INFLIGHT (ring, idx);
-
-      if (PREDICT_TRUE (frame_inflight > 0))
-       break;
-      idx++;
-      idx &= (VNET_CRYPTO_FRAME_POOL_SIZE - 1);
-    }
-
-  ERROR_ASSERT (i != VNET_CRYPTO_FRAME_POOL_SIZE);
-  ring->deq_tail = idx;
-
   left_to_deq =
     ring->frames[*deq].f->n_elts - ring->frames[*deq].deq_elts_tail;
   max_to_deq = clib_min (left_to_deq, CRYPTODE_DEQ_MAX);
 
   /* deq field can be used to track frame that is currently dequeued
    based on that you can specify the amount of elements to deq for the frame */
-
   n_deq =
     rte_cryptodev_dequeue_burst (cet->cryptodev_id, cet->cryptodev_q,
                                 (struct rte_crypto_op **) cops, max_to_deq);
@@ -547,9 +537,13 @@ cryptodev_frame_dequeue_internal (vlib_main_t *vm, u32 *nb_elts_processed,
   ring->frames[*deq].deq_elts_tail += n_deq;
   if (cryptodev_cache_ring_update_deq_tail (ring, deq))
     {
+      u32 fr_processed =
+       (CRYPTODEV_CACHE_QUEUE_SIZE - ring->tail + ring->deq_tail) &
+       CRYPTODEV_CACHE_QUEUE_MASK;
+
       *nb_elts_processed = frame->n_elts;
       *enqueue_thread_idx = frame->enqueue_thread_index;
-      dequeue_more = (max_to_deq < CRYPTODE_DEQ_MAX);
+      dequeue_more = (fr_processed < CRYPTODEV_MAX_PROCESED_IN_CACHE_QUEUE);
     }
 
   cet->inflight = inflight;
index 19291eb..9ac0f94 100644 (file)
@@ -118,6 +118,9 @@ cryptodev_frame_linked_algs_enqueue (vlib_main_t *vm,
   cryptodev_cache_ring_elt_t *ring_elt =
     cryptodev_cache_ring_push (ring, frame);
 
+  if (PREDICT_FALSE (ring_elt == NULL))
+    return -1;
+
   ring_elt->aad_len = 1;
   ring_elt->op_type = (u8) op_type;
   return 0;
@@ -272,6 +275,9 @@ cryptodev_raw_aead_enqueue (vlib_main_t *vm, vnet_crypto_async_frame_t *frame,
   cryptodev_cache_ring_elt_t *ring_elt =
     cryptodev_cache_ring_push (ring, frame);
 
+  if (PREDICT_FALSE (ring_elt == NULL))
+    return -1;
+
   ring_elt->aad_len = aad_len;
   ring_elt->op_type = (u8) op_type;
   return 0;
@@ -466,32 +472,17 @@ cryptodev_raw_dequeue_internal (vlib_main_t *vm, u32 *nb_elts_processed,
   cryptodev_cache_ring_t *ring = &cet->cache_ring;
   u16 *const deq = &ring->deq_tail;
   u32 n_success;
-  u16 n_deq, indice, i, left_to_deq;
+  u16 n_deq, i, left_to_deq;
   u16 max_to_deq = 0;
   u16 inflight = cet->inflight;
   u8 dequeue_more = 0;
   int dequeue_status;
 
-  indice = *deq;
-
-  for (i = 0; i < VNET_CRYPTO_FRAME_POOL_SIZE; i++)
-    {
-      if (PREDICT_TRUE (
-           CRYPTODEV_CACHE_RING_GET_FRAME_ELTS_INFLIGHT (ring, indice) > 0))
-       break;
-      indice += 1;
-      indice &= CRYPTODEV_CACHE_QUEUE_MASK;
-    }
-
-  ERROR_ASSERT (i != VNET_CRYPTO_FRAME_POOL_SIZE);
-
-  *deq = indice;
-
   left_to_deq = ring->frames[*deq].n_elts - ring->frames[*deq].deq_elts_tail;
   max_to_deq = clib_min (left_to_deq, CRYPTODE_DEQ_MAX);
 
-  /* you can use deq field to track frame that is currently dequeued */
-  /* based on that you can specify the amount of elements to deq for the frame
+  /* deq field can be used to track frame that is currently dequeued */
+  /* based on thatthe amount of elements to deq for the frame can be specified
    */
 
   n_deq = rte_cryptodev_raw_dequeue_burst (
@@ -516,9 +507,13 @@ cryptodev_raw_dequeue_internal (vlib_main_t *vm, u32 *nb_elts_processed,
 
   if (cryptodev_cache_ring_update_deq_tail (ring, deq))
     {
+      u32 fr_processed =
+       (CRYPTODEV_CACHE_QUEUE_SIZE - ring->tail + ring->deq_tail) &
+       CRYPTODEV_CACHE_QUEUE_MASK;
+
       *nb_elts_processed = frame->n_elts;
       *enqueue_thread_idx = frame->enqueue_thread_index;
-      dequeue_more = max_to_deq < CRYPTODE_DEQ_MAX;
+      dequeue_more = (fr_processed < CRYPTODEV_MAX_PROCESED_IN_CACHE_QUEUE);
     }
 
   int res =
@@ -555,24 +550,18 @@ cryptodev_raw_dequeue (vlib_main_t *vm, u32 *nb_elts_processed,
   u8 dequeue_more = 1;
 
   while (cet->inflight > 0 && dequeue_more)
-    {
       dequeue_more = cryptodev_raw_dequeue_internal (vm, nb_elts_processed,
                                                     enqueue_thread_idx);
-    }
 
   if (PREDICT_TRUE (ring->frames[ring->enq_head].f != 0))
     cryptodev_enqueue_frame_to_qat (vm, &ring->frames[ring->enq_head]);
 
-  if (PREDICT_TRUE (ring_elt->f != 0))
+  if (PREDICT_TRUE (ring_elt->f != 0) &&
+      (ring_elt->n_elts == ring_elt->deq_elts_tail))
     {
-      if (ring_elt->enq_elts_head == ring_elt->deq_elts_tail)
-       {
-         vlib_node_set_interrupt_pending (
-           vlib_get_main_by_index (vm->thread_index), cm->crypto_node_index);
-         ret_frame = cryptodev_cache_ring_pop (ring);
-
-         return ret_frame;
-       }
+      vlib_node_set_interrupt_pending (
+       vlib_get_main_by_index (vm->thread_index), cm->crypto_node_index);
+      ret_frame = cryptodev_cache_ring_pop (ring);
     }
 
   return ret_frame;