api: multiple connections per process
[vpp.git] / src / vlibmemory / vlib_api.c
index 35a8686..297ac37 100644 (file)
@@ -68,36 +68,13 @@ vl_api_trace_plugin_msg_ids_t_print (vl_api_trace_plugin_msg_ids_t * a,
 #include <vlibmemory/vl_memory_api_h.h>
 #undef vl_endianfun
 
-u8 *
-vl_api_serialize_message_table (api_main_t * am, u8 * vector)
-{
-  serialize_main_t _sm, *sm = &_sm;
-  hash_pair_t *hp;
-  u32 nmsg = hash_elts (am->msg_index_by_name_and_crc);
-
-  serialize_open_vector (sm, vector);
-
-  /* serialize the count */
-  serialize_integer (sm, nmsg, sizeof (u32));
-
-  /* *INDENT-OFF* */
-  hash_foreach_pair (hp, am->msg_index_by_name_and_crc,
-  ({
-    serialize_likely_small_unsigned_integer (sm, hp->value[0]);
-    serialize_cstring (sm, (char *) hp->key);
-  }));
-  /* *INDENT-ON* */
-
-  return serialize_close_vector (sm);
-}
-
 static void
 vl_api_get_first_msg_id_t_handler (vl_api_get_first_msg_id_t * mp)
 {
   vl_api_get_first_msg_id_reply_t *rmp;
   vl_api_registration_t *regp;
   uword *p;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_api_msg_range_t *rp;
   u8 name[64];
   u16 first_msg_id = ~0;
@@ -131,7 +108,7 @@ out:
 void
 vl_api_api_versions_t_handler (vl_api_api_versions_t * mp)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_api_api_versions_reply_t *rmp;
   vl_api_registration_t *reg;
   u32 nmsg = vec_len (am->api_version_list);
@@ -143,7 +120,7 @@ vl_api_api_versions_t_handler (vl_api_api_versions_t * mp)
     return;
 
   rmp = vl_msg_api_alloc (msg_size);
-  memset (rmp, 0, msg_size);
+  clib_memset (rmp, 0, msg_size);
   rmp->_vl_msg_id = ntohs (VL_API_API_VERSIONS_REPLY);
 
   /* fill in the message */
@@ -165,8 +142,8 @@ vl_api_api_versions_t_handler (vl_api_api_versions_t * mp)
   vl_api_send_msg (reg, (u8 *) rmp);
 }
 
-#define foreach_vlib_api_msg                            \
-_(GET_FIRST_MSG_ID, get_first_msg_id)                   \
+#define foreach_vlib_api_msg                           \
+_(GET_FIRST_MSG_ID, get_first_msg_id)                  \
 _(API_VERSIONS, api_versions)
 
 /*
@@ -178,7 +155,7 @@ vlib_api_init (void)
   vl_msg_api_msg_config_t cfg;
   vl_msg_api_msg_config_t *c = &cfg;
 
-  memset (c, 0, sizeof (*c));
+  clib_memset (c, 0, sizeof (*c));
 
 #define _(N,n) do {                                             \
     c->id = VL_API_##N;                                         \
@@ -208,12 +185,12 @@ static void
 send_one_plugin_msg_ids_msg (u8 * name, u16 first_msg_id, u16 last_msg_id)
 {
   vl_api_trace_plugin_msg_ids_t *mp;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
   svm_queue_t *q;
 
   mp = vl_msg_api_alloc_as_if_client (sizeof (*mp));
-  memset (mp, 0, sizeof (*mp));
+  clib_memset (mp, 0, sizeof (*mp));
 
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_TRACE_PLUGIN_MSG_IDS);
   strncpy ((char *) mp->plugin_name, (char *) name,
@@ -230,7 +207,7 @@ void
 vl_api_save_msg_table (void)
 {
   u8 *serialized_message_table;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   u8 *chroot_file;
   int fd, rv;
 
@@ -280,7 +257,7 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
   vl_shmem_hdr_t *shm;
   svm_queue_t *q;
   clib_error_t *e;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   f64 dead_client_scan_time;
   f64 sleep_time, start_time;
   f64 vector_rate;
@@ -306,7 +283,7 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
   q = shm->vl_input_queue;
 
   e = vlib_call_init_exit_functions
-    (vm, vm->api_init_function_registrations, 1 /* call_once */ );
+    (vm, &vm->api_init_function_registrations, 1 /* call_once */ );
   if (e)
     clib_error_report (e);
 
@@ -342,11 +319,12 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
        * of the application to process the request, the client will
        * sit and wait for Godot...
        */
