__thread uword __vcl_worker_index = ~0;
-
static int
vcl_wait_for_segment (u64 segment_handle)
{
f64 timeout;
if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
- return 1;
+ return 0;
timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
while (clib_time_now (&wrk->clib_time) < timeout)
}
const char *
-vppcom_session_state_str (session_state_t state)
+vppcom_session_state_str (vcl_session_state_t state)
{
char *st;
st = "STATE_FAILED";
break;
+ case STATE_UPDATED:
+ st = "STATE_UPDATED";
+ break;
+
+ case STATE_LISTEN_NO_MQ:
+ st = "STATE_LISTEN_NO_MQ";
+ break;
+
default:
st = "UNKNOWN_STATE";
break;
vcl_session_table_add_listener (wrk, mp->handle, sid);
session->session_state = STATE_LISTEN;
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+ vec_validate (wrk->vpp_event_queues, 0);
+ wrk->vpp_event_queues[0] = session->vpp_evt_q;
+
if (session->is_dgram)
{
svm_fifo_t *rx_fifo, *tx_fifo;
return sid;
}
+static void
+vcl_session_unlisten_reply_handler (vcl_worker_t * wrk, void *data)
+{
+ session_unlisten_reply_msg_t *mp = (session_unlisten_reply_msg_t *) data;
+ vcl_session_t *s;
+
+ s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
+ if (!s || s->session_state != STATE_DISCONNECT)
+ {
+ VDBG (0, "Unlisten reply with wrong handle %llx", mp->handle);
+ return;
+ }
+
+ if (mp->retval)
+ VDBG (0, "ERROR: session %u [0xllx]: unlisten failed: %U",
+ s->session_index, mp->handle, format_api_error, ntohl (mp->retval));
+
+ if (mp->context != wrk->wrk_index)
+ VDBG (0, "wrong context");
+
+ vcl_session_table_del_vpp_handle (wrk, mp->handle);
+ vcl_session_free (wrk, s);
+}
+
static vcl_session_t *
vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
{
s->session_index);
return;
}
- s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
- s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
-
- s->rx_fifo->client_session_index = s->session_index;
- s->tx_fifo->client_session_index = s->session_index;
- s->rx_fifo->client_thread_index = wrk->wrk_index;
- s->tx_fifo->client_thread_index = wrk->wrk_index;
- s->session_state = STATE_UPDATED;
- if (s->shared_index != VCL_INVALID_SESSION_INDEX)
+ if (s->rx_fifo)
{
- vcl_shared_session_t *ss;
- ss = vcl_shared_session_get (s->shared_index);
- if (vec_len (ss->workers) > 1)
- VDBG (0, "workers need to be updated");
+ s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+ s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+ s->rx_fifo->client_session_index = s->session_index;
+ s->tx_fifo->client_session_index = s->session_index;
+ s->rx_fifo->client_thread_index = wrk->wrk_index;
+ s->tx_fifo->client_thread_index = wrk->wrk_index;
}
+ s->session_state = STATE_UPDATED;
+
VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
s->vpp_handle, wrk->wrk_index);
}
session = vcl_session_disconnected_handler (wrk, disconnected_msg);
if (!session)
break;
- session->session_state = STATE_DISCONNECT;
VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
session->vpp_handle);
break;
case SESSION_CTRL_EVT_BOUND:
vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
break;
+ case SESSION_CTRL_EVT_UNLISTEN_REPLY:
+ vcl_session_unlisten_reply_handler (wrk, e->data);
+ break;
case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
vcl_session_req_worker_update_handler (wrk, e->data);
break;
static int
vppcom_wait_for_session_state_change (u32 session_index,
- session_state_t state,
+ vcl_session_state_t state,
f64 wait_for_time)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
static void
vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
{
- session_state_t state;
+ vcl_session_state_t state;
vcl_session_t *s;
u32 *sip;
vec_reset_length (wrk->pending_session_wrk_updates);
}
-static void
+void
vcl_flush_mq_events (void)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
return VPPCOM_EBADFD;
vpp_handle = session->vpp_handle;
- vcl_session_table_del_listener (wrk, vpp_handle);
session->vpp_handle = ~0;
session->session_state = STATE_DISCONNECT;
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
- " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
+ VDBG (1, "vpp handle 0x%llx, sid %u: sending unbind msg! new state"
+ " 0x%x (%s)", vpp_handle, session_handle, STATE_DISCONNECT,
vppcom_session_state_str (STATE_DISCONNECT));
vcl_evt (VCL_EVT_UNBIND, session);
- vppcom_send_unbind_sock (vpp_handle);
+ vppcom_send_unbind_sock (wrk, vpp_handle);
return VPPCOM_OK;
}
vcl_worker_t *wrk = vcl_worker_get_current ();
svm_msg_q_t *vpp_evt_q;
vcl_session_t *session;
- session_state_t state;
+ vcl_session_state_t state;
u64 vpp_handle;
session = vcl_session_get_w_handle (wrk, session_handle);
return VPPCOM_OK;
}
-static void
-vcl_cleanup_bapi (void)
-{
- socket_client_main_t *scm = &socket_client_main;
- api_main_t *am = &api_main;
-
- am->my_client_index = ~0;
- am->my_registration = 0;
- am->vl_input_queue = 0;
- am->msg_index_by_name_and_crc = 0;
- scm->socket_fd = 0;
-
- vl_client_api_unmap ();
-}
-
-static void
-vcl_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
-{
- vcl_worker_t *sub_child;
- int tries = 0;
-
- if (child_wrk->forked_child != ~0)
- {
- sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
- if (sub_child)
- {
- /* Wait a bit, maybe the process is going away */
- while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
- usleep (1e3);
- if (kill (sub_child->current_pid, 0) < 0)
- vcl_cleanup_forked_child (child_wrk, sub_child);
- }
- }
- vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ );
- VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index);
- wrk->forked_child = ~0;
-}
-
-static struct sigaction old_sa;
-
-static void
-vcl_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
-{
- vcl_worker_t *wrk, *child_wrk;
-
- if (vcl_get_worker_index () == ~0)
- return;
-
- if (sigaction (SIGCHLD, &old_sa, 0))
- {
- VERR ("couldn't restore sigchld");
- exit (-1);
- }
-
- wrk = vcl_worker_get_current ();
- if (wrk->forked_child == ~0)
- return;
-
- child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
- if (!child_wrk)
- goto done;
-
- if (si && si->si_pid != child_wrk->current_pid)
- {
- VDBG (0, "unexpected child pid %u", si->si_pid);
- goto done;
- }
- vcl_cleanup_forked_child (wrk, child_wrk);
-
-done:
- if (old_sa.sa_flags & SA_SIGINFO)
- {
- void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
- fn (signum, si, uc);
- }
- else
- {
- void (*fn) (int) = old_sa.sa_handler;
- if (fn)
- fn (signum);
- }
-}
-
-static void
-vcl_incercept_sigchld ()
-{
- struct sigaction sa;
- clib_memset (&sa, 0, sizeof (sa));
- sa.sa_sigaction = vcl_intercept_sigchld_handler;
- sa.sa_flags = SA_SIGINFO;
- if (sigaction (SIGCHLD, &sa, &old_sa))
- {
- VERR ("couldn't intercept sigchld");
- exit (-1);
- }
-}
-
-static void
-vcl_app_pre_fork (void)
-{
- vcl_incercept_sigchld ();
- vcl_flush_mq_events ();
-}
-
-static void
-vcl_app_fork_child_handler (void)
-{
- vcl_worker_t *parent_wrk, *wrk;
- int rv, parent_wrk_index;
- u8 *child_name;
-
- parent_wrk_index = vcl_get_worker_index ();
- VDBG (0, "initializing forked child with parent wrk %u", parent_wrk_index);
-
- /*
- * Allocate worker
- */
- vcl_set_worker_index (~0);
- if (!vcl_worker_alloc_and_init ())
- VERR ("couldn't allocate new worker");
-
- /*
- * Attach to binary api
- */
- child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
- vcl_cleanup_bapi ();
- vppcom_api_hookup ();
- vcm->app_state = STATE_APP_START;
- rv = vppcom_connect_to_vpp ((char *) child_name);
- vec_free (child_name);
- if (rv)
- {
- VERR ("couldn't connect to VPP!");
- return;
- }
-
- /*
- * Register worker with vpp and share sessions
- */
- vcl_worker_register_with_vpp ();
- parent_wrk = vcl_worker_get (parent_wrk_index);
- wrk = vcl_worker_get_current ();
- wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
- vcl_worker_share_sessions (parent_wrk);
- parent_wrk->forked_child = vcl_get_worker_index ();
-
- VDBG (0, "forked child main worker initialized");
- vcm->forking = 0;
-}
-
-static void
-vcl_app_fork_parent_handler (void)
-{
- vcm->forking = 1;
- while (vcm->forking)
- ;
-}
-
/**
* Handle app exit
*
pool_alloc (vcm->workers, vcl_cfg->max_workers);
clib_spinlock_init (&vcm->workers_lock);
clib_rwlock_init (&vcm->segment_table_lock);
- pthread_atfork (vcl_app_pre_fork, vcl_app_fork_parent_handler,
- vcl_app_fork_child_handler);
atexit (vppcom_app_exit);
/* Allocate default worker */
}
int
-vppcom_session_close (uint32_t session_handle)
+vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
+ vcl_session_handle_t sh, u8 do_disconnect)
{
- vcl_worker_t *wrk = vcl_worker_get_current ();
- u8 is_vep, do_disconnect = 1;
- vcl_session_t *session = 0;
- session_state_t state;
+ vcl_session_state_t state;
u32 next_sh, vep_sh;
int rv = VPPCOM_OK;
u64 vpp_handle;
-
- session = vcl_session_get_w_handle (wrk, session_handle);
- if (!session)
- return VPPCOM_EBADFD;
-
- if (session->shared_index != ~0)
- do_disconnect = vcl_worker_unshare_session (wrk, session);
+ u8 is_vep;
is_vep = session->is_vep;
next_sh = session->vep.next_sh;
state = session->session_state;
vpp_handle = session->vpp_handle;
- VDBG (1, "closing session handle %u vpp handle %u", session_handle,
- vpp_handle);
+ VDBG (1, "session %u [0x%llx] closing", session->session_index, vpp_handle);
if (is_vep)
{
while (next_sh != ~0)
{
- rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
+ rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
if (PREDICT_FALSE (rv < 0))
VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u"
" failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
{
if (session->is_vep_session)
{
- rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
+ rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, sh, 0);
if (rv < 0)
- VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u "
- "failed! rv %d (%s)", vpp_handle, session_handle, vep_sh,
- rv, vppcom_retval_str (rv));
+ VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
+ "failed! rv %d (%s)", session->session_index, vpp_handle,
+ vep_sh, rv, vppcom_retval_str (rv));
}
if (!do_disconnect)
{
- VDBG (0, "session handle %u [0x%llx] disconnect skipped",
- session_handle, vpp_handle);
+ VDBG (1, "session %u [0x%llx] disconnect skipped",
+ session->session_index, vpp_handle);
goto cleanup;
}
if (state & STATE_LISTEN)
{
- rv = vppcom_session_unbind (session_handle);
+ rv = vppcom_session_unbind (sh);
if (PREDICT_FALSE (rv < 0))
- VDBG (0, "vpp handle 0x%llx, sid %u: listener unbind failed! "
- "rv %d (%s)", vpp_handle, session_handle, rv,
+ VDBG (0, "session %u [0x%llx]: listener unbind failed! "
+ "rv %d (%s)", session->session_index, vpp_handle, rv,
vppcom_retval_str (rv));
+ return rv;
}
else if (state & STATE_OPEN)
{
- rv = vppcom_session_disconnect (session_handle);
+ rv = vppcom_session_disconnect (sh);
if (PREDICT_FALSE (rv < 0))
- clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
- "session disconnect failed! rv %d (%s)",
- getpid (), vpp_handle, session_handle,
- rv, vppcom_retval_str (rv));
+ VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
+ " rv %d (%s)", session->session_index, vpp_handle,
+ rv, vppcom_retval_str (rv));
}
else if (state == STATE_DISCONNECT)
{
}
}
-cleanup:
-
if (vcl_session_is_ct (session))
{
vcl_cut_through_registration_t *ctr;
vcl_ct_registration_unlock (wrk);
}
+ VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle);
+
+cleanup:
vcl_session_table_del_vpp_handle (wrk, vpp_handle);
vcl_session_free (wrk, session);
-
- VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
-
vcl_evt (VCL_EVT_CLOSE, session, rv);
return rv;
}
+int
+vppcom_session_close (uint32_t session_handle)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *session;
+
+ session = vcl_session_get_w_handle (wrk, session_handle);
+ if (!session)
+ return VPPCOM_EBADFD;
+ return vcl_session_cleanup (wrk, session, session_handle,
+ 1 /* do_disconnect */ );
+}
+
int
vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
{
if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
{
- VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: session already "
+ VDBG (0, "session handle %u [0x%llx]: session already "
"connected to %s %U port %d proto %s, state 0x%x (%s)",
- getpid (), session->vpp_handle, session_handle,
+ session_handle, session->vpp_handle,
session->transport.is_ip4 ? "IPv4" : "IPv6",
format_ip46_address,
&session->transport.rmt_ip, session->transport.is_ip4 ?
sizeof (ip6_address_t));
session->transport.rmt_port = server_ep->port;
- VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connecting to server %s %U "
- "port %d proto %s",
- getpid (), session->vpp_handle, session_handle,
+ VDBG (0, "session handle %u [0x%llx]: connecting to server %s %U "
+ "port %d proto %s", session_handle, session->vpp_handle,
session->transport.is_ip4 ? "IPv4" : "IPv6",
format_ip46_address,
&session->transport.rmt_ip, session->transport.is_ip4 ?
return (e->event_type == SESSION_IO_EVT_CT_TX);
}
-static inline u8
-vcl_session_is_readable (vcl_session_t * s)
-{
- return ((s->session_state & STATE_OPEN)
- || (s->session_state == STATE_LISTEN
- && s->session_type == VPPCOM_PROTO_UDP));
-}
-
static inline int
vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
u8 peek)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
- int n_read = 0, rv, is_nonblocking;
+ int n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
svm_msg_q_msg_t msg;
if (PREDICT_FALSE (!s || s->is_vep))
return VPPCOM_EBADFD;
- if (PREDICT_FALSE (!vcl_session_is_readable (s)))
+ if (PREDICT_FALSE (!vcl_session_is_open (s)))
{
- session_state_t state = s->session_state;
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
- VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s),"
- " returning %d (%s)", session_handle, s->vpp_handle, state,
- vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
- return rv;
+ VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s)",
+ s->session_index, s->vpp_handle, s->session_state,
+ vppcom_session_state_str (s->session_state));
+ return vcl_session_closed_error (s);
}
is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
}
while (svm_fifo_is_empty (rx_fifo))
{
+ if (vcl_session_is_closing (s))
+ return vcl_session_closing_error (s);
+
svm_fifo_unset_event (rx_fifo);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
-
- if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
- return VPPCOM_ECONNRESET;
}
}
if (svm_fifo_is_empty (rx_fifo))
svm_fifo_unset_event (rx_fifo);
- if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
+ if (is_ct && svm_fifo_needs_tx_ntf (rx_fifo, n_read))
{
- svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
+ svm_fifo_clear_tx_ntf (s->rx_fifo);
app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
SVM_Q_WAIT);
}
vppcom_data_segments_t ds)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
- int n_read = 0, rv, is_nonblocking;
+ int n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
svm_msg_q_msg_t msg;
if (PREDICT_FALSE (!s || s->is_vep))
return VPPCOM_EBADFD;
- if (PREDICT_FALSE (!vcl_session_is_readable (s)))
- {
- session_state_t state = s->session_state;
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
- return rv;
- }
+ if (PREDICT_FALSE (!vcl_session_is_open (s)))
+ return vcl_session_closed_error (s);
is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
}
while (svm_fifo_is_empty (rx_fifo))
{
+ if (vcl_session_is_closing (s))
+ return vcl_session_closing_error (s);
+
svm_fifo_unset_event (rx_fifo);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
-
- if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
- return VPPCOM_ECONNRESET;
}
}
svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
}
-static inline int
-vppcom_session_read_ready (vcl_session_t * session)
-{
- /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
- if (PREDICT_FALSE (session->is_vep))
- {
- clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
- "epoll session!", getpid (), session->session_index);
- return VPPCOM_EBADFD;
- }
-
- if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
- {
- session_state_t state = session->session_state;
- int rv;
-
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
- " state 0x%x (%s), returning %d (%s)", getpid (),
- session->vpp_handle, session->session_index, state,
- vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
- return rv;
- }
-
- if (session->session_state & STATE_LISTEN)
- return clib_fifo_elts (session->accept_evts_fifo);
-
- return svm_fifo_max_dequeue (session->rx_fifo);
-}
-
int
vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
{
u8 is_flush)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
- int rv, n_write, is_nonblocking;
+ int n_write, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *tx_fifo = 0;
session_evt_type_t et;
if (PREDICT_FALSE (s->is_vep))
{
- clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
- "cannot write to an epoll session!",
- getpid (), s->vpp_handle, session_handle);
-
+ VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
+ " session!", s->session_index, s->vpp_handle);
return VPPCOM_EBADFD;
}
- if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
+ if (PREDICT_FALSE (!vcl_session_is_open (s)))
{
- session_state_t state = s->session_state;
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
- "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
- state, vppcom_session_state_str (state));
- return rv;
+ VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
+ s->session_index, s->vpp_handle, s->session_state,
+ vppcom_session_state_str (s->session_state));
+ return vcl_session_closed_error (s);;
}
tx_fifo = s->tx_fifo;
}
while (svm_fifo_is_full (tx_fifo))
{
- svm_fifo_set_want_tx_evt (tx_fifo, 1);
+ svm_fifo_add_want_tx_ntf (tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
+ if (vcl_session_is_closing (s))
+ return vcl_session_closing_error (s);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
svm_msg_q_wait (mq);
if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
-
- if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
- return VPPCOM_ECONNRESET;
}
}
ASSERT (n_write > 0);
- VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
- s->vpp_handle, session_handle, n_write);
+ VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
+ s->vpp_handle, n_write);
return n_write;
}
return 0;
}
-static inline int
-vppcom_session_write_ready (vcl_session_t * session)
-{
- /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
- if (PREDICT_FALSE (session->is_vep))
- {
- clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
- "cannot write to an epoll session!",
- getpid (), session->vpp_handle, session->session_index);
- return VPPCOM_EBADFD;
- }
-
- if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
- {
- clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
- "cannot write to a listen session!",
- getpid (), session->vpp_handle, session->session_index);
- return VPPCOM_EBADFD;
- }
-
- if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
- {
- session_state_t state = session->session_state;
- int rv;
-
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
- clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
- "session is not open! state 0x%x (%s), "
- "returning %d (%s)", getpid (), session->vpp_handle,
- session->session_index,
- state, vppcom_session_state_str (state),
- rv, vppcom_retval_str (rv));
- return rv;
- }
-
- VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
- getpid (), session->vpp_handle, session->session_index,
- session->tx_fifo, svm_fifo_max_enqueue (session->tx_fifo));
-
- return svm_fifo_max_enqueue (session->tx_fifo);
-}
-
#define vcl_fifo_rx_evt_valid_or_break(_fifo) \
if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \
{ \
*bits_set += 1;
}
break;
+ case SESSION_CTRL_EVT_UNLISTEN_REPLY:
+ vcl_session_unlisten_reply_handler (wrk, e->data);
+ break;
case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
vcl_session_worker_update_reply_handler (wrk, e->data);
break;
}
static int
-vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
- unsigned long *read_map, unsigned long *write_map,
- unsigned long *except_map, double time_to_wait,
+vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
+ vcl_si_set * read_map, vcl_si_set * write_map,
+ vcl_si_set * except_map, double time_to_wait,
u32 * bits_set)
{
double total_wait = 0, wait_slice;
vcl_cut_through_registration_t *cr;
- time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
+ time_to_wait = (time_to_wait == -1) ? 1e6 : time_to_wait;
wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
do
{
vcl_ct_registration_unlock (wrk);
vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
- write_map, except_map, time_to_wait, bits_set);
+ write_map, except_map, wait_slice, bits_set);
total_wait += wait_slice;
if (*bits_set)
return *bits_set;
}
static int
-vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
- unsigned long *read_map, unsigned long *write_map,
- unsigned long *except_map, double time_to_wait,
+vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
+ vcl_si_set * read_map, vcl_si_set * write_map,
+ vcl_si_set * except_map, double time_to_wait,
u32 * bits_set)
{
vcl_mq_evt_conn_t *mqc;
}
int
-vppcom_select (unsigned long n_bits, unsigned long *read_map,
- unsigned long *write_map, unsigned long *except_map,
- double time_to_wait)
+vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
+ vcl_si_set * except_map, double time_to_wait)
{
u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
vcl_worker_t *wrk = vcl_worker_get_current ();
vcl_session_t *session = 0;
int rv, i;
- STATIC_ASSERT (sizeof (clib_bitmap_t) == sizeof (unsigned long),
- "vppcom bitmap size mismatch");
- STATIC_ASSERT (sizeof (clib_bitmap_t) == sizeof (fd_mask),
- "vppcom bitmap size mismatch");
- STATIC_ASSERT (sizeof (clib_bitmap_t) == sizeof (uword),
- "vppcom bitmap size mismatch");
-
if (n_bits && read_map)
{
clib_bitmap_validate (wrk->rd_bitmap, minbits);
clib_memcpy_fast (wrk->rd_bitmap, read_map,
- vec_len (wrk->rd_bitmap) * sizeof (unsigned long));
- memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (unsigned long));
+ vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
+ memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
}
if (n_bits && write_map)
{
clib_bitmap_validate (wrk->wr_bitmap, minbits);
clib_memcpy_fast (wrk->wr_bitmap, write_map,
- vec_len (wrk->wr_bitmap) * sizeof (unsigned long));
- memset (write_map, 0,
- vec_len (wrk->wr_bitmap) * sizeof (unsigned long));
+ vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
+ memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
}
if (n_bits && except_map)
{
clib_bitmap_validate (wrk->ex_bitmap, minbits);
clib_memcpy_fast (wrk->ex_bitmap, except_map,
- vec_len (wrk->ex_bitmap) * sizeof (unsigned long));
- memset (except_map, 0,
- vec_len (wrk->ex_bitmap) * sizeof (unsigned long));
+ vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
+ memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
}
if (!n_bits)
clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
bits_set++;
}
+ else
+ svm_fifo_add_want_tx_ntf (session->tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
}));
check_rd:
continue;
}
- rv = vppcom_session_read_ready (session);
+ rv = vcl_session_read_ready (session);
if (rv)
{
clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
session->is_vep_session = 1;
vep_session->vep.next_sh = session_handle;
+ if (session->tx_fifo)
+ svm_fifo_add_want_tx_ntf (session->tx_fifo,
+ SVM_FIFO_WANT_TX_NOTIF_IF_FULL);
+
VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
vep_handle, session_handle, event->events, event->data.u64);
vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
session->vep.prev_sh = ~0;
session->vep.vep_sh = ~0;
session->is_vep_session = 0;
+
+ if (session->tx_fifo)
+ svm_fifo_del_want_tx_ntf (session->tx_fifo, SVM_FIFO_NO_TX_NOTIF);
+
VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sid %u!", vep_handle,
session_handle);
vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
add_event = 1;
events[*num_ev].events |= EPOLLOUT;
session_evt_data = session->vep.ev.data.u64;
+ svm_fifo_reset_tx_ntf (session->tx_fifo);
break;
case SESSION_IO_EVT_CT_TX:
vcl_fifo_rx_evt_valid_or_break (e->fifo);
add_event = 1;
events[*num_ev].events |= EPOLLOUT;
session_evt_data = session->vep.ev.data.u64;
+ svm_fifo_reset_tx_ntf (session->tx_fifo);
break;
case SESSION_CTRL_EVT_ACCEPTED:
session = vcl_session_accepted (wrk,
if (!(session = vcl_session_get (wrk, sid)))
break;
session_events = session->vep.ev.events;
- if (EPOLLOUT & session_events)
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- }
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
break;
case SESSION_CTRL_EVT_DISCONNECTED:
disconnected_msg = (session_disconnected_msg_t *) e->data;
session = vcl_session_disconnected_handler (wrk, disconnected_msg);
if (!session)
break;
+ session_events = session->vep.ev.events;
+ if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
+ break;
add_event = 1;
events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
break;
case SESSION_CTRL_EVT_RESET:
sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
if (!(session = vcl_session_get (wrk, sid)))
break;
+ session_events = session->vep.ev.events;
+ if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
+ break;
add_event = 1;
events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN_REPLY:
+ vcl_session_unlisten_reply_handler (wrk, e->data);
break;
case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
vcl_session_req_worker_update_handler (wrk, e->data);
double total_wait = 0, wait_slice;
int rv;
- wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
+ wait_for_time = (wait_for_time == -1) ? (double) 1e6 : wait_for_time;
wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
do
vcl_worker_t *wrk = vcl_worker_get_current ();
vcl_session_t *session;
int rv = VPPCOM_OK;
- u32 *flags = buffer;
+ u32 *flags = buffer, tmp_flags = 0;
vppcom_endpt_t *ep = buffer;
session = vcl_session_get_w_handle (wrk, session_handle);
switch (op)
{
case VPPCOM_ATTR_GET_NREAD:
- rv = vppcom_session_read_ready (session);
- VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
- getpid (), rv);
+ rv = vcl_session_read_ready (session);
+ VDBG (2, "VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d", rv);
break;
case VPPCOM_ATTR_GET_NWRITE:
- rv = vppcom_session_write_ready (session);
+ rv = vcl_session_write_ready (session);
VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
getpid (), session_handle, rv);
break;
rv = VPPCOM_EINVAL;
break;
- case VPPCOM_ATTR_GET_REFCNT:
- rv = vcl_session_get_refcnt (session);
+ case VPPCOM_ATTR_SET_SHUT:
+ if (*flags == SHUT_RD || *flags == SHUT_RDWR)
+ VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_RD);
+ if (*flags == SHUT_WR || *flags == SHUT_RDWR)
+ VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_WR);
break;
+ case VPPCOM_ATTR_GET_SHUT:
+ if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_SHUT_RD))
+ tmp_flags = 1;
+ if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_SHUT_WR))
+ tmp_flags |= 2;
+ if (tmp_flags == 1)
+ *(int *) buffer = SHUT_RD;
+ else if (tmp_flags == 2)
+ *(int *) buffer = SHUT_WR;
+ else if (tmp_flags == 3)
+ *(int *) buffer = SHUT_RDWR;
+ *buflen = sizeof (int);
+ break;
default:
rv = VPPCOM_EINVAL;
break;
for (i = 0; i < n_sids; i++)
{
- session = vcl_session_get (wrk, vp[i].sid);
+ session = vcl_session_get (wrk, vp[i].sh);
if (!session)
{
vp[i].revents = POLLHUP;
if (POLLIN & vp[i].events)
{
- rv = vppcom_session_read_ready (session);
+ rv = vcl_session_read_ready (session);
if (rv > 0)
{
vp[i].revents |= POLLIN;
if (POLLOUT & vp[i].events)
{
- rv = vppcom_session_write_ready (session);
+ rv = vcl_session_write_ready (session);
if (rv > 0)
{
vp[i].revents |= POLLOUT;
for (i = 0; i < n_sids; i++)
{
clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
- ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
+ ".revents 0x%x", getpid (), i, vp[i].sh, vp[i].sh,
vp[i].events, vp[i].revents);
}
}
return session_handle >> 24;
}
-int
-vppcom_session_handle (uint32_t session_index)
-{
- return (vcl_get_worker_index () << 24) | session_index;
-}
-
int
vppcom_worker_register (void)
{
return vcl_get_worker_index ();
}
+int
+vppcom_worker_mqs_epfd (void)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ if (!vcm->cfg.use_mq_eventfd)
+ return -1;
+ return wrk->mqs_epfd;
+}
+
/*
* fd.io coding-style-patch-verification: ON
*