svm: support for multi-segment enqueues 12/30112/7
authorFlorin Coras <fcoras@cisco.com>
Tue, 24 Nov 2020 16:41:17 +0000 (08:41 -0800)
committerFlorin Coras <florin.coras@gmail.com>
Tue, 24 Nov 2020 21:44:14 +0000 (21:44 +0000)
Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I06c7022a6afbb146b23cbd3a430497ec9e8be73d

src/svm/svm_fifo.c
src/svm/svm_fifo.h
src/vnet/session/application_interface.h
src/vnet/session/session.c

index d357eb6..f1ac8d4 100644 (file)
@@ -955,6 +955,87 @@ svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
   clib_atomic_store_rel_n (&f->tail, tail);
 }
 
+int
+svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
+                          u32 n_segs, u8 allow_partial)
+{
+  u32 tail, head, free_count, len = 0, i;
+  svm_fifo_chunk_t *old_tail_c;
+
+  f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
+
+  f_load_head_tail_prod (f, &head, &tail);
+
+  /* free space in fifo can only increase during enqueue: SPSC */
+  free_count = f_free_count (f, head, tail);
+
+  if (PREDICT_FALSE (free_count == 0))
+    return SVM_FIFO_EFULL;
+
+  for (i = 0; i < n_segs; i++)
+    len += segs[i].len;
+
+  old_tail_c = f->tail_chunk;
+
+  if (!allow_partial)
+    {
+      if (PREDICT_FALSE (free_count < len))
+       return SVM_FIFO_EFULL;
+
+      if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+       {
+         if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
+           return SVM_FIFO_EGROW;
+       }
+
+      for (i = 0; i < n_segs; i++)
+       {
+         svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, segs[i].data,
+                                 segs[i].len, &f->tail_chunk);
+         tail += segs[i].len;
+       }
+    }
+  else
+    {
+      len = clib_min (free_count, len);
+
+      if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+       {
+         if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
+           {
+             len = f_chunk_end (f->end_chunk) - tail;
+             if (!len)
+               return SVM_FIFO_EGROW;
+           }
+       }
+
+      i = 0;
+      while (len)
+       {
+         u32 to_copy = clib_min (segs[i].len, len);
+         svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, segs[i].data,
+                                 to_copy, &f->tail_chunk);
+         len -= to_copy;
+         tail += to_copy;
+         i++;
+       }
+    }
+
+  /* collect out-of-order segments */
+  if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
+    {
+      len += ooo_segment_try_collect (f, len, &tail);
+      /* Tail chunk might've changed even if nothing was collected */
+      f->tail_chunk = f_lookup_clear_enq_chunks (f, old_tail_c, tail);
+      f->ooo_enq = 0;
+    }
+
+  /* store-rel: producer owned index (paired with load-acq in consumer) */
+  clib_atomic_store_rel_n (&f->tail, tail);
+
+  return len;
+}
+
 always_inline svm_fifo_chunk_t *
 f_unlink_chunks (svm_fifo_t * f, u32 end_pos, u8 maybe_ooo)
 {
index 408d99a..4239e9d 100644 (file)
@@ -291,6 +291,17 @@ int svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len,
  * @param len          number of bytes to add to tail
  */
 void svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len);