-      vector_rate = vlib_last_vector_length_per_node (vm);
+      vector_rate = (f64) vlib_last_vectors_per_main_loop (vm);
       start_time = vlib_time_now (vm);
       while (1)
        {
-         if (vl_mem_api_handle_msg_main (vm, node))
+         if (vl_mem_api_handle_rpc (vm, node)
+             || vl_mem_api_handle_msg_main (vm, node))
            {
              vm->api_queue_nonempty = 0;
              VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
@@ -387,9 +365,9 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
        */
       if (PREDICT_FALSE (vec_len (am->vlib_private_rps)))
        {
-         vl_mem_api_handle_msg_private (vm, node, private_segment_rotor++);
          if (private_segment_rotor >= vec_len (am->vlib_private_rps))
            private_segment_rotor = 0;
+         vl_mem_api_handle_msg_private (vm, node, private_segment_rotor++);
        }
 
       vlib_process_wait_for_event_or_clock (vm, sleep_time);
@@ -407,9 +385,16 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
        case SOCKET_READ_EVENT:
          for (i = 0; i < vec_len (event_data); i++)
            {
+             vl_api_registration_t *regp;
+
              a = pool_elt_at_index (socket_main.process_args, event_data[i]);
-             vl_socket_process_api_msg (a->clib_file, a->regp,
-                                        (i8 *) a->data);
+             regp = vl_socket_get_registration (a->reg_index);
+             if (regp)
+               {
+                 vl_socket_process_api_msg (regp, (i8 *) a->data);
+                 a = pool_elt_at_index (socket_main.process_args,
+                                        event_data[i]);
+               }
              vec_free (a->data);
              pool_put (socket_main.process_args, a);
            }
@@ -463,7 +448,7 @@ api_rx_from_node (vlib_main_t * vm,
 
   vec_validate (long_msg, 4095);
   n_left_from = frame->n_vectors;
-  from = vlib_frame_args (frame);
+  from = vlib_frame_vector_args (frame);
 
   while (n_left_from > 0)
     {
@@ -497,7 +482,7 @@ api_rx_from_node (vlib_main_t * vm,
     }
 
   /* Free what we've been given. */
-  vlib_buffer_free (vm, vlib_frame_args (frame), n_packets);
+  vlib_buffer_free (vm, vlib_frame_vector_args (frame), n_packets);
 
   return n_packets;
 }
@@ -564,36 +549,14 @@ vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp)
 void
 vl_api_send_pending_rpc_requests (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
-  vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
-  svm_queue_t *q;
-  int i;
-
-  /*
-   * Use the "normal" control-plane mechanism for the main thread.
-   * Well, almost. if the main input queue is full, we cannot
-   * block. Otherwise, we can expect a barrier sync timeout.
-   */
-  q = shmem_hdr->vl_input_queue;
+  vlib_main_t *vm_global = &vlib_global_main;
 
-  for (i = 0; i < vec_len (vm->pending_rpc_requests); i++)
-    {
-      while (pthread_mutex_trylock (&q->mutex))
-       vlib_worker_thread_barrier_check ();
-
-      while (PREDICT_FALSE (svm_queue_is_full (q)))
-       {
-         pthread_mutex_unlock (&q->mutex);
-         vlib_worker_thread_barrier_check ();
-         while (pthread_mutex_trylock (&q->mutex))
-           vlib_worker_thread_barrier_check ();
-       }
-
-      vl_msg_api_send_shmem_nolock (q, (u8 *) (vm->pending_rpc_requests + i));
+  ASSERT (vm != vm_global);
 
-      pthread_mutex_unlock (&q->mutex);
-    }
-  _vec_len (vm->pending_rpc_requests) = 0;
+  clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
+  vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
+  vec_reset_length (vm->pending_rpc_requests);
+  clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 }
 
 always_inline void
@@ -601,6 +564,7 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
                                    u8 force_rpc)
 {
   vl_api_rpc_call_t *mp;
+  vlib_main_t *vm_global = &vlib_global_main;
   vlib_main_t *vm = vlib_get_main ();
 
   /* Main thread and not a forced RPC: call the function directly */
@@ -620,13 +584,18 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
   /* Otherwise, actually do an RPC */
   mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length);
 
-  memset (mp, 0, sizeof (*mp));
-  clib_memcpy (mp->data, data, data_length);
+  clib_memset (mp, 0, sizeof (*mp));
+  clib_memcpy_fast (mp->data, data, data_length);
   mp->_vl_msg_id = ntohs (VL_API_RPC_CALL);
   mp->function = pointer_to_uword (fp);
   mp->need_barrier_sync = 1;
 
+  /* Add to the pending vector. Thread 0 requires locking. */
+  if (vm == vm_global)
+    clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
   vec_add1 (vm->pending_rpc_requests, (uword) mp);
+  if (vm == vm_global)
+    clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 }
 
 /*
@@ -655,7 +624,7 @@ vl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
 static void
 vl_api_trace_plugin_msg_ids_t_handler (vl_api_trace_plugin_msg_ids_t * mp)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_api_msg_range_t *rp;
   uword *p;
 
@@ -708,7 +677,7 @@ extern void *rpc_call_main_thread_cb_fn;
 static clib_error_t *
 rpc_api_hookup (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
 #define _(N,n)                                                  \
     vl_msg_api_set_handlers(VL_API_##N, #n,                     \
                            vl_api_##n##_t_handler,              \