vcl: allow more rx events on peek
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef enum ct_segment_flags_
20 {
21   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
23 } ct_segment_flags_t;
24
25 typedef struct ct_segment_
26 {
27   u32 client_n_sessions;
28   u32 server_n_sessions;
29   u32 seg_ctx_index;
30   u32 ct_seg_index;
31   u32 segment_index;
32   ct_segment_flags_t flags;
33 } ct_segment_t;
34
35 typedef struct ct_segments_
36 {
37   u32 sm_index;
38   u32 server_wrk;
39   u32 client_wrk;
40   u32 fifo_pair_bytes;
41   ct_segment_t *segments;
42 } ct_segments_ctx_t;
43
44 typedef struct ct_cleanup_req_
45 {
46   u32 ct_index;
47 } ct_cleanup_req_t;
48
49 typedef struct ct_worker_
50 {
51   ct_connection_t *connections;       /**< Per-worker connection pools */
52   u32 *pending_connects;              /**< Fifo of pending ho indices */
53   ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
54   u8 have_connects;                   /**< Set if connect rpc pending */
55   u8 have_cleanups;                   /**< Set if cleanup rpc pending */
56   clib_spinlock_t pending_connects_lock; /**< Lock for pending connects */
57   u32 *new_connects;                     /**< Burst of connects to be done */
58 } ct_worker_t;
59
60 typedef struct ct_main_
61 {
62   ct_worker_t *wrk;                     /**< Per-worker state */
63   u32 n_workers;                        /**< Number of vpp workers */
64   u32 n_sessions;                       /**< Cumulative sessions counter */
65   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
66   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
67   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
68   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
69   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
70   u32 **fwrk_pending_connects;          /**< First wrk pending half-opens */
71   u32 fwrk_thread;                      /**< First worker thread */
72   u8 fwrk_have_flush;                   /**< Flag for connect flush rpc */
73 } ct_main_t;
74
75 static ct_main_t ct_main;
76
77 static inline ct_worker_t *
78 ct_worker_get (u32 thread_index)
79 {
80   return &ct_main.wrk[thread_index];
81 }
82
83 static ct_connection_t *
84 ct_connection_alloc (u32 thread_index)
85 {
86   ct_worker_t *wrk = ct_worker_get (thread_index);
87   ct_connection_t *ct;
88
89   pool_get_aligned_safe (wrk->connections, ct, CLIB_CACHE_LINE_BYTES);
90   clib_memset (ct, 0, sizeof (*ct));
91   ct->c_c_index = ct - wrk->connections;
92   ct->c_thread_index = thread_index;
93   ct->client_wrk = ~0;
94   ct->server_wrk = ~0;
95   ct->seg_ctx_index = ~0;
96   ct->ct_seg_index = ~0;
97   return ct;
98 }
99
100 static ct_connection_t *
101 ct_connection_get (u32 ct_index, u32 thread_index)
102 {
103   ct_worker_t *wrk = ct_worker_get (thread_index);
104
105   if (pool_is_free_index (wrk->connections, ct_index))
106     return 0;
107   return pool_elt_at_index (wrk->connections, ct_index);
108 }
109
110 static void
111 ct_connection_free (ct_connection_t * ct)
112 {
113   ct_worker_t *wrk = ct_worker_get (ct->c_thread_index);
114
115   if (CLIB_DEBUG)
116     {
117       clib_memset (ct, 0xfc, sizeof (*ct));
118       pool_put (wrk->connections, ct);
119       return;
120     }
121   pool_put (wrk->connections, ct);
122 }
123
124 static ct_connection_t *
125 ct_half_open_alloc (void)
126 {
127   ct_main_t *cm = &ct_main;
128   u32 *hip;
129
130   clib_spinlock_lock (&cm->ho_reuseable_lock);
131   vec_foreach (hip, cm->ho_reusable)
132     pool_put_index (cm->wrk[cm->fwrk_thread].connections, *hip);
133   vec_reset_length (cm->ho_reusable);
134   clib_spinlock_unlock (&cm->ho_reuseable_lock);
135
136   return ct_connection_alloc (cm->fwrk_thread);
137 }
138
139 static ct_connection_t *
140 ct_half_open_get (u32 ho_index)
141 {
142   ct_main_t *cm = &ct_main;
143   return ct_connection_get (ho_index, cm->fwrk_thread);
144 }
145
146 void
147 ct_half_open_add_reusable (u32 ho_index)
148 {
149   ct_main_t *cm = &ct_main;
150
151   clib_spinlock_lock (&cm->ho_reuseable_lock);
152   vec_add1 (cm->ho_reusable, ho_index);
153   clib_spinlock_unlock (&cm->ho_reuseable_lock);
154 }
155
156 session_t *
157 ct_session_get_peer (session_t * s)
158 {
159   ct_connection_t *ct, *peer_ct;
160   ct = ct_connection_get (s->connection_index, s->thread_index);
161   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
162   return session_get (peer_ct->c_s_index, s->thread_index);
163 }
164
165 void
166 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
167 {
168   ct_connection_t *ct;
169   ct = (ct_connection_t *) session_get_transport (ll);
170   sep->transport_proto = ct->actual_tp;
171   sep->port = ct->c_lcl_port;
172   sep->is_ip4 = ct->c_is_ip4;
173   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
174 }
175
176 static void
177 ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
178 {
179   ct_connection_t *peer_ct;
180
181   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
182
183   if (is_client)
184     {
185       ct->client_wrk = APP_INVALID_INDEX;
186       if (peer_ct)
187         ct->client_wrk = APP_INVALID_INDEX;
188     }
189   else
190     {
191       ct->server_wrk = APP_INVALID_INDEX;
192       if (peer_ct)
193         ct->server_wrk = APP_INVALID_INDEX;
194     }
195 }
196
197 static inline u64
198 ct_client_seg_handle (u64 server_sh, u32 client_wrk_index)
199 {
200   return (((u64) client_wrk_index << 56) | server_sh);
201 }
202
203 static void
204 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
205                           svm_fifo_t *tx_fifo)
206 {
207   ct_segments_ctx_t *seg_ctx;
208   ct_main_t *cm = &ct_main;
209   segment_manager_t *sm;
210   app_worker_t *app_wrk;
211   ct_segment_t *ct_seg;
212   fifo_segment_t *fs;
213   u32 seg_index;
214   session_t *s;
215   int cnt;
216
217   /*
218    * Cleanup fifos
219    */
220
221   sm = segment_manager_get (rx_fifo->segment_manager);
222   seg_index = rx_fifo->segment_index;
223
224   fs = segment_manager_get_segment_w_lock (sm, seg_index);
225   fifo_segment_free_fifo (fs, rx_fifo);
226   fifo_segment_free_fifo (fs, tx_fifo);
227   segment_manager_segment_reader_unlock (sm);
228
229   /*
230    * Atomically update segment context with readers lock
231    */
232
233   clib_rwlock_reader_lock (&cm->app_segs_lock);
234
235   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
236   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
237
238   if (ct->flags & CT_CONN_F_CLIENT)
239     {
240       cnt =
241         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
242     }
243   else
244     {
245       cnt =
246         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
247     }
248
249   clib_rwlock_reader_unlock (&cm->app_segs_lock);
250
251   /*
252    * No need to do any app updates, return
253    */
254   ASSERT (cnt >= 0);
255   if (cnt)
256     return;
257
258   /*
259    * Grab exclusive lock and update flags unless some other thread
260    * added more sessions
261    */
262   clib_rwlock_writer_lock (&cm->app_segs_lock);
263
264   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
265   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
266   if (ct->flags & CT_CONN_F_CLIENT)
267     {
268       cnt = ct_seg->client_n_sessions;
269       if (cnt)
270         goto done;
271       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
272       s = session_get (ct->c_s_index, ct->c_thread_index);
273       if (s->app_wrk_index == APP_INVALID_INDEX)
274         ct_set_invalid_app_wrk (ct, 1 /* is_client */);
275     }
276   else
277     {
278       cnt = ct_seg->server_n_sessions;
279       if (cnt)
280         goto done;
281       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
282       s = session_get (ct->c_s_index, ct->c_thread_index);
283       if (s->app_wrk_index == APP_INVALID_INDEX)
284         ct_set_invalid_app_wrk (ct, 0 /* is_client */);
285     }
286
287   if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
288       !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
289     goto done;
290
291   /*
292    * Remove segment context because both client and server detached
293    */
294
295   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
296
297   /*
298    * No more segment indices left, remove the segments context
299    */
300   if (!pool_elts (seg_ctx->segments))
301     {
302       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
303       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
304       hash_unset (cm->app_segs_ctxs_table, table_handle);
305       pool_free (seg_ctx->segments);
306       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
307     }
308
309   /*
310    * Segment to be removed so notify both apps
311    */
312
313   app_wrk = app_worker_get_if_valid (ct->client_wrk);
314   /* Determine if client app still needs notification, i.e., if it is
315    * still attached. If client detached and this is the last ct session
316    * on this segment, then its connects segment manager should also be
317    * detached, so do not send notification */
318   if (app_wrk)
319     {
320       segment_manager_t *csm;
321       csm = app_worker_get_connect_segment_manager (app_wrk);
322       if (!segment_manager_app_detached (csm))
323         app_worker_del_segment_notify (
324           app_wrk, ct_client_seg_handle (ct->segment_handle, ct->client_wrk));
325     }
326
327   /* Notify server app and free segment */
328   segment_manager_lock_and_del_segment (sm, seg_index);
329
330   /* Cleanup segment manager if needed. If server detaches there's a chance
331    * the client's sessions will hold up segment removal */
332   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
333     segment_manager_free_safe (sm);
334
335 done:
336
337   clib_rwlock_writer_unlock (&cm->app_segs_lock);
338 }
339
340 static void
341 ct_session_force_disconnect_server (ct_connection_t *sct)
342 {
343   sct->peer_index = ~0;
344   session_transport_closing_notify (&sct->connection);
345 }
346
347 int
348 ct_session_connect_notify (session_t *ss, session_error_t err)
349 {
350   u32 ss_index, opaque, thread_index;
351   ct_connection_t *sct, *cct;
352   app_worker_t *client_wrk;
353   session_t *cs;
354
355   ss_index = ss->session_index;
356   thread_index = ss->thread_index;
357   sct = (ct_connection_t *) session_get_transport (ss);
358   client_wrk = app_worker_get (sct->client_wrk);
359   opaque = sct->client_opaque;
360
361   cct = ct_connection_get (sct->peer_index, thread_index);
362
363   /* Client closed while waiting for reply from server */
364   if (PREDICT_FALSE (!cct))
365     {
366       ct_session_force_disconnect_server (sct);
367       return 0;
368     }
369
370   session_half_open_delete_notify (&cct->connection);
371   cct->flags &= ~CT_CONN_F_HALF_OPEN;
372
373   if (PREDICT_FALSE (err))
374     goto connect_error;
375
376   /*
377    * Alloc client session, server session assumed to be established
378    */
379
380   ASSERT (ss->session_state >= SESSION_STATE_READY);
381
382   cs = session_alloc (thread_index);
383   ss = session_get (ss_index, thread_index);
384   cs->session_type = ss->session_type;
385   cs->listener_handle = SESSION_INVALID_HANDLE;
386   session_set_state (cs, SESSION_STATE_CONNECTING);
387   cs->app_wrk_index = client_wrk->wrk_index;
388   cs->connection_index = cct->c_c_index;
389   cs->opaque = opaque;
390   cct->c_s_index = cs->session_index;
391
392   /* This will allocate fifos for the session. They won't be used for
393    * exchanging data but they will be used to close the connection if
394    * the segment manager/worker is freed */
395   if ((err = app_worker_init_connected (client_wrk, cs)))
396     {
397       session_free (cs);
398       ct_session_force_disconnect_server (sct);
399       err = SESSION_E_ALLOC;
400       goto connect_error;
401     }
402
403   session_set_state (cs, SESSION_STATE_CONNECTING);
404
405   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
406     {
407       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
408       session_free (cs);
409       ct_session_force_disconnect_server (sct);
410       goto cleanup_client;
411     }
412
413   cs = session_get (cct->c_s_index, cct->c_thread_index);
414   session_set_state (cs, SESSION_STATE_READY);
415
416   return 0;
417
418 connect_error:
419
420   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
421
422 cleanup_client:
423
424   if (cct->client_rx_fifo)
425     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
426   ct_connection_free (cct);
427   return -1;
428 }
429
430 static inline ct_segment_t *
431 ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
432                         u32 seg_ctx_index)
433 {
434   uword free_bytes, max_free_bytes;
435   ct_segment_t *ct_seg, *res = 0;
436   ct_segments_ctx_t *seg_ctx;
437   fifo_segment_t *fs;
438   u32 max_fifos;
439
440   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
441   max_free_bytes = seg_ctx->fifo_pair_bytes;
442
443   pool_foreach (ct_seg, seg_ctx->segments)
444     {
445       /* Client or server has detached so segment cannot be used */
446       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
447       free_bytes = fifo_segment_available_bytes (fs);
448       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
449       if (free_bytes > max_free_bytes &&
450           fifo_segment_num_fifos (fs) / 2 < max_fifos)
451         {
452           max_free_bytes = free_bytes;
453           res = ct_seg;
454         }
455     }
456
457   return res;
458 }
459
460 static ct_segment_t *
461 ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
462                   segment_manager_t *sm, u32 client_wrk_index)
463 {
464   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
465   u64 seg_size, seg_handle, client_seg_handle;
466   segment_manager_props_t *props;
467   const u32 margin = 16 << 10;
468   ct_segments_ctx_t *seg_ctx;
469   app_worker_t *client_wrk;
470   application_t *server;
471   ct_segment_t *ct_seg;
472   uword *spp;
473   int fs_index;
474
475   server = application_get (server_wrk->app_index);
476   props = application_segment_manager_properties (server);
477   sm_index = segment_manager_index (sm);
478   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
479
480   /*
481    * Make sure another thread did not alloc a segment while acquiring the lock
482    */
483
484   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
485   if (spp)
486     {
487       seg_ctx_index = *spp;
488       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
489       if (ct_seg)
490         return ct_seg;
491     }
492
493   /*
494    * No segment, try to alloc one and notify the server and the client.
495    * Make sure the segment is not used for other fifos
496    */
497   seg_size = clib_max (props->segment_size, 128 << 20);
498   fs_index =
499     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
500   if (fs_index < 0)
501     return 0;
502
503   if (seg_ctx_index == ~0)
504     {
505       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
506       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
507       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
508       seg_ctx->server_wrk = server_wrk->wrk_index;
509       seg_ctx->client_wrk = client_wrk_index;
510       seg_ctx->sm_index = sm_index;
511       seg_ctx->fifo_pair_bytes = pair_bytes;
512     }
513   else
514     {
515       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
516     }
517
518   pool_get_zero (seg_ctx->segments, ct_seg);
519   ct_seg->segment_index = fs_index;
520   ct_seg->server_n_sessions = 0;
521   ct_seg->client_n_sessions = 0;
522   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
523   ct_seg->seg_ctx_index = seg_ctx_index;
524
525   /* New segment, notify the server and client */
526   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
527   if (app_worker_add_segment_notify (server_wrk, seg_handle))
528     goto error;
529
530   client_wrk = app_worker_get (client_wrk_index);
531   /* Make sure client workers do not have overlapping segment handles.
532    * Ideally, we should attach fs to client worker segment manager and
533    * create a new handle but that's not currently possible. */
534   client_seg_handle = ct_client_seg_handle (seg_handle, client_wrk_index);
535   if (app_worker_add_segment_notify (client_wrk, client_seg_handle))
536     {
537       app_worker_del_segment_notify (server_wrk, seg_handle);
538       goto error;
539     }
540
541   return ct_seg;
542
543 error:
544
545   segment_manager_lock_and_del_segment (sm, fs_index);
546   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
547   return 0;
548 }
549
550 static int
551 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
552                           session_t *ls, session_t *ll)
553 {
554   segment_manager_props_t *props;
555   u64 seg_handle, table_handle;
556   u32 sm_index, fs_index = ~0;
557   ct_segments_ctx_t *seg_ctx;
558   ct_main_t *cm = &ct_main;
559   application_t *server;
560   segment_manager_t *sm;
561   ct_segment_t *ct_seg;
562   fifo_segment_t *fs;
563   uword *spp;
564   int rv;
565
566   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
567   sm_index = segment_manager_index (sm);
568   server = application_get (server_wrk->app_index);
569   props = application_segment_manager_properties (server);
570
571   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
572   table_handle = (u64) sm_index << 32 | table_handle;
573
574   /*
575    * Check if we already have a segment that can hold the fifos
576    */
577
578   clib_rwlock_reader_lock (&cm->app_segs_lock);
579
580   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
581   if (spp)
582     {
583       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
584       if (ct_seg)
585         {
586           ct->seg_ctx_index = ct_seg->seg_ctx_index;
587           ct->ct_seg_index = ct_seg->ct_seg_index;
588           fs_index = ct_seg->segment_index;
589           ct_seg->flags &=
590             ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
591           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
592           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
593         }
594     }
595
596   clib_rwlock_reader_unlock (&cm->app_segs_lock);
597
598   /*
599    * If not, grab exclusive lock and allocate segment
600    */
601   if (fs_index == ~0)
602     {
603       clib_rwlock_writer_lock (&cm->app_segs_lock);
604
605       ct_seg =
606         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
607       if (!ct_seg)
608         {
609           clib_rwlock_writer_unlock (&cm->app_segs_lock);
610           return -1;
611         }
612
613       ct->seg_ctx_index = ct_seg->seg_ctx_index;
614       ct->ct_seg_index = ct_seg->ct_seg_index;
615       ct_seg->server_n_sessions += 1;
616       ct_seg->client_n_sessions += 1;
617       fs_index = ct_seg->segment_index;
618
619       clib_rwlock_writer_unlock (&cm->app_segs_lock);
620     }
621
622   /*
623    * Allocate and initialize the fifos
624    */
625   fs = segment_manager_get_segment_w_lock (sm, fs_index);
626   rv = segment_manager_try_alloc_fifos (
627     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
628     &ls->rx_fifo, &ls->tx_fifo);
629   if (rv)
630     {
631       segment_manager_segment_reader_unlock (sm);
632
633       clib_rwlock_reader_lock (&cm->app_segs_lock);
634
635       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
636       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
637       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
638       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
639
640       clib_rwlock_reader_unlock (&cm->app_segs_lock);
641
642       return rv;
643     }
644
645   ls->rx_fifo->shr->master_session_index = ls->session_index;
646   ls->tx_fifo->shr->master_session_index = ls->session_index;
647   ls->rx_fifo->master_thread_index = ls->thread_index;
648   ls->tx_fifo->master_thread_index = ls->thread_index;
649
650   seg_handle = segment_manager_segment_handle (sm, fs);
651   segment_manager_segment_reader_unlock (sm);
652
653   ct->segment_handle = seg_handle;
654
655   return 0;
656 }
657
658 static void
659 ct_accept_one (u32 thread_index, u32 ho_index)
660 {
661   ct_connection_t *sct, *cct, *ho;
662   transport_connection_t *ll_ct;
663   app_worker_t *server_wrk;
664   u32 cct_index, ll_index;
665   session_t *ss, *ll;
666
667   /*
668    * Alloc client ct and initialize from ho
669    */
670   cct = ct_connection_alloc (thread_index);
671   cct_index = cct->c_c_index;
672
673   ho = ct_half_open_get (ho_index);
674
675   /* Unlikely but half-open session and transport could have been freed */
676   if (PREDICT_FALSE (!ho))
677     {
678       ct_connection_free (cct);
679       return;
680     }
681
682   clib_memcpy (cct, ho, sizeof (*ho));
683   cct->c_c_index = cct_index;
684   cct->c_thread_index = thread_index;
685   cct->flags |= CT_CONN_F_HALF_OPEN;
686
687   /* Notify session layer that half-open is on a different thread
688    * and mark ho connection index reusable. Avoids another rpc
689    */
690   session_half_open_migrate_notify (&cct->connection);
691   session_half_open_migrated_notify (&cct->connection);
692   ct_half_open_add_reusable (ho_index);
693
694   /*
695    * Alloc and init server transport
696    */
697
698   ll_index = cct->peer_index;
699   ll = listen_session_get (ll_index);
700   sct = ct_connection_alloc (thread_index);
701   /* Transport not necessarily ct but it might, so grab after sct alloc */
702   ll_ct = listen_session_get_transport (ll);
703
704   /* Make sure cct is valid after sct alloc */
705   cct = ct_connection_get (cct_index, thread_index);
706
707   sct->c_rmt_port = 0;
708   sct->c_lcl_port = ll_ct->lcl_port;
709   sct->c_is_ip4 = cct->c_is_ip4;
710   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
711   sct->client_wrk = cct->client_wrk;
712   sct->c_proto = TRANSPORT_PROTO_NONE;
713   sct->client_opaque = cct->client_opaque;
714   sct->actual_tp = cct->actual_tp;
715
716   sct->peer_index = cct->c_c_index;
717   cct->peer_index = sct->c_c_index;
718
719   /*
720    * Accept server session. Client session is created only after
721    * server confirms accept.
722    */
723   ss = session_alloc (thread_index);
724   ll = listen_session_get (ll_index);
725   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
726                                                      sct->c_is_ip4);
727   ss->connection_index = sct->c_c_index;
728   ss->listener_handle = listen_session_get_handle (ll);
729   session_set_state (ss, SESSION_STATE_CREATED);
730
731   server_wrk = application_listener_select_worker (ll);
732   ss->app_wrk_index = server_wrk->wrk_index;
733
734   sct->c_s_index = ss->session_index;
735   sct->server_wrk = ss->app_wrk_index;
736
737   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
738     {
739       ct_session_connect_notify (ss, SESSION_E_ALLOC);
740       ct_connection_free (sct);
741       session_free (ss);
742       return;
743     }
744
745   cct->server_wrk = sct->server_wrk;
746   cct->seg_ctx_index = sct->seg_ctx_index;
747   cct->ct_seg_index = sct->ct_seg_index;
748   cct->client_rx_fifo = ss->tx_fifo;
749   cct->client_tx_fifo = ss->rx_fifo;
750   cct->client_rx_fifo->refcnt++;
751   cct->client_tx_fifo->refcnt++;
752   cct->segment_handle =
753     ct_client_seg_handle (sct->segment_handle, cct->client_wrk);
754
755   session_set_state (ss, SESSION_STATE_ACCEPTING);
756   if (app_worker_accept_notify (server_wrk, ss))
757     {
758       ct_session_connect_notify (ss, SESSION_E_REFUSED);
759       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
760       ct_connection_free (sct);
761       session_free (ss);
762     }
763 }
764
765 static void
766 ct_accept_rpc_wrk_handler (void *rpc_args)
767 {
768   u32 thread_index, n_connects, i, n_pending;
769   const u32 max_connects = 32;
770   ct_worker_t *wrk;
771   u8 need_rpc = 0;
772
773   thread_index = pointer_to_uword (rpc_args);
774   wrk = ct_worker_get (thread_index);
775
776   /* Connects could be handled without worker barrier so grab lock */
777   clib_spinlock_lock (&wrk->pending_connects_lock);
778
779   n_pending = clib_fifo_elts (wrk->pending_connects);
780   n_connects = clib_min (n_pending, max_connects);
781   vec_validate (wrk->new_connects, n_connects);
782
783   for (i = 0; i < n_connects; i++)
784     clib_fifo_sub1 (wrk->pending_connects, wrk->new_connects[i]);
785
786   if (n_pending == n_connects)
787     wrk->have_connects = 0;
788   else
789     need_rpc = 1;
790
791   clib_spinlock_unlock (&wrk->pending_connects_lock);
792
793   for (i = 0; i < n_connects; i++)
794     ct_accept_one (thread_index, wrk->new_connects[i]);
795
796   if (need_rpc)
797     session_send_rpc_evt_to_thread_force (
798       thread_index, ct_accept_rpc_wrk_handler,
799       uword_to_pointer (thread_index, void *));
800 }
801
802 static void
803 ct_fwrk_flush_connects (void *rpc_args)
804 {
805   u32 thread_index, fwrk_index, n_workers;
806   ct_main_t *cm = &ct_main;
807   ct_worker_t *wrk;
808   u8 need_rpc;
809
810   fwrk_index = cm->fwrk_thread;
811   n_workers = vec_len (cm->fwrk_pending_connects);
812
813   for (thread_index = fwrk_index; thread_index < n_workers; thread_index++)
814     {
815       if (!vec_len (cm->fwrk_pending_connects[thread_index]))
816         continue;
817
818       wrk = ct_worker_get (thread_index);
819
820       /* Connects can be done without worker barrier, grab dst worker lock */
821       if (thread_index != fwrk_index)
822         clib_spinlock_lock (&wrk->pending_connects_lock);
823
824       clib_fifo_add (wrk->pending_connects,
825                      cm->fwrk_pending_connects[thread_index],
826                      vec_len (cm->fwrk_pending_connects[thread_index]));
827       if (!wrk->have_connects)
828         {
829           wrk->have_connects = 1;
830           need_rpc = 1;
831         }
832
833       if (thread_index != fwrk_index)
834         clib_spinlock_unlock (&wrk->pending_connects_lock);
835
836       vec_reset_length (cm->fwrk_pending_connects[thread_index]);
837
838       if (need_rpc)
839         session_send_rpc_evt_to_thread_force (
840           thread_index, ct_accept_rpc_wrk_handler,
841           uword_to_pointer (thread_index, void *));
842     }
843
844   cm->fwrk_have_flush = 0;
845 }
846
847 static void
848 ct_program_connect_to_wrk (u32 ho_index)
849 {
850   ct_main_t *cm = &ct_main;
851   u32 thread_index;
852
853   /* Simple round-robin policy for spreading sessions over workers. We skip
854    * thread index 0, i.e., offset the index by 1, when we have workers as it
855    * is the one dedicated to main thread. Note that n_workers does not include
856    * main thread */
857   cm->n_sessions += 1;
858   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
859
860   /* Pospone flushing of connect request to dst worker until after session
861    * layer fully initializes the half-open session. */
862   vec_add1 (cm->fwrk_pending_connects[thread_index], ho_index);
863   if (!cm->fwrk_have_flush)
864     {
865       session_send_rpc_evt_to_thread_force (
866         cm->fwrk_thread, ct_fwrk_flush_connects,
867         uword_to_pointer (thread_index, void *));
868       cm->fwrk_have_flush = 1;
869     }
870 }
871
872 static int
873 ct_connect (app_worker_t *client_wrk, session_t *ll,
874             session_endpoint_cfg_t *sep)
875 {
876   ct_connection_t *ho;
877   u32 ho_index;
878
879   /*
880    * Alloc and init client half-open transport
881    */
882
883   ho = ct_half_open_alloc ();
884   ho_index = ho->c_c_index;
885   ho->c_rmt_port = sep->port;
886   ho->c_lcl_port = 0;
887   ho->c_is_ip4 = sep->is_ip4;
888   ho->client_opaque = sep->opaque;
889   ho->client_wrk = client_wrk->wrk_index;
890   ho->peer_index = ll->session_index;
891   ho->c_proto = TRANSPORT_PROTO_NONE;
892   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
893   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
894   ho->flags |= CT_CONN_F_CLIENT;
895   ho->c_s_index = ~0;
896   ho->actual_tp = sep->original_tp;
897
898   /*
899    * Program connect on a worker, connected reply comes
900    * after server accepts the connection.
901    */
902   ct_program_connect_to_wrk (ho_index);
903
904   return ho_index;
905 }
906
907 static u32
908 ct_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
909 {
910   session_endpoint_cfg_t *sep;
911   ct_connection_t *ct;
912
913   sep = (session_endpoint_cfg_t *) tep;
914   ct = ct_connection_alloc (0);
915   ct->server_wrk = sep->app_wrk_index;
916   ct->c_is_ip4 = sep->is_ip4;
917   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
918   ct->c_lcl_port = sep->port;
919   ct->c_s_index = app_listener_index;
920   ct->actual_tp = sep->transport_proto;
921   return ct->c_c_index;
922 }
923
924 static u32
925 ct_stop_listen (u32 ct_index)
926 {
927   ct_connection_t *ct;
928   ct = ct_connection_get (ct_index, 0);
929   ct_connection_free (ct);
930   return 0;
931 }
932
933 static transport_connection_t *
934 ct_listener_get (u32 ct_index)
935 {
936   return (transport_connection_t *) ct_connection_get (ct_index, 0);
937 }
938
939 static transport_connection_t *
940 ct_session_half_open_get (u32 ct_index)
941 {
942   return (transport_connection_t *) ct_half_open_get (ct_index);
943 }
944
945 static void
946 ct_session_cleanup (u32 conn_index, u32 thread_index)
947 {
948   ct_connection_t *ct, *peer_ct;
949
950   ct = ct_connection_get (conn_index, thread_index);
951   if (!ct)
952     return;
953
954   peer_ct = ct_connection_get (ct->peer_index, thread_index);
955   if (peer_ct)
956     peer_ct->peer_index = ~0;
957
958   ct_connection_free (ct);
959 }
960
961 static void
962 ct_cleanup_ho (u32 ho_index)
963 {
964   ct_connection_t *ho;
965
966   ho = ct_half_open_get (ho_index);
967   ct_connection_free (ho);
968 }
969
970 static int
971 ct_session_connect (transport_endpoint_cfg_t * tep)
972 {
973   session_endpoint_cfg_t *sep_ext;
974   session_endpoint_t _sep, *sep = &_sep;
975   app_worker_t *app_wrk;
976   session_handle_t lh;
977   application_t *app;
978   app_listener_t *al;
979   u32 table_index;
980   session_t *ll;
981   u8 fib_proto;
982
983   sep_ext = (session_endpoint_cfg_t *) tep;
984   _sep = *(session_endpoint_t *) tep;
985   app_wrk = app_worker_get (sep_ext->app_wrk_index);
986   app = application_get (app_wrk->app_index);
987
988   sep->transport_proto = sep_ext->original_tp;
989   table_index = application_local_session_table (app);
990   lh = session_lookup_local_endpoint (table_index, sep);
991   if (lh == SESSION_DROP_HANDLE)
992     return SESSION_E_FILTERED;
993
994   if (lh == SESSION_INVALID_HANDLE)
995     goto global_scope;
996
997   ll = listen_session_get_from_handle (lh);
998   al = app_listener_get (ll->al_index);
999
1000   /*
1001    * Break loop if rule in local table points to connecting app. This
1002    * can happen if client is a generic proxy. Route connect through
1003    * global table instead.
1004    */
1005   if (al->app_index == app->app_index)
1006     goto global_scope;
1007
1008   return ct_connect (app_wrk, ll, sep_ext);
1009
1010   /*
1011    * If nothing found, check the global scope for locally attached
1012    * destinations. Make sure first that we're allowed to.
1013    */
1014
1015 global_scope:
1016   if (session_endpoint_is_local (sep))
1017     return SESSION_E_NOROUTE;
1018
1019   if (!application_has_global_scope (app))
1020     return SESSION_E_SCOPE;
1021
1022   fib_proto = session_endpoint_fib_proto (sep);
1023   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
1024   ll = session_lookup_listener_wildcard (table_index, sep);
1025
1026   /* Avoid connecting app to own listener */
1027   if (ll)
1028     {
1029       al = app_listener_get (ll->al_index);
1030       if (al->app_index != app->app_index)
1031         return ct_connect (app_wrk, ll, sep_ext);
1032     }
1033
1034   /* Failed to connect but no error */
1035   return SESSION_E_LOCAL_CONNECT;
1036 }
1037
1038 static inline int
1039 ct_close_is_reset (ct_connection_t *ct, session_t *s)
1040 {
1041   if (ct->flags & CT_CONN_F_RESET)
1042     return 1;
1043   if (ct->flags & CT_CONN_F_CLIENT)
1044     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
1045   else
1046     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
1047 }
1048
1049 static void
1050 ct_session_cleanup_server_session (session_t *s)
1051 {
1052   ct_connection_t *ct;
1053
1054   ct = (ct_connection_t *) session_get_transport (s);
1055   ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
1056   session_free (s);
1057   ct_connection_free (ct);
1058 }
1059
1060 static void
1061 ct_session_postponed_cleanup (ct_connection_t *ct)
1062 {
1063   ct_connection_t *peer_ct;
1064   app_worker_t *app_wrk;
1065   session_t *s;
1066
1067   s = session_get (ct->c_s_index, ct->c_thread_index);
1068   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1069
1070   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1071   if (peer_ct)
1072     {
1073       if (ct_close_is_reset (ct, s))
1074         session_transport_reset_notify (&peer_ct->connection);
1075       else
1076         session_transport_closing_notify (&peer_ct->connection);
1077     }
1078   session_transport_closed_notify (&ct->connection);
1079
1080   /* It would be cleaner to call session_transport_delete_notify
1081    * but then we can't control session cleanup lower */
1082   session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1083   if (app_wrk)
1084     app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
1085
1086   if (ct->flags & CT_CONN_F_CLIENT)
1087     {
1088       /* Normal free for client session as the fifos are allocated through
1089        * the connects segment manager in a segment that's not shared with
1090        * the server */
1091       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
1092       session_program_cleanup (s);
1093       ct_connection_free (ct);
1094     }
1095   else
1096     {
1097       /* Manual session and fifo segment cleanup to avoid implicit
1098        * segment manager cleanups and notifications */
1099       if (app_wrk)
1100         {
1101           /* Remove custom cleanup notify infra when/if switching to normal
1102            * session cleanup. Note that ct is freed in the cb function */
1103           app_worker_cleanup_notify_custom (app_wrk, s,
1104                                             SESSION_CLEANUP_SESSION,
1105                                             ct_session_cleanup_server_session);
1106         }
1107       else
1108         {
1109           ct_connection_free (ct);
1110         }
1111     }
1112 }
1113
1114 static void
1115 ct_handle_cleanups (void *args)
1116 {
1117   uword thread_index = pointer_to_uword (args);
1118   const u32 max_cleanups = 100;
1119   ct_cleanup_req_t *req;
1120   ct_connection_t *ct;
1121   u32 n_to_handle = 0;
1122   ct_worker_t *wrk;
1123   session_t *s;
1124
1125   wrk = ct_worker_get (thread_index);
1126   wrk->have_cleanups = 0;
1127   n_to_handle = clib_fifo_elts (wrk->pending_cleanups);
1128   n_to_handle = clib_min (n_to_handle, max_cleanups);
1129
1130   while (n_to_handle)
1131     {
1132       clib_fifo_sub2 (wrk->pending_cleanups, req);
1133       ct = ct_connection_get (req->ct_index, thread_index);
1134       s = session_get (ct->c_s_index, ct->c_thread_index);
1135       if (svm_fifo_has_event (s->tx_fifo) || (s->flags & SESSION_F_RX_EVT))
1136         clib_fifo_add1 (wrk->pending_cleanups, *req);
1137       else
1138         ct_session_postponed_cleanup (ct);
1139       n_to_handle -= 1;
1140     }
1141
1142   if (clib_fifo_elts (wrk->pending_cleanups))
1143     {
1144       wrk->have_cleanups = 1;
1145       session_send_rpc_evt_to_thread_force (
1146         thread_index, ct_handle_cleanups,
1147         uword_to_pointer (thread_index, void *));
1148     }
1149 }
1150
1151 static void
1152 ct_program_cleanup (ct_connection_t *ct)
1153 {
1154   ct_cleanup_req_t *req;
1155   uword thread_index;
1156   ct_worker_t *wrk;
1157
1158   thread_index = ct->c_thread_index;
1159   wrk = ct_worker_get (ct->c_thread_index);
1160
1161   clib_fifo_add2 (wrk->pending_cleanups, req);
1162   req->ct_index = ct->c_c_index;
1163
1164   if (wrk->have_cleanups)
1165     return;
1166
1167   wrk->have_cleanups = 1;
1168   session_send_rpc_evt_to_thread_force (
1169     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
1170 }
1171
1172 static void
1173 ct_session_close (u32 ct_index, u32 thread_index)
1174 {
1175   ct_connection_t *ct, *peer_ct;
1176   session_t *s;
1177
1178   ct = ct_connection_get (ct_index, thread_index);
1179   s = session_get (ct->c_s_index, ct->c_thread_index);
1180   peer_ct = ct_connection_get (ct->peer_index, thread_index);
1181   if (peer_ct)
1182     {
1183       peer_ct->peer_index = ~0;
1184       /* Make sure session was allocated */
1185       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
1186         {
1187           ct_session_connect_notify (s, SESSION_E_REFUSED);
1188           ct->peer_index = ~0;
1189         }
1190       else if (peer_ct->c_s_index == ~0)
1191         {
1192           /* should not happen */
1193           clib_warning ("ct peer without session");
1194           ct_connection_free (peer_ct);
1195         }
1196     }
1197
1198   /* Do not send closed notify to make sure pending tx events are
1199    * still delivered and program cleanup */
1200   ct_program_cleanup (ct);
1201 }
1202
1203 static void
1204 ct_session_reset (u32 ct_index, u32 thread_index)
1205 {
1206   ct_connection_t *ct;
1207   ct = ct_connection_get (ct_index, thread_index);
1208   ct->flags |= CT_CONN_F_RESET;
1209   ct_session_close (ct_index, thread_index);
1210 }
1211
1212 static transport_connection_t *
1213 ct_session_get (u32 ct_index, u32 thread_index)
1214 {
1215   return (transport_connection_t *) ct_connection_get (ct_index,
1216                                                        thread_index);
1217 }
1218
1219 static u8 *
1220 format_ct_connection_id (u8 * s, va_list * args)
1221 {
1222   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1223   if (!ct)
1224     return s;
1225   if (ct->c_is_ip4)
1226     {
1227       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1228                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1229                   format_ip4_address, &ct->c_lcl_ip4,
1230                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
1231                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
1232     }
1233   else
1234     {
1235       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1236                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1237                   format_ip6_address, &ct->c_lcl_ip6,
1238                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
1239                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
1240     }
1241
1242   return s;
1243 }
1244
1245 static int
1246 ct_custom_tx (void *session, transport_send_params_t * sp)
1247 {
1248   session_t *s = (session_t *) session;
1249   if (session_has_transport (s))
1250     return 0;
1251   /* If event enqueued towards peer, remove from scheduler and remove
1252    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
1253    * avoid missing events if peer did not clear fifo flag yet, which is
1254    * interpreted as successful notification and session is descheduled. */
1255   svm_fifo_unset_event (s->tx_fifo);
1256   if (!ct_session_tx (s))
1257     sp->flags = TRANSPORT_SND_F_DESCHED;
1258
1259   /* The scheduler uses packet count as a means of upper bounding the amount
1260    * of work done per dispatch. So make it look like we have sent something */
1261   return 1;
1262 }
1263
1264 static int
1265 ct_app_rx_evt (transport_connection_t * tc)
1266 {
1267   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
1268   session_t *ps, *s;
1269
1270   s = session_get (ct->c_s_index, ct->c_thread_index);
1271   if (session_has_transport (s) || s->session_state < SESSION_STATE_READY)
1272     return -1;
1273   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
1274   if (!peer_ct || (peer_ct->flags & CT_CONN_F_HALF_OPEN))
1275     return -1;
1276   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1277   if (ps->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1278     return -1;
1279   return session_dequeue_notify (ps);
1280 }
1281
1282 static u8 *
1283 format_ct_listener (u8 * s, va_list * args)
1284 {
1285   u32 tc_index = va_arg (*args, u32);
1286   u32 __clib_unused thread_index = va_arg (*args, u32);
1287   u32 __clib_unused verbose = va_arg (*args, u32);
1288   ct_connection_t *ct = ct_connection_get (tc_index, 0);
1289   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1290   if (verbose)
1291     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
1292   return s;
1293 }
1294
1295 static u8 *
1296 format_ct_half_open (u8 *s, va_list *args)
1297 {
1298   u32 ho_index = va_arg (*args, u32);
1299   u32 verbose = va_arg (*args, u32);
1300   ct_connection_t *ct = ct_half_open_get (ho_index);
1301   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1302   if (verbose)
1303     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
1304   return s;
1305 }
1306
1307 static u8 *
1308 format_ct_connection (u8 * s, va_list * args)
1309 {
1310   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1311   u32 verbose = va_arg (*args, u32);
1312
1313   if (!ct)
1314     return s;
1315   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1316   if (verbose)
1317     {
1318       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
1319       if (verbose > 1)
1320         {
1321           s = format (s, "\n");
1322         }
1323     }
1324   return s;
1325 }
1326
1327 static u8 *
1328 format_ct_session (u8 * s, va_list * args)
1329 {
1330   u32 ct_index = va_arg (*args, u32);
1331   u32 thread_index = va_arg (*args, u32);
1332   u32 verbose = va_arg (*args, u32);
1333   ct_connection_t *ct;
1334
1335   ct = ct_connection_get (ct_index, thread_index);
1336   if (!ct)
1337     {
1338       s = format (s, "empty\n");
1339       return s;
1340     }
1341
1342   s = format (s, "%U", format_ct_connection, ct, verbose);
1343   return s;
1344 }
1345
1346 clib_error_t *
1347 ct_enable_disable (vlib_main_t * vm, u8 is_en)
1348 {
1349   vlib_thread_main_t *vtm = &vlib_thread_main;
1350   ct_main_t *cm = &ct_main;
1351   ct_worker_t *wrk;
1352
1353   cm->n_workers = vlib_num_workers ();
1354   cm->fwrk_thread = transport_cl_thread ();
1355   vec_validate (cm->wrk, vtm->n_vlib_mains);
1356   vec_foreach (wrk, cm->wrk)
1357     clib_spinlock_init (&wrk->pending_connects_lock);
1358   clib_spinlock_init (&cm->ho_reuseable_lock);
1359   clib_rwlock_init (&cm->app_segs_lock);
1360   vec_validate (cm->fwrk_pending_connects, cm->n_workers);
1361   return 0;
1362 }
1363
1364 static const transport_proto_vft_t cut_thru_proto = {
1365   .enable = ct_enable_disable,
1366   .start_listen = ct_start_listen,
1367   .stop_listen = ct_stop_listen,
1368   .get_connection = ct_session_get,
1369   .get_listener = ct_listener_get,
1370   .get_half_open = ct_session_half_open_get,
1371   .cleanup = ct_session_cleanup,
1372   .cleanup_ho = ct_cleanup_ho,
1373   .connect = ct_session_connect,
1374   .close = ct_session_close,
1375   .reset = ct_session_reset,
1376   .custom_tx = ct_custom_tx,
1377   .app_rx_evt = ct_app_rx_evt,
1378   .format_listener = format_ct_listener,
1379   .format_half_open = format_ct_half_open,
1380   .format_connection = format_ct_session,
1381   .transport_options = {
1382     .name = "ct",
1383     .short_name = "C",
1384     .tx_type = TRANSPORT_TX_INTERNAL,
1385     .service_type = TRANSPORT_SERVICE_VC,
1386   },
1387 };
1388
1389 static inline int
1390 ct_session_can_tx (session_t *s)
1391 {
1392   return (s->session_state == SESSION_STATE_READY ||
1393           s->session_state == SESSION_STATE_CLOSING ||
1394           s->session_state == SESSION_STATE_APP_CLOSED);
1395 }
1396
1397 int
1398 ct_session_tx (session_t * s)
1399 {
1400   ct_connection_t *ct, *peer_ct;
1401   session_t *peer_s;
1402
1403   if (!ct_session_can_tx (s))
1404     return 0;
1405   ct = (ct_connection_t *) session_get_transport (s);
1406   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1407   if (!peer_ct)
1408     return 0;
1409   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1410   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1411     return 0;
1412   peer_s->flags |= SESSION_F_RX_EVT;
1413   return session_enqueue_notify (peer_s);
1414 }
1415
1416 static clib_error_t *
1417 ct_transport_init (vlib_main_t * vm)
1418 {
1419   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1420                                FIB_PROTOCOL_IP4, ~0);
1421   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1422                                FIB_PROTOCOL_IP6, ~0);
1423   return 0;
1424 }
1425
1426 VLIB_INIT_FUNCTION (ct_transport_init);
1427
1428 /*
1429  * fd.io coding-style-patch-verification: ON
1430  *
1431  * Local Variables:
1432  * eval: (c-set-style "gnu")
1433  * End:
1434  */