udp: refactor udp code 30/8730/19
authorFlorin Coras <fcoras@cisco.com>
Mon, 2 Oct 2017 07:18:51 +0000 (00:18 -0700)
committerDave Barach <openvpp@barachs.net>
Mon, 16 Oct 2017 21:41:11 +0000 (21:41 +0000)
Change-Id: I44d5c9df7c49b8d4d5677c6d319033b2da3e6b80
Signed-off-by: Florin Coras <fcoras@cisco.com>
40 files changed:
src/scripts/vnet/uri/dummy_app.py [changed mode: 0644->0755]
src/scripts/vnet/uri/udp
src/uri/uri_tcp_test.c
src/uri/uri_udp_test.c
src/uri/vppcom.c
src/vnet.am
src/vnet/ip/ip.c
src/vnet/ip/ip.h
src/vnet/session/application.c
src/vnet/session/application_interface.c
src/vnet/session/application_interface.h
src/vnet/session/segment_manager.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_cli.c
src/vnet/session/session_lookup.c
src/vnet/session/session_lookup.h
src/vnet/session/session_node.c
src/vnet/session/session_table.h
src/vnet/session/session_test.c
src/vnet/session/stream_session.h
src/vnet/session/transport.c [new file with mode: 0644]
src/vnet/session/transport.h
src/vnet/session/transport_interface.c [deleted file]
src/vnet/session/transport_interface.h
src/vnet/tcp/builtin_client.c
src/vnet/tcp/builtin_http_server.c
src/vnet/tcp/builtin_proxy.c
src/vnet/tcp/builtin_server.c
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp.h
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_test.c
src/vnet/udp/builtin_server.c
src/vnet/udp/udp.c
src/vnet/udp/udp.h
src/vnet/udp/udp_error.def
src/vnet/udp/udp_input.c
src/vnet/udp/udp_local.c

old mode 100644 (file)
new mode 100755 (executable)
index ff00f2f..85ea4dc
@@ -3,6 +3,7 @@
 import socket
 import sys
 import time
+import argparse
 
 # action can be reflect or drop 
 action = "drop"
@@ -32,37 +33,52 @@ def handle_connection (connection, client_address):
                 connection.sendall(data)
     finally:
         connection.close()
-        
-def run_server(ip, port):
-    print("Starting server {}:{}".format(repr(ip), repr(port)))
+def run_tcp_server(ip, port):
+    print("Starting TCP server {}:{}".format(repr(ip), repr(port)))
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
     server_address = (ip, int(port))
     sock.bind(server_address)
     sock.listen(1)
-    
     while True:
         connection, client_address = sock.accept()
         handle_connection (connection, client_address)
+def run_udp_server(ip, port):
+    print("Starting UDP server {}:{}".format(repr(ip), repr(port)))
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    server_address = (ip, int(port))
+    sock.bind(server_address)
+    while True:
+        data, addr = sock.recvfrom(4096)
+        if (action != "drop"):
+            #snd_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+            sock.sendto (data, addr)
 
-def prepare_data():
+def run_server(ip, port, proto):
+    if (proto == "tcp"):
+        run_tcp_server(ip, port)
+    elif (proto == "udp"):
+        run_udp_server(ip, port)
+
+def prepare_data(power):
     buf = []
-    for i in range (0, pow(2, 16)):
+    for i in range (0, pow(2, power)):
         buf.append(i & 0xff)
     return bytearray(buf)
 
-def run_client(ip, port):
-    print("Starting client {}:{}".format(repr(ip), repr(port)))
+def run_tcp_client(ip, port):
+    print("Starting TCP client {}:{}".format(repr(ip), repr(port)))
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    server_address = (ip, port)
+    server_address = (ip, int(port))
     sock.connect(server_address)
-    
-    data = prepare_data()
+
+    data = prepare_data(16)
     n_rcvd = 0
     n_sent = len (data)
     try:
         sock.sendall(data)
-        
+
         timeout = time.time() + 2
         while n_rcvd < n_sent and time.time() < timeout:
             tmp = sock.recv(1500)
@@ -73,28 +89,53 @@ def run_client(ip, port):
                     print("Difference at byte {}. Sent {} got {}"
                           .format(n_rcvd + i, data[n_rcvd + i], tmp[i]))
             n_rcvd += n_read
-            
+
         if (n_rcvd < n_sent or n_rcvd > n_sent):
             print("Sent {} and got back {}".format(n_sent, n_rcvd))
         else:
             print("Got back what we've sent!!");
-            
+
     finally:
         sock.close()
-    
-def run(mode, ip, port):
+def run_udp_client(ip, port):
+    print("Starting UDP client {}:{}".format(repr(ip), repr(port)))
+    n_packets = 100
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    server_address = (ip, int(port))
+    data = prepare_data(10)
+    try:
+        for i in range (0, n_packets):
+            sock.sendto(data, server_address)
+    finally:
+        sock.close()
+def run_client(ip, port, proto):
+    if (proto == "tcp"):
+        run_tcp_client(ip, port)
+    elif (proto == "udp"):
+        run_udp_client(ip, port)
+def run(mode, ip, port, proto):
     if (mode == "server"):
-        run_server (ip, port)
+        run_server (ip, port, proto)
     elif (mode == "client"):
-        run_client (ip, port)
+        run_client (ip, port, proto)
     else:
         raise Exception("Unknown mode. Only client and server supported")
 
 if __name__ == "__main__":
-    if (len(sys.argv)) < 4:
-        raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action> <test>]")
-    if (len(sys.argv) == 6):
-        action = sys.argv[4]
-        test = int(sys.argv[5])
-
-    run (sys.argv[1], sys.argv[2], int(sys.argv[3]))
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-m', action='store', dest='mode')
+    parser.add_argument('-i', action='store', dest='ip')
+    parser.add_argument('-p', action='store', dest='port')
+    parser.add_argument('-proto', action='store', dest='proto')
+    parser.add_argument('-a', action='store', dest='action')
+    parser.add_argument('-t', action='store', dest='test')
+    results = parser.parse_args()
+    action = results.action
+    test = results.test
+    run(results.mode, results.ip, results.port, results.proto)
+    #if (len(sys.argv)) < 4:
+    #    raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action> <test>]")
+    #if (len(sys.argv) == 6):
+    #    action = sys.argv[4]
+    #    test = int(sys.argv[5])
+    #run (sys.argv[1], sys.argv[2], int(sys.argv[3]))
index c7628f4..3ab4292 100644 (file)
@@ -1,6 +1,12 @@
 loop create
 set int ip address loop0 6.0.0.1/32
 set int state loop0 up
+set int state GigabitEthernet1b/0/0 up
+set int ip address GigabitEthernet1b/0/0 192.168.1.1/24
+
+create host-interface name vpp1
+set int state host-vpp1 up
+set int ip address host-vpp1 6.0.1.1/24
 
 packet-generator new {
   name udp
index 41d3d4c..89f070f 100755 (executable)
@@ -327,6 +327,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
   svm_fifo_segment_create_args_t _a, *a = &_a;
   int rv;
 
+  memset (a, 0, sizeof (*a));
   a->segment_name = (char *) mp->segment_name;
   a->segment_size = mp->segment_size;
   /* Attach to the segment vpp created */
@@ -590,7 +591,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
   u32 bytes_to_snd;
   u32 queue_max_chunk = 128 << 10, actual_write;
   session_fifo_event_t evt;
-  static int serial_number = 0;
   int rv;
 
   bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
@@ -615,7 +615,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
              /* Fabricate TX event, send to vpp */
              evt.fifo = tx_fifo;
              evt.event_type = FIFO_EVENT_APP_TX;
-             evt.event_id = serial_number++;
 
              unix_shared_memory_queue_add (utm->vpp_event_queue,
                                            (u8 *) & evt,
@@ -918,6 +917,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   memset (rmp, 0, sizeof (*rmp));
   rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
   rmp->handle = mp->handle;
+  rmp->context = mp->context;
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
 
   session->bytes_received = 0;
@@ -983,7 +983,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
              /* Fabricate TX event, send to vpp */
              evt.fifo = tx_fifo;
              evt.event_type = FIFO_EVENT_APP_TX;
-             evt.event_id = e->event_id;
 
              q = utm->vpp_event_queue;
              unix_shared_memory_queue_add (q, (u8 *) & evt,
@@ -997,7 +996,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
 void
 server_handle_event_queue (uri_tcp_test_main_t * utm)
 {
-  session_fifo_event_t _e, *e = &_e;;
+  session_fifo_event_t _e, *e = &_e;
 
   while (1)
     {
index d559d57..27e70cf 100644 (file)
@@ -50,6 +50,7 @@
 typedef enum
 {
   STATE_START,
+  STATE_BOUND,
   STATE_READY,
   STATE_FAILED,
   STATE_DISCONNECTING,
@@ -97,6 +98,7 @@ typedef struct
 
   /* $$$$ hack: cut-through session index */
   volatile u32 cut_through_session_index;
+  volatile u32 connected_session;
 
   /* unique segment name counter */
   u32 unique_segment_index;
@@ -123,6 +125,7 @@ typedef struct
   /* convenience */
   svm_fifo_segment_main_t *segment_main;
 
+  u8 *connect_test_data;
 } uri_udp_test_main_t;
 
 #if CLIB_DEBUG > 0
@@ -163,7 +166,7 @@ void
 application_send_attach (uri_udp_test_main_t * utm)
 {
   vl_api_application_attach_t *bmp;
-  u32 fifo_size = 3 << 20;
+  u32 fifo_size = 1 << 20;
   bmp = vl_msg_api_alloc (sizeof (*bmp));
   memset (bmp, 0, sizeof (*bmp));
 
@@ -172,11 +175,12 @@ application_send_attach (uri_udp_test_main_t * utm)
   bmp->context = ntohl (0xfeedface);
   bmp->options[APP_OPTIONS_FLAGS] =
     APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
-  bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16;
+  bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2;
   bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
   bmp->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 20;
+  bmp->options[APP_EVT_QUEUE_SIZE] = 16768;
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
 }
 
@@ -348,7 +352,7 @@ udp_client_connect (uri_udp_test_main_t * utm)
 }
 
 static void
-client_send (uri_udp_test_main_t * utm, session_t * session)
+client_send_cut_through (uri_udp_test_main_t * utm, session_t * session)
 {
   int i;
   u8 *test_data = 0;
@@ -391,7 +395,6 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
        }
 
       bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
-
       bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ?
        bytes_to_read : vec_len (utm->rx_buf);
 
@@ -451,7 +454,114 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
 }
 
 static void
-uri_udp_client_test (uri_udp_test_main_t * utm)
+send_test_chunk (uri_udp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+                u32 bytes)
+{
+  u8 *test_data = utm->connect_test_data;
+  u64 bytes_sent = 0;
+  int test_buf_offset = 0;
+  u32 bytes_to_snd;
+  u32 queue_max_chunk = 128 << 10, actual_write;
+  session_fifo_event_t evt;
+  int rv;
+
+  bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+  if (bytes_to_snd > vec_len (test_data))
+    bytes_to_snd = vec_len (test_data);
+
+  while (bytes_to_snd > 0 && !utm->time_to_stop)
+    {
+      actual_write = (bytes_to_snd > queue_max_chunk) ?
+       queue_max_chunk : bytes_to_snd;
+      rv = svm_fifo_enqueue_nowait (tx_fifo, actual_write,
+                                   test_data + test_buf_offset);
+
+      if (rv > 0)
+       {
+         bytes_to_snd -= rv;
+         test_buf_offset += rv;
+         bytes_sent += rv;
+
+         if (svm_fifo_set_event (tx_fifo))
+           {
+             /* Fabricate TX event, send to vpp */
+             evt.fifo = tx_fifo;
+             evt.event_type = FIFO_EVENT_APP_TX;
+
+             unix_shared_memory_queue_add (utm->vpp_event_queue,
+                                           (u8 *) & evt,
+                                           0 /* do wait for mutex */ );
+           }
+       }
+    }
+}
+
+static void
+recv_test_chunk (uri_udp_test_main_t * utm, session_t * session)
+{
+  svm_fifo_t *rx_fifo;
+  int buffer_offset, bytes_to_read = 0, rv;
+
+  rx_fifo = session->server_rx_fifo;
+  bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
+  bytes_to_read =
+    vec_len (utm->rx_buf) > bytes_to_read ?
+    bytes_to_read : vec_len (utm->rx_buf);
+
+  buffer_offset = 0;
+  while (bytes_to_read > 0)
+    {
+      rv = svm_fifo_dequeue_nowait (rx_fifo, bytes_to_read,
+                                   utm->rx_buf + buffer_offset);
+      if (rv > 0)
+       {
+         bytes_to_read -= rv;
+         buffer_offset += rv;
+       }
+    }
+}
+
+void
+client_send_data (uri_udp_test_main_t * utm)
+{
+  u8 *test_data;
+  int mypid = getpid ();
+  session_t *session;
+  svm_fifo_t *tx_fifo;
+  u32 n_iterations;
+  int i;
+
+  vec_validate (utm->connect_test_data, 64 * 1024 - 1);
+  for (i = 0; i < vec_len (utm->connect_test_data); i++)
+    utm->connect_test_data[i] = i & 0xff;
+
+  test_data = utm->connect_test_data;
+  session = pool_elt_at_index (utm->sessions, utm->connected_session);
+  tx_fifo = session->server_tx_fifo;
+
+  ASSERT (vec_len (test_data) > 0);
+
+  vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+  n_iterations = NITER;
+
+  for (i = 0; i < n_iterations; i++)
+    {
+      send_test_chunk (utm, tx_fifo, mypid, 0);
+      recv_test_chunk (utm, session);
+      if (utm->time_to_stop)
+       break;
+    }
+
+  f64 timeout = clib_time_now (&utm->clib_time) + 5;
+  while (clib_time_now (&utm->clib_time) < timeout)
+    {
+      recv_test_chunk (utm, session);
+    }
+
+}
+
+static void
+client_test (uri_udp_test_main_t * utm)
 {
   session_t *session;
 
@@ -464,10 +574,18 @@ uri_udp_client_test (uri_udp_test_main_t * utm)
       return;
     }
 
-  /* Only works with cut through sessions */
-  session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
+  if (utm->cut_through_session_index != ~0)
+    {
+      session = pool_elt_at_index (utm->sessions,
+                                  utm->cut_through_session_index);
+      client_send_cut_through (utm, session);
+    }
+  else
+    {
+      session = pool_elt_at_index (utm->sessions, utm->connected_session);
+      client_send_data (utm);
+    }
 
-  client_send (utm, session);
   application_detach (utm);
 }
 
@@ -483,7 +601,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
       return;
     }
 
-  utm->state = STATE_READY;
+  utm->state = STATE_BOUND;
 }
 
 static void
@@ -492,6 +610,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
   svm_fifo_segment_create_args_t _a, *a = &_a;
   int rv;
 
+  memset (a, 0, sizeof (*a));
   a->segment_name = (char *) mp->segment_name;
   a->segment_size = mp->segment_size;
   /* Attach to the segment vpp created */
@@ -625,8 +744,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   hash_set (utm->session_index_by_vpp_handles, mp->handle,
            session - utm->sessions);
 
-  utm->state = STATE_READY;
-
   if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
     {
       f64 now = clib_time_now (&utm->clib_time);
@@ -639,7 +756,11 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   memset (rmp, 0, sizeof (*rmp));
   rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
   rmp->handle = mp->handle;
+  rmp->context = mp->context;
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
+
+  CLIB_MEMORY_BARRIER ();
+  utm->state = STATE_READY;
 }
 
 static void
@@ -677,16 +798,22 @@ static void
 vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
 {
   uri_udp_test_main_t *utm = &uri_udp_test_main;
+  session_t *session;
 
   ASSERT (utm->i_am_master == 0);
 
+  if (mp->retval)
+    {
+      clib_warning ("failed connect");
+      return;
+    }
+
   /* We've been redirected */
   if (mp->segment_name_length > 0)
     {
       svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
       svm_fifo_segment_create_args_t _a, *a = &_a;
       u32 segment_index;
-      session_t *session;
       svm_fifo_segment_private_t *seg;
       int rv;
 
@@ -707,20 +834,24 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
       vec_add2 (utm->seg, seg, 1);
       memcpy (seg, sm->segments + segment_index, sizeof (*seg));
       sleep (1);
-
-      pool_get (utm->sessions, session);
-      utm->cut_through_session_index = session - utm->sessions;
-
-      session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
-                                                 svm_fifo_t *);
-      ASSERT (session->server_rx_fifo);
-      session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo,
-                                                 svm_fifo_t *);
-      ASSERT (session->server_tx_fifo);
     }
 
-  /* security: could unlink /dev/shm/<mp->segment_name> here, maybe */
+  pool_get (utm->sessions, session);
+  session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
+                                             svm_fifo_t *);
+  ASSERT (session->server_rx_fifo);
+  session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo,
+                                             svm_fifo_t *);
+  ASSERT (session->server_tx_fifo);
 
+  if (mp->segment_name_length > 0)
+    utm->cut_through_session_index = session - utm->sessions;
+  else
+    {
+      utm->connected_session = session - utm->sessions;
+      utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+                                              unix_shared_memory_queue_t *);
+    }
   utm->state = STATE_READY;
 }
 
@@ -789,13 +920,13 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
   int nbytes;
-
   session_fifo_event_t evt;
   unix_shared_memory_queue_t *q;
   int rv;
 
   rx_fifo = e->fifo;
   tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
+  svm_fifo_unset_event (rx_fifo);
 
   do
     {
@@ -809,13 +940,11 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
     }
   while (rv == -2);
 
-  /* Fabricate TX event, send to vpp */
-  evt.fifo = tx_fifo;
-  evt.event_type = FIFO_EVENT_APP_TX;
-  evt.event_id = e->event_id;
-
   if (svm_fifo_set_event (tx_fifo))
     {
+      /* Fabricate TX event, send to vpp */
+      evt.fifo = tx_fifo;
+      evt.event_type = FIFO_EVENT_APP_TX;
       q = utm->vpp_event_queue;
       unix_shared_memory_queue_add (q, (u8 *) & evt,
                                    0 /* do wait for mutex */ );
@@ -827,6 +956,9 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
 {
   session_fifo_event_t _e, *e = &_e;
 
+  while (utm->state != STATE_READY)
+    sleep (5);
+
   while (1)
     {
       unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *) e,
@@ -845,7 +977,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
          break;
        }
       if (PREDICT_FALSE (utm->time_to_stop == 1))
-       break;
+       return;
       if (PREDICT_FALSE (utm->time_to_print_stats == 1))
        {
          utm->time_to_print_stats = 0;
@@ -869,7 +1001,7 @@ server_unbind (uri_udp_test_main_t * utm)
 }
 
 static void
-server_listen (uri_udp_test_main_t * utm)
+server_bind (uri_udp_test_main_t * utm)
 {
   vl_api_bind_uri_t *bmp;
 
@@ -890,11 +1022,11 @@ udp_server_test (uri_udp_test_main_t * utm)
   application_send_attach (utm);
 
   /* Bind to uri */
-  server_listen (utm);
+  server_bind (utm);
 
-  if (wait_for_state_change (utm, STATE_READY))
+  if (wait_for_state_change (utm, STATE_BOUND))
     {
-      clib_warning ("timeout waiting for STATE_READY");
+      clib_warning ("timeout waiting for STATE_BOUND");
       return;
     }
 
@@ -976,7 +1108,7 @@ main (int argc, char **argv)
   utm->i_am_master = i_am_master;
   utm->segment_main = &svm_fifo_segment_main;
 
-  utm->connect_uri = format (0, "udp://6.0.0.1/1234%c", 0);
+  utm->connect_uri = format (0, "udp://6.0.1.2/1234%c", 0);
 
   setup_signal_handlers ();
 
@@ -991,7 +1123,7 @@ main (int argc, char **argv)
 
   if (i_am_master == 0)
     {
-      uri_udp_client_test (utm);
+      client_test (utm);
       exit (0);
     }
 
index a8e3a50..f0bd2f8 100644 (file)
@@ -136,7 +136,6 @@ typedef struct vppcom_main_t_
   u8 init;
   u32 *client_session_index_fifo;
   volatile u32 bind_session_index;
-  u32 tx_event_id;
   int main_cpu;
 
   /* vpe input queue */
@@ -2328,7 +2327,6 @@ vppcom_session_write (uint32_t session_index, void *buf, int n)
       /* Fabricate TX event, send to vpp */
       evt.fifo = tx_fifo;
       evt.event_type = FIFO_EVENT_APP_TX;
-      evt.event_id = vcm->tx_event_id++;
 
       rval = vppcom_session_at_index (session_index, &session);
       if (PREDICT_FALSE (rval))
index 520bee4..97964c5 100644 (file)
@@ -877,7 +877,7 @@ libvnet_la_SOURCES +=                               \
   vnet/session/session_table.c                 \
   vnet/session/session_lookup.c                        \
   vnet/session/session_node.c                  \
-  vnet/session/transport_interface.c           \
+  vnet/session/transport.c                     \
   vnet/session/application.c                   \
   vnet/session/session_cli.c                   \
   vnet/session/application_interface.c         \
index caa553d..bd9706b 100644 (file)
@@ -63,6 +63,24 @@ ip_is_local (u32 fib_index, ip46_address_t * ip46_address, u8 is_ip4)
   return (flags & FIB_ENTRY_FLAG_LOCAL);
 }
 
+void
+ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4)
+{
+  if (is_ip4)
+    dst->ip4.as_u32 = src->ip4.as_u32;
+  else
+    clib_memcpy (&dst->ip6, &src->ip6, sizeof (ip6_address_t));
+}
+
+void
+ip_set (ip46_address_t * dst, void *src, u8 is_ip4)
+{
+  if (is_ip4)
+    dst->ip4.as_u32 = ((ip4_address_t *) src)->as_u32;
+  else
+    clib_memcpy (&dst->ip6, (ip6_address_t *) src, sizeof (ip6_address_t));
+}
+
 u8
 ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4)
 {
@@ -97,22 +115,37 @@ ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4)
   return 0;
 }
 
-void
-ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4)
+void *
+ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4)
 {
-  if (is_ip4)
-    dst->ip4.as_u32 = src->ip4.as_u32;
-  else
-    clib_memcpy (&dst->ip6, &src->ip6, sizeof (ip6_address_t));
-}
+  ip_lookup_main_t *lm4 = &ip4_main.lookup_main;
+  ip_lookup_main_t *lm6 = &ip6_main.lookup_main;
+  ip_interface_address_t *ia = 0;
 
-void
-ip_set (ip46_address_t * dst, void *src, u8 is_ip4)
-{
   if (is_ip4)
-    dst->ip4.as_u32 = ((ip4_address_t *) src)->as_u32;
+    {
+      /* *INDENT-OFF* */
+      foreach_ip_interface_address (lm4, ia, sw_if_index, 1 /* unnumbered */ ,
+      ({
+        return ip_interface_address_get_address (lm4, ia);
+      }));
+      /* *INDENT-ON* */
+    }
   else
-    clib_memcpy (&dst->ip6, (ip6_address_t *) src, sizeof (ip6_address_t));
+    {
+      /* *INDENT-OFF* */
+      foreach_ip_interface_address (lm6, ia, sw_if_index, 1 /* unnumbered */ ,
+      ({
+        ip6_address_t *rv;
+        rv = ip_interface_address_get_address (lm6, ia);
+        /* Trying to use a link-local ip6 src address is a fool's errand */
+        if (!ip6_address_is_link_local_unicast (rv))
+          return rv;
+      }));
+      /* *INDENT-ON* */
+    }
+
+  return 0;
 }
 
 /*
index 3b3a465..9387ba3 100644 (file)
@@ -198,6 +198,7 @@ u8 ip_is_local (u32 fib_index, ip46_address_t * ip46_address, u8 is_ip4);
 u8 ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4);
 void ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4);
 void ip_set (ip46_address_t * dst, void *src, u8 is_ip4);
+void *ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4);
 
 #endif /* included_ip_main_h */
 
