udp/session: refactor to support dgram mode
[vpp.git] / src / vnet / session / session_node.c
index b8f429e..1471696 100644 (file)
@@ -70,7 +70,7 @@ session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
                            vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
                            u32 left_from_seg, u32 * left_to_snd0,
                            u16 * n_bufs, u32 * tx_offset, u16 deq_per_buf,
-                           u8 peek_data)
+                           u8 peek_data, transport_tx_fn_type_t tx_type)
 {
   vlib_buffer_t *chain_b0, *prev_b0;
   u32 chain_bi0, to_deq;
@@ -102,7 +102,23 @@ session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
        }
       else
        {
-         n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
+         if (tx_type == TRANSPORT_TX_DGRAM)
+           {
+             session_dgram_hdr_t *hdr;
+             u16 deq_now;
+             hdr = (session_dgram_hdr_t *) svm_fifo_head (fifo);
+             deq_now = clib_min (hdr->data_length - hdr->data_offset,
+                                 len_to_deq0);
+             n_bytes_read = svm_fifo_peek (fifo, hdr->data_offset, deq_now,
+                                           data0);
+             ASSERT (n_bytes_read > 0);
+
+             hdr->data_offset += n_bytes_read;
+             if (hdr->data_offset == hdr->data_length)
+               svm_fifo_dequeue_drop (fifo, hdr->data_length);
+           }
+         else
+           n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
        }
       ASSERT (n_bytes_read == len_to_deq0);
       chain_b0->current_length = n_bytes_read;
@@ -145,12 +161,35 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   int i, n_bytes_read;
   u32 n_bytes_per_buf, deq_per_buf, deq_per_first_buf;
   u32 bufs_alloc, bufs_now;
+  session_dgram_hdr_t hdr;
 
   next_index = next0 = smm->session_type_to_next[s0->session_type];
-
   tp = session_get_transport_proto (s0);
   transport_vft = transport_protocol_get_vft (tp);
-  tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
+  if (peek_data)
+    {
+      if (PREDICT_FALSE (s0->session_state < SESSION_STATE_READY))
+       {
+         /* Can retransmit for closed sessions but can't send new data if
+          * session is not ready or closed */
+         vec_add1 (smm->pending_event_vector[thread_index], *e0);
+         return 0;
+       }
+      tc0 =
+       transport_vft->get_connection (s0->connection_index, thread_index);
+    }
+  else
+    {
+      if (s0->session_state == SESSION_STATE_LISTENING)
+       tc0 = transport_vft->get_listener (s0->connection_index);
+      else
+       {
+         if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
+           return 0;
+         tc0 = transport_vft->get_connection (s0->connection_index,
+                                              thread_index);
+       }
+    }
 
   /* Make sure we have space to send and there's something to dequeue */
   snd_mss0 = transport_vft->send_mss (tc0);
@@ -168,20 +207,26 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   /* Check how much we can pull. */
   max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo);
-
   if (peek_data)
     {
       /* Offset in rx fifo from where to peek data */
       tx_offset = transport_vft->tx_fifo_offset (tc0);
       if (PREDICT_FALSE (tx_offset >= max_dequeue0))
-       max_dequeue0 = 0;
-      else
-       max_dequeue0 -= tx_offset;
+       return 0;
+      max_dequeue0 -= tx_offset;
     }
