return app_worker_connect_session (client_wrk, &a->sep_ext, &a->sh);
}
+session_error_t
+vnet_connect_stream (vnet_connect_args_t *a)
+{
+ app_worker_t *client_wrk;
+ application_t *client;
+
+ /* stream must be opened on same thread as parent connection */
+ ASSERT (a->sep_ext.parent_handle != SESSION_INVALID_HANDLE);
+ ASSERT (vlib_get_thread_index () ==
+ session_thread_from_handle (a->sep_ext.parent_handle));
+
+ a->sep_ext.opaque = a->api_context;
+
+ client = application_get (a->app_index);
+ client_wrk = application_get_worker (client, a->wrk_map_index);
+
+ return app_worker_connect_stream (client_wrk, &a->sep_ext, &a->sh);
+}
+
session_error_t
vnet_unlisten (vnet_unlisten_args_t *a)
{
void app_worker_free (app_worker_t * app_wrk);
int app_worker_connect_session (app_worker_t *app, session_endpoint_cfg_t *sep,
session_handle_t *rsh);
+int app_worker_connect_stream (app_worker_t *app, session_endpoint_cfg_t *sep,
+ session_handle_t *rsh);
session_error_t app_worker_start_listen (app_worker_t *app_wrk,
app_listener_t *lstnr);
int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al);
session_error_t vnet_application_detach (vnet_app_detach_args_t *a);
session_error_t vnet_listen (vnet_listen_args_t *a);
session_error_t vnet_connect (vnet_connect_args_t *a);
+session_error_t vnet_connect_stream (vnet_connect_args_t *a);
session_error_t vnet_unlisten (vnet_unlisten_args_t *a);
session_error_t vnet_shutdown_session (vnet_shutdown_args_t *a);
session_error_t vnet_disconnect_session (vnet_disconnect_args_t *a);
return session_open (sep, rsh);
}
+int
+app_worker_connect_stream (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
+ session_handle_t *rsh)
+{
+ if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
+ return SESSION_E_REFUSED;
+
+ sep->app_wrk_index = app_wrk->wrk_index;
+
+ return session_open_stream (sep, rsh);
+}
+
int
app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
svm_fifo_t * f,
return s;
}
+static session_t *
+session_alloc_for_stream (session_handle_t parent_handle)
+{
+ session_t *s, *ps;
+ clib_thread_index_t thread_index =
+ session_thread_from_handle (parent_handle);
+
+ ASSERT (thread_index == vlib_get_thread_index ());
+
+ ps = session_get_from_handle (parent_handle);
+ s = session_alloc (thread_index);
+ s->listener_handle = SESSION_INVALID_HANDLE;
+ s->session_type = ps->session_type;
+ session_set_state (s, SESSION_STATE_CLOSED);
+
+ return s;
+}
+
session_t *
session_alloc_for_half_open (transport_connection_t *tc)
{
return session_open_srv_fns[tst](rmt, rsh);
}
+/**
+ * Ask transport to open stream on existing connection.
+ */
+int
+session_open_stream (session_endpoint_cfg_t *sep, session_handle_t *rsh)
+{
+ transport_connection_t *tc;
+ transport_endpoint_cfg_t *tep;
+ app_worker_t *app_wrk;
+ session_t *s;
+ u32 conn_index;
+ int rv;
+
+ app_wrk = app_worker_get (sep->app_wrk_index);
+ tep = session_endpoint_to_transport_cfg (sep);
+
+ /* allocate session and fifos now */
+ s = session_alloc_for_stream (sep->parent_handle);
+ s->app_wrk_index = app_wrk->wrk_index;
+ s->opaque = sep->opaque;
+ s->flags |= SESSION_F_STREAM;
+ if ((rv = app_worker_init_connected (app_wrk, s)))
+ {
+ session_free (s);
+ if (app_worker_application_is_builtin (app_wrk))
+ return rv;
+ return app_worker_connect_notify (app_wrk, 0, rv, sep->opaque);
+ }
+
+ rv = transport_connect_stream (sep->transport_proto, tep, s, &conn_index);
+ if (rv < 0)
+ {
+ SESSION_DBG ("Transport failed to open stream.");
+ segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+ session_free (s);
+ if (app_worker_application_is_builtin (app_wrk))
+ return rv;
+ return app_worker_connect_notify (app_wrk, 0, rv, sep->opaque);
+ }
+
+ session_set_state (s, SESSION_STATE_READY);
+
+ tc =
+ transport_get_connection (sep->transport_proto, conn_index,
+ session_thread_from_handle (sep->parent_handle));
+
+ /* Attach transport to session and vice versa */
+ s->connection_index = tc->c_index;
+ tc->s_index = s->session_index;
+ *rsh = session_handle (s);
+
+ /* builtin apps are synchronous */
+ if (app_worker_application_is_builtin (app_wrk))
+ {
+ s->flags |= SESSION_F_RX_READY;
+ return SESSION_E_NONE;
+ }
+
+ return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, sep->opaque);
+}
+
/**
* Ask transport to listen on session endpoint.
*
}
int session_open (session_endpoint_cfg_t *sep, session_handle_t *rsh);
+int session_open_stream (session_endpoint_cfg_t *sep, session_handle_t *rsh);
int session_listen (session_t * s, session_endpoint_cfg_t * sep);
int session_stop_listen (session_t * s);
void session_half_close (session_t *s);
_ (APP_CLOSED, "app-closed") \
_ (IS_CLESS, "connectionless") \
_ (RX_READY, "rx-ready") \
- _ (TPT_INIT_CLOSE, "transport-init-close")
+ _ (TPT_INIT_CLOSE, "transport-init-close") \
+ _ (STREAM, "stream")
typedef enum session_flags_bits_
{
return tp_vfts[tp].connect (tep);
}
+int
+transport_connect_stream (transport_proto_t tp, transport_endpoint_cfg_t *tep,
+ session_t *stream_session, u32 *conn_index)
+{
+ if (PREDICT_FALSE (!tp_vfts[tp].connect_stream))
+ return SESSION_E_TRANSPORT_NO_REG;
+ return tp_vfts[tp].connect_stream (tep, stream_session, conn_index);
+}
+
void
transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
{
#define SRC_VNET_SESSION_TRANSPORT_H_
#include <vnet/vnet.h>
+#include <vnet/session/session_types.h>
#include <vnet/session/transport_types.h>
#define TRANSPORT_PACER_MIN_MSS 1460
u32 (*start_listen) (u32 session_index, transport_endpoint_cfg_t *lcl);
u32 (*stop_listen) (u32 conn_index);
int (*connect) (transport_endpoint_cfg_t * rmt);
+ int (*connect_stream) (transport_endpoint_cfg_t *rmt,
+ session_t *session_index, u32 *conn_index);
void (*half_close) (u32 conn_index, clib_thread_index_t thread_index);
void (*close) (u32 conn_index, clib_thread_index_t thread_index);
void (*reset) (u32 conn_index, clib_thread_index_t thread_index);
if (VAR_ALLOW_BM & (1 << VAR))
int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep);
+int transport_connect_stream (transport_proto_t tp,
+ transport_endpoint_cfg_t *tep,
+ session_t *stream_session, u32 *conn_index);
void transport_half_close (transport_proto_t tp, u32 conn_index,
u8 thread_index);
void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index);