index 75d3cfb..c6fd119 100644 (file)
@@ -415,7 +415,6 @@ application_open_session (application_t * app, session_endpoint_t * sep,
                          u32 api_context)
 {
   segment_manager_t *sm;
-  transport_connection_t *tc = 0;
   int rv;
 
   /* Make sure we have a segment manager for connects */
@@ -427,13 +426,9 @@ application_open_session (application_t * app, session_endpoint_t * sep,
       app->connects_seg_manager = segment_manager_index (sm);
     }
 
-  if ((rv = stream_session_open (app->index, sep, &tc)))
+  if ((rv = session_open (app->index, sep, api_context)))
     return rv;
 
-  /* Store api_context for when the reply comes. Not the nicest thing
-   * but better than allocating a separate half-open pool. */
-  tc->s_index = api_context;
-
   return 0;
 }
 
index a0dff90..8599c74 100644 (file)
@@ -92,7 +92,8 @@ static int
 vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
 {
   application_t *app;
-  u32 table_index, listener_index;
+  u32 table_index;
+  u64 listener;
   int rv, have_local = 0;
 
   app = application_get_if_valid (app_index);
@@ -108,8 +109,8 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
 
   table_index = application_session_table (app,
                                           session_endpoint_fib_proto (sep));
-  listener_index = session_lookup_session_endpoint (table_index, sep);
-  if (listener_index != SESSION_INVALID_INDEX)
+  listener = session_lookup_session_endpoint (table_index, sep);
+  if (listener != SESSION_INVALID_HANDLE)
     return VNET_API_ERROR_ADDRESS_IN_USE;
 
   /*
@@ -119,8 +120,8 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
   if (application_has_local_scope (app) && session_endpoint_is_zero (sep))
     {
       table_index = application_local_session_table (app);
-      listener_index = session_lookup_session_endpoint (table_index, sep);
-      if (listener_index != SESSION_INVALID_INDEX)
+      listener = session_lookup_session_endpoint (table_index, sep);
+      if (listener != SESSION_INVALID_HANDLE)
        return VNET_API_ERROR_ADDRESS_IN_USE;
       session_lookup_add_session_endpoint (table_index, sep, app->index);
       *handle = session_lookup_local_listener_make_handle (sep);
@@ -206,6 +207,7 @@ vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep,
 {
   application_t *server, *app;
   u32 table_index;
+  stream_session_t *listener;
 
   if (session_endpoint_is_zero (sep))
     return VNET_API_ERROR_INVALID_VALUE;
@@ -243,10 +245,13 @@ vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep,
 
   table_index = application_session_table (app,
                                           session_endpoint_fib_proto (sep));
-  app_index = session_lookup_session_endpoint (table_index, sep);
-  server = application_get (app_index);
-  if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
-    return app_connect_redirect (server, mp);
+  listener = session_lookup_listener (table_index, sep);
+  if (listener)
+    {
+      server = application_get (listener->app_index);
+      if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
+       return app_connect_redirect (server, mp);
+    }
 
   /*
    * Not connecting to a local server, propagate to transport
@@ -470,14 +475,15 @@ vnet_unbind_uri (vnet_unbind_args_t * a)
 clib_error_t *
 vnet_connect_uri (vnet_connect_args_t * a)
 {
-  session_endpoint_t sep = SESSION_ENDPOINT_NULL;
+  session_endpoint_t sep_null = SESSION_ENDPOINT_NULL;
   int rv;
 
   /* Parse uri */
-  rv = parse_uri (a->uri, &sep);
+  a->sep = sep_null;
+  rv = parse_uri (a->uri, &a->sep);
   if (rv)
     return clib_error_return_code (0, rv, 0, "app init: %d", rv);
-  if ((rv = vnet_connect_i (a->app_index, a->api_context, &sep, a->mp)))
+  if ((rv = vnet_connect_i (a->app_index, a->api_context, &a->sep, a->mp)))
     return clib_error_return_code (0, rv, 0, "connect failed");
   return 0;
 }
@@ -489,7 +495,7 @@ vnet_disconnect_session (vnet_disconnect_args_t * a)
   stream_session_t *s;
 
   session_parse_handle (a->handle, &index, &thread_index);
-  s = stream_session_get_if_valid (index, thread_index);
+  s = session_get_if_valid (index, thread_index);
 
   if (!s || s->app_index != a->app_index)
     return VNET_API_ERROR_INVALID_VALUE;
index 5e1fe8e..0251c3b 100644 (file)
@@ -56,11 +56,7 @@ typedef struct _vnet_bind_args_t
   union
   {
     char *uri;
-    struct
-    {
-      session_endpoint_t sep;
-      transport_proto_t proto;
-    };
+    session_endpoint_t sep;
   };
 
   u32 app_index;
@@ -86,23 +82,14 @@ typedef struct _vnet_unbind_args_t
 
 typedef struct _vnet_connect_args
 {
-  union
-  {
-    char *uri;
-    struct
-    {
-      session_endpoint_t sep;
-      transport_proto_t proto;
-    };
-  };
+  char *uri;
+  session_endpoint_t sep;
   u32 app_index;
   u32 api_context;
 
   /* Used for redirects */
   void *mp;
-
-  /* used for proxy connections */
-  u64 server_handle;
+  u64 session_handle;
 } vnet_connect_args_t;
 
 typedef struct _vnet_disconnect_args_t
index f35dec7..cb83d8e 100644 (file)
@@ -273,7 +273,7 @@ segment_manager_del_sessions (segment_manager_t * sm)
          if (session->session_state != SESSION_STATE_CLOSED)
            {
              session->session_state = SESSION_STATE_CLOSED;
-             session_send_session_evt_to_thread (stream_session_handle
+             session_send_session_evt_to_thread (session_handle
                                                  (session),
                                                  FIFO_EVENT_DISCONNECT,
                                                  thread_index);
index 88b38f1..7f28a39 100644 (file)
 session_manager_main_t session_manager_main;
 extern transport_proto_vft_t *tp_vfts;
 
-int
-stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
-                        u8 alloc_fifos, stream_session_t ** ret_s)
+static void
+session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
+                           u32 thread_index, void *fp, void *rpc_args)
+{
+  u32 tries = 0;
+  session_fifo_event_t evt = { {0}, };
+  unix_shared_memory_queue_t *q;
+
+  evt.event_type = evt_type;
+  if (evt_type == FIFO_EVENT_RPC)
+    {
+      evt.rpc_args.fp = fp;
+      evt.rpc_args.arg = rpc_args;
+    }
+  else
+    evt.session_handle = session_handle;
+
+  q = session_manager_get_vpp_event_queue (thread_index);
+  while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1))
+    {
+      if (tries++ == 3)
+       {
+         SESSION_DBG ("failed to enqueue evt");
+         break;
+       }
+    }
+}
+
+void
+session_send_session_evt_to_thread (u64 session_handle,
+                                   fifo_event_type_t evt_type,
+                                   u32 thread_index)
+{
+  session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
+}
+
+void
+session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
+{
+  if (thread_index != vlib_get_thread_index ())
+    session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
+                               rpc_args);
+  else
+    {
+      void (*fnp) (void *) = fp;
+      fnp (rpc_args);
+    }
+}
+
+stream_session_t *
+session_alloc (u32 thread_index)
 {
   session_manager_main_t *smm = &session_manager_main;
+  stream_session_t *s;
+  u8 will_expand = 0;
+  pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
+                               CLIB_CACHE_LINE_BYTES);
+  /* If we have peekers, let them finish */
+  if (PREDICT_FALSE (will_expand))
+    {
+      clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]);
+      pool_get_aligned (session_manager_main.sessions[thread_index], s,
+                       CLIB_CACHE_LINE_BYTES);
+      clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]);
+    }
+  else
+    {
+      pool_get_aligned (session_manager_main.sessions[thread_index], s,
+                       CLIB_CACHE_LINE_BYTES);
+    }
+  memset (s, 0, sizeof (*s));
+  s->session_index = s - session_manager_main.sessions[thread_index];
+  s->thread_index = thread_index;
+  return s;
+}
+
+static void
+session_free (stream_session_t * s)
+{
+  pool_put (session_manager_main.sessions[s->thread_index], s);
+  if (CLIB_DEBUG)
+    memset (s, 0xFA, sizeof (*s));
+}
+
+static int
+session_alloc_fifos (segment_manager_t * sm, stream_session_t * s)
+{
   svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
   u32 fifo_segment_index;
-  u32 pool_index;
-  stream_session_t *s;
-  u64 value;
-  u32 thread_index = tc->thread_index;
   int rv;
 
-  ASSERT (thread_index == vlib_get_thread_index ());
+  if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
+                                                &server_tx_fifo,
+                                                &fifo_segment_index)))
+    return rv;
+  /* Initialize backpointers */
+  server_rx_fifo->master_session_index = s->session_index;
+  server_rx_fifo->master_thread_index = s->thread_index;
 
-  /* Create the session */
-  pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
-  memset (s, 0, sizeof (*s));
-  pool_index = s - smm->sessions[thread_index];
+  server_tx_fifo->master_session_index = s->session_index;
+  server_tx_fifo->master_thread_index = s->thread_index;
 
-  /* Allocate fifos */
-  if (alloc_fifos)
-    {
-      if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
-                                                    &server_tx_fifo,
-                                                    &fifo_segment_index)))
-       {
-         pool_put (smm->sessions[thread_index], s);
-         return rv;
-       }
-      /* Initialize backpointers */
-      server_rx_fifo->master_session_index = pool_index;
-      server_rx_fifo->master_thread_index = thread_index;
+  s->server_rx_fifo = server_rx_fifo;
+  s->server_tx_fifo = server_tx_fifo;
+  s->svm_segment_index = fifo_segment_index;
+  return 0;
+}
 
-      server_tx_fifo->master_session_index = pool_index;
-      server_tx_fifo->master_thread_index = thread_index;
+static stream_session_t *
+session_alloc_for_connection (transport_connection_t * tc)
+{
+  stream_session_t *s;
+  u32 thread_index = tc->thread_index;
 
-      s->server_rx_fifo = server_rx_fifo;
-      s->server_tx_fifo = server_tx_fifo;
-      s->svm_segment_index = fifo_segment_index;
-    }
+  ASSERT (thread_index == vlib_get_thread_index ());
 
-  /* Initialize state machine, such as it is... */
-  s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
-                                                   tc->is_ip4);
+  s = session_alloc (thread_index);
+  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
   s->session_state = SESSION_STATE_CONNECTING;
   s->thread_index = thread_index;
-  s->session_index = pool_index;
 
-  /* Attach transport to session */
+  /* Attach transport to session and vice versa */
   s->connection_index = tc->c_index;
-
-  /* Attach session to transport */
   tc->s_index = s->session_index;
+  return s;
+}
+
+static int
+session_alloc_and_init (segment_manager_t * sm, transport_connection_t * tc,
+                       u8 alloc_fifos, stream_session_t ** ret_s)
+{
+  stream_session_t *s;
+  int rv;
+
+  s = session_alloc_for_connection (tc);
+  if (alloc_fifos && (rv = session_alloc_fifos (sm, s)))
+    {
+      session_free (s);
+      return rv;
+    }
 
   /* Add to the main lookup table */
-  value = stream_session_handle (s);
-  session_lookup_add_connection (tc, value);
+  session_lookup_add_connection (tc, session_handle (s));
 
   *ret_s = s;
-
   return 0;
 }
 
@@ -217,8 +301,9 @@ session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
  * @return Number of bytes enqueued or a negative value if enqueueing failed.
  */
 int
-stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
-                            u32 offset, u8 queue_event, u8 is_in_order)
+session_enqueue_stream_connection (transport_connection_t * tc,
+                                  vlib_buffer_t * b, u32 offset,
+                                  u8 queue_event, u8 is_in_order)
 {
   stream_session_t *s;
   int enqueued = 0, rv, in_order_off;
@@ -257,12 +342,12 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
        * by calling stream_server_flush_enqueue_events () */
       session_manager_main_t *smm = vnet_get_session_manager_main ();
       u32 thread_index = s->thread_index;
-      u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
+      u32 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
 
-      if (s->enqueue_epoch != my_enqueue_epoch)
+      if (s->enqueue_epoch != enqueue_epoch)
        {
-         s->enqueue_epoch = my_enqueue_epoch;
-         vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
+         s->enqueue_epoch = enqueue_epoch;
+         vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
                    s - smm->sessions[thread_index]);
        }
     }
@@ -270,6 +355,41 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
   return enqueued;
 }
 
+int
+session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
+                                 u8 proto, u8 queue_event)
+{
+  int enqueued = 0, rv, in_order_off;
+
+  if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length)
+    return -1;
+  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
+                                     vlib_buffer_get_current (b));
+  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
+    {
+      in_order_off = enqueued > b->current_length ? enqueued : 0;
+      rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
+      if (rv > 0)
+       enqueued += rv;
+    }
+  if (queue_event)
+    {
+      /* Queue RX event on this fifo. Eventually these will need to be flushed
+       * by calling stream_server_flush_enqueue_events () */
+      session_manager_main_t *smm = vnet_get_session_manager_main ();
+      u32 thread_index = s->thread_index;
+      u32 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
+
+      if (s->enqueue_epoch != enqueue_epoch)
+       {
+         s->enqueue_epoch = enqueue_epoch;
+         vec_add1 (smm->session_to_enqueue[proto][thread_index],
+                   s - smm->sessions[thread_index]);
+       }
+    }
+  return enqueued;
+}
+
 /** Check if we have space in rx fifo to push more bytes */
 u8
 stream_session_no_space (transport_connection_t * tc, u32 thread_index,
@@ -319,12 +439,11 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
  * @return 0 on succes or negative number if failed to send notification.
  */
 static int
-stream_session_enqueue_notify (stream_session_t * s, u8 block)
+session_enqueue_notify (stream_session_t * s, u8 block)
 {
   application_t *app;
   session_fifo_event_t evt;
   unix_shared_memory_queue_t *q;
-  static u32 serial_number;
 
   if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
     {
@@ -354,7 +473,6 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
       /* Fabricate event */
       evt.fifo = s->server_rx_fifo;
       evt.event_type = FIFO_EVENT_APP_RX;
-      evt.event_id = serial_number++;
 
       /* Add event to server's event queue */
       q = app->event_queue;
@@ -389,35 +507,25 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
  *         failures due to API queue being full.
  */
 int
-session_manager_flush_enqueue_events (u32 thread_index)
+session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
 {
   session_manager_main_t *smm = &session_manager_main;
-  u32 *session_indices_to_enqueue;
+  u32 *indices;
+  stream_session_t *s;
   int i, errors = 0;
 
-  session_indices_to_enqueue =
-    smm->session_indices_to_enqueue_by_thread[thread_index];
+  indices = smm->session_to_enqueue[transport_proto][thread_index];
 
-  for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
+  for (i = 0; i < vec_len (indices); i++)
     {
-      stream_session_t *s0;
-
-      /* Get session */
-      s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
-                                       thread_index);
-      if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
-       {
-         errors++;
-       }
+      s = session_get_if_valid (indices[i], thread_index);
+      if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
+       errors++;
     }
 
-  vec_reset_length (session_indices_to_enqueue);
-
-  smm->session_indices_to_enqueue_by_thread[thread_index] =
-    session_indices_to_enqueue;
-
-  /* Increment enqueue epoch for next round */
-  smm->current_enqueue_epoch[thread_index]++;
+  vec_reset_length (indices);
+  smm->session_to_enqueue[transport_proto][thread_index] = indices;
+  smm->current_enqueue_epoch[transport_proto][thread_index]++;
 
   return errors;
 }
@@ -438,22 +546,25 @@ stream_session_init_fifos_pointers (transport_connection_t * tc,
 }
 
 int
-stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
+session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
 {
   application_t *app;
   stream_session_t *new_s = 0;
   u64 handle;
   u32 opaque = 0;
   int error = 0;
+  segment_manager_t *sm;
+  u8 alloc_fifos;
 
+  /*
+   * Find connection handle and cleanup half-open table
+   */
   handle = session_lookup_half_open_handle (tc);
   if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
     {
       SESSION_DBG ("half-open was removed!");
       return -1;
     }
-
-  /* Cleanup half-open table */
   session_lookup_del_half_open (tc);
 
   /* Get the app's index from the handle we stored when opening connection
@@ -462,17 +573,16 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
   app = application_get_if_valid (handle >> 32);
   if (!app)
     return -1;
-
   opaque = tc->s_index;
 
+  /*
+   * Allocate new session with fifos (svm segments are allocated if needed)
+   */
   if (!is_fail)
     {
-      segment_manager_t *sm;
-      u8 alloc_fifos;
       sm = application_get_connect_segment_manager (app);
       alloc_fifos = application_is_proxy (app);
-      /* Create new session (svm segments are allocated if needed) */
-      if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
+      if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
        {
          is_fail = 1;
          error = -1;
@@ -481,7 +591,9 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
        new_s->app_index = app->index;
     }
 
-  /* Notify client application */
+  /*
+   * Notify client application
+   */
   if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
                                              is_fail))
     {
@@ -498,6 +610,67 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
   return error;
 }
 
+typedef struct _session_switch_pool_args
+{
+  u32 session_index;
+  u32 thread_index;
+  u32 new_thread_index;
+  u32 new_session_index;
+} session_switch_pool_args_t;
+
+static void
+session_switch_pool (void *cb_args)
+{
+  session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
+  stream_session_t *s;
+  ASSERT (args->thread_index == vlib_get_thread_index ());
+  s = session_get (args->session_index, args->thread_index);
+  s->server_tx_fifo->master_session_index = args->new_session_index;
+  s->server_tx_fifo->master_thread_index = args->new_thread_index;
+  tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
+  session_free (s);
+  clib_mem_free (cb_args);
+}
+
+/**
+ * Move dgram session to the right thread
+ */
+int
+session_dgram_connect_notify (transport_connection_t * tc,
+                             u32 old_thread_index,
+                             stream_session_t ** new_session)
+{
+  stream_session_t *new_s;
+  session_switch_pool_args_t *rpc_args;
+
+  /*
+   * Clone half-open session to the right thread.
+   */
+  new_s = session_clone_safe (tc->s_index, old_thread_index);
+  new_s->connection_index = tc->c_index;
+  new_s->server_rx_fifo->master_session_index = new_s->session_index;
+  new_s->server_rx_fifo->master_thread_index = new_s->thread_index;
+  new_s->session_state = SESSION_STATE_READY;
+  session_lookup_add_connection (tc, session_handle (new_s));
+
+  /*
+   * Ask thread owning the old session to clean it up and make us the tx
+   * fifo owner
+   */
+  rpc_args = clib_mem_alloc (sizeof (*rpc_args));
+  rpc_args->new_session_index = new_s->session_index;
+  rpc_args->new_thread_index = new_s->thread_index;
+  rpc_args->session_index = tc->s_index;
+  rpc_args->thread_index = old_thread_index;
+  session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
+                                 rpc_args);
+
+  tc->s_index = new_s->session_index;
+  new_s->connection_index = tc->c_index;
+  *new_session = new_s;
+  return 0;
+}
+
 void
 stream_session_accept_notify (transport_connection_t * tc)
 {
@@ -533,7 +706,6 @@ stream_session_disconnect_notify (transport_connection_t * tc)
 void
 stream_session_delete (stream_session_t * s)
 {
-  session_manager_main_t *smm = vnet_get_session_manager_main ();
   int rv;
 
   /* Delete from the main lookup table. */
@@ -543,10 +715,7 @@ stream_session_delete (stream_session_t * s)
   /* Cleanup fifo segments */
   segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
                                 s->server_tx_fifo);
-
-  pool_put (smm->sessions[s->thread_index], s);
-  if (CLIB_DEBUG)
-    memset (s, 0xFA, sizeof (*s));
+  session_free (s);
 }
 
 /**
@@ -563,7 +732,7 @@ stream_session_delete_notify (transport_connection_t * tc)
   stream_session_t *s;
 
   /* App might've been removed already */
-  s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
+  s = session_get_if_valid (tc->s_index, tc->thread_index);
   if (!s)
     return;
   stream_session_delete (s);
@@ -596,14 +765,14 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index,
   session_type_t sst;
   int rv;
 
-  sst = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
+  sst = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
 
   /* Find the server */
   listener = listen_session_get (sst, listener_index);
   server = application_get (listener->app_index);
 
   sm = application_get_listen_segment_manager (server, listener);
-  if ((rv = stream_session_create_i (sm, tc, 1, &s)))
+  if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
     return rv;
 
   s->app_index = server->index;
@@ -629,14 +798,17 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index,
  * @param app_index Index of the application requesting the connect
  * @param st Session type requested.
  * @param tep Remote transport endpoint
- * @param res Resulting transport connection .
+ * @param opaque Opaque data (typically, api_context) the application expects
+ *              on open completion.
  */
 int
-stream_session_open (u32 app_index, session_endpoint_t * rmt,
-                    transport_connection_t ** res)
+session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
 {
   transport_connection_t *tc;
   session_type_t sst;
+  segment_manager_t *sm;
+  stream_session_t *s;
+  application_t *app;
   int rv;
   u64 handle;
 
@@ -644,22 +816,45 @@ stream_session_open (u32 app_index, session_endpoint_t * rmt,
   rv = tp_vfts[sst].open (session_endpoint_to_transport (rmt));
   if (rv < 0)
     {
-      clib_warning ("Transport failed to open connection.");
+      SESSION_DBG ("Transport failed to open connection.");
       return VNET_API_ERROR_SESSION_CONNECT;
     }
 
   tc = tp_vfts[sst].get_half_open ((u32) rv);
 
-  /* Save app and tc index. The latter is needed to help establish the
-   * connection while the former is needed when the connect notify comes
-   * and we have to notify the external app */
-  handle = (((u64) app_index) << 32) | (u64) tc->c_index;
-
-  /* Add to the half-open lookup table */
-  session_lookup_add_half_open (tc, handle);
+  /* If transport offers a stream service, only allocate session once the
+   * connection has been established.
+   */
+  if (transport_is_stream (rmt->transport_proto))
+    {
+      /* Add connection to half-open table and save app and tc index. The
+       * latter is needed to help establish the connection while the former
+       * is needed when the connect notify comes and we have to notify the
+       * external app
+       */
+      handle = (((u64) app_index) << 32) | (u64) tc->c_index;
+      session_lookup_add_half_open (tc, handle);
+
+      /* Store api_context (opaque) for when the reply comes. Not the nicest
+       * thing but better than allocating a separate half-open pool.
+       */
+      tc->s_index = opaque;
+    }
+  /* For dgram type of service, allocate session and fifos now.
+   */
+  else
+    {
+      app = application_get (app_index);
+      sm = application_get_connect_segment_manager (app);
 
-  *res = tc;
+      if (session_alloc_and_init (sm, tc, 1, &s))
+       return -1;
+      s->app_index = app->index;
+      s->session_state = SESSION_STATE_CONNECTING_READY;
 
+      /* Tell the app about the new event fifo for this session */
+      app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
+    }
   return 0;
 }
 
@@ -672,14 +867,14 @@ stream_session_open (u32 app_index, session_endpoint_t * rmt,
  * @param tep Local endpoint to be listened on.
  */
 int
-stream_session_listen (stream_session_t * s, session_endpoint_t * tep)
+stream_session_listen (stream_session_t * s, session_endpoint_t * sep)
 {
   transport_connection_t *tc;
   u32 tci;
 
   /* Transport bind/listen  */
   tci = tp_vfts[s->session_type].bind (s->session_index,
-                                      session_endpoint_to_transport (tep));
+                                      session_endpoint_to_transport (sep));
 
   if (tci == (u32) ~ 0)
     return -1;
@@ -694,7 +889,6 @@ stream_session_listen (stream_session_t * s, session_endpoint_t * tep)
 
   /* Add to the main lookup table */
   session_lookup_add_connection (tc, s->session_index);
-
   return 0;
 }
 
@@ -726,32 +920,6 @@ stream_session_stop_listen (stream_session_t * s)
   return 0;
 }
 
-void
-session_send_session_evt_to_thread (u64 session_handle,
-                                   fifo_event_type_t evt_type,
-                                   u32 thread_index)
-{
-  static u16 serial_number = 0;
-  u32 tries = 0;
-  session_fifo_event_t evt;
-  unix_shared_memory_queue_t *q;
-
-  /* Fabricate event */
-  evt.session_handle = session_handle;
-  evt.event_type = evt_type;
-  evt.event_id = serial_number++;
-
-  q = session_manager_get_vpp_event_queue (thread_index);
-  while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1))
-    {
-      if (tries++ == 3)
-       {
-         TCP_DBG ("failed to enqueue evt");
-         break;
-       }
-    }
-}
-
 /**
  * Disconnect session and propagate to transport. This should eventually
  * result in a delete notification that allows us to cleanup session state.
@@ -837,6 +1005,21 @@ session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
   return SESSION_N_TYPES;
 }
 
+transport_connection_t *
+session_get_transport (stream_session_t * s)
+{
+  if (s->session_state >= SESSION_STATE_READY)
+    return tp_vfts[s->session_type].get_connection (s->connection_index,
+                                                   s->thread_index);
+  return 0;
+}
+
+transport_connection_t *
+listen_session_get_transport (stream_session_t * s)
+{
+  return tp_vfts[s->session_type].get_listener (s->connection_index);
+}
+
 int
 listen_session_get_local_session_endpoint (stream_session_t * listener,
                                           session_endpoint_t * sep)
@@ -852,7 +1035,7 @@ listen_session_get_local_session_endpoint (stream_session_t * listener,
 
   /* N.B. The ip should not be copied because this is the local endpoint */
   sep->port = tc->lcl_port;
