#define HALF_OPEN_LOOKUP_INVALID_VALUE ((u64)~0)
#define INVALID_INDEX ((u32)~0)
#define SESSION_PROXY_LISTENER_INDEX ((u32)~0 - 1)
+#define SESSION_LOCAL_HANDLE_PREFIX 0x7FFFFFFF
/* TODO decide how much since we have pre-data as well */
#define MAX_HDRS_LEN 100 /* Max number of bytes for headers */
_(EVENT_FIFO_FULL, "Events not sent for lack of event fifo space") \
_(API_QUEUE_FULL, "Sessions not created for lack of API queue space") \
_(NEW_SEG_NO_SPACE, "Created segment, couldn't allocate a fifo pair") \
-_(NO_SPACE, "Couldn't allocate a fifo pair")
+_(NO_SPACE, "Couldn't allocate a fifo pair") \
+_(SEG_CREATE, "Couldn't create a new segment")
typedef enum
{
void *arg;
} rpc_args_t;
+typedef u64 session_handle_t;
+
/* *INDENT-OFF* */
typedef CLIB_PACKED (struct {
union
{
svm_fifo_t * fifo;
- u64 session_handle;
+ session_handle_t session_handle;
rpc_args_t rpc_args;
};
u8 event_type;
extern session_fifo_rx_fn session_tx_fifo_peek_and_snd;
extern session_fifo_rx_fn session_tx_fifo_dequeue_and_snd;
+extern session_fifo_rx_fn session_tx_fifo_dequeue_internal;
u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e);
/** Per worker-thread session pool peekers rw locks */
clib_rwlock_t *peekers_rw_locks;
- /** Pool of listen sessions. Same type as stream sessions to ease lookups */
- stream_session_t **listen_sessions;
-
/** Per-proto, per-worker enqueue epoch counters */
u32 *current_enqueue_epoch[TRANSPORT_N_PROTO];
/** Session ssvm segment configs*/
uword session_baseva;
- u32 segment_timeout;
+ uword session_va_space_size;
u32 evt_qs_segment_size;
u8 evt_qs_use_memfd_seg;
};
+typedef struct session_dgram_pre_hdr_
+{
+ u32 data_length;
+ u32 data_offset;
+} session_dgram_pre_hdr_t;
+
+/* *INDENT-OFF* */
+typedef CLIB_PACKED (struct session_dgram_header_
+{
+ u32 data_length;
+ u32 data_offset;
+ ip46_address_t rmt_ip;
+ ip46_address_t lcl_ip;
+ u16 rmt_port;
+ u16 lcl_port;
+ u8 is_ip4;
+}) session_dgram_hdr_t;
+/* *INDENT-ON* */
+
+#define SESSION_CONN_ID_LEN 37
+#define SESSION_CONN_HDR_LEN 45
+
+STATIC_ASSERT (sizeof (session_dgram_hdr_t) == (SESSION_CONN_ID_LEN + 8),
+ "session conn id wrong length");
+
extern session_manager_main_t session_manager_main;
extern vlib_node_registration_t session_queue_node;
}
stream_session_t *session_alloc (u32 thread_index);
+int session_alloc_fifos (segment_manager_t * sm, stream_session_t * s);
+void session_free (stream_session_t * s);
always_inline stream_session_t *
session_get (u32 si, u32 thread_index)
return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
}
-always_inline u64
+always_inline session_handle_t
session_handle (stream_session_t * s)
{
return ((u64) s->thread_index << 32) | (u64) s->session_index;
}
always_inline u32
-session_index_from_handle (u64 handle)
+session_index_from_handle (session_handle_t handle)
{
return handle & 0xFFFFFFFF;
}
always_inline u32
-session_thread_from_handle (u64 handle)
+session_thread_from_handle (session_handle_t handle)
{
return handle >> 32;
}
always_inline void
-session_parse_handle (u64 handle, u32 * index, u32 * thread_index)
+session_parse_handle (session_handle_t handle, u32 * index,
+ u32 * thread_index)
{
*index = session_index_from_handle (handle);
*thread_index = session_thread_from_handle (handle);
}
always_inline stream_session_t *
-session_get_from_handle (u64 handle)
+session_get_from_handle (session_handle_t handle)
{
session_manager_main_t *smm = &session_manager_main;
- return
- pool_elt_at_index (smm->sessions[session_thread_from_handle (handle)],
- session_index_from_handle (handle));
+ u32 session_index, thread_index;
+ session_parse_handle (handle, &session_index, &thread_index);
+ return pool_elt_at_index (smm->sessions[thread_index], session_index);
+}
+
+always_inline stream_session_t *
+session_get_from_handle_if_valid (session_handle_t handle)
+{
+ u32 session_index, thread_index;
+ session_parse_handle (handle, &session_index, &thread_index);
+ return session_get_if_valid (session_index, thread_index);
+}
+
+always_inline u8
+session_handle_is_local (session_handle_t handle)
+{
+ if ((handle >> 32) == SESSION_LOCAL_HANDLE_PREFIX)
+ return 1;
+ return 0;
+}
+
+always_inline transport_proto_t
+session_type_transport_proto (session_type_t st)
+{
+ return (st >> 1);
+}
+
+always_inline u8
+session_type_is_ip4 (session_type_t st)
+{
+ return (st & 1);
}
always_inline transport_proto_t
return (s->session_type >> 1);
}
+always_inline fib_protocol_t
+session_get_fib_proto (stream_session_t * s)
+{
+ u8 is_ip4 = s->session_type & 1;
+ return (is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6);
+}
+
always_inline session_type_t
session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
{
return (proto << 1 | is_ip4);
}
+always_inline u8
+session_has_transport (stream_session_t * s)
+{
+ return (session_get_transport_proto (s) != TRANSPORT_PROTO_NONE);
+}
+
+always_inline transport_service_type_t
+session_transport_service_type (stream_session_t * s)
+{
+ transport_proto_t tp;
+ tp = session_get_transport_proto (s);
+ return transport_protocol_service_type (tp);
+}
+
/**
* Acquires a lock that blocks a session pool from expanding.
*
}
}
-always_inline stream_session_t *
-stream_session_listener_get (u8 sst, u64 si)
-{
- return pool_elt_at_index (session_manager_main.listen_sessions[sst], si);
-}
-
-always_inline u32
-stream_session_get_index (stream_session_t * s)
-{
- if (s->session_state == SESSION_STATE_LISTENING)
- return s - session_manager_main.listen_sessions[s->session_type];
-
- return s - session_manager_main.sessions[s->thread_index];
-}
-
always_inline u32
stream_session_max_rx_enqueue (transport_connection_t * tc)
{
u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc);
-stream_session_t *session_alloc (u32 thread_index);
int
session_enqueue_stream_connection (transport_connection_t * tc,
vlib_buffer_t * b, u32 offset,
u8 queue_event, u8 is_in_order);
-int session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
- u8 proto, u8 queue_event);
+int session_enqueue_dgram_connection (stream_session_t * s,
+ session_dgram_hdr_t * hdr,
+ vlib_buffer_t * b, u8 proto,
+ u8 queue_event);
int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes);
u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes);
}
int session_manager_flush_enqueue_events (u8 proto, u32 thread_index);
+int session_manager_flush_all_enqueue_events (u8 transport_proto);
always_inline u64
listen_session_get_handle (stream_session_t * s)
{
ASSERT (s->session_state == SESSION_STATE_LISTENING);
- return ((u64) s->session_type << 32) | s->session_index;
+ return session_handle (s);
}
always_inline stream_session_t *
-listen_session_get_from_handle (u64 handle)
+listen_session_get_from_handle (session_handle_t handle)
{
- session_manager_main_t *smm = &session_manager_main;
- stream_session_t *s;
- u32 type, index;
- type = handle >> 32;
- index = handle & 0xFFFFFFFF;
-
- if (pool_is_free_index (smm->listen_sessions[type], index))
- return 0;
+ return session_get_from_handle (handle);
+}
- s = pool_elt_at_index (smm->listen_sessions[type], index);
- ASSERT (s->session_state == SESSION_STATE_LISTENING);
- return s;
+always_inline void
+listen_session_parse_handle (session_handle_t handle, u32 * index,
+ u32 * thread_index)
+{
+ session_parse_handle (handle, index, thread_index);
}
always_inline stream_session_t *
-listen_session_new (session_type_t type)
+listen_session_new (u8 thread_index, session_type_t type)
{
stream_session_t *s;
- pool_get_aligned (session_manager_main.listen_sessions[type], s,
- CLIB_CACHE_LINE_BYTES);
- memset (s, 0, sizeof (*s));
-
+ s = session_alloc (thread_index);
s->session_type = type;
s->session_state = SESSION_STATE_LISTENING;
- s->session_index = s - session_manager_main.listen_sessions[type];
-
return s;
}
always_inline stream_session_t *
-listen_session_get (session_type_t type, u32 index)
+listen_session_get (u32 index)
{
- return pool_elt_at_index (session_manager_main.listen_sessions[type],
- index);
+ return session_get (index, 0);
}
always_inline void
listen_session_del (stream_session_t * s)
{
- pool_put (session_manager_main.listen_sessions[s->session_type], s);
+ session_free (s);
}
transport_connection_t *listen_session_get_transport (stream_session_t * s);
listen_session_get_local_session_endpoint (stream_session_t * listener,
session_endpoint_t * sep);
-always_inline stream_session_t *
-session_manager_get_listener (u8 session_type, u32 index)
-{
- return
- pool_elt_at_index (session_manager_main.listen_sessions[session_type],
- index);
-}
-
-/**
- * Set peek or dequeue function for given session type
- *
- * Reliable transport protocols will probably want to use a peek function
- */
-always_inline void
-session_manager_set_transport_rx_fn (session_type_t type, u8 is_peek)
-{
- session_manager_main.session_tx_fns[type] = (is_peek) ?
- session_tx_fifo_peek_and_snd : session_tx_fifo_dequeue_and_snd;
-}
-
always_inline u8
session_manager_is_enabled ()
{