udp/session: refactor to support dgram mode
[vpp.git] / src / vnet / session / session.c
index d258b82..dfc967b 100644 (file)
@@ -355,14 +355,19 @@ session_enqueue_stream_connection (transport_connection_t * tc,
   return enqueued;
 }
 
+
 int
-session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
-                                 u8 proto, u8 queue_event)
+session_enqueue_dgram_connection (stream_session_t * s,
+                                 session_dgram_hdr_t * hdr,
+                                 vlib_buffer_t * b, u8 proto, u8 queue_event)
 {
   int enqueued = 0, rv, in_order_off;
 
-  if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length)
-    return -1;
+  ASSERT (svm_fifo_max_enqueue (s->server_rx_fifo)
+         >= b->current_length + sizeof (*hdr));
+
+  svm_fifo_enqueue_nowait (s->server_rx_fifo, sizeof (session_dgram_hdr_t),
+                          (u8 *) hdr);
   enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
                                      vlib_buffer_get_current (b));
   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
@@ -530,6 +535,16 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
   return errors;
 }
 
+int
+session_manager_flush_all_enqueue_events (u8 transport_proto)
+{
+  vlib_thread_main_t *vtm = vlib_get_thread_main ();
+  int i, errors = 0;
+  for (i = 0; i < 1 + vtm->n_threads; i++)
+    errors += session_manager_flush_enqueue_events (transport_proto, i);
+  return errors;
+}
+
 /**
  * Init fifo tail and head pointers
  *
@@ -825,7 +840,7 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
   if (session_alloc_and_init (sm, tc, 1, &s))
     return -1;
   s->app_index = app->index;
-  s->session_state = SESSION_STATE_CONNECTING_READY;
+  s->session_state = SESSION_STATE_OPENED;
 
   /* Tell the app about the new event fifo for this session */
   app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
@@ -841,10 +856,6 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
   u64 handle;
   int rv;
 
-  /* TODO until udp is fixed */
-  if (rmt->transport_proto == TRANSPORT_PROTO_UDP)
-    return session_open_cl (app_index, rmt, opaque);
-
   tep = session_endpoint_to_transport (rmt);
   rv = tp_vfts[rmt->transport_proto].open (tep);
   if (rv < 0)
@@ -912,14 +923,6 @@ session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
   return session_open_srv_fns[tst] (app_index, rmt, opaque);
 }
 
-/**
- * Ask transport to listen on local transport endpoint.
- *
- * @param s Session for which listen will be called. Note that unlike
- *         established sessions, listen sessions are not associated to a
- *         thread.
- * @param tep Local endpoint to be listened on.
- */
 int
 session_listen_vc (stream_session_t * s, session_endpoint_t * sep)
 {
@@ -947,6 +950,40 @@ session_listen_vc (stream_session_t * s, session_endpoint_t * sep)
   return 0;
 }
 
+int
+session_listen_cl (stream_session_t * s, session_endpoint_t * sep)
+{
+  transport_connection_t *tc;
+  application_t *server;
+  segment_manager_t *sm;
+  u32 tci;
+
+  /* Transport bind/listen  */
+  tci = tp_vfts[sep->transport_proto].bind (s->session_index,
+                                           session_endpoint_to_transport
+                                           (sep));
+
+  if (tci == (u32) ~ 0)
+    return -1;
+
+  /* Attach transport to session */
+  s->connection_index = tci;
+  tc = tp_vfts[sep->transport_proto].get_listener (tci);
+
+  /* Weird but handle it ... */
+  if (tc == 0)
+    return -1;
+
+  server = application_get (s->app_index);
+  sm = application_get_listen_segment_manager (server, s);
+  if (session_alloc_fifos (sm, s))
+    return -1;
+
+  /* Add to the main lookup table */
+  session_lookup_add_connection (tc, s->session_index);
+  return 0;
+}
+
 int
 session_listen_app (stream_session_t * s, session_endpoint_t * sep)
 {
@@ -965,11 +1002,19 @@ typedef int (*session_listen_service_fn) (stream_session_t *,
 static session_listen_service_fn
 session_listen_srv_fns[TRANSPORT_N_SERVICES] = {
   session_listen_vc,
-  session_listen_vc,
+  session_listen_cl,
   session_listen_app,
 };
 /* *INDENT-ON* */
 
+/**
+ * Ask transport to listen on local transport endpoint.
+ *
+ * @param s Session for which listen will be called. Note that unlike
+ *         established sessions, listen sessions are not associated to a
+ *         thread.
+ * @param tep Local endpoint to be listened on.
+ */
 int
 stream_session_listen (stream_session_t * s, session_endpoint_t * sep)
 {
@@ -1125,7 +1170,8 @@ session_manager_get_evt_q_segment (void)
 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
     session_tx_fifo_peek_and_snd,
     session_tx_fifo_dequeue_and_snd,
-    session_tx_fifo_dequeue_internal
+    session_tx_fifo_dequeue_internal,
+    session_tx_fifo_dequeue_and_snd
 };
 /* *INDENT-ON* */
 
@@ -1228,11 +1274,12 @@ session_manager_main_enable (vlib_main_t * vm)
   vec_validate (smm->peekers_rw_locks, num_threads - 1);
 
   for (i = 0; i < TRANSPORT_N_PROTO; i++)
-    for (j = 0; j < num_threads; j++)
-      {
-       vec_validate (smm->session_to_enqueue[i], num_threads - 1);
-       vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
-      }
+    {
+      vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
+      vec_validate (smm->session_to_enqueue[i], num_threads - 1);
+      for (j = 0; j < num_threads; j++)
+       smm->current_enqueue_epoch[i][j] = 1;
+    }
 
   for (i = 0; i < num_threads; i++)
     {