-  sep->transport_proto = tc->transport_proto;
+  sep->transport_proto = tc->proto;
   sep->is_ip4 = tc->is_ip4;
   return 0;
 }
@@ -864,7 +1047,7 @@ session_manager_main_enable (vlib_main_t * vm)
   vlib_thread_main_t *vtm = vlib_get_thread_main ();
   u32 num_threads;
   u32 preallocated_sessions_per_worker;
-  int i;
+  int i, j;
 
   num_threads = 1 /* main thread */  + vtm->n_threads;
 
@@ -877,12 +1060,21 @@ session_manager_main_enable (vlib_main_t * vm)
 
   /* configure per-thread ** vectors */
   vec_validate (smm->sessions, num_threads - 1);
-  vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
   vec_validate (smm->tx_buffers, num_threads - 1);
   vec_validate (smm->pending_event_vector, num_threads - 1);
+  vec_validate (smm->pending_disconnects, num_threads - 1);
   vec_validate (smm->free_event_vector, num_threads - 1);
-  vec_validate (smm->current_enqueue_epoch, num_threads - 1);
   vec_validate (smm->vpp_event_queues, num_threads - 1);
+  vec_validate (smm->session_peekers, num_threads - 1);
+  vec_validate (smm->peekers_readers_locks, num_threads - 1);
+  vec_validate (smm->peekers_write_locks, num_threads - 1);
+
+  for (i = 0; i < TRANSPORT_N_PROTO; i++)
+    for (j = 0; j < num_threads; j++)
+      {
+       vec_validate (smm->session_to_enqueue[i], num_threads - 1);
+       vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
+      }
 
   for (i = 0; i < num_threads; i++)
     {
@@ -890,6 +1082,8 @@ session_manager_main_enable (vlib_main_t * vm)
       _vec_len (smm->free_event_vector[i]) = 0;
       vec_validate (smm->pending_event_vector[i], 0);
       _vec_len (smm->pending_event_vector[i]) = 0;
+      vec_validate (smm->pending_disconnects[i], 0);
+      _vec_len (smm->pending_disconnects[i]) = 0;
     }
 
 #if SESSION_DBG
@@ -924,6 +1118,7 @@ session_manager_main_enable (vlib_main_t * vm)
 
   session_lookup_init ();
   app_namespaces_init ();
+  transport_init ();
 
   smm->is_enabled = 1;
 
index b1a03d2..bd854d4 100644 (file)
@@ -105,7 +105,7 @@ typedef CLIB_PACKED (struct {
       rpc_args_t rpc_args;
     };
   u8 event_type;
-  u16 event_id;
+  u8 postponed;
 }) session_fifo_event_t;
 /* *INDENT-ON* */
 
@@ -128,17 +128,21 @@ struct _session_manager_main
   /** Per worker thread session pools */
   stream_session_t **sessions;
 
+  /** Per worker-thread count of threads peeking into the session pool */
+  u32 *session_peekers;
+
+  /** Per worker-thread rw peekers locks */
+  clib_spinlock_t *peekers_readers_locks;
+  clib_spinlock_t *peekers_write_locks;
+
   /** Pool of listen sessions. Same type as stream sessions to ease lookups */
   stream_session_t *listen_sessions[SESSION_N_TYPES];
 
-  /** Sparse vector to map dst port to stream server  */
-  u16 *stream_server_by_dst_port[SESSION_N_TYPES];
-
-  /** per-worker enqueue epoch counters */
-  u8 *current_enqueue_epoch;
+  /** Per-proto, per-worker enqueue epoch counters */
+  u8 *current_enqueue_epoch[TRANSPORT_N_PROTO];
 
-  /** Per-worker thread vector of sessions to enqueue */
-  u32 **session_indices_to_enqueue_by_thread;
+  /** Per-proto, per-worker thread vector of sessions to enqueue */
+  u32 **session_to_enqueue[TRANSPORT_N_PROTO];
 
   /** per-worker tx buffer free lists */
   u32 **tx_buffers;
@@ -149,6 +153,9 @@ struct _session_manager_main
   /** per-worker active event vectors */
   session_fifo_event_t **pending_event_vector;
 
+  /** per-worker postponed disconnects */
+  session_fifo_event_t **pending_disconnects;
+
   /** vpp fifo event queue */
   unix_shared_memory_queue_t **vpp_event_queues;
 
@@ -213,6 +220,8 @@ stream_session_is_valid (u32 si, u8 thread_index)
   return 1;
 }
 
+stream_session_t *session_alloc (u32 thread_index);
+
 always_inline stream_session_t *
 session_get (u32 si, u32 thread_index)
 {
@@ -221,7 +230,7 @@ session_get (u32 si, u32 thread_index)
 }
 
 always_inline stream_session_t *
-stream_session_get_if_valid (u64 si, u32 thread_index)
+session_get_if_valid (u64 si, u32 thread_index)
 {
   if (thread_index >= vec_len (session_manager_main.sessions))
     return 0;
@@ -234,7 +243,7 @@ stream_session_get_if_valid (u64 si, u32 thread_index)
 }
 
 always_inline u64
-stream_session_handle (stream_session_t * s)
+session_handle (stream_session_t * s)
 {
   return ((u64) s->thread_index << 32) | (u64) s->session_index;
 }
@@ -267,6 +276,66 @@ session_get_from_handle (u64 handle)
                       session_index_from_handle (handle));
 }
 
+/**
+ * Acquires a lock that blocks a session pool from expanding.
+ *
+ * This is typically used for safely peeking into other threads'
+ * pools in order to clone elements. Lock should be dropped as soon
+ * as possible by calling @ref session_pool_remove_peeker.
+ *
+ * NOTE: Avoid using pool_elt_at_index while the lock is held because
+ * it may lead to free elt bitmap expansion/contraction!
+ */
+always_inline void
+session_pool_add_peeker (u32 thread_index)
+{
+  session_manager_main_t *smm = &session_manager_main;
+  if (thread_index == vlib_get_thread_index ())
+    return;
+  clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]);
+  smm->session_peekers[thread_index] += 1;
+  if (smm->session_peekers[thread_index] == 1)
+    clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]);
+  clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]);
+}
+
+always_inline void
+session_pool_remove_peeker (u32 thread_index)
+{
+  session_manager_main_t *smm = &session_manager_main;
+  if (thread_index == vlib_get_thread_index ())
+    return;
+  ASSERT (session_manager_main.session_peekers[thread_index] > 0);
+  clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]);
+  smm->session_peekers[thread_index] -= 1;
+  if (smm->session_peekers[thread_index] == 0)
+    clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]);
+  clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]);
+}
+
+/**
+ * Get session from handle and 'lock' pool resize if not in same thread
+ *
+ * Caller should drop the peek 'lock' as soon as possible.
+ */
+always_inline stream_session_t *
+session_get_from_handle_safe (u64 handle)
+{
+  session_manager_main_t *smm = &session_manager_main;
+  u32 thread_index = session_thread_from_handle (handle);
+  if (thread_index == vlib_get_thread_index ())
+    {
+      return pool_elt_at_index (smm->sessions[thread_index],
+                               session_index_from_handle (handle));
+    }
+  else
+    {
+      session_pool_add_peeker (thread_index);
+      /* Don't use pool_elt_at index. See @ref session_pool_add_peeker */
+      return smm->sessions[thread_index] + session_index_from_handle (handle);
+    }
+}
+
 always_inline stream_session_t *
 stream_session_listener_get (u8 sst, u64 si)
 {
@@ -296,17 +365,52 @@ stream_session_rx_fifo_size (transport_connection_t * tc)
   return s->server_rx_fifo->nitems;
 }
 
+always_inline u32
+session_get_index (stream_session_t * s)
+{
+  return (s - session_manager_main.sessions[s->thread_index]);
+}
+
+always_inline stream_session_t *
+session_clone_safe (u32 session_index, u32 thread_index)
+{
+  stream_session_t *old_s, *new_s;
+  u32 current_thread_index = vlib_get_thread_index ();
+
+  /* If during the memcpy pool is reallocated AND the memory allocator
+   * decides to give the old chunk of memory to somebody in a hurry to
+   * scribble something on it, we have a problem. So add this thread as
+   * a session pool peeker.
+   */
+  session_pool_add_peeker (thread_index);
+  new_s = session_alloc (current_thread_index);
+  old_s = session_manager_main.sessions[thread_index] + session_index;
+  clib_memcpy (new_s, old_s, sizeof (*new_s));
+  session_pool_remove_peeker (thread_index);
+  new_s->thread_index = current_thread_index;
+  new_s->session_index = session_get_index (new_s);
+  return new_s;
+}
+
+transport_connection_t *session_get_transport (stream_session_t * s);
+
 u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc);
 
+stream_session_t *session_alloc (u32 thread_index);
 int
-stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
-                            u32 offset, u8 queue_event, u8 is_in_order);
-int
-stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
-                          u32 offset, u32 max_bytes);
+session_enqueue_stream_connection (transport_connection_t * tc,
+                                  vlib_buffer_t * b, u32 offset,
+                                  u8 queue_event, u8 is_in_order);
+int session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
+                                     u8 proto, u8 queue_event);
+int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
+                              u32 offset, u32 max_bytes);
 u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes);
 
-int stream_session_connect_notify (transport_connection_t * tc, u8 is_fail);
+int session_stream_connect_notify (transport_connection_t * tc, u8 is_fail);
+int session_dgram_connect_notify (transport_connection_t * tc,
+                                 u32 old_thread_index,
+                                 stream_session_t ** new_session);
 void stream_session_init_fifos_pointers (transport_connection_t * tc,
                                         u32 rx_pointer, u32 tx_pointer);
 
@@ -314,12 +418,9 @@ void stream_session_accept_notify (transport_connection_t * tc);
 void stream_session_disconnect_notify (transport_connection_t * tc);
 void stream_session_delete_notify (transport_connection_t * tc);
 void stream_session_reset_notify (transport_connection_t * tc);
-int
-stream_session_accept (transport_connection_t * tc, u32 listener_index,
-                      u8 notify);
-int
-stream_session_open (u32 app_index, session_endpoint_t * tep,
-                    transport_connection_t ** tc);
+int stream_session_accept (transport_connection_t * tc, u32 listener_index,
+                          u8 notify);
+int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque);
 int stream_session_listen (stream_session_t * s, session_endpoint_t * tep);
 int stream_session_stop_listen (stream_session_t * s);
 void stream_session_disconnect (stream_session_t * s);
@@ -346,7 +447,7 @@ session_manager_get_vpp_event_queue (u32 thread_index)
   return session_manager_main.vpp_event_queues[thread_index];
 }
 
-int session_manager_flush_enqueue_events (u32 thread_index);
+int session_manager_flush_enqueue_events (u8 proto, u32 thread_index);
 
 always_inline u64
 listen_session_get_handle (stream_session_t * s)
@@ -400,6 +501,8 @@ listen_session_del (stream_session_t * s)
   pool_put (session_manager_main.listen_sessions[s->session_type], s);
 }
 
+transport_connection_t *listen_session_get_transport (stream_session_t * s);
+
 int
 listen_session_get_local_session_endpoint (stream_session_t * listener,
                                           session_endpoint_t * sep);
index 5bfca7b..432c7ba 100755 (executable)
@@ -99,10 +99,10 @@ send_session_accept_callback (stream_session_t * s)
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
   mp->context = server->index;
   listener = listen_session_get (s->session_type, s->listener_index);
-  tp_vft = session_get_transport_vft (s->session_type);
+  tp_vft = transport_protocol_get_vft (s->session_type);
   tc = tp_vft->get_connection (s->connection_index, s->thread_index);
   mp->listener_handle = listen_session_get_handle (listener);
-  mp->handle = stream_session_handle (s);
+  mp->handle = session_handle (s);
   mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
   mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
   mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
@@ -129,7 +129,7 @@ send_session_disconnect_callback (stream_session_t * s)
   mp = vl_msg_api_alloc (sizeof (*mp));
   memset (mp, 0, sizeof (*mp));
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
-  mp->handle = stream_session_handle (s);
+  mp->handle = session_handle (s);
   vl_msg_api_send_shmem (q, (u8 *) & mp);
 }
 
@@ -148,7 +148,7 @@ send_session_reset_callback (stream_session_t * s)
   mp = vl_msg_api_alloc (sizeof (*mp));
   memset (mp, 0, sizeof (*mp));
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_RESET_SESSION);
-  mp->handle = stream_session_handle (s);
+  mp->handle = session_handle (s);
   vl_msg_api_send_shmem (q, (u8 *) & mp);
 }
 
@@ -175,7 +175,7 @@ send_session_connected_callback (u32 app_index, u32 api_context,
       vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
       mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
       mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
-      mp->handle = stream_session_handle (s);
+      mp->handle = session_handle (s);
       mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
       mp->retval = 0;
     }
@@ -463,11 +463,14 @@ vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
       rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
     }
 
+  /*
+   * Don't reply to stream (tcp) connects. The reply will come once
+   * the connection is established. In case of the redirects, the reply
+   * will come from the server app.
+   */
   if (rv == 0 || rv == VNET_API_ERROR_SESSION_REDIRECT)
     return;
 
-  /* Got some error, relay it */
-
 done:
   /* *INDENT-OFF* */
   REPLY_MACRO (VL_API_CONNECT_SESSION_REPLY);
@@ -540,7 +543,7 @@ vl_api_reset_session_reply_t_handler (vl_api_reset_session_reply_t * mp)
     return;
 
   session_parse_handle (mp->handle, &index, &thread_index);
-  s = stream_session_get_if_valid (index, thread_index);
+  s = session_get_if_valid (index, thread_index);
   if (s == 0 || app->index != s->app_index)
     {
       clib_warning ("Invalid session!");
@@ -576,7 +579,7 @@ vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
   else
     {
       session_parse_handle (mp->handle, &session_index, &thread_index);
-      s = stream_session_get_if_valid (session_index, thread_index);
+      s = session_get_if_valid (session_index, thread_index);
       if (!s)
        {
          clib_warning ("session doesn't exist");
@@ -623,8 +626,8 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
       a->sep.port = mp->port;
       a->sep.fib_index = mp->vrf;
       a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
+      a->sep.transport_proto = mp->proto;
       a->app_index = app->index;
-      a->proto = mp->proto;
 
       if ((error = vnet_bind (a)))
        {
index 588cb60..f0f490d 100755 (executable)
@@ -55,7 +55,7 @@ format_stream_session (u8 * s, va_list * args)
   int verbose = va_arg (*args, int);
   transport_proto_vft_t *tp_vft;
   u8 *str = 0;
-  tp_vft = session_get_transport_vft (ss->session_type);
+  tp_vft = transport_protocol_get_vft (ss->session_type);
 
   if (verbose == 1 && ss->session_state >= SESSION_STATE_ACCEPTING)
     str = format (0, "%-10u%-10u%-10lld",
@@ -63,9 +63,7 @@ format_stream_session (u8 * s, va_list * args)
                  svm_fifo_max_enqueue (ss->server_tx_fifo),
                  stream_session_get_index (ss));
 
-  if (ss->session_state == SESSION_STATE_READY
-      || ss->session_state == SESSION_STATE_ACCEPTING
-      || ss->session_state == SESSION_STATE_CLOSED)
+  if (ss->session_state >= SESSION_STATE_ACCEPTING)
     {
       s = format (s, "%U", tp_vft->format_connection, ss->connection_index,
                  ss->thread_index, verbose);
@@ -146,16 +144,17 @@ unformat_stream_session (unformat_input_t * input, va_list * args)
     return 0;
 
   if (is_ip4)
-    s = session_lookup4 (fib_index, &lcl.ip4, &rmt.ip4,
-                        clib_host_to_net_u16 (lcl_port),
-                        clib_host_to_net_u16 (rmt_port), proto);
+    s = session_lookup_safe4 (fib_index, &lcl.ip4, &rmt.ip4,
+                             clib_host_to_net_u16 (lcl_port),
+                             clib_host_to_net_u16 (rmt_port), proto);
   else
-    s = session_lookup6 (fib_index, &lcl.ip6, &rmt.ip6,
-                        clib_host_to_net_u16 (lcl_port),
-                        clib_host_to_net_u16 (rmt_port), proto);
+    s = session_lookup_safe6 (fib_index, &lcl.ip6, &rmt.ip6,
+                             clib_host_to_net_u16 (lcl_port),
+                             clib_host_to_net_u16 (rmt_port), proto);
   if (s)
     {
       *result = s;
+      session_pool_remove_peeker (s->thread_index);
       return 1;
     }
   return 0;
@@ -324,7 +323,7 @@ clear_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
 
   if (session_index != ~0)
     {
-      session = stream_session_get_if_valid (session_index, thread_index);
+      session = session_get_if_valid (session_index, thread_index);
       if (!session)
        return clib_error_return (0, "no session %d on thread %d",
                                  session_index, thread_index);
index 796d93e..740c5a6 100644 (file)
@@ -116,7 +116,7 @@ always_inline void
 make_v4_ss_kv_from_tc (session_kv4_t * kv, transport_connection_t * t)
 {
   make_v4_ss_kv (kv, &t->lcl_ip.ip4, &t->rmt_ip.ip4, t->lcl_port, t->rmt_port,
-                session_type_from_proto_and_ip (t->transport_proto, 1));
+                session_type_from_proto_and_ip (t->proto, 1));
 }
 
 always_inline void
@@ -159,7 +159,7 @@ always_inline void
 make_v6_ss_kv_from_tc (session_kv6_t * kv, transport_connection_t * t)
 {
   make_v6_ss_kv (kv, &t->lcl_ip.ip6, &t->rmt_ip.ip6, t->lcl_port, t->rmt_port,
-                session_type_from_proto_and_ip (t->transport_proto, 0));
+                session_type_from_proto_and_ip (t->proto, 0));
 }
 
 
@@ -339,7 +339,7 @@ session_lookup_del_session (stream_session_t * s)
   return session_lookup_del_connection (ts);
 }
 
-u32
+u64
 session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep)
 {
   session_table_t *st;
@@ -349,14 +349,14 @@ session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep)
 
   st = session_table_get (table_index);
   if (!st)
-    return SESSION_INVALID_INDEX;
+    return SESSION_INVALID_HANDLE;
   if (sep->is_ip4)
     {
       make_v4_listener_kv (&kv4, &sep->ip.ip4, sep->port,
                           sep->transport_proto);
       rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
       if (rv == 0)
-       return (u32) kv4.value;
+       return kv4.value;
     }
   else
     {
@@ -364,9 +364,43 @@ session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep)
                           sep->transport_proto);
       rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
       if (rv == 0)
-       return (u32) kv6.value;
+       return kv6.value;
     }
-  return SESSION_INVALID_INDEX;
+  return SESSION_INVALID_HANDLE;
+}
+
+stream_session_t *
+session_lookup_global_session_endpoint (session_endpoint_t * sep)
+{
+  session_table_t *st;
+  session_kv4_t kv4;
+  session_kv6_t kv6;
+  u8 fib_proto;
+  u32 table_index;
+  int rv;
+
+  fib_proto = session_endpoint_fib_proto (sep);
+  table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
+  st = session_table_get (table_index);
+  if (!st)
+    return 0;
+  if (sep->is_ip4)
+    {
+      make_v4_listener_kv (&kv4, &sep->ip.ip4, sep->port,
+                          sep->transport_proto);
+      rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
+      if (rv == 0)
+       return session_get_from_handle (kv4.value);
+    }
+  else
+    {
+      make_v6_listener_kv (&kv6, &sep->ip.ip6, sep->port,
+                          sep->transport_proto);
+      rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
+      if (rv == 0)
+       return session_get_from_handle (kv6.value);
+    }
+  return 0;
 }
 
 u32
@@ -562,7 +596,7 @@ session_lookup_half_open_handle (transport_connection_t * tc)
   if (tc->is_ip4)
     {
       make_v4_ss_kv (&kv4, &tc->lcl_ip.ip4, &tc->rmt_ip.ip4, tc->lcl_port,
-                    tc->rmt_port, tc->transport_proto);
+                    tc->rmt_port, tc->proto);
       rv = clib_bihash_search_inline_16_8 (&st->v4_half_open_hash, &kv4);
       if (rv == 0)
        return kv4.value;
@@ -570,7 +604,7 @@ session_lookup_half_open_handle (transport_connection_t * tc)
   else
     {
       make_v6_ss_kv (&kv6, &tc->lcl_ip.ip6, &tc->rmt_ip.ip6, tc->lcl_port,
-                    tc->rmt_port, tc->transport_proto);
+                    tc->rmt_port, tc->proto);
       rv = clib_bihash_search_inline_48_8 (&st->v6_half_open_hash, &kv6);
       if (rv == 0)
        return kv6.value;
@@ -713,12 +747,19 @@ session_lookup_connection4 (u32 fib_index, ip4_address_t * lcl,
 /**
  * Lookup session with ip4 and transport layer information
  *
- * Lookup logic is identical to that of @ref session_lookup_connection_wt4 but
- * this returns a session as opposed to a transport connection;
+ * Important note: this may look into another thread's pool table and
+ * register as 'peeker'. Caller should call @ref session_pool_remove_peeker as
+ * if needed as soon as possible.
+ *
+ * Lookup logic is similar to that of @ref session_lookup_connection_wt4 but
+ * this returns a session as opposed to a transport connection and it does not
+ * try to lookup half-open sessions.
+ *
+ * Typically used by dgram connections
  */
 stream_session_t *
-session_lookup4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt,
-                u16 lcl_port, u16 rmt_port, u8 proto)
+session_lookup_safe4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt,
+                     u16 lcl_port, u16 rmt_port, u8 proto)
 {
   session_table_t *st;
   session_kv4_t kv4;
@@ -733,16 +774,11 @@ session_lookup4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt,
   make_v4_ss_kv (&kv4, lcl, rmt, lcl_port, rmt_port, proto);
   rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
   if (rv == 0)
-    return session_get_from_handle (kv4.value);
+    return session_get_from_handle_safe (kv4.value);
 
   /* If nothing is found, check if any listener is available */
   if ((s = session_lookup_listener4_i (st, lcl, lcl_port, proto)))
     return s;
-
-  /* Finally, try half-open connections */
-  rv = clib_bihash_search_inline_16_8 (&st->v4_half_open_hash, &kv4);
-  if (rv == 0)
-    return session_get_from_handle (kv4.value);
   return 0;
 }
 