+/**
+ * Enqueue array of @ref svm_fifo_seg_t in order
+ *
+ * @param f            fifo
+ * @param segs         array of segments to enqueue
+ * @param n_segs       number of segments
+ * @param allow_partial        if set partial enqueues are allowed
+ * @return             len if enqueue was successful, error otherwise
+ */
+int svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
+                              u32 n_segs, u8 allow_partial);
 /**
  * Overwrite fifo head with new data
  *
index 1f3e181..79c6f28 100644 (file)
@@ -615,35 +615,35 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
                    svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 evt_type,
                    u8 do_evt, u8 noblock)
 {
-  u32 max_enqueue, actual_write;
   session_dgram_hdr_t hdr;
   int rv;
 
-  max_enqueue = svm_fifo_max_enqueue_prod (f);
-  if (max_enqueue < (sizeof (session_dgram_hdr_t) + len))
+  if (svm_fifo_max_enqueue_prod (f) < (sizeof (session_dgram_hdr_t) + len))
     return 0;
 
-  max_enqueue -= sizeof (session_dgram_hdr_t);
-  actual_write = clib_min (len, max_enqueue);
-  hdr.data_length = actual_write;
+  hdr.data_length = len;
   hdr.data_offset = 0;
   clib_memcpy_fast (&hdr.rmt_ip, &at->rmt_ip, sizeof (ip46_address_t));
   hdr.is_ip4 = at->is_ip4;
   hdr.rmt_port = at->rmt_port;
   clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip, sizeof (ip46_address_t));
   hdr.lcl_port = at->lcl_port;
-  rv = svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
-  ASSERT (rv == sizeof (hdr));
 
-  rv = svm_fifo_enqueue (f, actual_write, data);
+  /* *INDENT-OFF* */
+  svm_fifo_seg_t segs[2] = {{ (u8 *) &hdr, sizeof (hdr) }, { data, len }};
+  /* *INDENT-ON* */
+
+  rv = svm_fifo_enqueue_segments (f, segs, 2, 0 /* allow partial */ );
+  if (PREDICT_FALSE (rv < 0))
+    return 0;
+
   if (do_evt)
     {
-      if (rv > 0 && svm_fifo_set_event (f))
+      if (svm_fifo_set_event (f))
        app_send_io_evt_to_vpp (vpp_evt_q, f->master_session_index, evt_type,
                                noblock);
     }
-  ASSERT (rv);
-  return rv;
+  return len;
 }
 
 always_inline int
index 3a468b4..1f24805 100644 (file)
@@ -516,22 +516,49 @@ session_enqueue_dgram_connection (session_t * s,
                                  session_dgram_hdr_t * hdr,
                                  vlib_buffer_t * b, u8 proto, u8 queue_event)
 {
-  int enqueued = 0, rv, in_order_off;
+  int rv;
 
   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
          >= b->current_length + sizeof (*hdr));
 
-  svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
-  enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
-                              vlib_buffer_get_current (b));
-  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
+  if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
     {
-      in_order_off = enqueued > b->current_length ? enqueued : 0;
-      rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
-      if (rv > 0)
-       enqueued += rv;
+      /* *INDENT-OFF* */
+      svm_fifo_seg_t segs[2] = {
+         { (u8 *) hdr, sizeof (*hdr) },
+         { vlib_buffer_get_current (b), b->current_length }
+      };
+      /* *INDENT-ON* */
+
+      rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
+                                     0 /* allow_partial */ );
     }
-  if (queue_event)
+  else
+    {
+      vlib_main_t *vm = vlib_get_main ();
+      svm_fifo_seg_t *segs = 0, *seg;
+      vlib_buffer_t *it = b;
+      u32 n_segs = 1;
+
+      vec_add2 (segs, seg, 1);
+      seg->data = (u8 *) hdr;
+      seg->len = sizeof (*hdr);
+      while (it)
+       {
+         vec_add2 (segs, seg, 1);
+         seg->data = vlib_buffer_get_current (it);
+         seg->len = it->current_length;
+         n_segs++;
+         if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
+           break;
+         it = vlib_get_buffer (vm, it->next_buffer);
+       }
+      rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
+                                     0 /* allow partial */ );
+      vec_free (segs);
+    }
+
+  if (queue_event && rv > 0)
     {
       /* Queue RX event on this fifo. Eventually these will need to be flushed
        * by calling stream_server_flush_enqueue_events () */
@@ -546,7 +573,7 @@ session_enqueue_dgram_connection (session_t * s,
 
       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
     }
-  return enqueued;
+  return rv > 0 ? rv : 0;
 }
 
 int