f64 timeout;
if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
- return 1;
+ return 0;
timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
while (clib_time_now (&wrk->clib_time) < timeout)
st = "STATE_FAILED";
break;
+ case STATE_UPDATED:
+ st = "STATE_UPDATED";
+ break;
+
+ case STATE_LISTEN_NO_MQ:
+ st = "STATE_LISTEN_NO_MQ";
+ break;
+
default:
st = "UNKNOWN_STATE";
break;
vcl_session_table_add_listener (wrk, mp->handle, sid);
session->session_state = STATE_LISTEN;
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+ vec_validate (wrk->vpp_event_queues, 0);
+ wrk->vpp_event_queues[0] = session->vpp_evt_q;
+
if (session->is_dgram)
{
svm_fifo_t *rx_fifo, *tx_fifo;
s->session_index);
return;
}
- s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
- s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
- s->rx_fifo->client_session_index = s->session_index;
- s->tx_fifo->client_session_index = s->session_index;
- s->rx_fifo->client_thread_index = wrk->wrk_index;
- s->tx_fifo->client_thread_index = wrk->wrk_index;
- s->session_state = STATE_UPDATED;
-
- if (s->shared_index != VCL_INVALID_SESSION_INDEX)
+ if (s->rx_fifo)
{
- vcl_shared_session_t *ss;
- ss = vcl_shared_session_get (s->shared_index);
- if (vec_len (ss->workers) > 1)
- VDBG (0, "workers need to be updated");
+ s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+ s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+ s->rx_fifo->client_session_index = s->session_index;
+ s->tx_fifo->client_session_index = s->session_index;
+ s->rx_fifo->client_thread_index = wrk->wrk_index;
+ s->tx_fifo->client_thread_index = wrk->wrk_index;
}
+ s->session_state = STATE_UPDATED;
+
VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
s->vpp_handle, wrk->wrk_index);
}
session = vcl_session_disconnected_handler (wrk, disconnected_msg);
if (!session)
break;
- session->session_state = STATE_DISCONNECT;
VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
session->vpp_handle);
break;
vec_reset_length (wrk->pending_session_wrk_updates);
}
-static void
+void
vcl_flush_mq_events (void)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
session->vpp_handle = ~0;
session->session_state = STATE_DISCONNECT;
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
- " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
+ VDBG (1, "vpp handle 0x%llx, sid %u: sending unbind msg! new state"
+ " 0x%x (%s)", vpp_handle, session_handle, STATE_DISCONNECT,
vppcom_session_state_str (STATE_DISCONNECT));
vcl_evt (VCL_EVT_UNBIND, session);
- vppcom_send_unbind_sock (vpp_handle);
+ vppcom_send_unbind_sock (wrk, vpp_handle);
return VPPCOM_OK;
}
return VPPCOM_OK;
}
-static void
-vcl_cleanup_bapi (void)
-{
- socket_client_main_t *scm = &socket_client_main;
- api_main_t *am = &api_main;
-
- am->my_client_index = ~0;
- am->my_registration = 0;
- am->vl_input_queue = 0;
- am->msg_index_by_name_and_crc = 0;
- scm->socket_fd = 0;
-
- vl_client_api_unmap ();
-}
-
-static void
-vcl_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
-{
- vcl_worker_t *sub_child;
- int tries = 0;
-
- if (child_wrk->forked_child != ~0)
- {
- sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
- if (sub_child)
- {
- /* Wait a bit, maybe the process is going away */
- while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
- usleep (1e3);
- if (kill (sub_child->current_pid, 0) < 0)
- vcl_cleanup_forked_child (child_wrk, sub_child);
- }
- }
- vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ );
- VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index);
- wrk->forked_child = ~0;
-}
-
-static struct sigaction old_sa;
-
-static void
-vcl_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
-{
- vcl_worker_t *wrk, *child_wrk;
-
- if (vcl_get_worker_index () == ~0)
- return;
-
- if (sigaction (SIGCHLD, &old_sa, 0))
- {
- VERR ("couldn't restore sigchld");
- exit (-1);
- }
-
- wrk = vcl_worker_get_current ();
- if (wrk->forked_child == ~0)
- return;
-
- child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
- if (!child_wrk)
- goto done;
-
- if (si && si->si_pid != child_wrk->current_pid)
- {
- VDBG (0, "unexpected child pid %u", si->si_pid);
- goto done;
- }
- vcl_cleanup_forked_child (wrk, child_wrk);
-
-done:
- if (old_sa.sa_flags & SA_SIGINFO)
- {
- void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
- fn (signum, si, uc);
- }
- else
- {
- void (*fn) (int) = old_sa.sa_handler;
- if (fn)
- fn (signum);
- }
-}
-
-static void
-vcl_incercept_sigchld ()
-{
- struct sigaction sa;
- clib_memset (&sa, 0, sizeof (sa));
- sa.sa_sigaction = vcl_intercept_sigchld_handler;
- sa.sa_flags = SA_SIGINFO;
- if (sigaction (SIGCHLD, &sa, &old_sa))
- {
- VERR ("couldn't intercept sigchld");
- exit (-1);
- }
-}
-
-static void
-vcl_app_pre_fork (void)
-{
- vcl_incercept_sigchld ();
- vcl_flush_mq_events ();
-}
-
-static void
-vcl_app_fork_child_handler (void)
-{
- vcl_worker_t *parent_wrk, *wrk;
- int rv, parent_wrk_index;
- u8 *child_name;
-
- parent_wrk_index = vcl_get_worker_index ();
- VDBG (0, "initializing forked child with parent wrk %u", parent_wrk_index);
-
- /*
- * Allocate worker
- */
- vcl_set_worker_index (~0);
- if (!vcl_worker_alloc_and_init ())
- VERR ("couldn't allocate new worker");
-
- /*
- * Attach to binary api
- */
- child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
- vcl_cleanup_bapi ();
- vppcom_api_hookup ();
- vcm->app_state = STATE_APP_START;
- rv = vppcom_connect_to_vpp ((char *) child_name);
- vec_free (child_name);
- if (rv)
- {
- VERR ("couldn't connect to VPP!");
- return;
- }
-
- /*
- * Register worker with vpp and share sessions
- */
- vcl_worker_register_with_vpp ();
- parent_wrk = vcl_worker_get (parent_wrk_index);
- wrk = vcl_worker_get_current ();
- wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
- vcl_worker_share_sessions (parent_wrk);
- parent_wrk->forked_child = vcl_get_worker_index ();
-
- VDBG (0, "forked child main worker initialized");
- vcm->forking = 0;
-}
-
-static void
-vcl_app_fork_parent_handler (void)
-{
- vcm->forking = 1;
- while (vcm->forking)
- ;
-}
-
/**
* Handle app exit
*
pool_alloc (vcm->workers, vcl_cfg->max_workers);
clib_spinlock_init (&vcm->workers_lock);
clib_rwlock_init (&vcm->segment_table_lock);
- pthread_atfork (vcl_app_pre_fork, vcl_app_fork_parent_handler,
- vcl_app_fork_child_handler);
atexit (vppcom_app_exit);
/* Allocate default worker */
}
int
-vppcom_session_close (uint32_t session_handle)
+vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
+ vcl_session_handle_t sh, u8 do_disconnect)
{
- vcl_worker_t *wrk = vcl_worker_get_current ();
- u8 is_vep, do_disconnect = 1;
- vcl_session_t *session = 0;
session_state_t state;
u32 next_sh, vep_sh;
int rv = VPPCOM_OK;
u64 vpp_handle;
-
- session = vcl_session_get_w_handle (wrk, session_handle);
- if (!session)
- return VPPCOM_EBADFD;
-
- if (session->shared_index != ~0)
- do_disconnect = vcl_worker_unshare_session (wrk, session);
+ u8 is_vep;
is_vep = session->is_vep;
next_sh = session->vep.next_sh;
state = session->session_state;
vpp_handle = session->vpp_handle;
- VDBG (1, "closing session handle %u vpp handle %u", session_handle,
- vpp_handle);
+ VDBG (1, "session %u [0x%llx] closing", session->session_index, vpp_handle);
if (is_vep)
{
while (next_sh != ~0)
{
- rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
+ rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
if (PREDICT_FALSE (rv < 0))
VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u"
" failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
{
if (session->is_vep_session)
{
- rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
+ rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, sh, 0);
if (rv < 0)
- VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u "
- "failed! rv %d (%s)", vpp_handle, session_handle, vep_sh,
- rv, vppcom_retval_str (rv));
+ VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
+ "failed! rv %d (%s)", session->session_index, vpp_handle,
+ vep_sh, rv, vppcom_retval_str (rv));
}
if (!do_disconnect)
{
- VDBG (0, "session handle %u [0x%llx] disconnect skipped",
- session_handle, vpp_handle);
+ VDBG (1, "session %u [0x%llx] disconnect skipped",
+ session->session_index, vpp_handle);
goto cleanup;
}
if (state & STATE_LISTEN)
{
- rv = vppcom_session_unbind (session_handle);
+ rv = vppcom_session_unbind (sh);
if (PREDICT_FALSE (rv < 0))
- VDBG (0, "vpp handle 0x%llx, sid %u: listener unbind failed! "
- "rv %d (%s)", vpp_handle, session_handle, rv,
+ VDBG (0, "session %u [0x%llx]: listener unbind failed! "
+ "rv %d (%s)", session->session_index, vpp_handle, rv,
vppcom_retval_str (rv));
}
else if (state & STATE_OPEN)
{
- rv = vppcom_session_disconnect (session_handle);
+ rv = vppcom_session_disconnect (sh);
if (PREDICT_FALSE (rv < 0))
- clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
- "session disconnect failed! rv %d (%s)",
- getpid (), vpp_handle, session_handle,
- rv, vppcom_retval_str (rv));
+ VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
+ " rv %d (%s)", session->session_index, vpp_handle,
+ rv, vppcom_retval_str (rv));
}
else if (state == STATE_DISCONNECT)
{
}
}
-cleanup:
-
if (vcl_session_is_ct (session))
{
vcl_cut_through_registration_t *ctr;
vcl_ct_registration_unlock (wrk);
}
+ VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle);
+
+cleanup:
vcl_session_table_del_vpp_handle (wrk, vpp_handle);
vcl_session_free (wrk, session);
-
- VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
-
vcl_evt (VCL_EVT_CLOSE, session, rv);
return rv;
}
+int
+vppcom_session_close (uint32_t session_handle)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *session;
+
+ session = vcl_session_get_w_handle (wrk, session_handle);
+ if (!session)
+ return VPPCOM_EBADFD;
+ return vcl_session_cleanup (wrk, session, session_handle,
+ 1 /* do_disconnect */ );
+}
+
int
vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
{
return (e->event_type == SESSION_IO_EVT_CT_TX);
}
-static inline u8
-vcl_session_is_readable (vcl_session_t * s)
-{
- return ((s->session_state & STATE_OPEN)
- || (s->session_state == STATE_LISTEN
- && s->session_type == VPPCOM_PROTO_UDP));
-}
-
static inline int
vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
u8 peek)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
- int n_read = 0, rv, is_nonblocking;
+ int n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
svm_msg_q_msg_t msg;
if (PREDICT_FALSE (!s || s->is_vep))
return VPPCOM_EBADFD;
- if (PREDICT_FALSE (!vcl_session_is_readable (s)))
+ if (PREDICT_FALSE (!vcl_session_is_open (s)))
{
- session_state_t state = s->session_state;
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
- VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s),"
- " returning %d (%s)", session_handle, s->vpp_handle, state,
- vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
- return rv;
+ VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s)",
+ s->session_index, s->vpp_handle, s->session_state,
+ vppcom_session_state_str (s->session_state));
+ return vcl_session_closed_error (s);
}
is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
}
while (svm_fifo_is_empty (rx_fifo))
{
+ if (vcl_session_is_closing (s))
+ return vcl_session_closing_error (s);
+
svm_fifo_unset_event (rx_fifo);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
-
- if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
- return VPPCOM_ECONNRESET;
}
}
if (svm_fifo_is_empty (rx_fifo))
svm_fifo_unset_event (rx_fifo);
- if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
+ if (is_ct && svm_fifo_needs_tx_ntf (rx_fifo, n_read))
{
- svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
+ svm_fifo_clear_tx_ntf (s->rx_fifo);
app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
SVM_Q_WAIT);
}
vppcom_data_segments_t ds)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
- int n_read = 0, rv, is_nonblocking;
+ int n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
svm_msg_q_msg_t msg;
if (PREDICT_FALSE (!s || s->is_vep))
return VPPCOM_EBADFD;
- if (PREDICT_FALSE (!vcl_session_is_readable (s)))
- {
- session_state_t state = s->session_state;
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
- return rv;
- }
+ if (PREDICT_FALSE (!vcl_session_is_open (s)))
+ return vcl_session_closed_error (s);
is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
}
while (svm_fifo_is_empty (rx_fifo))
{
+ if (vcl_session_is_closing (s))
+ return vcl_session_closing_error (s);
+
svm_fifo_unset_event (rx_fifo);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
-
- if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
- return VPPCOM_ECONNRESET;
}
}
svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
}
-static inline int
-vppcom_session_read_ready (vcl_session_t * session)
-{
- /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
- if (PREDICT_FALSE (session->is_vep))
- {
- clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
- "epoll session!", getpid (), session->session_index);
- return VPPCOM_EBADFD;
- }
-
- if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
- {
- session_state_t state = session->session_state;
- int rv;
-
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
- " state 0x%x (%s), returning %d (%s)", getpid (),
- session->vpp_handle, session->session_index, state,
- vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
- return rv;
- }
-
- if (session->session_state & STATE_LISTEN)
- return clib_fifo_elts (session->accept_evts_fifo);
-
- return svm_fifo_max_dequeue (session->rx_fifo);
-}
-
int
vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
{
u8 is_flush)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
- int rv, n_write, is_nonblocking;
+ int n_write, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *tx_fifo = 0;
session_evt_type_t et;
if (PREDICT_FALSE (s->is_vep))
{
- clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
- "cannot write to an epoll session!",
- getpid (), s->vpp_handle, session_handle);
-
+ VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
+ " session!", s->session_index, s->vpp_handle);
return VPPCOM_EBADFD;
}
- if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
+ if (PREDICT_FALSE (!vcl_session_is_open (s)))
{
- session_state_t state = s->session_state;
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
- "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
- state, vppcom_session_state_str (state));
- return rv;
+ VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
+ s->session_index, s->vpp_handle, s->session_state,
+ vppcom_session_state_str (s->session_state));
+ return vcl_session_closed_error (s);;
}
tx_fifo = s->tx_fifo;
}
while (svm_fifo_is_full (tx_fifo))
{
- svm_fifo_set_want_tx_evt (tx_fifo, 1);
+ svm_fifo_add_want_tx_ntf (tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
+ if (vcl_session_is_closing (s))
+ return vcl_session_closing_error (s);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
svm_msg_q_wait (mq);
if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
-
- if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
- return VPPCOM_ECONNRESET;
}
}
ASSERT (n_write > 0);
- VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
- s->vpp_handle, session_handle, n_write);
+ VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
+ s->vpp_handle, n_write);
return n_write;
}
return 0;
}
-static inline int
-vppcom_session_write_ready (vcl_session_t * session)
-{
- /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
- if (PREDICT_FALSE (session->is_vep))
- {
- VDBG (0, "session %u [0x%llx]: cannot write to an epoll session!",
- session->session_index, session->vpp_handle);
- return VPPCOM_EBADFD;
- }
-
- if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
- {
- if (session->tx_fifo)
- return svm_fifo_max_enqueue (session->tx_fifo);
- else
- return VPPCOM_EBADFD;
- }
-
- if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
- {
- session_state_t state = session->session_state;
- int rv;
-
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
- VDBG (0, "session %u [0x%llx]: session is not open! state 0x%x (%s), "
- "returning %d (%s)", session->session_index, session->vpp_handle,
- state, vppcom_session_state_str (state), rv,
- vppcom_retval_str (rv));
- return rv;
- }
-
- return svm_fifo_max_enqueue (session->tx_fifo);
-}
-
#define vcl_fifo_rx_evt_valid_or_break(_fifo) \
if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \
{ \
bits_set++;
}
else
- svm_fifo_set_want_tx_evt (session->tx_fifo, 1);
+ svm_fifo_add_want_tx_ntf (session->tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
}));
check_rd:
continue;
}
- rv = vppcom_session_read_ready (session);
+ rv = vcl_session_read_ready (session);
if (rv)
{
clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
session->is_vep_session = 1;
vep_session->vep.next_sh = session_handle;
+ if (session->tx_fifo)
+ svm_fifo_add_want_tx_ntf (session->tx_fifo,
+ SVM_FIFO_WANT_TX_NOTIF_IF_FULL);
+
VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
vep_handle, session_handle, event->events, event->data.u64);
vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
session->vep.prev_sh = ~0;
session->vep.vep_sh = ~0;
session->is_vep_session = 0;
+
+ if (session->tx_fifo)
+ svm_fifo_del_want_tx_ntf (session->tx_fifo, SVM_FIFO_NO_TX_NOTIF);
+
VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sid %u!", vep_handle,
session_handle);
vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
add_event = 1;
events[*num_ev].events |= EPOLLOUT;
session_evt_data = session->vep.ev.data.u64;
+ svm_fifo_reset_tx_ntf (session->tx_fifo);
break;
case SESSION_IO_EVT_CT_TX:
vcl_fifo_rx_evt_valid_or_break (e->fifo);
add_event = 1;
events[*num_ev].events |= EPOLLOUT;
session_evt_data = session->vep.ev.data.u64;
+ svm_fifo_reset_tx_ntf (session->tx_fifo);
break;
case SESSION_CTRL_EVT_ACCEPTED:
session = vcl_session_accepted (wrk,
if (!(session = vcl_session_get (wrk, sid)))
break;
session_events = session->vep.ev.events;
- if (EPOLLOUT & session_events)
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- }
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
break;
case SESSION_CTRL_EVT_DISCONNECTED:
disconnected_msg = (session_disconnected_msg_t *) e->data;
session = vcl_session_disconnected_handler (wrk, disconnected_msg);
if (!session)
break;
+ session_events = session->vep.ev.events;
+ if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
+ break;
add_event = 1;
events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
break;
case SESSION_CTRL_EVT_RESET:
sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
if (!(session = vcl_session_get (wrk, sid)))
break;
+ session_events = session->vep.ev.events;
+ if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
+ break;
add_event = 1;
events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
break;
case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
vcl_session_req_worker_update_handler (wrk, e->data);
double total_wait = 0, wait_slice;
int rv;
- wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
+ wait_for_time = (wait_for_time == -1) ? (double) 1e6 : wait_for_time;
wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
do
switch (op)
{
case VPPCOM_ATTR_GET_NREAD:
- rv = vppcom_session_read_ready (session);
+ rv = vcl_session_read_ready (session);
VDBG (2, "VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d", rv);
break;
case VPPCOM_ATTR_GET_NWRITE:
- rv = vppcom_session_write_ready (session);
+ rv = vcl_session_write_ready (session);
VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
getpid (), session_handle, rv);
break;
rv = VPPCOM_EINVAL;
break;
- case VPPCOM_ATTR_GET_REFCNT:
- rv = vcl_session_get_refcnt (session);
- break;
-
case VPPCOM_ATTR_SET_SHUT:
if (*flags == SHUT_RD || *flags == SHUT_RDWR)
VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_RD);
if (POLLIN & vp[i].events)
{
- rv = vppcom_session_read_ready (session);
+ rv = vcl_session_read_ready (session);
if (rv > 0)
{
vp[i].revents |= POLLIN;
if (POLLOUT & vp[i].events)
{
- rv = vppcom_session_write_ready (session);
+ rv = vcl_session_write_ready (session);
if (rv > 0)
{
vp[i].revents |= POLLOUT;
return session_handle >> 24;
}
-int
-vppcom_session_handle (uint32_t session_index)
-{
- return (vcl_get_worker_index () << 24) | session_index;
-}
-
int
vppcom_worker_register (void)
{
return vcl_get_worker_index ();
}
+int
+vppcom_worker_mqs_epfd (void)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ if (!vcm->cfg.use_mq_eventfd)
+ return -1;
+ return wrk->mqs_epfd;
+}
+
/*
* fd.io coding-style-patch-verification: ON
*