@@ -868,12 +904,19 @@ session_lookup_connection6 (u32 fib_index, ip6_address_t * lcl,
 /**
  * Lookup session with ip6 and transport layer information
  *
- * Lookup logic is identical to that of @ref session_lookup_connection_wt6 but
- * this returns a session as opposed to a transport connection;
+ * Important note: this may look into another thread's pool table and
+ * register as 'peeker'. Caller should call @ref session_pool_remove_peeker as
+ * if needed as soon as possible.
+ *
+ * Lookup logic is similar to that of @ref session_lookup_connection_wt6 but
+ * this returns a session as opposed to a transport connection and it does not
+ * try to lookup half-open sessions.
+ *
+ * Typically used by dgram connections
  */
 stream_session_t *
-session_lookup6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
-                u16 lcl_port, u16 rmt_port, u8 proto)
+session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
+                     u16 lcl_port, u16 rmt_port, u8 proto)
 {
   session_table_t *st;
   session_kv6_t kv6;
@@ -887,16 +930,11 @@ session_lookup6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
   make_v6_ss_kv (&kv6, lcl, rmt, lcl_port, rmt_port, proto);
   rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
   if (rv == 0)
-    return session_get_from_handle (kv6.value);
+    return session_get_from_handle_safe (kv6.value);
 
   /* If nothing is found, check if any listener is available */
   if ((s = session_lookup_listener6_i (st, lcl, lcl_port, proto)))
     return s;
-
-  /* Finally, try half-open connections */
-  rv = clib_bihash_search_inline_48_8 (&st->v6_half_open_hash, &kv6);
-  if (rv == 0)
-    return session_get_from_handle (kv6.value);
   return 0;
 }
 
index 20cbaf2..449f8f4 100644 (file)
 #include <vnet/session/stream_session.h>
 #include <vnet/session/transport.h>
 
-stream_session_t *session_lookup4 (u32 fib_index, ip4_address_t * lcl,
-                                  ip4_address_t * rmt, u16 lcl_port,
-                                  u16 rmt_port, u8 proto);
-stream_session_t *session_lookup6 (u32 fib_index, ip6_address_t * lcl,
-                                  ip6_address_t * rmt, u16 lcl_port,
-                                  u16 rmt_port, u8 proto);
+stream_session_t *session_lookup_safe4 (u32 fib_index, ip4_address_t * lcl,
+                                       ip4_address_t * rmt, u16 lcl_port,
+                                       u16 rmt_port, u8 proto);
+stream_session_t *session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl,
+                                       ip6_address_t * rmt, u16 lcl_port,
+                                       u16 rmt_port, u8 proto);
 transport_connection_t *session_lookup_connection_wt4 (u32 fib_index,
                                                       ip4_address_t * lcl,
                                                       ip4_address_t * rmt,
@@ -58,10 +58,12 @@ stream_session_t *session_lookup_listener (u32 table_index,
                                           session_endpoint_t * sep);
 int session_lookup_add_connection (transport_connection_t * tc, u64 value);
 int session_lookup_del_connection (transport_connection_t * tc);
-u32 session_lookup_session_endpoint (u32 table_index,
+u64 session_lookup_session_endpoint (u32 table_index,
                                     session_endpoint_t * sep);
 u32 session_lookup_local_session_endpoint (u32 table_index,
                                           session_endpoint_t * sep);
+stream_session_t *session_lookup_global_session_endpoint (session_endpoint_t
+                                                         *);
 int session_lookup_add_session_endpoint (u32 table_index,
                                         session_endpoint_t * sep, u64 value);
 int session_lookup_del_session_endpoint (u32 table_index,
index d2291fa..cbe936c 100644 (file)
@@ -154,7 +154,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   next_index = next0 = session_type_to_next[s0->session_type];
 
-  transport_vft = session_get_transport_vft (s0->session_type);
+  transport_vft = transport_protocol_get_vft (s0->session_type);
   tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
 
   /* Make sure we have space to send and there's something to dequeue */
@@ -401,8 +401,7 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
 always_inline stream_session_t *
 session_event_get_session (session_fifo_event_t * e, u8 thread_index)
 {
-  return stream_session_get_if_valid (e->fifo->master_session_index,
-                                     thread_index);
+  return session_get_if_valid (e->fifo->master_session_index, thread_index);
 }
 
 void
@@ -540,7 +539,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
-  session_fifo_event_t *my_pending_event_vector, *e;
+  session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e;
   session_fifo_event_t *my_fifo_events;
   u32 n_to_dequeue, n_events;
   unix_shared_memory_queue_t *q;
@@ -570,8 +569,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   /* min number of events we can dequeue without blocking */
   n_to_dequeue = q->cursize;
   my_pending_event_vector = smm->pending_event_vector[my_thread_index];
+  pending_disconnects = smm->pending_disconnects[my_thread_index];
 
-  if (n_to_dequeue == 0 && vec_len (my_pending_event_vector) == 0)
+  if (!n_to_dequeue && !vec_len (my_pending_event_vector)
+      && !vec_len (pending_disconnects))
     return 0;
 
   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
@@ -603,9 +604,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   pthread_mutex_unlock (&q->mutex);
 
   vec_append (my_fifo_events, my_pending_event_vector);
+  vec_append (my_fifo_events, smm->pending_disconnects[my_thread_index]);
 
   _vec_len (my_pending_event_vector) = 0;
   smm->pending_event_vector[my_thread_index] = my_pending_event_vector;
+  _vec_len (smm->pending_disconnects[my_thread_index]) = 0;
 
 skip_dequeue:
   n_events = vec_len (my_fifo_events);
@@ -644,6 +647,13 @@ skip_dequeue:
            }
          break;
        case FIFO_EVENT_DISCONNECT:
+         /* Make sure disconnects run after the pending list is drained */
+         if (!e0->postponed)
+           {
+             e0->postponed = 1;
+             vec_add1 (smm->pending_disconnects[my_thread_index], *e0);
+             continue;
+           }
          s0 = session_get_from_handle (e0->session_handle);
          stream_session_disconnect (s0);
          break;
index ce0b4a2..5e05640 100644 (file)
@@ -37,6 +37,7 @@ typedef struct _session_lookup_table
 #define SESSION_TABLE_INVALID_INDEX ((u32)~0)
 #define SESSION_LOCAL_TABLE_PREFIX ((u32)~0)
 #define SESSION_INVALID_INDEX ((u32)~0)
+#define SESSION_INVALID_HANDLE ((u64)~0)
 
 typedef int (*ip4_session_table_walk_fn_t) (clib_bihash_kv_16_8_t * kvp,
                                            void *ctx);
index b46b33d..433c20e 100644 (file)
@@ -260,8 +260,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
                "the server");
   server_local_st_index = application_local_session_table (server);
-  local_listener = session_lookup_session_endpoint (server_local_st_index,
-                                                   &server_sep);
+  local_listener =
+    session_lookup_local_session_endpoint (server_local_st_index,
+                                          &server_sep);
   SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
                "listener should exist in local table");
 
@@ -312,8 +313,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
 
   s = session_lookup_listener (server_st_index, &server_sep);
   SESSION_TEST ((s == 0), "listener should not exist in global table");
-  local_listener = session_lookup_session_endpoint (server_local_st_index,
-                                                   &server_sep);
+  local_listener =
+    session_lookup_local_session_endpoint (server_local_st_index,
+                                          &server_sep);
   SESSION_TEST ((s == 0), "listener should not exist in local table");
 
   detach_args.app_index = server_index;
@@ -337,8 +339,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   s = session_lookup_listener (server_st_index, &server_sep);
   SESSION_TEST ((s == 0), "listener should not exist in global table");
   server_local_st_index = application_local_session_table (server);
-  local_listener = session_lookup_session_endpoint (server_local_st_index,
-                                                   &server_sep);
+  local_listener =
+    session_lookup_local_session_endpoint (server_local_st_index,
+                                          &server_sep);
   SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
                "listener should exist in local table");
 
@@ -346,8 +349,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   error = vnet_unbind (&unbind_args);
   SESSION_TEST ((error == 0), "unbind should work");
 
-  local_listener = session_lookup_session_endpoint (server_local_st_index,
-                                                   &server_sep);
+  local_listener =
+    session_lookup_local_session_endpoint (server_local_st_index,
+                                          &server_sep);
   SESSION_TEST ((local_listener == SESSION_INVALID_INDEX),
                "listener should not exist in local table");
 
@@ -417,8 +421,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
                "the server");
   server_local_st_index = application_local_session_table (server);
-  local_listener = session_lookup_session_endpoint (server_local_st_index,
-                                                   &server_sep);
+  local_listener =
+    session_lookup_local_session_endpoint (server_local_st_index,
+                                          &server_sep);
   SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
                "zero listener should exist in local table");
   detach_args.app_index = server_index;
index 1ed6e0b..51d5065 100644 (file)
@@ -43,6 +43,7 @@ typedef enum
   SESSION_STATE_CONNECTING,
   SESSION_STATE_ACCEPTING,
   SESSION_STATE_READY,
+  SESSION_STATE_CONNECTING_READY,
   SESSION_STATE_CLOSED,
   SESSION_STATE_N_STATES,
 } stream_session_state_t;
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
new file mode 100644 (file)
index 0000000..fc722e4
--- /dev/null
@@ -0,0 +1,306 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/session/transport_interface.h>
+#include <vnet/session/session.h>
+#include <vnet/fib/fib.h>
+
+/**
+ * Per-type vector of transport protocol virtual function tables
+ */
+transport_proto_vft_t *tp_vfts;
+
+/*
+ * Port allocator seed
+ */
+static u32 port_allocator_seed;
+
+/*
+ * Local endpoints table
+ */
+static transport_endpoint_table_t local_endpoints_table;
+
+/*
+ * Pool of local endpoints
+ */
+static transport_endpoint_t *local_endpoints;
+
+/*
+ * Local endpoints pool lock
+ */
+static clib_spinlock_t local_endpoints_lock;
+
+
+u32
+transport_endpoint_lookup (transport_endpoint_table_t * ht, u8 proto,
+                          ip46_address_t * ip, u16 port)
+{
+  clib_bihash_kv_24_8_t kv;
+  int rv;
+
+  kv.key[0] = ip->as_u64[0];
+  kv.key[1] = ip->as_u64[1];
+  kv.key[2] = (u64) port << 8 | (u64) proto;
+
+  rv = clib_bihash_search_inline_24_8 (ht, &kv);
+  if (rv == 0)
+    return kv.value;
+
+  return ENDPOINT_INVALID_INDEX;
+}
+
+void
+transport_endpoint_table_add (transport_endpoint_table_t * ht, u8 proto,
+                             transport_endpoint_t * te, u32 value)
+{
+  clib_bihash_kv_24_8_t kv;
+
+  kv.key[0] = te->ip.as_u64[0];
+  kv.key[1] = te->ip.as_u64[1];
+  kv.key[2] = (u64) te->port << 8 | (u64) proto;
+  kv.value = value;
+
+  clib_bihash_add_del_24_8 (ht, &kv, 1);
+}
+
+void
+transport_endpoint_table_del (transport_endpoint_table_t * ht, u8 proto,
+                             transport_endpoint_t * te)
+{
+  clib_bihash_kv_24_8_t kv;
+
+  kv.key[0] = te->ip.as_u64[0];
+  kv.key[1] = te->ip.as_u64[1];
+  kv.key[2] = (u64) te->port << 8 | (u64) proto;
+
+  clib_bihash_add_del_24_8 (ht, &kv, 0);
+}
+
+/**
+ * Register transport virtual function table.
+ *
+ * @param type - session type (not protocol type)
+ * @param vft - virtual function table
+ */
+void
+transport_register_protocol (transport_proto_t transport_proto, u8 is_ip4,
+                            const transport_proto_vft_t * vft)
+{
+  u8 session_type;
+  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
+
+  vec_validate (tp_vfts, session_type);
+  tp_vfts[session_type] = *vft;
+
+  /* If an offset function is provided, then peek instead of dequeue */
+  session_manager_set_transport_rx_fn (session_type,
+                                      vft->tx_fifo_offset != 0);
+}
+
+/**
+ * Get transport virtual function table
+ *
+ * @param type - session type (not protocol type)
+ */
+transport_proto_vft_t *
+transport_protocol_get_vft (u8 session_type)
+{
+  if (session_type >= vec_len (tp_vfts))
+    return 0;
+  return &tp_vfts[session_type];
+}
+
+#define PORT_MASK ((1 << 16)- 1)
+
+void
+transport_endpoint_del (u32 tepi)
+{
+  clib_spinlock_lock_if_init (&local_endpoints_lock);
+  pool_put_index (local_endpoints, tepi);
+  clib_spinlock_unlock_if_init (&local_endpoints_lock);
+}
+
+always_inline transport_endpoint_t *
+transport_endpoint_new (void)
+{
+  transport_endpoint_t *tep;
+  pool_get (local_endpoints, tep);
+  return tep;
+}
+
+void
+transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port)
+{
+  u32 tepi;
+  transport_endpoint_t *tep;
+
+  /* Cleanup local endpoint if this was an active connect */
+  tepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip,
+                                   clib_net_to_host_u16 (port));
+  if (tepi != ENDPOINT_INVALID_INDEX)
+    {
+      tep = pool_elt_at_index (local_endpoints, tepi);
+      transport_endpoint_table_del (&local_endpoints_table, proto, tep);
+      transport_endpoint_del (tepi);
+    }
+}
+
+/**
+ * Allocate local port and add if successful add entry to local endpoint
+ * table to mark the pair as used.
+ */
+int
+transport_alloc_local_port (u8 proto, ip46_address_t * ip)
+{
+  transport_endpoint_t *tep;
+  u32 tei;
+  u16 min = 1024, max = 65535; /* XXX configurable ? */
+  int tries, limit;
+
+  limit = max - min;
+
+  /* Only support active opens from thread 0 */
+  ASSERT (vlib_get_thread_index () == 0);
+
+  /* Search for first free slot */
+  for (tries = 0; tries < limit; tries++)
+    {
+      u16 port = 0;
+
+      /* Find a port in the specified range */
+      while (1)
+       {
+         port = random_u32 (&port_allocator_seed) & PORT_MASK;
+         if (PREDICT_TRUE (port >= min && port < max))
+           break;
+       }
+
+      /* Look it up. If not found, we're done */
+      tei = transport_endpoint_lookup (&local_endpoints_table, proto, ip,
+                                      port);
+      if (tei == ENDPOINT_INVALID_INDEX)
+       {
+         clib_spinlock_lock_if_init (&local_endpoints_lock);
+         tep = transport_endpoint_new ();
+         clib_memcpy (&tep->ip, ip, sizeof (*ip));
+         tep->port = port;
+         transport_endpoint_table_add (&local_endpoints_table, proto, tep,
+                                       tep - local_endpoints);
+         clib_spinlock_unlock_if_init (&local_endpoints_lock);
+
+         return tep->port;
+       }
+    }
+  return -1;
+}
+
+int
+transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt,
+                               ip46_address_t * lcl_addr, u16 * lcl_port)
+{
+  fib_prefix_t prefix;
+  fib_node_index_t fei;
+  u32 sw_if_index;
+  int port;
+
+  /*
+   * Find the local address and allocate port
+   */
+
+  /* Find a FIB path to the destination */
+  clib_memcpy (&prefix.fp_addr, &rmt->ip, sizeof (rmt->ip));
+  prefix.fp_proto = rmt->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
+  prefix.fp_len = rmt->is_ip4 ? 32 : 128;
+
+  ASSERT (rmt->fib_index != ENDPOINT_INVALID_INDEX);
+  fei = fib_table_lookup (rmt->fib_index, &prefix);
+
+  /* Couldn't find route to destination. Bail out. */
+  if (fei == FIB_NODE_INDEX_INVALID)
+    {
+      clib_warning ("no route to destination");
+      return -1;
+    }
+
+  sw_if_index = rmt->sw_if_index;
+  if (sw_if_index == ENDPOINT_INVALID_INDEX)
+    sw_if_index = fib_entry_get_resolving_interface (fei);
+
+  if (sw_if_index == ENDPOINT_INVALID_INDEX)
+    {
+      clib_warning ("no resolving interface for %U", format_ip46_address,
+                   &rmt->ip, (rmt->is_ip4 == 0) + 1);
+      return -1;
+    }
+
+  memset (lcl_addr, 0, sizeof (*lcl_addr));
+
+  if (rmt->is_ip4)
+    {
+      ip4_address_t *ip4;
+      ip4 = ip_interface_get_first_ip (sw_if_index, 1);
+      lcl_addr->ip4.as_u32 = ip4->as_u32;
+    }
+  else
+    {
+      ip6_address_t *ip6;
+      ip6 = ip_interface_get_first_ip (sw_if_index, 0);
+      if (ip6 == 0)
+       {
+         clib_warning ("no routable ip6 addresses on %U",
+                       format_vnet_sw_if_index_name, vnet_get_main (),
+                       sw_if_index);
+         return -1;
+       }
+      clib_memcpy (&lcl_addr->ip6, ip6, sizeof (*ip6));
+    }
+
+  /* Allocate source port */
+  port = transport_alloc_local_port (proto, lcl_addr);
+  if (port < 1)
+    {
+      clib_warning ("Failed to allocate src port");
+      return -1;
+    }
+  *lcl_port = port;
+  return 0;
+}
+
+void
+transport_init (void)
+{
+  vlib_thread_main_t *vtm = vlib_get_thread_main ();
+  u32 local_endpoints_table_buckets = 250000;
+  u32 local_endpoints_table_memory = 512 << 20;
+  u32 num_threads;
+
+  /* Initialize [port-allocator] random number seed */
+  port_allocator_seed = (u32) clib_cpu_time_now ();
+
+  clib_bihash_init_24_8 (&local_endpoints_table, "local endpoints table",
+                        local_endpoints_table_buckets,
+                        local_endpoints_table_memory);
+  num_threads = 1 /* main thread */  + vtm->n_threads;
+  if (num_threads > 1)
+    clib_spinlock_init (&local_endpoints_lock);
+}
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
index 8c299c4..f2cc80b 100644 (file)
@@ -29,7 +29,7 @@ typedef struct _transport_connection
   ip46_address_t lcl_ip;       /**< Local IP */
   u16 lcl_port;                        /**< Local port */
   u16 rmt_port;                        /**< Remote port */
-  u8 transport_proto;          /**< Protocol id */
+  u8 proto;                    /**< Protocol id */
   u8 is_ip4;                   /**< Flag if IP4 connection */
   u32 fib_index;                       /**< Network namespace */
 
@@ -54,7 +54,7 @@ typedef struct _transport_connection
 #define c_rmt_ip6 connection.rmt_ip.ip6
 #define c_lcl_port connection.lcl_port
 #define c_rmt_port connection.rmt_port
-#define c_transport_proto connection.transport_proto
+#define c_proto connection.proto
 #define c_fib_index connection.fib_index
 #define c_s_index connection.s_index
 #define c_c_index connection.c_index
@@ -69,7 +69,8 @@ typedef struct _transport_connection
 typedef enum _transport_proto
 {
   TRANSPORT_PROTO_TCP,
-  TRANSPORT_PROTO_UDP
+  TRANSPORT_PROTO_UDP,
+  TRANSPORT_N_PROTO
 } transport_proto_t;
 
 #define foreach_transport_connection_fields                            \
@@ -86,6 +87,8 @@ typedef struct _transport_endpoint
 #undef _
 } transport_endpoint_t;
 
+typedef clib_bihash_24_8_t transport_endpoint_table_t;
+
 #define ENDPOINT_INVALID_INDEX ((u32)~0)
 
 always_inline u8
@@ -94,6 +97,31 @@ transport_connection_fib_proto (transport_connection_t * tc)
   return tc->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
 }
 
+always_inline u8
+transport_endpoint_fib_proto (transport_endpoint_t * tep)
+{
+  return tep->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
+}
+
+always_inline u8
+transport_is_stream (u8 proto)
+{
+  return (proto == TRANSPORT_PROTO_TCP);
+}
+
+always_inline u8
+transport_is_dgram (u8 proto)
+{
+  return (proto == TRANSPORT_PROTO_UDP);
+}
+
+int transport_alloc_local_port (u8 proto, ip46_address_t * ip);
+int transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt,
+                                   ip46_address_t * lcl_addr,
+                                   u16 * lcl_port);
+void transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port);
+void transport_init (void);
+
 #endif /* VNET_VNET_URI_TRANSPORT_H_ */
 
 /*
diff --git a/src/vnet/session/transport_interface.c b/src/vnet/session/transport_interface.c
deleted file mode 100644 (file)
index ef8d1e4..0000000
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright (c) 2017 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <vnet/session/transport_interface.h>
-#include <vnet/session/session.h>
-
-/**
- * Per-type vector of transport protocol virtual function tables
- */
-transport_proto_vft_t *tp_vfts;
-
-u32
-transport_endpoint_lookup (transport_endpoint_table_t * ht,
-                          ip46_address_t * ip, u16 port)
-{
-  clib_bihash_kv_24_8_t kv;
-  int rv;
-
-  kv.key[0] = ip->as_u64[0];
-  kv.key[1] = ip->as_u64[1];
-  kv.key[2] = port;
-
-  rv = clib_bihash_search_inline_24_8 (ht, &kv);
-  if (rv == 0)
-    return kv.value;
-
-  return TRANSPORT_ENDPOINT_INVALID_INDEX;
-}
-
-void
-transport_endpoint_table_add (transport_endpoint_table_t * ht,
-                             transport_endpoint_t * te, u32 value)
-{
-  clib_bihash_kv_24_8_t kv;
-
-  kv.key[0] = te->ip.as_u64[0];
-  kv.key[1] = te->ip.as_u64[1];
-  kv.key[2] = te->port;
-  kv.value = value;
-
-  clib_bihash_add_del_24_8 (ht, &kv, 1);
-}
-
-void
-transport_endpoint_table_del (transport_endpoint_table_t * ht,
-                             transport_endpoint_t * te)
-{
-  clib_bihash_kv_24_8_t kv;
-
-  kv.key[0] = te->ip.as_u64[0];
-  kv.key[1] = te->ip.as_u64[1];
-  kv.key[2] = te->port;
-
-  clib_bihash_add_del_24_8 (ht, &kv, 0);
-}
-
-/**
- * Register transport virtual function table.
- *
- * @param type - session type (not protocol type)
- * @param vft - virtual function table
- */
-void
-session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
-                           const transport_proto_vft_t * vft)
-{
-  u8 session_type;
-  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
-
-  vec_validate (tp_vfts, session_type);
-  tp_vfts[session_type] = *vft;
-
-  /* If an offset function is provided, then peek instead of dequeue */
-  session_manager_set_transport_rx_fn (session_type,
-                                      vft->tx_fifo_offset != 0);
-}
-
-/**
- * Get transport virtual function table
- *
- * @param type - session type (not protocol type)
- */
-transport_proto_vft_t *
-session_get_transport_vft (u8 session_type)
-{
-  if (session_type >= vec_len (tp_vfts))
-    return 0;
-  return &tp_vfts[session_type];
-}
-
-/*
- * fd.io coding-style-patch-verification: ON
- *
- * Local Variables:
- * eval: (c-set-style "gnu")
- * End:
- */
index 661221c..079e646 100644 (file)
@@ -56,20 +56,10 @@ typedef struct _transport_proto_vft
   u8 *(*format_half_open) (u8 * s, va_list * args);
 } transport_proto_vft_t;
 