-
-  /* Nothing to read return */
-  if (max_dequeue0 == 0)
-    return 0;
+  else
+    {
+      if (transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+       {
+         if (max_dequeue0 < sizeof (hdr))
+           return 0;
+         svm_fifo_peek (s0->server_tx_fifo, 0, sizeof (hdr), (u8 *) & hdr);
+         ASSERT (hdr.data_length > hdr.data_offset);
+         max_dequeue0 = hdr.data_length - hdr.data_offset;
+       }
+    }
+  ASSERT (max_dequeue0 > 0);
 
   /* Ensure we're not writing more than transport window allows */
   if (max_dequeue0 < snd_space0)
@@ -286,14 +331,42 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
            }
          else
            {
-             n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
-                                                     len_to_deq0, data0);
-             if (n_bytes_read <= 0)
-               goto dequeue_fail;
+             if (transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+               {
+                 svm_fifo_t *f = s0->server_tx_fifo;
+                 u16 deq_now;
+                 u32 offset;
+
+                 ASSERT (hdr.data_length > hdr.data_offset);
+                 deq_now = clib_min (hdr.data_length - hdr.data_offset,
+                                     len_to_deq0);
+                 offset = hdr.data_offset + SESSION_CONN_HDR_LEN;
+                 n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
+                 if (PREDICT_FALSE (n_bytes_read <= 0))
+                   goto dequeue_fail;
+
+                 if (s0->session_state == SESSION_STATE_LISTENING)
+                   {
+                     ip_copy (&tc0->rmt_ip, &hdr.rmt_ip, tc0->is_ip4);
+                     tc0->rmt_port = hdr.rmt_port;
+                   }
+                 hdr.data_offset += n_bytes_read;
+                 if (hdr.data_offset == hdr.data_length)
+                   {
+                     offset = hdr.data_length + SESSION_CONN_HDR_LEN;
+                     svm_fifo_dequeue_drop (f, offset);
+                   }
+               }
+             else
+               {
+                 n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
+                                                         len_to_deq0, data0);
+                 if (n_bytes_read <= 0)
+                   goto dequeue_fail;
+               }
            }
 
          b0->current_length = n_bytes_read;
-
          left_to_snd0 -= n_bytes_read;
          *n_tx_packets = *n_tx_packets + 1;
 
@@ -307,7 +380,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                          s0->server_tx_fifo, b0, bi0,
                                          n_bufs_per_seg, left_for_seg,
                                          &left_to_snd0, &n_bufs, &tx_offset,
-                                         deq_per_buf, peek_data);
+                                         deq_per_buf, peek_data,
+                                         transport_vft->tx_type);
            }
 
          /* Ask transport to push header after current_length and
@@ -345,12 +419,18 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   /* If we couldn't dequeue all bytes mark as partially read */
   if (max_len_to_snd0 < max_dequeue0)
+    if (svm_fifo_set_event (s0->server_tx_fifo))
+      vec_add1 (smm->pending_event_vector[thread_index], *e0);
+
+  if (!peek_data && transport_vft->tx_type == TRANSPORT_TX_DGRAM)
     {
-      /* If we don't already have new event */
-      if (svm_fifo_set_event (s0->server_tx_fifo))
-       {
-         vec_add1 (smm->pending_event_vector[thread_index], *e0);
-       }
+      /* Fix dgram pre header */
+      if (max_len_to_snd0 < max_dequeue0)
+       svm_fifo_overwrite_head (s0->server_tx_fifo, (u8 *) & hdr,
+                                sizeof (session_dgram_pre_hdr_t));
+      /* More data needs to be read */
+      else if (svm_fifo_max_dequeue (s0->server_tx_fifo) > 0)
+       vec_add1 (smm->pending_event_vector[thread_index], *e0);
     }
   return 0;
 
@@ -360,7 +440,6 @@ dequeue_fail:
    * read, return buff to free list and return
    */
   clib_warning ("dequeue fail");
-
   if (svm_fifo_set_event (s0->server_tx_fifo))
     {
       vec_add1 (smm->pending_event_vector[thread_index], *e0);
@@ -638,13 +717,6 @@ skip_dequeue:
              clib_warning ("It's dead, Jim!");
              continue;
            }
-         /* Can retransmit for closed sessions but can't do anything if
-          * session is not ready or closed */
-         if (PREDICT_FALSE (s0->session_state < SESSION_STATE_READY))
-           {
-             vec_add1 (smm->pending_event_vector[my_thread_index], *e0);
-             continue;
-           }
          /* Spray packets in per session type frames, since they go to
           * different nodes */
          rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,