X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvlibmemory%2Fmemory_client.c;h=c7ae13963c9e9a4a066e798f27c196cb9e4caa50;hb=7a2e1bd6c7137552399d7e0cfb414e0d59b3f98f;hp=234a0a5a330d6319eb1b34b63268dd0eb112a22b;hpb=7cd468a3d7dee7d6c92f69a0bb7061ae208ec727;p=vpp.git diff --git a/src/vlibmemory/memory_client.c b/src/vlibmemory/memory_client.c index 234a0a5a330..c7ae13963c9 100644 --- a/src/vlibmemory/memory_client.c +++ b/src/vlibmemory/memory_client.c @@ -80,6 +80,7 @@ rx_thread_fn (void *arg) unix_shared_memory_queue_t *q; memory_client_main_t *mm = &memory_client_main; api_main_t *am = &api_main; + int i; q = am->vl_input_queue; @@ -87,10 +88,27 @@ rx_thread_fn (void *arg) if (setjmp (mm->rx_thread_jmpbuf) == 0) { mm->rx_thread_jmpbuf_valid = 1; - while (1) + /* + * Find an unused slot in the per-cpu-mheaps array, + * and grab it for this thread. We need to be able to + * push/pop the thread heap without affecting other thread(s). + */ + if (__os_thread_index == 0) { - vl_msg_api_queue_handler (q); + for (i = 0; i < ARRAY_LEN (clib_per_cpu_mheaps); i++) + { + if (clib_per_cpu_mheaps[i] == 0) + { + /* Copy the main thread mheap pointer */ + clib_per_cpu_mheaps[i] = clib_per_cpu_mheaps[0]; + __os_thread_index = i; + break; + } + } + ASSERT (__os_thread_index > 0); } + while (1) + vl_msg_api_queue_handler (q); } pthread_exit (0); } @@ -104,25 +122,253 @@ vl_api_rx_thread_exit_t_handler (vl_api_rx_thread_exit_t * mp) } static void -noop_handler (void *notused) +vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp) { + serialize_main_t _sm, *sm = &_sm; + api_main_t *am = &api_main; + u8 *tblv; + u32 nmsgs; + int i; + u8 *name_and_crc; + u32 msg_index; + + am->my_client_index = mp->index; + am->my_registration = (vl_api_registration_t *) (uword) mp->handle; + + /* Clean out any previous hash table (unlikely) */ + if (am->msg_index_by_name_and_crc) + { + int i; + u8 **keys = 0; + hash_pair_t *hp; + /* *INDENT-OFF* */ + hash_foreach_pair (hp, am->msg_index_by_name_and_crc, + ({ + vec_add1 (keys, (u8 *) hp->key); + })); + /* *INDENT-ON* */ + for (i = 0; i < vec_len (keys); i++) + vec_free (keys[i]); + vec_free (keys); + } + + am->msg_index_by_name_and_crc = hash_create_string (0, sizeof (uword)); + + /* Recreate the vnet-side API message handler table */ + tblv = uword_to_pointer (mp->message_table, u8 *); + unserialize_open_data (sm, tblv, vec_len (tblv)); + unserialize_integer (sm, &nmsgs, sizeof (u32)); + + for (i = 0; i < nmsgs; i++) + { + msg_index = unserialize_likely_small_unsigned_integer (sm); + unserialize_cstring (sm, (char **) &name_and_crc); + hash_set_mem (am->msg_index_by_name_and_crc, name_and_crc, msg_index); + } } -#define foreach_api_msg \ -_(RX_THREAD_EXIT, rx_thread_exit) +static void +noop_handler (void *notused) +{ +} -static int -connect_to_vlib_internal (char *svm_name, char *client_name, - int rx_queue_size, int want_pthread) +int +vl_client_connect (const char *name, int ctx_quota, int input_queue_size) { + svm_region_t *svm; + vl_api_memclnt_create_t *mp; + vl_api_memclnt_create_reply_t *rp; + unix_shared_memory_queue_t *vl_input_queue; + vl_shmem_hdr_t *shmem_hdr; int rv = 0; - memory_client_main_t *mm = &memory_client_main; + void *oldheap; + api_main_t *am = &api_main; - if ((rv = vl_client_api_map (svm_name))) + if (am->my_registration) { - clib_warning ("vl_client_api map rv %d", rv); - return rv; + clib_warning ("client %s already connected...", name); + return -1; + } + + if (am->vlib_rp == 0) + { + clib_warning ("am->vlib_rp NULL"); + return -1; + } + + svm = am->vlib_rp; + shmem_hdr = am->shmem_hdr; + + if (shmem_hdr == 0 || shmem_hdr->vl_input_queue == 0) + { + clib_warning ("shmem_hdr / input queue NULL"); + return -1; + } + + pthread_mutex_lock (&svm->mutex); + oldheap = svm_push_data_heap (svm); + vl_input_queue = + unix_shared_memory_queue_init (input_queue_size, sizeof (uword), + getpid (), 0); + pthread_mutex_unlock (&svm->mutex); + svm_pop_heap (oldheap); + + am->my_client_index = ~0; + am->my_registration = 0; + am->vl_input_queue = vl_input_queue; + + mp = vl_msg_api_alloc (sizeof (vl_api_memclnt_create_t)); + memset (mp, 0, sizeof (*mp)); + mp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE); + mp->ctx_quota = ctx_quota; + mp->input_queue = (uword) vl_input_queue; + strncpy ((char *) mp->name, name, sizeof (mp->name) - 1); + + vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) & mp); + + while (1) + { + int qstatus; + struct timespec ts, tsrem; + int i; + + /* Wait up to 10 seconds */ + for (i = 0; i < 1000; i++) + { + qstatus = unix_shared_memory_queue_sub (vl_input_queue, (u8 *) & rp, + 1 /* nowait */ ); + if (qstatus == 0) + goto read_one_msg; + ts.tv_sec = 0; + ts.tv_nsec = 10000 * 1000; /* 10 ms */ + while (nanosleep (&ts, &tsrem) < 0) + ts = tsrem; + } + /* Timeout... */ + clib_warning ("memclnt_create_reply timeout"); + return -1; + + read_one_msg: + if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_CREATE_REPLY) + { + clib_warning ("unexpected reply: id %d", ntohs (rp->_vl_msg_id)); + continue; + } + rv = clib_net_to_host_u32 (rp->response); + + vl_msg_api_handler ((void *) rp); + break; + } + return (rv); +} + +static void +vl_api_memclnt_delete_reply_t_handler (vl_api_memclnt_delete_reply_t * mp) +{ + void *oldheap; + api_main_t *am = &api_main; + + pthread_mutex_lock (&am->vlib_rp->mutex); + oldheap = svm_push_data_heap (am->vlib_rp); + unix_shared_memory_queue_free (am->vl_input_queue); + pthread_mutex_unlock (&am->vlib_rp->mutex); + svm_pop_heap (oldheap); + + am->my_client_index = ~0; + am->my_registration = 0; + am->vl_input_queue = 0; +} + +void +vl_client_disconnect (void) +{ + vl_api_memclnt_delete_t *mp; + vl_api_memclnt_delete_reply_t *rp; + unix_shared_memory_queue_t *vl_input_queue; + vl_shmem_hdr_t *shmem_hdr; + time_t begin; + api_main_t *am = &api_main; + + ASSERT (am->vlib_rp); + shmem_hdr = am->shmem_hdr; + ASSERT (shmem_hdr && shmem_hdr->vl_input_queue); + + vl_input_queue = am->vl_input_queue; + + mp = vl_msg_api_alloc (sizeof (vl_api_memclnt_delete_t)); + memset (mp, 0, sizeof (*mp)); + mp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE); + mp->index = am->my_client_index; + mp->handle = (uword) am->my_registration; + + vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) & mp); + + /* + * Have to be careful here, in case the client is disconnecting + * because e.g. the vlib process died, or is unresponsive. + */ + + begin = time (0); + while (1) + { + time_t now; + + now = time (0); + + if (now >= (begin + 2)) + { + clib_warning ("peer unresponsive, give up"); + am->my_client_index = ~0; + am->my_registration = 0; + am->shmem_hdr = 0; + break; + } + if (unix_shared_memory_queue_sub (vl_input_queue, (u8 *) & rp, 1) < 0) + continue; + + /* drain the queue */ + if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_DELETE_REPLY) + { + clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); + vl_msg_api_handler ((void *) rp); + continue; + } + vl_msg_api_handler ((void *) rp); + break; } +} + +/** + * Stave off the binary API dead client reaper + * Only sent to inactive clients + */ +static void +vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp) +{ + vl_api_memclnt_keepalive_reply_t *rmp; + api_main_t *am; + vl_shmem_hdr_t *shmem_hdr; + + am = &api_main; + shmem_hdr = am->shmem_hdr; + + rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp)); + memset (rmp, 0, sizeof (*rmp)); + rmp->_vl_msg_id = ntohs (VL_API_MEMCLNT_KEEPALIVE_REPLY); + rmp->context = mp->context; + vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) & rmp); +} + +#define foreach_api_msg \ +_(RX_THREAD_EXIT, rx_thread_exit) \ +_(MEMCLNT_CREATE_REPLY, memclnt_create_reply) \ +_(MEMCLNT_DELETE_REPLY, memclnt_delete_reply) \ +_(MEMCLNT_KEEPALIVE, memclnt_keepalive) + + +void +vl_client_install_client_message_handlers (void) +{ #define _(N,n) \ vl_msg_api_set_handlers(VL_API_##N, #n, \ @@ -133,6 +379,40 @@ connect_to_vlib_internal (char *svm_name, char *client_name, sizeof(vl_api_##n##_t), 1); foreach_api_msg; #undef _ +} + + +int +vl_client_api_map (const char *region_name) +{ + int rv; + + if ((rv = vl_map_shmem (region_name, 0 /* is_vlib */ )) < 0) + return rv; + + vl_client_install_client_message_handlers (); + return 0; +} + +void +vl_client_api_unmap (void) +{ + vl_unmap_shmem (); +} + +static int +connect_to_vlib_internal (const char *svm_name, + const char *client_name, + int rx_queue_size, int want_pthread, int do_map) +{ + int rv = 0; + memory_client_main_t *mm = &memory_client_main; + + if (do_map && (rv = vl_client_api_map (svm_name))) + { + clib_warning ("vl_client_api map rv %d", rv); + return rv; + } if (vl_client_connect (client_name, 0 /* punt quota */ , rx_queue_size /* input queue */ ) < 0) @@ -156,19 +436,31 @@ connect_to_vlib_internal (char *svm_name, char *client_name, } int -vl_client_connect_to_vlib (char *svm_name, char *client_name, - int rx_queue_size) +vl_client_connect_to_vlib (const char *svm_name, + const char *client_name, int rx_queue_size) { return connect_to_vlib_internal (svm_name, client_name, rx_queue_size, - 1 /* want pthread */ ); + 1 /* want pthread */ , + 1 /* do map */ ); } int -vl_client_connect_to_vlib_no_rx_pthread (char *svm_name, char *client_name, +vl_client_connect_to_vlib_no_rx_pthread (const char *svm_name, + const char *client_name, int rx_queue_size) { return connect_to_vlib_internal (svm_name, client_name, rx_queue_size, - 0 /* want pthread */ ); + 0 /* want pthread */ , + 1 /* do map */ ); +} + +int +vl_client_connect_to_vlib_no_map (const char *svm_name, + const char *client_name, int rx_queue_size) +{ + return connect_to_vlib_internal (svm_name, client_name, rx_queue_size, + 1 /* want pthread */ , + 0 /* dont map */ ); } void @@ -205,7 +497,7 @@ static void vl_api_get_first_msg_id_reply_t_handler } u16 -vl_client_get_first_plugin_msg_id (char *plugin_name) +vl_client_get_first_plugin_msg_id (const char *plugin_name) { vl_api_get_first_msg_id_t *mp; api_main_t *am = &api_main; @@ -268,12 +560,6 @@ result: return rv; } -void -vlib_node_sync_stats (vlib_main_t * vm, vlib_node_t * n) -{ - clib_warning ("STUB called..."); -} - /* * fd.io coding-style-patch-verification: ON *