-typedef clib_bihash_24_8_t transport_endpoint_table_t;
-
-#define TRANSPORT_ENDPOINT_INVALID_INDEX ((u32)~0)
-
-u32 transport_endpoint_lookup (transport_endpoint_table_t * ht,
-                              ip46_address_t * ip, u16 port);
-void transport_endpoint_table_add (transport_endpoint_table_t * ht,
-                                  transport_endpoint_t * te, u32 value);
-void transport_endpoint_table_del (transport_endpoint_table_t * ht,
-                                  transport_endpoint_t * te);
-
-void session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
-                                const transport_proto_vft_t * vft);
-transport_proto_vft_t *session_get_transport_vft (u8 session_type);
+void transport_register_protocol (transport_proto_t transport_proto,
+                                 u8 is_ip4,
+                                 const transport_proto_vft_t * vft);
+transport_proto_vft_t *transport_protocol_get_vft (u8 session_type);
 
 #endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */
 
index 7a0d2ea..4258fc4 100644 (file)
@@ -50,7 +50,6 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
   int test_buf_offset;
   u32 bytes_this_chunk;
   session_fifo_event_t evt;
-  static int serial_number = 0;
   svm_fifo_t *txf;
   int rv;
 
@@ -98,7 +97,6 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
          /* Fabricate TX event, send to vpp */
          evt.fifo = txf;
          evt.event_type = FIFO_EVENT_APP_TX;
-         evt.event_id = serial_number++;
 
          if (unix_shared_memory_queue_add
              (tm->vpp_event_queue[txf->master_thread_index], (u8 *) & evt,
@@ -248,12 +246,12 @@ builtin_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
          session_parse_handle (sp->vpp_session_handle,
                                &index, &thread_index);
-         s = stream_session_get_if_valid (index, thread_index);
+         s = session_get_if_valid (index, thread_index);
 
          if (s)
            {
              vnet_disconnect_args_t _a, *a = &_a;
-             a->handle = stream_session_handle (s);
+             a->handle = session_handle (s);
              a->app_index = tm->app_index;
              vnet_disconnect_session (a);
 
@@ -369,7 +367,7 @@ builtin_session_connected_callback (u32 app_index, u32 api_context,
   session->server_rx_fifo->client_session_index = session_index;
   session->server_tx_fifo = s->server_tx_fifo;
   session->server_tx_fifo->client_session_index = session_index;
-  session->vpp_session_handle = stream_session_handle (s);
+  session->vpp_session_handle = session_handle (s);
 
   vec_add1 (tm->connection_index_by_thread[thread_index], session_index);
   __sync_fetch_and_add (&tm->ready_connections, 1);
@@ -403,7 +401,7 @@ builtin_session_disconnect_callback (stream_session_t * s)
 {
   tclient_main_t *tm = &tclient_main;
   vnet_disconnect_args_t _a, *a = &_a;
-  a->handle = stream_session_handle (s);
+  a->handle = session_handle (s);
   a->app_index = tm->app_index;
   vnet_disconnect_session (a);
   return;
index f808400..f307880 100644 (file)
@@ -166,7 +166,6 @@ send_data (builtin_http_server_args * args, u8 * data)
              /* Fabricate TX event, send to vpp */
              evt.fifo = s->server_tx_fifo;
              evt.event_type = FIFO_EVENT_APP_TX;
-             evt.event_id = 0;
 
              unix_shared_memory_queue_add (hsm->vpp_queue[s->thread_index],
                                            (u8 *) & evt,
@@ -346,7 +345,7 @@ http_server_rx_callback (stream_session_t * s)
   /* send the command to a new/recycled vlib process */
   args = clib_mem_alloc (sizeof (*args));
   args->data = vec_dup (hsm->rx_buf);
-  args->session_handle = stream_session_handle (s);
+  args->session_handle = session_handle (s);
 
   /* Send an RPC request via the thread-0 input node */
   if (vlib_get_thread_index () != 0)
@@ -382,7 +381,7 @@ builtin_session_disconnect_callback (stream_session_t * s)
   http_server_main_t *bsm = &http_server_main;
   vnet_disconnect_args_t _a, *a = &_a;
 
-  a->handle = stream_session_handle (s);
+  a->handle = session_handle (s);
   a->app_index = bsm->app_index;
   vnet_disconnect_session (a);
 }
index a51a812..e1e0198 100644 (file)
@@ -32,7 +32,7 @@ delete_proxy_session (stream_session_t * s, int is_active_open)
   uword *p;
   u64 handle;
 
-  handle = stream_session_handle (s);
+  handle = session_handle (s);
 
   clib_spinlock_lock_if_init (&bpm->sessions_lock);
   if (is_active_open)
@@ -88,19 +88,19 @@ delete_proxy_session (stream_session_t * s, int is_active_open)
 
   if (active_open_session)
     {
-      a->handle = stream_session_handle (active_open_session);
+      a->handle = session_handle (active_open_session);
       a->app_index = bpm->active_open_app_index;
       hash_unset (bpm->proxy_session_by_active_open_handle,
-                 stream_session_handle (active_open_session));
+                 session_handle (active_open_session));
       vnet_disconnect_session (a);
     }
 
   if (server_session)
     {
-      a->handle = stream_session_handle (server_session);
+      a->handle = session_handle (server_session);
       a->app_index = bpm->server_app_index;
       hash_unset (bpm->proxy_session_by_server_handle,
-                 stream_session_handle (server_session));
+                 session_handle (server_session));
       vnet_disconnect_session (a);
     }
 }
@@ -171,8 +171,7 @@ server_rx_callback (stream_session_t * s)
   ASSERT (s->thread_index == thread_index);
 
   clib_spinlock_lock_if_init (&bpm->sessions_lock);
-  p =
-    hash_get (bpm->proxy_session_by_server_handle, stream_session_handle (s));
+  p = hash_get (bpm->proxy_session_by_server_handle, session_handle (s));
 
   if (PREDICT_TRUE (p != 0))
     {
@@ -218,7 +217,7 @@ server_rx_callback (stream_session_t * s)
       memset (ps, 0, sizeof (*ps));
       ps->server_rx_fifo = rx_fifo;
       ps->server_tx_fifo = tx_fifo;
-      ps->vpp_server_handle = stream_session_handle (s);
+      ps->vpp_server_handle = session_handle (s);
 
       proxy_index = ps - bpm->sessions;
 
@@ -268,7 +267,7 @@ active_open_connected_callback (u32 app_index, u32 opaque,
   clib_spinlock_lock_if_init (&bpm->sessions_lock);
 
   ps = pool_elt_at_index (bpm->sessions, opaque);
-  ps->vpp_active_open_handle = stream_session_handle (s);
+  ps->vpp_active_open_handle = session_handle (s);
 
   s->server_tx_fifo = ps->server_rx_fifo;
   s->server_rx_fifo = ps->server_tx_fifo;
index b4a52c6..27c4370 100644 (file)
@@ -73,7 +73,7 @@ builtin_session_disconnect_callback (stream_session_t * s)
   builtin_server_main_t *bsm = &builtin_server_main;
   vnet_disconnect_args_t _a, *a = &_a;
 
-  a->handle = stream_session_handle (s);
+  a->handle = session_handle (s);
   a->app_index = bsm->app_index;
   vnet_disconnect_session (a);
 }
@@ -158,7 +158,6 @@ builtin_server_rx_callback (stream_session_t * s)
   svm_fifo_t *tx_fifo, *rx_fifo;
   builtin_server_main_t *bsm = &builtin_server_main;
   session_fifo_event_t evt;
-  static int serial_number = 0;
   u32 thread_index = vlib_get_thread_index ();
 
   ASSERT (s->thread_index == thread_index);
@@ -190,7 +189,6 @@ builtin_server_rx_callback (stream_session_t * s)
          unix_shared_memory_queue_t *q;
          evt.fifo = rx_fifo;
          evt.event_type = FIFO_EVENT_BUILTIN_RX;
-         evt.event_id = 0;
 
          q = bsm->vpp_queue[thread_index];
          if (PREDICT_FALSE (q->cursize == q->maxsize))
@@ -232,7 +230,6 @@ builtin_server_rx_callback (stream_session_t * s)
       /* Fabricate TX event, send to vpp */
       evt.fifo = tx_fifo;
       evt.event_type = FIFO_EVENT_APP_TX;
-      evt.event_id = serial_number++;
 
       if (unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
                                        (u8 *) & evt,
index 34c901e..1c44ef0 100644 (file)
 
 tcp_main_t tcp_main;
 
-void *
-ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4)
-{
-  ip_lookup_main_t *lm4 = &ip4_main.lookup_main;
-  ip_lookup_main_t *lm6 = &ip6_main.lookup_main;
-  ip_interface_address_t *ia = 0;
-
-  if (is_ip4)
-    {
-      /* *INDENT-OFF* */
-      foreach_ip_interface_address (lm4, ia, sw_if_index, 1 /* unnumbered */ ,
-      ({
-        return ip_interface_address_get_address (lm4, ia);
-      }));
-      /* *INDENT-ON* */
-    }
-  else
-    {
-      /* *INDENT-OFF* */
-      foreach_ip_interface_address (lm6, ia, sw_if_index, 1 /* unnumbered */ ,
-      ({
-        ip6_address_t *rv;
-        rv = ip_interface_address_get_address (lm6, ia);
-        /* Trying to use a link-local ip6 src address is a fool's errand */
-        if (!ip6_address_is_link_local_unicast (rv))
-          return rv;
-      }));
-      /* *INDENT-ON* */
-    }
-
-  return 0;
-}
-
 static u32
 tcp_connection_bind (u32 session_index, transport_endpoint_t * lcl)
 {
@@ -83,7 +50,7 @@ tcp_connection_bind (u32 session_index, transport_endpoint_t * lcl)
     }
   ip_copy (&listener->c_lcl_ip, &lcl->ip, lcl->is_ip4);
   listener->c_is_ip4 = lcl->is_ip4;
-  listener->c_transport_proto = TRANSPORT_PROTO_TCP;
+  listener->c_proto = TRANSPORT_PROTO_TCP;
   listener->c_s_index = session_index;
   listener->c_fib_index = lcl->fib_index;
   listener->state = TCP_STATE_LISTEN;
@@ -134,24 +101,6 @@ tcp_session_get_listener (u32 listener_index)
   return &tc->connection;
 }
 
-always_inline void
-transport_endpoint_del (u32 tepi)
-{
-  tcp_main_t *tm = vnet_get_tcp_main ();
-  clib_spinlock_lock_if_init (&tm->local_endpoints_lock);
-  pool_put_index (tm->local_endpoints, tepi);
-  clib_spinlock_unlock_if_init (&tm->local_endpoints_lock);
-}
-
-always_inline transport_endpoint_t *
-transport_endpoint_new (void)
-{
-  tcp_main_t *tm = vnet_get_tcp_main ();
-  transport_endpoint_t *tep;
-  pool_get (tm->local_endpoints, tep);
-  return tep;
-}
-
 /**
  * Cleanup half-open connection
  *
@@ -209,18 +158,10 @@ void
 tcp_connection_cleanup (tcp_connection_t * tc)
 {
   tcp_main_t *tm = &tcp_main;
-  u32 tepi;
-  transport_endpoint_t *tep;
 
   /* Cleanup local endpoint if this was an active connect */
-  tepi = transport_endpoint_lookup (&tm->local_endpoints_table, &tc->c_lcl_ip,
-                                   clib_net_to_host_u16 (tc->c_lcl_port));
-  if (tepi != TRANSPORT_ENDPOINT_INVALID_INDEX)
-    {
-      tep = pool_elt_at_index (tm->local_endpoints, tepi);
-      transport_endpoint_table_del (&tm->local_endpoints_table, tep);
-      transport_endpoint_del (tepi);
-    }
+  transport_endpoint_cleanup (TRANSPORT_PROTO_TCP, &tc->c_lcl_ip,
+                             tc->c_lcl_port);
 
   /* Check if connection is not yet fully established */
   if (tc->state == TCP_STATE_SYN_SENT)
@@ -288,7 +229,7 @@ tcp_connection_reset (tcp_connection_t * tc)
       tcp_connection_cleanup (tc);
       break;
     case TCP_STATE_SYN_SENT:
-      stream_session_connect_notify (&tc->connection, 1 /* fail */ );
+      session_stream_connect_notify (&tc->connection, 1 /* fail */ );
       tcp_connection_cleanup (tc);
       break;
     case TCP_STATE_ESTABLISHED:
@@ -388,57 +329,6 @@ tcp_session_cleanup (u32 conn_index, u32 thread_index)
   tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
 }
 
-#define PORT_MASK ((1 << 16)- 1)
-/**
- * Allocate local port and add if successful add entry to local endpoint
- * table to mark the pair as used.
- */
-int
-tcp_allocate_local_port (ip46_address_t * ip)
-{
-  tcp_main_t *tm = vnet_get_tcp_main ();
-  transport_endpoint_t *tep;
-  u32 tei;
-  u16 min = 1024, max = 65535; /* XXX configurable ? */
-  int tries, limit;
-
-  limit = max - min;
-
-  /* Only support active opens from thread 0 */
-  ASSERT (vlib_get_thread_index () == 0);
-
-  /* Search for first free slot */
-  for (tries = 0; tries < limit; tries++)
-    {
-      u16 port = 0;
-
-      /* Find a port in the specified range */
-      while (1)
-       {
-         port = random_u32 (&tm->port_allocator_seed) & PORT_MASK;
-         if (PREDICT_TRUE (port >= min && port < max))
-           break;
-       }
-
-      /* Look it up */
-      tei = transport_endpoint_lookup (&tm->local_endpoints_table, ip, port);
-      /* If not found, we're done */
-      if (tei == TRANSPORT_ENDPOINT_INVALID_INDEX)
-       {
-         clib_spinlock_lock_if_init (&tm->local_endpoints_lock);
-         tep = transport_endpoint_new ();
-         clib_memcpy (&tep->ip, ip, sizeof (*ip));
-         tep->port = port;
-         transport_endpoint_table_add (&tm->local_endpoints_table, tep,
-                                       tep - tm->local_endpoints);
-         clib_spinlock_unlock_if_init (&tm->local_endpoints_lock);
-
-         return tep->port;
-       }
-    }
-  return -1;
-}
-
 /**
  * Initialize all connection timers as invalid
  */
@@ -574,9 +464,15 @@ tcp_init_snd_vars (tcp_connection_t * tc)
 {
   u32 time_now;
 
-  /* Set random initial sequence */
-  tcp_set_time_now (0);
+  /*
+   * We use the time to randomize iss and for setting up the initial
+   * timestamp. Make sure it's updated otherwise syn and ack in the
+   * handshake may make it look as if time has flown in the opposite
+   * direction for us.
+   */
+  tcp_set_time_now (vlib_get_thread_index ());
   time_now = tcp_time_now ();
+
   tc->iss = random_u32 (&time_now);
   tc->snd_una = tc->iss;
   tc->snd_nxt = tc->iss + 1;
@@ -600,112 +496,70 @@ tcp_connection_init_vars (tcp_connection_t * tc)
   //  tcp_connection_fib_attach (tc);
 }
 
+static int
+tcp_alloc_custom_local_endpoint (tcp_main_t * tm, ip46_address_t * lcl_addr,
+                                u16 * lcl_port, u8 is_ip4)
+{
+  int index, port;
+  if (is_ip4)
+    {
+      index = tm->last_v4_address_rotor++;
+      if (tm->last_v4_address_rotor >= vec_len (tm->ip4_src_addresses))
+       tm->last_v4_address_rotor = 0;
+      lcl_addr->ip4.as_u32 = tm->ip4_src_addresses[index].as_u32;
+    }
+  else
+    {
+      index = tm->last_v6_address_rotor++;
+      if (tm->last_v6_address_rotor >= vec_len (tm->ip6_src_addresses))
+       tm->last_v6_address_rotor = 0;
+      clib_memcpy (&lcl_addr->ip6, &tm->ip6_src_addresses[index],
+                  sizeof (ip6_address_t));
+    }
+  port = transport_alloc_local_port (TRANSPORT_PROTO_TCP, lcl_addr);
+  if (port < 1)
+    {
+      clib_warning ("Failed to allocate src port");
+      return -1;
+    }
+  *lcl_port = port;
+  return 0;
+}
+
 int
 tcp_connection_open (transport_endpoint_t * rmt)
 {
   tcp_main_t *tm = vnet_get_tcp_main ();
   tcp_connection_t *tc;
-  fib_prefix_t prefix;
-  fib_node_index_t fei;
-  u32 sw_if_index;
   ip46_address_t lcl_addr;
-  int lcl_port;
+  u16 lcl_port;
+  int rv;
 
   /*
-   * Find the local address and allocate port
+   * Allocate local endpoint
    */
-  memset (&lcl_addr, 0, sizeof (lcl_addr));
-
-  /* Find a FIB path to the destination */
-  clib_memcpy (&prefix.fp_addr, &rmt->ip, sizeof (rmt->ip));
-  prefix.fp_proto = rmt->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
-  prefix.fp_len = rmt->is_ip4 ? 32 : 128;
-
-  ASSERT (rmt->fib_index != ENDPOINT_INVALID_INDEX);
-  fei = fib_table_lookup (rmt->fib_index, &prefix);
-
-  /* Couldn't find route to destination. Bail out. */
-  if (fei == FIB_NODE_INDEX_INVALID)
-    {
-      clib_warning ("no route to destination");
-      return -1;
-    }
-
-  sw_if_index = rmt->sw_if_index;
-  if (sw_if_index == ENDPOINT_INVALID_INDEX)
-    sw_if_index = fib_entry_get_resolving_interface (fei);
-
-  if (sw_if_index == ENDPOINT_INVALID_INDEX)
-    {
-      clib_warning ("no resolving interface for %U", format_ip46_address,
-                   &rmt->ip, (rmt->is_ip4 == 0) + 1);
-      return -1;
-    }
-
-  if (rmt->is_ip4)
-    {
-      ip4_address_t *ip4;
-      int index;
-      if (vec_len (tm->ip4_src_addresses))
-       {
-         index = tm->last_v4_address_rotor++;
-         if (tm->last_v4_address_rotor >= vec_len (tm->ip4_src_addresses))
-           tm->last_v4_address_rotor = 0;
-         lcl_addr.ip4.as_u32 = tm->ip4_src_addresses[index].as_u32;
-       }
-      else
-       {
-         ip4 = ip_interface_get_first_ip (sw_if_index, 1);
-         lcl_addr.ip4.as_u32 = ip4->as_u32;
-       }
-    }
+  if ((rmt->is_ip4 && vec_len (tm->ip4_src_addresses))
+      || (!rmt->is_ip4 && vec_len (tm->ip6_src_addresses)))
+    rv = tcp_alloc_custom_local_endpoint (tm, &lcl_addr, &lcl_port,
+                                         rmt->is_ip4);
   else
-    {
-      ip6_address_t *ip6;
-      int index;
-
-      if (vec_len (tm->ip6_src_addresses))
-       {
-         index = tm->last_v6_address_rotor++;
-         if (tm->last_v6_address_rotor >= vec_len (tm->ip6_src_addresses))
-           tm->last_v6_address_rotor = 0;
-         clib_memcpy (&lcl_addr.ip6, &tm->ip6_src_addresses[index],
-                      sizeof (*ip6));
-       }
-      else
-       {
-         ip6 = ip_interface_get_first_ip (sw_if_index, 0);
-         if (ip6 == 0)
-           {
-             clib_warning ("no routable ip6 addresses on %U",
-                           format_vnet_sw_if_index_name, vnet_get_main (),
-                           sw_if_index);
-             return -1;
-           }
-
-         clib_memcpy (&lcl_addr.ip6, ip6, sizeof (*ip6));
-       }
-    }
+    rv = transport_alloc_local_endpoint (TRANSPORT_PROTO_TCP,
+                                        rmt, &lcl_addr, &lcl_port);
 
-  /* Allocate source port */
-  lcl_port = tcp_allocate_local_port (&lcl_addr);
-  if (lcl_port < 1)
-    {
-      clib_warning ("Failed to allocate src port");
-      return -1;
-    }
+  if (rv)
+    return -1;
 
   /*
    * Create connection and send SYN
    */
   clib_spinlock_lock_if_init (&tm->half_open_lock);
   tc = tcp_half_open_connection_new ();
-  clib_memcpy (&tc->c_rmt_ip, &rmt->ip, sizeof (ip46_address_t));
-  clib_memcpy (&tc->c_lcl_ip, &lcl_addr, sizeof (ip46_address_t));
+  ip_copy (&tc->c_rmt_ip, &rmt->ip, rmt->is_ip4);
+  ip_copy (&tc->c_lcl_ip, &lcl_addr, rmt->is_ip4);
   tc->c_rmt_port = rmt->port;
   tc->c_lcl_port = clib_host_to_net_u16 (lcl_port);
   tc->c_is_ip4 = rmt->is_ip4;
-  tc->c_transport_proto = TRANSPORT_PROTO_TCP;
+  tc->c_proto = TRANSPORT_PROTO_TCP;
   tc->c_fib_index = rmt->fib_index;
   /* The other connection vars will be initialized after SYN ACK */
   tcp_connection_timers_init (tc);
@@ -1195,7 +1049,7 @@ tcp_timer_establish_handler (u32 conn_index)
   if (tc)
     {
       ASSERT (tc->state == TCP_STATE_SYN_SENT);
-      stream_session_connect_notify (&tc->connection, 1 /* fail */ );
+      session_stream_connect_notify (&tc->connection, 1 /* fail */ );
       TCP_DBG ("establish pop: %U", format_tcp_connection, tc, 2);
     }
   else
@@ -1328,8 +1182,8 @@ tcp_main_enable (vlib_main_t * vm)
   ip6_register_protocol (IP_PROTOCOL_TCP, tcp6_input_node.index);
 
   /* Register as transport with session layer */
-  session_register_transport (TRANSPORT_PROTO_TCP, 1, &tcp_proto);
-  session_register_transport (TRANSPORT_PROTO_TCP, 0, &tcp_proto);
+  transport_register_protocol (TRANSPORT_PROTO_TCP, 1, &tcp_proto);
+  transport_register_protocol (TRANSPORT_PROTO_TCP, 0, &tcp_proto);
 
   /*
    * Initialize data structures
@@ -1379,22 +1233,9 @@ tcp_main_enable (vlib_main_t * vm)
   tm->tstamp_ticks_per_clock = vm->clib_time.seconds_per_clock
     / TCP_TSTAMP_RESOLUTION;
 
-  if (tm->local_endpoints_table_buckets == 0)
-    tm->local_endpoints_table_buckets = 250000;
-  if (tm->local_endpoints_table_memory == 0)
-    tm->local_endpoints_table_memory = 512 << 20;
-
-  clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoint table",
-                        tm->local_endpoints_table_buckets,
-                        tm->local_endpoints_table_memory);
-
-  /* Initialize [port-allocator] random number seed */
-  tm->port_allocator_seed = (u32) clib_cpu_time_now ();
-
   if (num_threads > 1)
     {
       clib_spinlock_init (&tm->half_open_lock);
-      clib_spinlock_init (&tm->local_endpoints_lock);
     }
 
   vec_validate (tm->tx_frames[0], num_threads - 1);
index 2a65dfa..b057b88 100644 (file)
@@ -385,13 +385,6 @@ typedef struct _tcp_main
   tcp_connection_t *half_open_connections;
   clib_spinlock_t half_open_lock;
 
-  /* Pool of local TCP endpoints */
-  transport_endpoint_t *local_endpoints;
-
-  /* Local endpoints lookup table */
-  transport_endpoint_table_t local_endpoints_table;
-  clib_spinlock_t local_endpoints_lock;
-
   /* Congestion control algorithms registered */
   tcp_cc_algorithm_t *cc_algos;
 
@@ -412,9 +405,6 @@ typedef struct _tcp_main
   u32 last_v6_address_rotor;
   ip6_address_t *ip6_src_addresses;
 
-  /** Port allocator random number generator seed */
-  u32 port_allocator_seed;
-
   /** vlib buffer size */
   u32 bytes_per_buffer;
 
index 3a32e62..73642df 100644 (file)
@@ -1388,8 +1388,8 @@ tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b,
       return TCP_ERROR_PURE_ACK;
     }
 
-  written = stream_session_enqueue_data (&tc->connection, b, 0,
-                                        1 /* queue event */ , 1);
+  written = session_enqueue_stream_connection (&tc->connection, b, 0,
+                                              1 /* queue event */ , 1);
 
   TCP_EVT_DBG (TCP_EVT_INPUT, tc, 0, data_len, written);
 
@@ -1450,9 +1450,10 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
     }
 
   /* Enqueue out-of-order data with relative offset */
