+ msg = (session_cleanup_msg_t *) data;
+ session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
+ if (!session)
+ {
+ VDBG (0, "disconnect confirmed for unknown handle 0x%llx", msg->handle);
+ return;
+ }
+
+ vcl_session_table_del_vpp_handle (wrk, msg->handle);
+ /* Should not happen. App did not close the connection so don't free it. */
+ if (session->session_state != STATE_CLOSED)
+ {
+ VDBG (0, "app did not close session %d", session->session_index);
+ session->session_state = STATE_DETACHED;
+ session->vpp_handle = VCL_INVALID_SESSION_HANDLE;
+ return;
+ }
+ vcl_session_free (wrk, session);
+}
+
+static void
+vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
+{
+ session_req_worker_update_msg_t *msg;
+ vcl_session_t *s;
+
+ msg = (session_req_worker_update_msg_t *) data;
+ s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
+ if (!s)
+ return;
+
+ vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
+}
+
+static void
+vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
+{
+ session_worker_update_reply_msg_t *msg;
+ vcl_session_t *s;
+
+ msg = (session_worker_update_reply_msg_t *) data;
+ s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
+ if (!s)
+ {
+ VDBG (0, "unknown handle 0x%llx", msg->handle);
+ return;
+ }
+ if (vcl_segment_is_not_mounted (wrk, msg->segment_handle))
+ {
+ clib_warning ("segment for session %u is not mounted!",
+ s->session_index);
+ return;
+ }
+
+ if (s->rx_fifo)
+ {
+ s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+ s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+ s->rx_fifo->client_session_index = s->session_index;
+ s->tx_fifo->client_session_index = s->session_index;
+ s->rx_fifo->client_thread_index = wrk->wrk_index;
+ s->tx_fifo->client_thread_index = wrk->wrk_index;
+ }
+ s->session_state = STATE_UPDATED;
+
+ VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
+ s->vpp_handle, wrk->wrk_index);
+}
+
+static void
+vcl_session_app_add_segment_handler (vcl_worker_t * wrk, void *data)
+{
+ ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
+ session_app_add_segment_msg_t *msg;
+ u64 segment_handle;
+ int fd = -1;
+
+ msg = (session_app_add_segment_msg_t *) data;
+
+ if (msg->fd_flags)
+ {
+ vl_socket_client_recv_fd_msg2 (&wrk->bapi_sock_ctx, &fd, 1, 5);
+ seg_type = SSVM_SEGMENT_MEMFD;
+ }
+
+ segment_handle = msg->segment_handle;
+ if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
+ {
+ clib_warning ("invalid segment handle");
+ return;
+ }
+
+ if (vcl_segment_attach (segment_handle, (char *) msg->segment_name,
+ seg_type, fd))
+ {
+ VDBG (0, "vcl_segment_attach ('%s') failed", msg->segment_name);
+ return;
+ }
+
+ VDBG (1, "mapped new segment '%s' size %d", msg->segment_name,
+ msg->segment_size);
+}
+
+static void
+vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
+{
+ session_app_del_segment_msg_t *msg = (session_app_del_segment_msg_t *) data;
+ vcl_segment_detach (msg->segment_handle);
+ VDBG (1, "Unmapped segment: %d", msg->segment_handle);
+}
+
+static int
+vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
+{
+ session_disconnected_msg_t *disconnected_msg;
+ vcl_session_t *session;
+
+ switch (e->event_type)
+ {
+ case SESSION_IO_EVT_RX:
+ case SESSION_IO_EVT_TX:
+ session = vcl_session_get (wrk, e->session_index);
+ if (!session || !(session->session_state & STATE_OPEN))
+ break;
+ vec_add1 (wrk->unhandled_evts_vector, *e);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ vcl_session_connected_handler (wrk,
+ (session_connected_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ session = vcl_session_disconnected_handler (wrk, disconnected_msg);
+ if (!session)
+ break;
+ VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
+ session->vpp_handle);
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_BOUND:
+ vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN_REPLY:
+ vcl_session_unlisten_reply_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_MIGRATED:
+ vcl_session_migrated_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_CLEANUP:
+ vcl_session_cleanup_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+ vcl_session_req_worker_update_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+ vcl_session_worker_update_reply_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
+ vcl_session_app_add_segment_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
+ vcl_session_app_del_segment_handler (wrk, e->data);
+ break;
+ default:
+ clib_warning ("unhandled %u", e->event_type);
+ }
+ return VPPCOM_OK;
+}
+
+static int
+vppcom_wait_for_session_state_change (u32 session_index,
+ vcl_session_state_t state,
+ f64 wait_for_time)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
+ vcl_session_t *volatile session;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
+
+ do
+ {
+ session = vcl_session_get (wrk, session_index);
+ if (PREDICT_FALSE (!session))
+ {
+ return VPPCOM_EBADFD;
+ }
+ if (session->session_state & state)
+ {
+ return VPPCOM_OK;
+ }
+ if (session->session_state & STATE_DETACHED)
+ {
+ return VPPCOM_ECONNREFUSED;
+ }
+
+ if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
+ {
+ usleep (100);
+ continue;
+ }
+ e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
+ vcl_handle_mq_event (wrk, e);
+ svm_msg_q_free_msg (wrk->app_event_queue, &msg);
+ }
+ while (clib_time_now (&wrk->clib_time) < timeout);
+
+ VDBG (0, "timeout waiting for state 0x%x (%s)", state,
+ vppcom_session_state_str (state));
+ vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
+
+ return VPPCOM_ETIMEDOUT;
+}
+
+static void
+vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
+{
+ vcl_session_state_t state;
+ vcl_session_t *s;
+ u32 *sip;
+
+ if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
+ return;
+
+ vec_foreach (sip, wrk->pending_session_wrk_updates)
+ {
+ s = vcl_session_get (wrk, *sip);
+ vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
+ state = s->session_state;
+ vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5);
+ s->session_state = state;
+ }
+ vec_reset_length (wrk->pending_session_wrk_updates);
+}
+
+void
+vcl_flush_mq_events (void)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ svm_msg_q_msg_t *msg;
+ session_event_t *e;
+ svm_msg_q_t *mq;
+ int i;
+
+ mq = wrk->app_event_queue;
+ svm_msg_q_lock (mq);
+ vcl_mq_dequeue_batch (wrk, mq, ~0);
+ svm_msg_q_unlock (mq);
+
+ for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
+ {
+ msg = vec_elt_at_index (wrk->mq_msg_vector, i);
+ e = svm_msg_q_msg_data (mq, msg);
+ vcl_handle_mq_event (wrk, e);
+ svm_msg_q_free_msg (mq, msg);
+ }
+ vec_reset_length (wrk->mq_msg_vector);
+ vcl_handle_pending_wrk_updates (wrk);
+}
+
+static int
+vppcom_app_session_enable (void)
+{
+ int rv;
+
+ if (vcm->app_state != STATE_APP_ENABLED)
+ {
+ vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
+ rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
+ if (PREDICT_FALSE (rv))
+ {
+ VDBG (0, "application session enable timed out! returning %d (%s)",
+ rv, vppcom_retval_str (rv));
+ return rv;
+ }
+ }
+ return VPPCOM_OK;
+}
+
+static int
+vppcom_app_attach (void)
+{
+ int rv;
+
+ vppcom_app_send_attach ();
+ rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
+ if (PREDICT_FALSE (rv))
+ {
+ VDBG (0, "application attach timed out! returning %d (%s)", rv,
+ vppcom_retval_str (rv));
+ return rv;
+ }
+
+ return VPPCOM_OK;
+}
+
+static int
+vppcom_session_unbind (u32 session_handle)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ session_accepted_msg_t *accepted_msg;
+ vcl_session_t *session = 0;
+ vcl_session_msg_t *evt;
+
+ session = vcl_session_get_w_handle (wrk, session_handle);
+ if (!session)
+ return VPPCOM_EBADFD;
+
+ /* Flush pending accept events, if any */
+ while (clib_fifo_elts (session->accept_evts_fifo))
+ {
+ clib_fifo_sub2 (session->accept_evts_fifo, evt);
+ accepted_msg = &evt->accepted_msg;
+ vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle);
+ vcl_send_session_accepted_reply (session->vpp_evt_q,
+ accepted_msg->context,
+ session->vpp_handle, -1);
+ }
+ clib_fifo_free (session->accept_evts_fifo);
+
+ vcl_send_session_unlisten (wrk, session);
+
+ VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
+ session->vpp_handle);
+ vcl_evt (VCL_EVT_UNBIND, session);
+
+ session->vpp_handle = ~0;
+ session->session_state = STATE_DISCONNECT;
+
+ return VPPCOM_OK;
+}
+
+static int
+vppcom_session_disconnect (u32 session_handle)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ svm_msg_q_t *vpp_evt_q;
+ vcl_session_t *session, *listen_session;
+ vcl_session_state_t state;
+ u64 vpp_handle;
+
+ session = vcl_session_get_w_handle (wrk, session_handle);
+ if (!session)
+ return VPPCOM_EBADFD;
+
+ vpp_handle = session->vpp_handle;