-  rv = stream_session_enqueue_data (&tc->connection, b,
-                                   vnet_buffer (b)->tcp.seq_number -
-                                   tc->rcv_nxt, 0 /* queue event */ , 0);
+  rv = session_enqueue_stream_connection (&tc->connection, b,
+                                         vnet_buffer (b)->tcp.seq_number -
+                                         tc->rcv_nxt, 0 /* queue event */ ,
+                                         0);
 
   /* Nothing written */
   if (rv)
@@ -1669,15 +1670,16 @@ tcp_set_rx_trace_data (tcp_rx_trace_t * t0, tcp_connection_t * tc0,
 }
 
 always_inline void
-tcp_established_inc_counter (vlib_main_t * vm, u8 is_ip4, u8 evt, u8 val)
+tcp_node_inc_counter (vlib_main_t * vm, u32 tcp4_node, u32 tcp6_node,
+                     u8 is_ip4, u8 evt, u8 val)
 {
   if (PREDICT_TRUE (!val))
     return;
 
   if (is_ip4)
-    vlib_node_increment_counter (vm, tcp4_established_node.index, evt, val);
+    vlib_node_increment_counter (vm, tcp4_node, evt, val);
   else
-    vlib_node_increment_counter (vm, tcp6_established_node.index, evt, val);
+    vlib_node_increment_counter (vm, tcp6_node, evt, val);
 }
 
 always_inline uword
@@ -1787,8 +1789,11 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
     }
 
-  errors = session_manager_flush_enqueue_events (my_thread_index);
-  tcp_established_inc_counter (vm, is_ip4, TCP_ERROR_EVENT_FIFO_FULL, errors);
+  errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_TCP,
+                                                my_thread_index);
+  tcp_node_inc_counter (vm, is_ip4, tcp4_established_node.index,
+                       tcp6_established_node.index,
+                       TCP_ERROR_EVENT_FIFO_FULL, errors);
   tcp_flush_frame_to_output (vm, my_thread_index, is_ip4);
 
   return from_frame->n_vectors;
@@ -1873,8 +1878,7 @@ tcp_lookup_is_valid (tcp_connection_t * tc, tcp_header_t * hdr)
     {
       handle = session_lookup_half_open_handle (&tc->connection);
       tmp = session_lookup_half_open_connection (handle & 0xFFFFFFFF,
-                                                tc->c_transport_proto,
-                                                tc->c_is_ip4);
+                                                tc->c_proto, tc->c_is_ip4);
 
       if (tmp)
        {
@@ -2117,7 +2121,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
 
              /* Notify app that we have connection. If session layer can't
               * allocate session send reset */
-             if (stream_session_connect_notify (&new_tc0->connection, 0))
+             if (session_stream_connect_notify (&new_tc0->connection, 0))
                {
                  clib_warning ("connect notify fail");
                  tcp_send_reset_w_pkt (new_tc0, b0, is_ip4);
@@ -2138,7 +2142,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              new_tc0->state = TCP_STATE_SYN_RCVD;
 
              /* Notify app that we have connection */
-             if (stream_session_connect_notify (&new_tc0->connection, 0))
+             if (session_stream_connect_notify (&new_tc0->connection, 0))
                {
                  tcp_connection_cleanup (new_tc0);
                  tcp_send_reset_w_pkt (tc0, b0, is_ip4);
@@ -2187,17 +2191,11 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
     }
 
-  errors = session_manager_flush_enqueue_events (my_thread_index);
-  if (errors)
-    {
-      if (is_ip4)
-       vlib_node_increment_counter (vm, tcp4_established_node.index,
-                                    TCP_ERROR_EVENT_FIFO_FULL, errors);
-      else
-       vlib_node_increment_counter (vm, tcp6_established_node.index,
-                                    TCP_ERROR_EVENT_FIFO_FULL, errors);
-    }
-
+  errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_TCP,
+                                                my_thread_index);
+  tcp_node_inc_counter (vm, is_ip4, tcp4_syn_sent_node.index,
+                       tcp6_syn_sent_node.index,
+                       TCP_ERROR_EVENT_FIFO_FULL, errors);
   return from_frame->n_vectors;
 }
 
@@ -2259,6 +2257,9 @@ VLIB_REGISTER_NODE (tcp6_syn_sent_node) =
 
 VLIB_NODE_FUNCTION_MULTIARCH (tcp6_syn_sent_node, tcp6_syn_sent_rcv);
 
+vlib_node_registration_t tcp4_rcv_process_node;
+vlib_node_registration_t tcp6_rcv_process_node;
+
 /**
  * Handles reception for all states except LISTEN, SYN-SENT and ESTABLISHED
  * as per RFC793 p. 64
@@ -2583,16 +2584,11 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
     }
 
-  errors = session_manager_flush_enqueue_events (my_thread_index);
-  if (errors)
-    {
-      if (is_ip4)
-       vlib_node_increment_counter (vm, tcp4_established_node.index,
-                                    TCP_ERROR_EVENT_FIFO_FULL, errors);
-      else
-       vlib_node_increment_counter (vm, tcp6_established_node.index,
-                                    TCP_ERROR_EVENT_FIFO_FULL, errors);
-    }
+  errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_TCP,
+                                                my_thread_index);
+  tcp_node_inc_counter (vm, is_ip4, tcp4_rcv_process_node.index,
+                       tcp6_rcv_process_node.index,
+                       TCP_ERROR_EVENT_FIFO_FULL, errors);
 
   return from_frame->n_vectors;
 }
@@ -2966,12 +2962,13 @@ tcp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                                  + tcp_header_bytes (tcp0));
              n_data_bytes0 = clib_net_to_host_u16 (ip40->length)
                - n_advance_bytes0;
-             tconn =
-               session_lookup_connection_wt4 (fib_index0, &ip40->dst_address,
-                                              &ip40->src_address,
-                                              tcp0->dst_port, tcp0->src_port,
-                                              TRANSPORT_PROTO_TCP,
-                                              my_thread_index);
+             tconn = session_lookup_connection_wt4 (fib_index0,
+                                                    &ip40->dst_address,
+                                                    &ip40->src_address,
+                                                    tcp0->dst_port,
+                                                    tcp0->src_port,
+                                                    TRANSPORT_PROTO_TCP,
+                                                    my_thread_index);
              tc0 = tcp_get_connection_from_transport (tconn);
              ASSERT (tcp_lookup_is_valid (tc0, tcp0));
            }
@@ -2983,12 +2980,13 @@ tcp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              n_data_bytes0 = clib_net_to_host_u16 (ip60->payload_length)
                - n_advance_bytes0;
              n_advance_bytes0 += sizeof (ip60[0]);
-             tconn =
-               session_lookup_connection_wt6 (fib_index0, &ip60->dst_address,
-                                              &ip60->src_address,
-                                              tcp0->dst_port, tcp0->src_port,
-                                              TRANSPORT_PROTO_TCP,
-                                              my_thread_index);
+             tconn = session_lookup_connection_wt6 (fib_index0,
+                                                    &ip60->dst_address,
+                                                    &ip60->src_address,
+                                                    tcp0->dst_port,
+                                                    tcp0->src_port,
+                                                    TRANSPORT_PROTO_TCP,
+                                                    my_thread_index);
              tc0 = tcp_get_connection_from_transport (tconn);
              ASSERT (tcp_lookup_is_valid (tc0, tcp0));
            }
index 2018855..021f416 100644 (file)
@@ -1574,7 +1574,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000103);
   tc->connection.lcl_port = 35051;
   tc->connection.rmt_port = 53764;
-  tc->connection.transport_proto = 0;
+  tc->connection.proto = 0;
   clib_memcpy (tc1, &tc->connection, sizeof (*tc1));
 
   pool_get (session_manager_main.sessions[0], s);
@@ -1590,7 +1590,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000102);
   tc->connection.lcl_port = 38225;
   tc->connection.rmt_port = 53764;
-  tc->connection.transport_proto = 0;
+  tc->connection.proto = 0;
   clib_memcpy (tc2, &tc->connection, sizeof (*tc2));
 
   /*
@@ -1601,7 +1601,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   tconn = session_lookup_connection_wt4 (0, &tc1->lcl_ip.ip4,
                                         &tc1->rmt_ip.ip4,
                                         tc1->lcl_port, tc1->rmt_port,
-                                        tc1->transport_proto, 0);
+                                        tc1->proto, 0);
   cmp = (memcmp (&tconn->rmt_ip, &tc1->rmt_ip, sizeof (tc1->rmt_ip)) == 0);
   TCP_TEST ((cmp), "rmt ip is identical %d", cmp);
   TCP_TEST ((tconn->lcl_port == tc1->lcl_port),
@@ -1614,7 +1614,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   tconn = session_lookup_connection_wt4 (0, &tc2->lcl_ip.ip4,
                                         &tc2->rmt_ip.ip4,
                                         tc2->lcl_port, tc2->rmt_port,
-                                        tc2->transport_proto, 0);
+                                        tc2->proto, 0);
   TCP_TEST ((tconn == 0), "lookup result should be null");
 
   /*
@@ -1624,12 +1624,12 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   tconn = session_lookup_connection_wt4 (0, &tc1->lcl_ip.ip4,
                                         &tc1->rmt_ip.ip4,
                                         tc1->lcl_port, tc1->rmt_port,
-                                        tc1->transport_proto, 0);
+                                        tc1->proto, 0);
   TCP_TEST ((tconn == 0), "lookup result should be null");
   tconn = session_lookup_connection_wt4 (0, &tc2->lcl_ip.ip4,
                                         &tc2->rmt_ip.ip4,
                                         tc2->lcl_port, tc2->rmt_port,
-                                        tc2->transport_proto, 0);
+                                        tc2->proto, 0);
   TCP_TEST ((tconn == 0), "lookup result should be null");
 
   /*
@@ -1639,7 +1639,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   tconn = session_lookup_connection_wt4 (0, &tc2->lcl_ip.ip4,
                                         &tc2->rmt_ip.ip4,
                                         tc2->lcl_port, tc2->rmt_port,
-                                        tc2->transport_proto, 0);
+                                        tc2->proto, 0);
   TCP_TEST ((tconn == 0), "lookup result should be null");
 
   return 0;
index 23f0854..593642c 100644 (file)
@@ -73,7 +73,6 @@ builtin_server_rx_callback (stream_session_t * s)
       /* Fabricate TX event, send to ourselves */
       evt.fifo = tx_fifo;
       evt.event_type = FIFO_EVENT_APP_TX;
-      evt.event_id = 0;
       q = session_manager_get_vpp_event_queue (s->thread_index);
       unix_shared_memory_queue_add (q, (u8 *) & evt,
                                    0 /* do wait for mutex */ );
index 0e0336b..c12e837 100644 (file)
 #include <vnet/dpo/load_balance.h>
 #include <vnet/fib/ip4_fib.h>
 
-udp_uri_main_t udp_uri_main;
-
-u32
-udp_session_bind_ip4 (u32 session_index, transport_endpoint_t * lcl)
+udp_connection_t *
+udp_connection_alloc (u32 thread_index)
 {
-  udp_uri_main_t *um = vnet_get_udp_main ();
-  udp_connection_t *listener;
+  udp_main_t *um = &udp_main;
+  udp_connection_t *uc;
+  u32 will_expand = 0;
+  pool_get_aligned_will_expand (um->connections[thread_index], will_expand,
+                               CLIB_CACHE_LINE_BYTES);
 
-  pool_get (um->udp_listeners, listener);
-  memset (listener, 0, sizeof (udp_connection_t));
-  listener->c_lcl_port = lcl->port;
-  listener->c_lcl_ip4.as_u32 = lcl->ip.ip4.as_u32;
-  listener->c_transport_proto = TRANSPORT_PROTO_UDP;
-  udp_register_dst_port (um->vlib_main, clib_net_to_host_u16 (lcl->port),
-                        udp4_uri_input_node.index, 1 /* is_ipv4 */ );
-  return 0;
+  if (PREDICT_FALSE (will_expand))
+    {
+      clib_spinlock_lock_if_init (&udp_main.peekers_write_locks
+                                 [thread_index]);
+      pool_get_aligned (udp_main.connections[thread_index], uc,
+                       CLIB_CACHE_LINE_BYTES);
+      clib_spinlock_unlock_if_init (&udp_main.peekers_write_locks
+                                   [thread_index]);
+    }
+  else
+    {
+      pool_get_aligned (um->connections[thread_index], uc,
+                       CLIB_CACHE_LINE_BYTES);
+    }
+  memset (uc, 0, sizeof (*uc));
+  uc->c_c_index = uc - um->connections[thread_index];
+  uc->c_thread_index = thread_index;
+  uc->c_proto = TRANSPORT_PROTO_UDP;
+  return uc;
 }
 
-u32
-udp_session_bind_ip6 (u32 session_index, transport_endpoint_t * lcl)
+void
+udp_connection_free (udp_connection_t * uc)
 {
-  udp_uri_main_t *um = vnet_get_udp_main ();
-  udp_connection_t *listener;
-
-  pool_get (um->udp_listeners, listener);
-  listener->c_lcl_port = lcl->port;
-  clib_memcpy (&listener->c_lcl_ip6, &lcl->ip.ip6, sizeof (ip6_address_t));
-  listener->c_transport_proto = TRANSPORT_PROTO_UDP;
-  udp_register_dst_port (um->vlib_main, clib_net_to_host_u16 (lcl->port),
-                        udp4_uri_input_node.index, 0 /* is_ipv4 */ );
-  return 0;
+  pool_put (udp_main.connections[uc->c_thread_index], uc);
+  if (CLIB_DEBUG)
+    memset (uc, 0xFA, sizeof (*uc));
 }
 
 u32
-udp_session_unbind_ip4 (u32 listener_index)
+udp_session_bind (u32 session_index, transport_endpoint_t * lcl)
 {
+  udp_main_t *um = vnet_get_udp_main ();
   vlib_main_t *vm = vlib_get_main ();
   udp_connection_t *listener;
-  listener = udp_listener_get (listener_index);
+  u32 node_index;
+  void *iface_ip;
+  udp_dst_port_info_t *pi;
 
-  /* deregister the udp_local mapping */
-  udp_unregister_dst_port (vm, listener->c_lcl_port, 1 /* is_ipv4 */ );
-  return 0;
+  pi = udp_get_dst_port_info (um, lcl->port, lcl->is_ip4);
+  if (pi)
+    return -1;
+
+  pool_get (um->listener_pool, listener);
+  memset (listener, 0, sizeof (udp_connection_t));
+
+  listener->c_lcl_port = lcl->port;
+  listener->c_c_index = listener - um->listener_pool;
+
+  /* If we are provided a sw_if_index, bind using one of its ips */
+  if (ip_is_zero (&lcl->ip, 1) && lcl->sw_if_index != ENDPOINT_INVALID_INDEX)
+    {
+      if ((iface_ip = ip_interface_get_first_ip (lcl->sw_if_index,
+                                                lcl->is_ip4)))
+       ip_set (&lcl->ip, iface_ip, lcl->is_ip4);
+    }
+  ip_copy (&listener->c_lcl_ip, &lcl->ip, lcl->is_ip4);
+  listener->c_is_ip4 = lcl->is_ip4;
+  listener->c_proto = TRANSPORT_PROTO_UDP;
+  listener->c_s_index = session_index;
+  listener->c_fib_index = lcl->fib_index;
+
+  node_index = lcl->is_ip4 ? udp4_input_node.index : udp6_input_node.index;
+  udp_register_dst_port (vm, clib_net_to_host_u16 (lcl->port), node_index,
+                        1 /* is_ipv4 */ );
+  return listener->c_c_index;
 }
 
 u32
-udp_session_unbind_ip6 (u32 listener_index)
+udp_session_unbind (u32 listener_index)
 {
   vlib_main_t *vm = vlib_get_main ();
-  udp_connection_t *listener;
 
+  udp_connection_t *listener;
   listener = udp_listener_get (listener_index);
-
-  /* deregister the udp_local mapping */
-  udp_unregister_dst_port (vm, listener->c_lcl_port, 0 /* is_ipv4 */ );
+  udp_unregister_dst_port (vm, clib_net_to_host_u16 (listener->c_lcl_port),
+                          listener->c_is_ip4);
   return 0;
 }
 
@@ -90,214 +121,220 @@ udp_session_get_listener (u32 listener_index)
 }
 
 u32
-udp_push_header (transport_connection_t * tconn, vlib_buffer_t * b)
+udp_push_header (transport_connection_t * tc, vlib_buffer_t * b)
 {
-  udp_connection_t *us;
-  u8 *data;
-  udp_header_t *udp;
+  udp_connection_t *uc;
+  vlib_main_t *vm = vlib_get_main ();
 
-  us = (udp_connection_t *) tconn;
+  uc = udp_get_connection_from_transport (tc);
 
-  if (tconn->is_ip4)
-    {
-      ip4_header_t *ip;
-
-      data = vlib_buffer_get_current (b);
-      udp = (udp_header_t *) (data - sizeof (udp_header_t));
-      ip = (ip4_header_t *) ((u8 *) udp - sizeof (ip4_header_t));
-
-      /* Build packet header, swap rx key src + dst fields */
-      ip->src_address.as_u32 = us->c_lcl_ip4.as_u32;
-      ip->dst_address.as_u32 = us->c_rmt_ip4.as_u32;
-      ip->ip_version_and_header_length = 0x45;
-      ip->ttl = 254;
-      ip->protocol = IP_PROTOCOL_UDP;
-      ip->length = clib_host_to_net_u16 (b->current_length + sizeof (*udp));
-      ip->checksum = ip4_header_checksum (ip);
-
-      udp->src_port = us->c_lcl_port;
-      udp->dst_port = us->c_rmt_port;
-      udp->length = clib_host_to_net_u16 (b->current_length);
-      udp->checksum = 0;
-
-      b->current_length = sizeof (*ip) + sizeof (*udp);
-      return SESSION_QUEUE_NEXT_IP4_LOOKUP;
-    }
+  vlib_buffer_push_udp (b, uc->c_lcl_port, uc->c_rmt_port, 1);
+  if (tc->is_ip4)
+    vlib_buffer_push_ip4 (vm, b, &uc->c_lcl_ip4, &uc->c_rmt_ip4,
+                         IP_PROTOCOL_UDP, 1);
   else
     {
-      vlib_main_t *vm = vlib_get_main ();
-      ip6_header_t *ip;
-      u16 payload_length;
-      int bogus = ~0;
-
-      data = vlib_buffer_get_current (b);
-      udp = (udp_header_t *) (data - sizeof (udp_header_t));
-      ip = (ip6_header_t *) ((u8 *) udp - sizeof (ip6_header_t));
-
-      /* Build packet header, swap rx key src + dst fields */
-      clib_memcpy (&ip->src_address, &us->c_lcl_ip6, sizeof (ip6_address_t));
-      clib_memcpy (&ip->dst_address, &us->c_rmt_ip6, sizeof (ip6_address_t));
-
-      ip->ip_version_traffic_class_and_flow_label =
-       clib_host_to_net_u32 (0x6 << 28);
-
-      ip->hop_limit = 0xff;
-      ip->protocol = IP_PROTOCOL_UDP;
-
-      payload_length = vlib_buffer_length_in_chain (vm, b);
-      payload_length -= sizeof (*ip);
-
-      ip->payload_length = clib_host_to_net_u16 (payload_length);
-
-      udp->checksum = ip6_tcp_udp_icmp_compute_checksum (vm, b, ip, &bogus);
-      ASSERT (!bogus);
-
-      udp->src_port = us->c_lcl_port;
-      udp->dst_port = us->c_rmt_port;
-      udp->length = clib_host_to_net_u16 (b->current_length);
-      udp->checksum = 0;
-
-      b->current_length = sizeof (*ip) + sizeof (*udp);
-
-      return SESSION_QUEUE_NEXT_IP6_LOOKUP;
+      ip6_header_t *ih;
+      ih = vlib_buffer_push_ip6 (vm, b, &uc->c_lcl_ip6, &uc->c_rmt_ip6,
+                                IP_PROTOCOL_UDP);
+      vnet_buffer (b)->l3_hdr_offset = (u8 *) ih - b->data;
     }
+  vnet_buffer (b)->sw_if_index[VLIB_RX] = 0;
+  vnet_buffer (b)->sw_if_index[VLIB_TX] = ~0;
+  b->flags |= VNET_BUFFER_F_LOCALLY_ORIGINATED;
+
+  return 0;
 }
 
 transport_connection_t *
-udp_session_get (u32 connection_index, u32 my_thread_index)
+udp_session_get (u32 connection_index, u32 thread_index)
 {
-  udp_uri_main_t *um = vnet_get_udp_main ();
+  udp_connection_t *uc;
+  uc = udp_connection_get (connection_index, thread_index);
+  if (uc)
+    return &uc->connection;
+  return 0;
+}
 
-  udp_connection_t *us;
-  us =
-    pool_elt_at_index (um->udp_sessions[my_thread_index], connection_index);
-  return &us->connection;
+void
+udp_session_close (u32 connection_index, u32 thread_index)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  udp_connection_t *uc;
+  uc = udp_connection_get (connection_index, thread_index);
+  if (uc)
+    {
+      udp_unregister_dst_port (vm, clib_net_to_host_u16 (uc->c_lcl_port),
+                              uc->c_is_ip4);
+      stream_session_delete_notify (&uc->connection);
+      udp_connection_free (uc);
+    }
 }
 
 void
-udp_session_close (u32 connection_index, u32 my_thread_index)
+udp_session_cleanup (u32 connection_index, u32 thread_index)
 {
-  udp_uri_main_t *um = vnet_get_udp_main ();
-  pool_put_index (um->udp_sessions[my_thread_index], connection_index);
+  udp_connection_t *uc;
+  uc = udp_connection_get (connection_index, thread_index);
+  if (uc)
+    udp_connection_free (uc);
 }
 
 u8 *
-format_udp_session_ip4 (u8 * s, va_list * args)
+format_udp_connection_id (u8 * s, va_list * args)
 {
-  u32 uci = va_arg (*args, u32);
-  u32 thread_index = va_arg (*args, u32);
-  udp_connection_t *u4;
-
-  u4 = udp_connection_get (uci, thread_index);
+  udp_connection_t *uc = va_arg (*args, udp_connection_t *);
+  if (!uc)
+    return s;
+  if (uc->c_is_ip4)
+    s = format (s, "[#%d][%s] %U:%d->%U:%d", uc->c_thread_index, "U",
+               format_ip4_address, &uc->c_lcl_ip4,
+               clib_net_to_host_u16 (uc->c_lcl_port), format_ip4_address,
+               &uc->c_rmt_ip4, clib_net_to_host_u16 (uc->c_rmt_port));
+  else
+    s = format (s, "[#%d][%s] %U:%d->%U:%d", uc->c_thread_index, "U",
+               format_ip6_address, &uc->c_lcl_ip6,
+               clib_net_to_host_u16 (uc->c_lcl_port), format_ip6_address,
+               &uc->c_rmt_ip6, clib_net_to_host_u16 (uc->c_rmt_port));
+  return s;
+}
 
-  s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip4_address,
-             &u4->c_lcl_ip4, clib_net_to_host_u16 (u4->c_lcl_port),
-             format_ip4_address, &u4->c_rmt_ip4,
-             clib_net_to_host_u16 (u4->c_rmt_port));
+u8 *
+format_udp_connection (u8 * s, va_list * args)
+{
+  udp_connection_t *uc = va_arg (*args, udp_connection_t *);
+  u32 verbose = va_arg (*args, u32);
+  if (!uc)
+    return s;
+  s = format (s, "%-50U", format_udp_connection_id, uc);
+  if (verbose)
+    {
+      if (verbose == 1)
+       s = format (s, "%-15s", "-");
+      else
+       s = format (s, "\n");
+    }
   return s;
 }
 
 u8 *
-format_udp_session_ip6 (u8 * s, va_list * args)
+format_udp_session (u8 * s, va_list * args)
 {
   u32 uci = va_arg (*args, u32);
   u32 thread_index = va_arg (*args, u32);
-  udp_connection_t *tc = udp_connection_get (uci, thread_index);
-  s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip6_address,
-             &tc->c_lcl_ip6, clib_net_to_host_u16 (tc->c_lcl_port),
-             format_ip6_address, &tc->c_rmt_ip6,
-             clib_net_to_host_u16 (tc->c_rmt_port));
-  return s;
+  u32 verbose = va_arg (*args, u32);
+  udp_connection_t *uc;
+
+  uc = udp_connection_get (uci, thread_index);
+  return format (s, "%U", format_udp_connection, uc, verbose);
 }
 
 u8 *
-format_udp_listener_session_ip4 (u8 * s, va_list * args)
+format_udp_half_open_session (u8 * s, va_list * args)
 {
-  u32 tci = va_arg (*args, u32);
-  udp_connection_t *tc = udp_listener_get (tci);
-  s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip4_address,
-             &tc->c_lcl_ip4, clib_net_to_host_u16 (tc->c_lcl_port),
-             format_ip4_address, &tc->c_rmt_ip4,
-             clib_net_to_host_u16 (tc->c_rmt_port));
-  return s;
+  clib_warning ("BUG");
+  return 0;
 }
 
 u8 *
-format_udp_listener_session_ip6 (u8 * s, va_list * args)
+format_udp_listener_session (u8 * s, va_list * args)
 {
   u32 tci = va_arg (*args, u32);
-  udp_connection_t *tc = udp_listener_get (tci);
-  s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip6_address,
-             &tc->c_lcl_ip6, clib_net_to_host_u16 (tc->c_lcl_port),
-             format_ip6_address, &tc->c_rmt_ip6,
-             clib_net_to_host_u16 (tc->c_rmt_port));
-  return s;
+  udp_connection_t *uc = udp_listener_get (tci);
+  return format (s, "%U", format_udp_connection, uc);
 }
 
 u16
-udp_send_mss_uri (transport_connection_t * t)
+udp_send_mss (transport_connection_t * t)
 {
   /* TODO figure out MTU of output interface */
-  return 400;
+  return 1460;
 }
 
 u32
-udp_send_space_uri (transport_connection_t * t)
+udp_send_space (transport_connection_t * t)
 {
   /* No constraint on TX window */
   return ~0;
 }
 
 int
-udp_open_connection (transport_endpoint_t * tep)
+udp_open_connection (transport_endpoint_t * rmt)
 {
-  clib_warning ("Not implemented");
-  return 0;
+  udp_main_t *um = vnet_get_udp_main ();
+  vlib_main_t *vm = vlib_get_main ();
+  u32 thread_index = vlib_get_thread_index ();
+  udp_connection_t *uc;
+  ip46_address_t lcl_addr;
+  u32 node_index;
+  u16 lcl_port;
+
+  if (transport_alloc_local_endpoint (TRANSPORT_PROTO_UDP, rmt, &lcl_addr,
+                                     &lcl_port))
+    return -1;
+
+  while (udp_get_dst_port_info (um, lcl_port, rmt->is_ip4))
+    {
+      lcl_port = transport_alloc_local_port (TRANSPORT_PROTO_UDP, &lcl_addr);
+      if (lcl_port < 1)
+       {
+         clib_warning ("Failed to allocate src port");
+         return -1;
+       }
+    }
+
+  node_index = rmt->is_ip4 ? udp4_input_node.index : udp6_input_node.index;
+  udp_register_dst_port (vm, lcl_port, node_index, 1 /* is_ipv4 */ );
+
+  uc = udp_connection_alloc (thread_index);
+  ip_copy (&uc->c_rmt_ip, &rmt->ip, rmt->is_ip4);
+  ip_copy (&uc->c_lcl_ip, &lcl_addr, rmt->is_ip4);
+  uc->c_rmt_port = rmt->port;
+  uc->c_lcl_port = clib_host_to_net_u16 (lcl_port);
+  uc->c_is_ip4 = rmt->is_ip4;
+  uc->c_proto = TRANSPORT_PROTO_UDP;
+  uc->c_fib_index = rmt->fib_index;
+
+  return uc->c_c_index;
 }
 
-/* *INDENT-OFF* */
-const static transport_proto_vft_t udp4_proto = {
-  .bind = udp_session_bind_ip4,
-  .open = udp_open_connection,
-  .unbind = udp_session_unbind_ip4,
-  .push_header = udp_push_header,
-  .get_connection = udp_session_get,
-  .get_listener = udp_session_get_listener,
-  .close = udp_session_close,
-  .send_mss = udp_send_mss_uri,
-  .send_space = udp_send_space_uri,
-  .format_connection = format_udp_session_ip4,
-  .format_listener = format_udp_listener_session_ip4
-};
+transport_connection_t *
+udp_half_open_session_get_transport (u32 conn_index)
+{
+  udp_connection_t *uc;
+  uc = udp_connection_get (conn_index, vlib_get_thread_index ());
+  return &uc->connection;
+}
 
-const static transport_proto_vft_t udp6_proto = {
-  .bind = udp_session_bind_ip6,
+/* *INDENT-OFF* */
+const static transport_proto_vft_t udp_proto = {
+  .bind = udp_session_bind,
   .open = udp_open_connection,
-  .unbind = udp_session_unbind_ip6,
+  .unbind = udp_session_unbind,
   .push_header = udp_push_header,
   .get_connection = udp_session_get,
   .get_listener = udp_session_get_listener,
+  .get_half_open = udp_half_open_session_get_transport,
   .close = udp_session_close,
-  .send_mss = udp_send_mss_uri,
-  .send_space = udp_send_space_uri,
-  .format_connection = format_udp_session_ip6,
-  .format_listener = format_udp_listener_session_ip6
+  .cleanup = udp_session_cleanup,
+  .send_mss = udp_send_mss,
+  .send_space = udp_send_space,
+  .format_connection = format_udp_session,
+  .format_half_open = format_udp_half_open_session,
+  .format_listener = format_udp_listener_session
 };
 /* *INDENT-ON* */
 
 static clib_error_t *
 udp_init (vlib_main_t * vm)
 {
-  udp_uri_main_t *um = vnet_get_udp_main ();
+  udp_main_t *um = vnet_get_udp_main ();
   ip_main_t *im = &ip_main;
   vlib_thread_main_t *tm = vlib_get_thread_main ();
   u32 num_threads;
   clib_error_t *error = 0;
   ip_protocol_info_t *pi;
 
-  um->vlib_main = vm;
-  um->vnet_main = vnet_get_main ();
-
   if ((error = vlib_call_init_function (vm, ip_main_init)))
     return error;
   if ((error = vlib_call_init_function (vm, ip4_lookup_init)))
@@ -318,16 +355,18 @@ udp_init (vlib_main_t * vm)
 
 
   /* Register as transport with URI */
-  session_register_transport (TRANSPORT_PROTO_UDP, 1, &udp4_proto);
-  session_register_transport (TRANSPORT_PROTO_UDP, 0, &udp6_proto);
+  transport_register_protocol (TRANSPORT_PROTO_UDP, 1, &udp_proto);
+  transport_register_protocol (TRANSPORT_PROTO_UDP, 0, &udp_proto);
 
   /*
    * Initialize data structures
    */
 
   num_threads = 1 /* main thread */  + tm->n_threads;
-  vec_validate (um->udp_sessions, num_threads - 1);
-
+  vec_validate (um->connections, num_threads - 1);
+  vec_validate (um->connection_peekers, num_threads - 1);
+  vec_validate (um->peekers_readers_locks, num_threads - 1);
+  vec_validate (um->peekers_write_locks, num_threads - 1);
   return error;
 }
 
index aa37701..920ef96 100644 (file)
 #include <vnet/ip/ip.h>
 #include <vnet/session/transport.h>
 
-typedef struct
-{
-  transport_connection_t connection;         /** must be first */
-
-  /** ersatz MTU to limit fifo pushes to test data size */
-  u32 mtu;
-} udp_connection_t;
-
-typedef struct _udp_uri_main
-{
-  /* Per-worker thread udp connection pools */
-  udp_connection_t **udp_sessions;
-  udp_connection_t *udp_listeners;
-
-  /* convenience */
-  vlib_main_t *vlib_main;
-  vnet_main_t *vnet_main;
-  ip4_main_t *ip4_main;
-  ip6_main_t *ip6_main;
-} udp_uri_main_t;
-
-extern udp_uri_main_t udp_uri_main;
-extern vlib_node_registration_t udp4_uri_input_node;
-
-always_inline udp_uri_main_t *
-vnet_get_udp_main ()
-{
-  return &udp_uri_main;
-}
-
-always_inline udp_connection_t *
-udp_connection_get (u32 conn_index, u32 thread_index)
-{
-  return pool_elt_at_index (udp_uri_main.udp_sessions[thread_index],
-                           conn_index);
-}
-
-always_inline udp_connection_t *
-udp_listener_get (u32 conn_index)
-{
-  return pool_elt_at_index (udp_uri_main.udp_listeners, conn_index);
-}
-
 typedef enum
 {
 #define udp_error(n,s) UDP_ERROR_##n,
@@ -77,6 +34,13 @@ typedef enum
   UDP_N_ERROR,
 } udp_error_t;
 
+typedef struct
+{
+  transport_connection_t connection;         /** must be first */
+  /** ersatz MTU to limit fifo pushes to test data size */
+  u32 mtu;
+} udp_connection_t;
+
 #define foreach_udp4_dst_port                  \
 _ (53, dns)                                    \
 _ (67, dhcp_to_server)                          \
@@ -161,10 +125,108 @@ typedef struct
   u8 punt_unknown4;
   u8 punt_unknown6;
 
-  /* convenience */
-  vlib_main_t *vlib_main;
+  /*
+   * Per-worker thread udp connection pools used with session layer
+   */
+  udp_connection_t **connections;
+  u32 *connection_peekers;
+  clib_spinlock_t *peekers_readers_locks;
+  clib_spinlock_t *peekers_write_locks;
+  udp_connection_t *listener_pool;
+
 } udp_main_t;
 
+extern udp_main_t udp_main;
+extern vlib_node_registration_t udp4_input_node;
+extern vlib_node_registration_t udp6_input_node;
+
+always_inline udp_connection_t *
+udp_connection_get (u32 conn_index, u32 thread_index)
+{
+  if (pool_is_free_index (udp_main.connections[thread_index], conn_index))
+    return 0;
+  return pool_elt_at_index (udp_main.connections[thread_index], conn_index);
+}
+
+always_inline udp_connection_t *
+udp_listener_get (u32 conn_index)
+{
+  return pool_elt_at_index (udp_main.listener_pool, conn_index);
+}
+
+always_inline udp_main_t *
+vnet_get_udp_main ()
+{
+  return &udp_main;
+}
+
+always_inline udp_connection_t *
+udp_get_connection_from_transport (transport_connection_t * tc)
+{
+  return ((udp_connection_t *) tc);
+}
+
+always_inline u32
+udp_connection_index (udp_connection_t * uc)
+{
+  return (uc - udp_main.connections[uc->c_thread_index]);
+}
+
+udp_connection_t *udp_connection_alloc (u32 thread_index);
+
+/**
+ * Acquires a lock that blocks a connection pool from expanding.
+ */
+always_inline void
+udp_pool_add_peeker (u32 thread_index)
+{
+  if (thread_index != vlib_get_thread_index ())
+    return;
+  clib_spinlock_lock_if_init (&udp_main.peekers_readers_locks[thread_index]);
+  udp_main.connection_peekers[thread_index] += 1;
+  if (udp_main.connection_peekers[thread_index] == 1)
+    clib_spinlock_lock_if_init (&udp_main.peekers_write_locks[thread_index]);
+  clib_spinlock_unlock_if_init (&udp_main.peekers_readers_locks
+                               [thread_index]);
+}
+
+always_inline void
+udp_pool_remove_peeker (u32 thread_index)
+{
+  if (thread_index != vlib_get_thread_index ())
+    return;
+  ASSERT (udp_main.connection_peekers[thread_index] > 0);
+  clib_spinlock_lock_if_init (&udp_main.peekers_readers_locks[thread_index]);
+  udp_main.connection_peekers[thread_index] -= 1;
+  if (udp_main.connection_peekers[thread_index] == 0)
+    clib_spinlock_unlock_if_init (&udp_main.peekers_write_locks
+                                 [thread_index]);
+  clib_spinlock_unlock_if_init (&udp_main.peekers_readers_locks
+                               [thread_index]);
+}
+
+always_inline udp_connection_t *
+udp_conenction_clone_safe (u32 connection_index, u32 thread_index)
+{
+  udp_connection_t *old_c, *new_c;
+  u32 current_thread_index = vlib_get_thread_index ();
+  new_c = udp_connection_alloc (current_thread_index);
+
+  /* If during the memcpy pool is reallocated AND the memory allocator
+   * decides to give the old chunk of memory to somebody in a hurry to
+   * scribble something on it, we have a problem. So add this thread as
+   * a session pool peeker.
+   */
+  udp_pool_add_peeker (thread_index);
+  old_c = udp_main.connections[thread_index] + connection_index;
+  clib_memcpy (new_c, old_c, sizeof (*new_c));
+  udp_pool_remove_peeker (thread_index);
+  new_c->c_thread_index = current_thread_index;
+  new_c->c_c_index = udp_connection_index (new_c);
+  return new_c;
+}
+
+
 always_inline udp_dst_port_info_t *
 udp_get_dst_port_info (udp_main_t * um, udp_dst_port_t dst_port, u8 is_ip4)
 {
@@ -174,19 +236,34 @@ udp_get_dst_port_info (udp_main_t * um, udp_dst_port_t dst_port, u8 is_ip4)
 
 format_function_t format_udp_header;
 format_function_t format_udp_rx_trace;
-
 unformat_function_t unformat_udp_header;
 
 void udp_register_dst_port (vlib_main_t * vm,
                            udp_dst_port_t dst_port,
                            u32 node_index, u8 is_ip4);
-
-void
-udp_unregister_dst_port (vlib_main_t * vm,
-                        udp_dst_port_t dst_port, u8 is_ip4);
+void udp_unregister_dst_port (vlib_main_t * vm,
+                             udp_dst_port_t dst_port, u8 is_ip4);
 
 void udp_punt_unknown (vlib_main_t * vm, u8 is_ip4, u8 is_add);
 
+always_inline void *
+vlib_buffer_push_udp (vlib_buffer_t * b, u16 sp, u16 dp, u8 offload_csum)
+{
+  udp_header_t *uh;
+
+  uh = vlib_buffer_push_uninit (b, sizeof (udp_header_t));
+  uh->src_port = sp;
+  uh->dst_port = dp;
+  uh->checksum = 0;
+  uh->length = clib_host_to_net_u16 (b->current_length);
+  if (offload_csum)
+    {
+      b->flags |= VNET_BUFFER_F_OFFLOAD_UDP_CKSUM;
+      vnet_buffer (b)->l4_hdr_offset = (u8 *) uh - b->data;
+    }
+  return uh;
+}
+
 always_inline void
 ip_udp_fixup_one (vlib_main_t * vm, vlib_buffer_t * b0, u8 is_ip4)
 {
index bfdae0a..488e7be 100644 (file)
@@ -19,3 +19,9 @@ udp_error (NONE, "no error")
 udp_error (NO_LISTENER, "no listener for dst port")
 udp_error (LENGTH_ERROR, "UDP packets with length errors")
 udp_error (PUNT, "no listener punt")
+udp_error (ENQUEUED, "UDP packets enqueued")
+udp_error (FIFO_FULL, "UDP fifo full")
+udp_error (NOT_READY, "UDP connection not ready")
+udp_error (LISTENER, "UDP connected session")
+udp_error (CREATE_SESSION, "Failed to create UDP session")
+udp_error (EVENT_FIFO_FULL, "UDP session event fifo full")
index 5d3a185..8170cfb 100644 (file)
 #include <vlibmemory/api.h>
 #include "../session/application_interface.h"
 
-vlib_node_registration_t udp4_uri_input_node;
+static char *udp_error_strings[] = {
+#define udp_error(n,s) s,
+#include "udp_error.def"
+#undef udp_error
+};
 
 typedef struct
 {
-  u32 session;
+  u32 connection;
   u32 disposition;
   u32 thread_index;
-} udp4_uri_input_trace_t;
+} udp_input_trace_t;
 
 /* packet trace format function */
 static u8 *
-format_udp4_uri_input_trace (u8 * s, va_list * args)
+format_udp_input_trace (u8 * s, va_list * args)
 {
   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
-  udp4_uri_input_trace_t *t = va_arg (*args, udp4_uri_input_trace_t *);
+  udp_input_trace_t *t = va_arg (*args, udp_input_trace_t *);
 
-  s = format (s, "UDP4_URI_INPUT: session %d, disposition %d, thread %d",
-             t->session, t->disposition, t->thread_index);
+  s = format (s, "UDP_INPUT: connection %d, disposition %d, thread %d",
+             t->connection, t->disposition, t->thread_index);
   return s;
 }
 
+#define foreach_udp_input_next                 \
+  _ (DROP, "error-drop")
+
 typedef enum
 {
-  UDP4_URI_INPUT_NEXT_DROP,
-  UDP4_URI_INPUT_N_NEXT,
-} udp4_uri_input_next_t;
-
-static char *udp4_uri_input_error_strings[] = {
-#define _(sym,string) string,
-  foreach_session_input_error
+#define _(s, n) UDP_INPUT_NEXT_##s,
+  foreach_udp_input_next
 #undef _
-};
+    UDP_INPUT_N_NEXT,
+} udp_input_next_t;
 
-static uword
-udp4_uri_input_node_fn (vlib_main_t * vm,
-                       vlib_node_runtime_t * node, vlib_frame_t * frame)
+always_inline void
+udp_input_inc_counter (vlib_main_t * vm, u8 is_ip4, u8 evt, u8 val)
+{
+  if (PREDICT_TRUE (!val))
+    return;
+
+  if (is_ip4)
+    vlib_node_increment_counter (vm, udp4_input_node.index, evt, val);
+  else
+    vlib_node_increment_counter (vm, udp6_input_node.index, evt, val);
+}
+
+always_inline uword
+udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
+                   vlib_frame_t * frame, u8 is_ip4)
 {
   u32 n_left_from, *from, *to_next;
-  udp4_uri_input_next_t next_index;
-  udp_uri_main_t *um = vnet_get_udp_main ();
-  session_manager_main_t *smm = vnet_get_session_manager_main ();
+  u32 next_index, errors;
   u32 my_thread_index = vm->thread_index;
-  u8 my_enqueue_epoch;
-  u32 *session_indices_to_enqueue;
-  static u32 serial_number;
-  int i;
-
-  my_enqueue_epoch = ++smm->current_enqueue_epoch[my_thread_index];
 
   from = vlib_frame_vector_args (frame);
   n_left_from = frame->n_vectors;
@@ -90,16 +97,18 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
 
       while (n_left_from > 0 && n_left_to_next > 0)
        {
-         u32 bi0;
+         u32 bi0, fib_index0;
          vlib_buffer_t *b0;
-         u32 next0 = UDP4_URI_INPUT_NEXT_DROP;
-         u32 error0 = SESSION_ERROR_ENQUEUED;
+         u32 next0 = UDP_INPUT_NEXT_DROP;
+         u32 error0 = UDP_ERROR_ENQUEUED;
          udp_header_t *udp0;
-         ip4_header_t *ip0;
-         stream_session_t *s0;
-         svm_fifo_t *f0;
-         u16 udp_len0;
+         ip4_header_t *ip40;
+         ip6_header_t *ip60;
          u8 *data0;
+         stream_session_t *s0;
+         transport_connection_t *tc0 = 0;
+         udp_connection_t *child0, *new_uc0;
+         int written0;
 
          /* speculatively enqueue b0 to the current next frame */
          bi0 = from[0];
@@ -112,89 +121,97 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
          b0 = vlib_get_buffer (vm, bi0);
 
          /* udp_local hands us a pointer to the udp data */
-
          data0 = vlib_buffer_get_current (b0);
          udp0 = (udp_header_t *) (data0 - sizeof (*udp0));
+         fib_index0 = vnet_buffer (b0)->ip.fib_index;
 
-         /* $$$$ fixme: udp_local doesn't do ip options correctly anyhow */
-         ip0 = (ip4_header_t *) (((u8 *) udp0) - sizeof (*ip0));
-         s0 = 0;
-
-         /* lookup session */
-         s0 = session_lookup4 (0, &ip0->dst_address, &ip0->src_address,
-                               udp0->dst_port, udp0->src_port,
-                               TRANSPORT_PROTO_UDP);
+         if (is_ip4)
+           {
+             /* $$$$ fixme: udp_local doesn't do ip options correctly anyhow */
+             ip40 = (ip4_header_t *) (((u8 *) udp0) - sizeof (*ip40));
+             s0 = session_lookup_safe4 (fib_index0, &ip40->dst_address,
+                                        &ip40->src_address, udp0->dst_port,
+                                        udp0->src_port, TRANSPORT_PROTO_UDP);
+           }
+         else
+           {
+             ip60 = (ip6_header_t *) (((u8 *) udp0) - sizeof (*ip60));
+             s0 = session_lookup_safe6 (fib_index0, &ip60->dst_address,
+                                        &ip60->src_address, udp0->dst_port,
+                                        udp0->src_port, TRANSPORT_PROTO_UDP);
+           }
 
-         /* no listener */
          if (PREDICT_FALSE (s0 == 0))
            {
-             error0 = SESSION_ERROR_NO_LISTENER;
+             error0 = UDP_ERROR_NO_LISTENER;
              goto trace0;
            }
 
-         f0 = s0->server_rx_fifo;
-
-         /* established hit */
          if (PREDICT_TRUE (s0->session_state == SESSION_STATE_READY))
            {
-             udp_len0 = clib_net_to_host_u16 (udp0->length);
-
-             if (PREDICT_FALSE (udp_len0 > svm_fifo_max_enqueue (f0)))
-               {
-                 error0 = SESSION_ERROR_FIFO_FULL;
-                 goto trace0;
-               }
-
-             svm_fifo_enqueue_nowait (f0, udp_len0 - sizeof (*udp0),
-                                      (u8 *) (udp0 + 1));
-
-             b0->error = node->errors[SESSION_ERROR_ENQUEUED];
-
-             /* We need to send an RX event on this fifo */
-             if (s0->enqueue_epoch != my_enqueue_epoch)
-               {
-                 s0->enqueue_epoch = my_enqueue_epoch;
-
-                 vec_add1 (smm->session_indices_to_enqueue_by_thread
-                           [my_thread_index],
-                           s0 - smm->sessions[my_thread_index]);
-               }
+             tc0 = session_get_transport (s0);
            }
-         /* listener hit */
-         else if (s0->session_state == SESSION_STATE_LISTENING)
+         else if (s0->session_state == SESSION_STATE_CONNECTING_READY)
            {
-             udp_connection_t *us;
-             int rv;
-
-             error0 = SESSION_ERROR_NOT_READY;
-
              /*
-              * create udp transport session
+              * Clone the transport. It will be cleaned up with the
+              * session once we notify the session layer.
               */
-             pool_get (um->udp_sessions[my_thread_index], us);
-
-             us->mtu = 1024;   /* $$$$ policy */
-
-             us->c_lcl_ip4.as_u32 = ip0->dst_address.as_u32;
-             us->c_rmt_ip4.as_u32 = ip0->src_address.as_u32;
-             us->c_lcl_port = udp0->dst_port;
-             us->c_rmt_port = udp0->src_port;
-             us->c_transport_proto = TRANSPORT_PROTO_UDP;
-             us->c_c_index = us - um->udp_sessions[my_thread_index];
+             new_uc0 = udp_conenction_clone_safe (s0->connection_index,
+                                                  s0->thread_index);
+             ASSERT (s0->session_index == new_uc0->c_s_index);
 
              /*
-              * create stream session and attach the udp session to it
+              * Drop the 'lock' on pool resize
               */
-             rv = stream_session_accept (&us->connection, s0->session_index,
-                                         1 /*notify */ );
-             if (rv)
-               error0 = rv;
+             session_pool_remove_peeker (s0->thread_index);
+             session_dgram_connect_notify (&new_uc0->connection,
+                                           s0->thread_index, &s0);
+             tc0 = &new_uc0->connection;
+           }
+         else if (s0->session_state == SESSION_STATE_LISTENING)
+           {
+             tc0 = listen_session_get_transport (s0);
+
+             child0 = udp_connection_alloc (my_thread_index);
+             if (is_ip4)
+               {
+                 ip_set (&child0->c_lcl_ip, &ip40->dst_address, 1);
+                 ip_set (&child0->c_rmt_ip, &ip40->src_address, 1);
+               }
+             else
+               {
+                 ip_set (&child0->c_lcl_ip, &ip60->dst_address, 0);
+                 ip_set (&child0->c_rmt_ip, &ip60->src_address, 0);
+               }
+             child0->c_lcl_port = udp0->dst_port;
+             child0->c_rmt_port = udp0->src_port;
+             child0->c_is_ip4 = is_ip4;
+             child0->mtu = 1460;       /* $$$$ policy */
+
+             if (stream_session_accept
+                 (&child0->connection, tc0->s_index, 1))
+               {
+                 error0 = UDP_ERROR_CREATE_SESSION;
+                 goto trace0;
+               }
+             s0 = session_get (child0->c_s_index, child0->c_thread_index);
+             s0->session_state = SESSION_STATE_READY;
+             tc0 = &child0->connection;
 
+             error0 = UDP_ERROR_LISTENER;
            }
          else
            {
+             error0 = UDP_ERROR_NOT_READY;
+             goto trace0;
+           }
 
-             error0 = SESSION_ERROR_NOT_READY;
+         written0 = session_enqueue_dgram_connection (s0, b0, tc0->proto,
+                                                      1 /* queue evt */ );
+         if (PREDICT_FALSE (written0 < 0))
+           {
+             error0 = UDP_ERROR_FIFO_FULL;
              goto trace0;
            }
 
@@ -204,17 +221,14 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
          if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE)
                             && (b0->flags & VLIB_BUFFER_IS_TRACED)))
            {
-             udp4_uri_input_trace_t *t =
-               vlib_add_trace (vm, node, b0, sizeof (*t));
+             udp_input_trace_t *t = vlib_add_trace (vm, node, b0,
+                                                    sizeof (*t));
 
-             t->session = ~0;
-             if (s0)
-               t->session = s0 - smm->sessions[my_thread_index];
+             t->connection = tc0 ? tc0->c_index : ~0;
              t->disposition = error0;
              t->thread_index = my_thread_index;
            }
 
-         /* verify speculative enqueue, maybe switch current next frame */
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
                                           to_next, n_left_to_next,
                                           bi0, next0);
@@ -223,94 +237,66 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
     }
 
-  /* Send enqueue events */
-
-  session_indices_to_enqueue =
-    smm->session_indices_to_enqueue_by_thread[my_thread_index];
-
-  for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
-    {
-      session_fifo_event_t evt;
-      unix_shared_memory_queue_t *q;
-      stream_session_t *s0;
-      application_t *server0;
-
-      /* Get session */
-      s0 = pool_elt_at_index (smm->sessions[my_thread_index],
-                             session_indices_to_enqueue[i]);
-
-      /* Get session's server */
-      server0 = application_get (s0->app_index);
-
-      /* Built-in server? Deliver the goods... */
-      if (server0->cb_fns.builtin_server_rx_callback)
-       {
-         server0->cb_fns.builtin_server_rx_callback (s0);
-         continue;
-       }
-
-      if (svm_fifo_set_event (s0->server_rx_fifo))
-       {
-         /* Fabricate event */
-         evt.fifo = s0->server_rx_fifo;
-         evt.event_type = FIFO_EVENT_APP_RX;
-         evt.event_id = serial_number++;
-
-         /* Add event to server's event queue */
-         q = server0->event_queue;
-
-         /* Don't block for lack of space */
-         if (PREDICT_TRUE (q->cursize < q->maxsize))
-           {
-             unix_shared_memory_queue_add (server0->event_queue,
-                                           (u8 *) & evt,
-                                           0 /* do wait for mutex */ );
-           }
-         else
-           {
-             vlib_node_increment_counter (vm, udp4_uri_input_node.index,
-                                          SESSION_ERROR_FIFO_FULL, 1);
-           }
-       }
-      /* *INDENT-OFF* */
-      if (1)
-       {
-         ELOG_TYPE_DECLARE (e) =
-         {
-             .format = "evt-enqueue: id %d length %d",
-             .format_args = "i4i4",};
-         struct
-         {
-           u32 data[2];
-         } *ed;
-         ed = ELOG_DATA (&vlib_global_main.elog_main, e);
-         ed->data[0] = evt.event_id;
-         ed->data[1] = svm_fifo_max_dequeue (s0->server_rx_fifo);
-       }
-      /* *INDENT-ON* */
+  errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_UDP,
+                                                my_thread_index);
+  udp_input_inc_counter (vm, is_ip4, UDP_ERROR_EVENT_FIFO_FULL, errors);
+  return frame->n_vectors;
+}
 
-    }
+vlib_node_registration_t udp4_input_node;
+vlib_node_registration_t udp6_input_node;
 
-  vec_reset_length (session_indices_to_enqueue);
+static uword
+udp4_input (vlib_main_t * vm, vlib_node_runtime_t * node,
+           vlib_frame_t * frame)
+{
+  return udp46_input_inline (vm, node, frame, 1);
+}
 
-  smm->session_indices_to_enqueue_by_thread[my_thread_index] =
-    session_indices_to_enqueue;
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (udp4_input_node) =
+{
+  .function = udp4_input,
+  .name = "udp4-input",
+  .vector_size = sizeof (u32),
+  .format_trace = format_udp_input_trace,
+  .type = VLIB_NODE_TYPE_INTERNAL,
+  .n_errors = ARRAY_LEN (udp_error_strings),
+  .error_strings = udp_error_strings,
+  .n_next_nodes = UDP_INPUT_N_NEXT,
+  .next_nodes = {
+#define _(s, n) [UDP_INPUT_NEXT_##s] = n,
+      foreach_udp_input_next
+#undef _
+  },
+};
+/* *INDENT-ON* */
 
-  return frame->n_vectors;
+static uword
+udp6_input (vlib_main_t * vm, vlib_node_runtime_t * node,
+           vlib_frame_t * frame)
+{
+  return udp46_input_inline (vm, node, frame, 0);
 }
 
-VLIB_REGISTER_NODE (udp4_uri_input_node) =
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (udp6_input_node) =
 {
-  .function = udp4_uri_input_node_fn,.name = "udp4-uri-input",.vector_size =
-    sizeof (u32),.format_trace = format_udp4_uri_input_trace,.type =
-    VLIB_NODE_TYPE_INTERNAL,.n_errors =
-    ARRAY_LEN (udp4_uri_input_error_strings),.error_strings =
-    udp4_uri_input_error_strings,.n_next_nodes = UDP4_URI_INPUT_N_NEXT,
-    /* edit / add dispositions here */
-    .next_nodes =
-  {
-  [UDP4_URI_INPUT_NEXT_DROP] = "error-drop",}
-,};
+  .function = udp6_input,
+  .name = "udp6-input",
+  .vector_size = sizeof (u32),
+  .format_trace = format_udp_input_trace,
+  .type = VLIB_NODE_TYPE_INTERNAL,
+  .n_errors = ARRAY_LEN (udp_error_strings),
+  .error_strings = udp_error_strings,
+  .n_next_nodes = UDP_INPUT_N_NEXT,
+  .next_nodes = {
+#define _(s, n) [UDP_INPUT_NEXT_##s] = n,
+      foreach_udp_input_next
+#undef _
+  },
+};
+/* *INDENT-ON* */
 
 /*
  * fd.io coding-style-patch-verification: ON
index 8c0ac46..ce9bb02 100644 (file)
@@ -23,7 +23,7 @@
 
 udp_main_t udp_main;
 
-#define foreach_udp_input_next                  \
+#define foreach_udp_local_next                  \
   _ (PUNT, "error-punt")                        \
   _ (DROP, "error-drop")                        \
   _ (ICMP4_ERROR, "ip4-icmp-error")             \
@@ -31,25 +31,25 @@ udp_main_t udp_main;
 
 typedef enum
 {
-#define _(s,n) UDP_INPUT_NEXT_##s,
-  foreach_udp_input_next
+#define _(s,n) UDP_LOCAL_NEXT_##s,
+  foreach_udp_local_next
 #undef _
-    UDP_INPUT_N_NEXT,
-} udp_input_next_t;
+    UDP_LOCAL_N_NEXT,
+} udp_local_next_t;
 
 typedef struct
 {
   u16 src_port;
   u16 dst_port;
   u8 bound;
-} udp_rx_trace_t;
+} udp_local_rx_trace_t;
 
 u8 *
 format_udp_rx_trace (u8 * s, va_list * args)
 {
   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
-  udp_rx_trace_t *t = va_arg (*args, udp_rx_trace_t *);
+  udp_local_rx_trace_t *t = va_arg (*args, udp_local_rx_trace_t *);
 
   s = format (s, "UDP: src-port %d dst-port %d%s",
              clib_net_to_host_u16 (t->src_port),
@@ -58,11 +58,11 @@ format_udp_rx_trace (u8 * s, va_list * args)
   return s;
 }
 
-vlib_node_registration_t udp4_input_node;
-vlib_node_registration_t udp6_input_node;
+vlib_node_registration_t udp4_local_node;
+vlib_node_registration_t udp6_local_node;
 
 always_inline uword
-udp46_input_inline (vlib_main_t * vm,
+udp46_local_inline (vlib_main_t * vm,
                    vlib_node_runtime_t * node,
                    vlib_frame_t * from_frame, int is_ip4)
 {
@@ -132,7 +132,7 @@ udp46_input_inline (vlib_main_t * vm,
          if (PREDICT_FALSE (b0->current_length < advance0 + sizeof (*h0)))
            {
              error0 = UDP_ERROR_LENGTH_ERROR;
-             next0 = UDP_INPUT_NEXT_DROP;
+             next0 = UDP_LOCAL_NEXT_DROP;
            }
          else
            {
@@ -143,14 +143,14 @@ udp46_input_inline (vlib_main_t * vm,
                                 vlib_buffer_length_in_chain (vm, b0)))
                {
                  error0 = UDP_ERROR_LENGTH_ERROR;
-                 next0 = UDP_INPUT_NEXT_DROP;
+                 next0 = UDP_LOCAL_NEXT_DROP;
                }
            }
 
          if (PREDICT_FALSE (b1->current_length < advance1 + sizeof (*h1)))
            {
              error1 = UDP_ERROR_LENGTH_ERROR;
-             next1 = UDP_INPUT_NEXT_DROP;
+             next1 = UDP_LOCAL_NEXT_DROP;
            }
          else
            {
@@ -161,7 +161,7 @@ udp46_input_inline (vlib_main_t * vm,
                                 vlib_buffer_length_in_chain (vm, b1)))
                {
                  error1 = UDP_ERROR_LENGTH_ERROR;
-                 next1 = UDP_INPUT_NEXT_DROP;
+                 next1 = UDP_LOCAL_NEXT_DROP;
                }
            }
 
@@ -187,7 +187,7 @@ udp46_input_inline (vlib_main_t * vm,
              if (PREDICT_FALSE (punt_unknown))
                {
                  b0->error = node->errors[UDP_ERROR_PUNT];
-                 next0 = UDP_INPUT_NEXT_PUNT;
+                 next0 = UDP_LOCAL_NEXT_PUNT;
                }
              else if (is_ip4)
                {
@@ -195,7 +195,7 @@ udp46_input_inline (vlib_main_t * vm,
                                               ICMP4_destination_unreachable,
                                               ICMP4_destination_unreachable_port_unreachable,
                                               0);
-                 next0 = UDP_INPUT_NEXT_ICMP4_ERROR;
+                 next0 = UDP_LOCAL_NEXT_ICMP4_ERROR;
                  n_no_listener++;
                }
              else
@@ -204,7 +204,7 @@ udp46_input_inline (vlib_main_t * vm,
                                               ICMP6_destination_unreachable,
                                               ICMP6_destination_unreachable_port_unreachable,
                                               0);
-                 next0 = UDP_INPUT_NEXT_ICMP6_ERROR;
+                 next0 = UDP_LOCAL_NEXT_ICMP6_ERROR;
                  n_no_listener++;
                }
            }
@@ -224,7 +224,7 @@ udp46_input_inline (vlib_main_t * vm,
              if (PREDICT_FALSE (punt_unknown))
                {
                  b1->error = node->errors[UDP_ERROR_PUNT];
-                 next1 = UDP_INPUT_NEXT_PUNT;
+                 next1 = UDP_LOCAL_NEXT_PUNT;
                }
              else if (is_ip4)
                {
@@ -232,7 +232,7 @@ udp46_input_inline (vlib_main_t * vm,
                                               ICMP4_destination_unreachable,
                                               ICMP4_destination_unreachable_port_unreachable,
                                               0);
-                 next1 = UDP_INPUT_NEXT_ICMP4_ERROR;
+                 next1 = UDP_LOCAL_NEXT_ICMP4_ERROR;
                  n_no_listener++;
                }
              else
@@ -241,7 +241,7 @@ udp46_input_inline (vlib_main_t * vm,
                                               ICMP6_destination_unreachable,
                                               ICMP6_destination_unreachable_port_unreachable,
                                               0);
-                 next1 = UDP_INPUT_NEXT_ICMP6_ERROR;
+                 next1 = UDP_LOCAL_NEXT_ICMP6_ERROR;
                  n_no_listener++;
                }
            }
@@ -254,26 +254,26 @@ udp46_input_inline (vlib_main_t * vm,
 
          if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
            {
-             udp_rx_trace_t *tr = vlib_add_trace (vm, node,
-                                                  b0, sizeof (*tr));
+             udp_local_rx_trace_t *tr = vlib_add_trace (vm, node,
+                                                        b0, sizeof (*tr));
              if (b0->error != node->errors[UDP_ERROR_LENGTH_ERROR])
                {
                  tr->src_port = h0 ? h0->src_port : 0;
                  tr->dst_port = h0 ? h0->dst_port : 0;
-                 tr->bound = (next0 != UDP_INPUT_NEXT_ICMP4_ERROR &&
-                              next0 != UDP_INPUT_NEXT_ICMP6_ERROR);
+                 tr->bound = (next0 != UDP_LOCAL_NEXT_ICMP4_ERROR &&
+                              next0 != UDP_LOCAL_NEXT_ICMP6_ERROR);
                }
            }
          if (PREDICT_FALSE (b1->flags & VLIB_BUFFER_IS_TRACED))
            {
-             udp_rx_trace_t *tr = vlib_add_trace (vm, node,
-                                                  b1, sizeof (*tr));
+             udp_local_rx_trace_t *tr = vlib_add_trace (vm, node,
+                                                        b1, sizeof (*tr));
              if (b1->error != node->errors[UDP_ERROR_LENGTH_ERROR])
                {
                  tr->src_port = h1 ? h1->src_port : 0;
                  tr->dst_port = h1 ? h1->dst_port : 0;
-                 tr->bound = (next1 != UDP_INPUT_NEXT_ICMP4_ERROR &&
-                              next1 != UDP_INPUT_NEXT_ICMP6_ERROR);
+                 tr->bound = (next1 != UDP_LOCAL_NEXT_ICMP4_ERROR &&
+                              next1 != UDP_LOCAL_NEXT_ICMP6_ERROR);
                }
            }
 
@@ -308,7 +308,7 @@ udp46_input_inline (vlib_main_t * vm,
          if (PREDICT_FALSE (b0->current_length < advance0 + sizeof (*h0)))
            {
              b0->error = node->errors[UDP_ERROR_LENGTH_ERROR];
-             next0 = UDP_INPUT_NEXT_DROP;
+             next0 = UDP_LOCAL_NEXT_DROP;
              goto trace_x1;
            }
 
@@ -333,7 +333,7 @@ udp46_input_inline (vlib_main_t * vm,
                  if (PREDICT_FALSE (punt_unknown))
                    {
                      b0->error = node->errors[UDP_ERROR_PUNT];
-                     next0 = UDP_INPUT_NEXT_PUNT;
+                     next0 = UDP_LOCAL_NEXT_PUNT;
                    }
                  else if (is_ip4)
                    {
@@ -341,7 +341,7 @@ udp46_input_inline (vlib_main_t * vm,
                                                   ICMP4_destination_unreachable,
                                                   ICMP4_destination_unreachable_port_unreachable,
                                                   0);
-                     next0 = UDP_INPUT_NEXT_ICMP4_ERROR;
+                     next0 = UDP_LOCAL_NEXT_ICMP4_ERROR;
                      n_no_listener++;
                    }
                  else
@@ -350,7 +350,7 @@ udp46_input_inline (vlib_main_t * vm,
                                                   ICMP6_destination_unreachable,
                                                   ICMP6_destination_unreachable_port_unreachable,
                                                   0);
-                     next0 = UDP_INPUT_NEXT_ICMP6_ERROR;
+                     next0 = UDP_LOCAL_NEXT_ICMP6_ERROR;
                      n_no_listener++;
                    }
                }
@@ -364,20 +364,20 @@ udp46_input_inline (vlib_main_t * vm,
          else
            {
              b0->error = node->errors[UDP_ERROR_LENGTH_ERROR];
-             next0 = UDP_INPUT_NEXT_DROP;
+             next0 = UDP_LOCAL_NEXT_DROP;
            }
 
        trace_x1:
          if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
            {
-             udp_rx_trace_t *tr = vlib_add_trace (vm, node,
-                                                  b0, sizeof (*tr));
+             udp_local_rx_trace_t *tr = vlib_add_trace (vm, node,
+                                                        b0, sizeof (*tr));
              if (b0->error != node->errors[UDP_ERROR_LENGTH_ERROR])
                {
                  tr->src_port = h0->src_port;
                  tr->dst_port = h0->dst_port;
-                 tr->bound = (next0 != UDP_INPUT_NEXT_ICMP4_ERROR &&
-                              next0 != UDP_INPUT_NEXT_ICMP6_ERROR);
+                 tr->bound = (next0 != UDP_LOCAL_NEXT_ICMP4_ERROR &&
+                              next0 != UDP_LOCAL_NEXT_ICMP6_ERROR);
                }
            }
 
@@ -400,23 +400,22 @@ static char *udp_error_strings[] = {
 };
 
 static uword
-udp4_input (vlib_main_t * vm,
+udp4_local (vlib_main_t * vm,
            vlib_node_runtime_t * node, vlib_frame_t * from_frame)
 {
-  return udp46_input_inline (vm, node, from_frame, 1 /* is_ip4 */ );
+  return udp46_local_inline (vm, node, from_frame, 1 /* is_ip4 */ );
 }
 
 static uword
-udp6_input (vlib_main_t * vm,
+udp6_local (vlib_main_t * vm,
            vlib_node_runtime_t * node, vlib_frame_t * from_frame)
 {
-  return udp46_input_inline (vm, node, from_frame, 0 /* is_ip4 */ );
+  return udp46_local_inline (vm, node, from_frame, 0 /* is_ip4 */ );
 }
 
-
 /* *INDENT-OFF* */
-VLIB_REGISTER_NODE (udp4_input_node) = {
-  .function = udp4_input,
+VLIB_REGISTER_NODE (udp4_local_node) = {
+  .function = udp4_local,
   .name = "ip4-udp-lookup",
   /* Takes a vector of packets. */
   .vector_size = sizeof (u32),
@@ -424,10 +423,10 @@ VLIB_REGISTER_NODE (udp4_input_node) = {
   .n_errors = UDP_N_ERROR,
   .error_strings = udp_error_strings,
 
-  .n_next_nodes = UDP_INPUT_N_NEXT,
+  .n_next_nodes = UDP_LOCAL_N_NEXT,
   .next_nodes = {
-#define _(s,n) [UDP_INPUT_NEXT_##s] = n,
-    foreach_udp_input_next
+#define _(s,n) [UDP_LOCAL_NEXT_##s] = n,
+    foreach_udp_local_next
 #undef _
   },
 
@@ -437,11 +436,11 @@ VLIB_REGISTER_NODE (udp4_input_node) = {
 };
 /* *INDENT-ON* */
 
-VLIB_NODE_FUNCTION_MULTIARCH (udp4_input_node, udp4_input);
+VLIB_NODE_FUNCTION_MULTIARCH (udp4_local_node, udp4_local);
 
 /* *INDENT-OFF* */
-VLIB_REGISTER_NODE (udp6_input_node) = {
-  .function = udp6_input,
+VLIB_REGISTER_NODE (udp6_local_node) = {
+  .function = udp6_local,
   .name = "ip6-udp-lookup",
   /* Takes a vector of packets. */
   .vector_size = sizeof (u32),
@@ -449,10 +448,10 @@ VLIB_REGISTER_NODE (udp6_input_node) = {
   .n_errors = UDP_N_ERROR,
   .error_strings = udp_error_strings,
 
-  .n_next_nodes = UDP_INPUT_N_NEXT,
+  .n_next_nodes = UDP_LOCAL_N_NEXT,
   .next_nodes = {
-#define _(s,n) [UDP_INPUT_NEXT_##s] = n,
-    foreach_udp_input_next
+#define _(s,n) [UDP_LOCAL_NEXT_##s] = n,
+    foreach_udp_local_next
 #undef _
   },
 
@@ -462,7 +461,7 @@ VLIB_REGISTER_NODE (udp6_input_node) = {
 };
 /* *INDENT-ON* */
 
-VLIB_NODE_FUNCTION_MULTIARCH (udp6_input_node, udp6_input);
+VLIB_NODE_FUNCTION_MULTIARCH (udp6_local_node, udp6_local);
 
 static void
 add_dst_port (udp_main_t * um,
@@ -508,8 +507,8 @@ udp_register_dst_port (vlib_main_t * vm,
 
   pi->node_index = node_index;
   pi->next_index = vlib_node_add_next (vm,
-                                      is_ip4 ? udp4_input_node.index
-                                      : udp6_input_node.index, node_index);
+                                      is_ip4 ? udp4_local_node.index
+                                      : udp6_local_node.index, node_index);
 
   /* Setup udp protocol -> next index sparse vector mapping. */
   if (is_ip4)
@@ -620,8 +619,8 @@ udp_local_init (vlib_main_t * vm)
       um->dst_port_info_by_dst_port[i] = hash_create (0, sizeof (uword));
     }
 
-  udp_setup_node (vm, udp4_input_node.index);
-  udp_setup_node (vm, udp6_input_node.index);
+  udp_setup_node (vm, udp4_local_node.index);
+  udp_setup_node (vm, udp6_local_node.index);
 
   um->punt_unknown4 = 0;
   um->punt_unknown6 = 0;
@@ -640,7 +639,7 @@ udp_local_init (vlib_main_t * vm)
 #define _(n,s) add_dst_port (um, UDP_DST_PORT_##s, #s, 0 /* is_ip4 */);
     foreach_udp6_dst_port
 #undef _
-    ip4_register_protocol (IP_PROTOCOL_UDP, udp4_input_node.index);
+    ip4_register_protocol (IP_PROTOCOL_UDP, udp4_local_node.index);
   /* Note: ip6 differs from ip4, UDP is hotwired to ip6-udp-lookup */
   return 0;
 }