session: per app wrk client ct segment handle
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef enum ct_segment_flags_
20 {
21   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
23 } ct_segment_flags_t;
24
25 typedef struct ct_segment_
26 {
27   u32 client_n_sessions;
28   u32 server_n_sessions;
29   u32 seg_ctx_index;
30   u32 ct_seg_index;
31   u32 segment_index;
32   ct_segment_flags_t flags;
33 } ct_segment_t;
34
35 typedef struct ct_segments_
36 {
37   u32 sm_index;
38   u32 server_wrk;
39   u32 client_wrk;
40   u32 fifo_pair_bytes;
41   ct_segment_t *segments;
42 } ct_segments_ctx_t;
43
44 typedef struct ct_cleanup_req_
45 {
46   u32 ct_index;
47 } ct_cleanup_req_t;
48
49 typedef struct ct_worker_
50 {
51   ct_connection_t *connections;       /**< Per-worker connection pools */
52   u32 *pending_connects;              /**< Fifo of pending ho indices */
53   ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
54   u8 have_connects;                   /**< Set if connect rpc pending */
55   u8 have_cleanups;                   /**< Set if cleanup rpc pending */
56   clib_spinlock_t pending_connects_lock; /**< Lock for pending connects */
57   u32 *new_connects;                     /**< Burst of connects to be done */
58 } ct_worker_t;
59
60 typedef struct ct_main_
61 {
62   ct_worker_t *wrk;                     /**< Per-worker state */
63   u32 n_workers;                        /**< Number of vpp workers */
64   u32 n_sessions;                       /**< Cumulative sessions counter */
65   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
66   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
67   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
68   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
69   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
70   u32 **fwrk_pending_connects;          /**< First wrk pending half-opens */
71   u32 fwrk_thread;                      /**< First worker thread */
72   u8 fwrk_have_flush;                   /**< Flag for connect flush rpc */
73 } ct_main_t;
74
75 static ct_main_t ct_main;
76
77 static inline ct_worker_t *
78 ct_worker_get (u32 thread_index)
79 {
80   return &ct_main.wrk[thread_index];
81 }
82
83 static ct_connection_t *
84 ct_connection_alloc (u32 thread_index)
85 {
86   ct_worker_t *wrk = ct_worker_get (thread_index);
87   ct_connection_t *ct;
88
89   pool_get_aligned_safe (wrk->connections, ct, CLIB_CACHE_LINE_BYTES);
90   clib_memset (ct, 0, sizeof (*ct));
91   ct->c_c_index = ct - wrk->connections;
92   ct->c_thread_index = thread_index;
93   ct->client_wrk = ~0;
94   ct->server_wrk = ~0;
95   ct->seg_ctx_index = ~0;
96   ct->ct_seg_index = ~0;
97   return ct;
98 }
99
100 static ct_connection_t *
101 ct_connection_get (u32 ct_index, u32 thread_index)
102 {
103   ct_worker_t *wrk = ct_worker_get (thread_index);
104
105   if (pool_is_free_index (wrk->connections, ct_index))
106     return 0;
107   return pool_elt_at_index (wrk->connections, ct_index);
108 }
109
110 static void
111 ct_connection_free (ct_connection_t * ct)
112 {
113   ct_worker_t *wrk = ct_worker_get (ct->c_thread_index);
114
115   if (CLIB_DEBUG)
116     {
117       clib_memset (ct, 0xfc, sizeof (*ct));
118       pool_put (wrk->connections, ct);
119       return;
120     }
121   pool_put (wrk->connections, ct);
122 }
123
124 static ct_connection_t *
125 ct_half_open_alloc (void)
126 {
127   ct_main_t *cm = &ct_main;
128   u32 *hip;
129
130   clib_spinlock_lock (&cm->ho_reuseable_lock);
131   vec_foreach (hip, cm->ho_reusable)
132     pool_put_index (cm->wrk[cm->fwrk_thread].connections, *hip);
133   vec_reset_length (cm->ho_reusable);
134   clib_spinlock_unlock (&cm->ho_reuseable_lock);
135
136   return ct_connection_alloc (cm->fwrk_thread);
137 }
138
139 static ct_connection_t *
140 ct_half_open_get (u32 ho_index)
141 {
142   ct_main_t *cm = &ct_main;
143   return ct_connection_get (ho_index, cm->fwrk_thread);
144 }
145
146 void
147 ct_half_open_add_reusable (u32 ho_index)
148 {
149   ct_main_t *cm = &ct_main;
150
151   clib_spinlock_lock (&cm->ho_reuseable_lock);
152   vec_add1 (cm->ho_reusable, ho_index);
153   clib_spinlock_unlock (&cm->ho_reuseable_lock);
154 }
155
156 session_t *
157 ct_session_get_peer (session_t * s)
158 {
159   ct_connection_t *ct, *peer_ct;
160   ct = ct_connection_get (s->connection_index, s->thread_index);
161   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
162   return session_get (peer_ct->c_s_index, s->thread_index);
163 }
164
165 void
166 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
167 {
168   ct_connection_t *ct;
169   ct = (ct_connection_t *) session_get_transport (ll);
170   sep->transport_proto = ct->actual_tp;
171   sep->port = ct->c_lcl_port;
172   sep->is_ip4 = ct->c_is_ip4;
173   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
174 }
175
176 static void
177 ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
178 {
179   ct_connection_t *peer_ct;
180
181   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
182
183   if (is_client)
184     {
185       ct->client_wrk = APP_INVALID_INDEX;
186       if (peer_ct)
187         ct->client_wrk = APP_INVALID_INDEX;
188     }
189   else
190     {
191       ct->server_wrk = APP_INVALID_INDEX;
192       if (peer_ct)
193         ct->server_wrk = APP_INVALID_INDEX;
194     }
195 }
196
197 static inline u64
198 ct_client_seg_handle (u64 server_sh, u32 client_wrk_index)
199 {
200   return (((u64) client_wrk_index << 56) | server_sh);
201 }
202
203 static void
204 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
205                           svm_fifo_t *tx_fifo)
206 {
207   ct_segments_ctx_t *seg_ctx;
208   ct_main_t *cm = &ct_main;
209   segment_manager_t *sm;
210   app_worker_t *app_wrk;
211   ct_segment_t *ct_seg;
212   fifo_segment_t *fs;
213   u32 seg_index;
214   session_t *s;
215   int cnt;
216
217   /*
218    * Cleanup fifos
219    */
220
221   sm = segment_manager_get (rx_fifo->segment_manager);
222   seg_index = rx_fifo->segment_index;
223
224   fs = segment_manager_get_segment_w_lock (sm, seg_index);
225   fifo_segment_free_fifo (fs, rx_fifo);
226   fifo_segment_free_fifo (fs, tx_fifo);
227   segment_manager_segment_reader_unlock (sm);
228
229   /*
230    * Atomically update segment context with readers lock
231    */
232
233   clib_rwlock_reader_lock (&cm->app_segs_lock);
234
235   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
236   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
237
238   if (ct->flags & CT_CONN_F_CLIENT)
239     {
240       cnt =
241         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
242     }
243   else
244     {
245       cnt =
246         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
247     }
248
249   clib_rwlock_reader_unlock (&cm->app_segs_lock);
250
251   /*
252    * No need to do any app updates, return
253    */
254   ASSERT (cnt >= 0);
255   if (cnt)
256     return;
257
258   /*
259    * Grab exclusive lock and update flags unless some other thread
260    * added more sessions
261    */
262   clib_rwlock_writer_lock (&cm->app_segs_lock);
263
264   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
265   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
266   if (ct->flags & CT_CONN_F_CLIENT)
267     {
268       cnt = ct_seg->client_n_sessions;
269       if (cnt)
270         goto done;
271       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
272       s = session_get (ct->c_s_index, ct->c_thread_index);
273       if (s->app_wrk_index == APP_INVALID_INDEX)
274         ct_set_invalid_app_wrk (ct, 1 /* is_client */);
275     }
276   else
277     {
278       cnt = ct_seg->server_n_sessions;
279       if (cnt)
280         goto done;
281       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
282       s = session_get (ct->c_s_index, ct->c_thread_index);
283       if (s->app_wrk_index == APP_INVALID_INDEX)
284         ct_set_invalid_app_wrk (ct, 0 /* is_client */);
285     }
286
287   if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
288       !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
289     goto done;
290
291   /*
292    * Remove segment context because both client and server detached
293    */
294
295   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
296
297   /*
298    * No more segment indices left, remove the segments context
299    */
300   if (!pool_elts (seg_ctx->segments))
301     {
302       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
303       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
304       hash_unset (cm->app_segs_ctxs_table, table_handle);
305       pool_free (seg_ctx->segments);
306       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
307     }
308
309   /*
310    * Segment to be removed so notify both apps
311    */
312
313   app_wrk = app_worker_get_if_valid (ct->client_wrk);
314   /* Determine if client app still needs notification, i.e., if it is
315    * still attached. If client detached and this is the last ct session
316    * on this segment, then its connects segment manager should also be
317    * detached, so do not send notification */
318   if (app_wrk)
319     {
320       segment_manager_t *csm;
321       csm = app_worker_get_connect_segment_manager (app_wrk);
322       if (!segment_manager_app_detached (csm))
323         app_worker_del_segment_notify (
324           app_wrk, ct_client_seg_handle (ct->segment_handle, ct->client_wrk));
325     }
326
327   /* Notify server app and free segment */
328   segment_manager_lock_and_del_segment (sm, seg_index);
329
330   /* Cleanup segment manager if needed. If server detaches there's a chance
331    * the client's sessions will hold up segment removal */
332   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
333     segment_manager_free_safe (sm);
334
335 done:
336
337   clib_rwlock_writer_unlock (&cm->app_segs_lock);
338 }
339
340 static void
341 ct_session_force_disconnect_server (ct_connection_t *sct)
342 {
343   sct->peer_index = ~0;
344   session_transport_closing_notify (&sct->connection);
345 }
346
347 int
348 ct_session_connect_notify (session_t *ss, session_error_t err)
349 {
350   u32 ss_index, opaque, thread_index;
351   ct_connection_t *sct, *cct;
352   app_worker_t *client_wrk;
353   session_t *cs;
354
355   ss_index = ss->session_index;
356   thread_index = ss->thread_index;
357   sct = (ct_connection_t *) session_get_transport (ss);
358   client_wrk = app_worker_get (sct->client_wrk);
359   opaque = sct->client_opaque;
360
361   cct = ct_connection_get (sct->peer_index, thread_index);
362
363   /* Client closed while waiting for reply from server */
364   if (PREDICT_FALSE (!cct))
365     {
366       ct_session_force_disconnect_server (sct);
367       return 0;
368     }
369
370   session_half_open_delete_notify (&cct->connection);
371   cct->flags &= ~CT_CONN_F_HALF_OPEN;
372
373   if (PREDICT_FALSE (err))
374     goto connect_error;
375
376   /*
377    * Alloc client session, server session assumed to be established
378    */
379
380   ASSERT (ss->session_state >= SESSION_STATE_READY);
381
382   cs = session_alloc (thread_index);
383   ss = session_get (ss_index, thread_index);
384   cs->session_type = ss->session_type;
385   cs->listener_handle = SESSION_INVALID_HANDLE;
386   session_set_state (cs, SESSION_STATE_CONNECTING);
387   cs->app_wrk_index = client_wrk->wrk_index;
388   cs->connection_index = cct->c_c_index;
389   cs->opaque = opaque;
390   cct->c_s_index = cs->session_index;
391
392   /* This will allocate fifos for the session. They won't be used for
393    * exchanging data but they will be used to close the connection if
394    * the segment manager/worker is freed */
395   if ((err = app_worker_init_connected (client_wrk, cs)))
396     {
397       session_free (cs);
398       ct_session_force_disconnect_server (sct);
399       err = SESSION_E_ALLOC;
400       goto connect_error;
401     }
402
403   session_set_state (cs, SESSION_STATE_CONNECTING);
404
405   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
406     {
407       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
408       session_free (cs);
409       ct_session_force_disconnect_server (sct);
410       goto cleanup_client;
411     }
412
413   cs = session_get (cct->c_s_index, cct->c_thread_index);
414   session_set_state (cs, SESSION_STATE_READY);
415
416   return 0;
417
418 connect_error:
419
420   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
421
422 cleanup_client:
423
424   if (cct->client_rx_fifo)
425     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
426   ct_connection_free (cct);
427   return -1;
428 }
429
430 static inline ct_segment_t *
431 ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
432                         u32 seg_ctx_index)
433 {
434   uword free_bytes, max_free_bytes;
435   ct_segment_t *ct_seg, *res = 0;
436   ct_segments_ctx_t *seg_ctx;
437   fifo_segment_t *fs;
438   u32 max_fifos;
439
440   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
441   max_free_bytes = seg_ctx->fifo_pair_bytes;
442
443   pool_foreach (ct_seg, seg_ctx->segments)
444     {
445       /* Client or server has detached so segment cannot be used */
446       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
447       free_bytes = fifo_segment_available_bytes (fs);
448       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
449       if (free_bytes > max_free_bytes &&
450           fifo_segment_num_fifos (fs) / 2 < max_fifos)
451         {
452           max_free_bytes = free_bytes;
453           res = ct_seg;
454         }
455     }
456
457   return res;
458 }
459
460 static ct_segment_t *
461 ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
462                   segment_manager_t *sm, u32 client_wrk_index)
463 {
464   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
465   u64 seg_size, seg_handle, client_seg_handle;
466   segment_manager_props_t *props;
467   const u32 margin = 16 << 10;
468   ct_segments_ctx_t *seg_ctx;
469   app_worker_t *client_wrk;
470   application_t *server;
471   ct_segment_t *ct_seg;
472   uword *spp;
473   int fs_index;
474
475   server = application_get (server_wrk->app_index);
476   props = application_segment_manager_properties (server);
477   sm_index = segment_manager_index (sm);
478   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
479
480   /*
481    * Make sure another thread did not alloc a segment while acquiring the lock
482    */
483
484   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
485   if (spp)
486     {
487       seg_ctx_index = *spp;
488       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
489       if (ct_seg)
490         return ct_seg;
491     }
492
493   /*
494    * No segment, try to alloc one and notify the server and the client.
495    * Make sure the segment is not used for other fifos
496    */
497   seg_size = clib_max (props->segment_size, 128 << 20);
498   fs_index =
499     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
500   if (fs_index < 0)
501     return 0;
502
503   if (seg_ctx_index == ~0)
504     {
505       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
506       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
507       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
508       seg_ctx->server_wrk = server_wrk->wrk_index;
509       seg_ctx->client_wrk = client_wrk_index;
510       seg_ctx->sm_index = sm_index;
511       seg_ctx->fifo_pair_bytes = pair_bytes;
512     }
513   else
514     {
515       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
516     }
517
518   pool_get_zero (seg_ctx->segments, ct_seg);
519   ct_seg->segment_index = fs_index;
520   ct_seg->server_n_sessions = 0;
521   ct_seg->client_n_sessions = 0;
522   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
523   ct_seg->seg_ctx_index = seg_ctx_index;
524
525   /* New segment, notify the server and client */
526   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
527   if (app_worker_add_segment_notify (server_wrk, seg_handle))
528     goto error;
529
530   client_wrk = app_worker_get (client_wrk_index);
531   /* Make sure client workers do not have overlapping segment handles.
532    * Ideally, we should attach fs to client worker segment manager and
533    * create a new handle but that's not currently possible. */
534   client_seg_handle = ct_client_seg_handle (seg_handle, client_wrk_index);
535   if (app_worker_add_segment_notify (client_wrk, client_seg_handle))
536     {
537       app_worker_del_segment_notify (server_wrk, seg_handle);
538       goto error;
539     }
540
541   return ct_seg;
542
543 error:
544
545   segment_manager_lock_and_del_segment (sm, fs_index);
546   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
547   return 0;
548 }
549
550 static int
551 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
552                           session_t *ls, session_t *ll)
553 {
554   segment_manager_props_t *props;
555   u64 seg_handle, table_handle;
556   u32 sm_index, fs_index = ~0;
557   ct_segments_ctx_t *seg_ctx;
558   ct_main_t *cm = &ct_main;
559   application_t *server;
560   segment_manager_t *sm;
561   ct_segment_t *ct_seg;
562   fifo_segment_t *fs;
563   uword *spp;
564   int rv;
565
566   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
567   sm_index = segment_manager_index (sm);
568   server = application_get (server_wrk->app_index);
569   props = application_segment_manager_properties (server);
570
571   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
572   table_handle = (u64) sm_index << 32 | table_handle;
573
574   /*
575    * Check if we already have a segment that can hold the fifos
576    */
577
578   clib_rwlock_reader_lock (&cm->app_segs_lock);
579
580   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
581   if (spp)
582     {
583       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
584       if (ct_seg)
585         {
586           ct->seg_ctx_index = ct_seg->seg_ctx_index;
587           ct->ct_seg_index = ct_seg->ct_seg_index;
588           fs_index = ct_seg->segment_index;
589           ct_seg->flags &=
590             ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
591           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
592           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
593         }
594     }
595
596   clib_rwlock_reader_unlock (&cm->app_segs_lock);
597
598   /*
599    * If not, grab exclusive lock and allocate segment
600    */
601   if (fs_index == ~0)
602     {
603       clib_rwlock_writer_lock (&cm->app_segs_lock);
604
605       ct_seg =
606         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
607       if (!ct_seg)
608         {
609           clib_rwlock_writer_unlock (&cm->app_segs_lock);
610           return -1;
611         }
612
613       ct->seg_ctx_index = ct_seg->seg_ctx_index;
614       ct->ct_seg_index = ct_seg->ct_seg_index;
615       ct_seg->server_n_sessions += 1;
616       ct_seg->client_n_sessions += 1;
617       fs_index = ct_seg->segment_index;
618
619       clib_rwlock_writer_unlock (&cm->app_segs_lock);
620     }
621
622   /*
623    * Allocate and initialize the fifos
624    */
625   fs = segment_manager_get_segment_w_lock (sm, fs_index);
626   rv = segment_manager_try_alloc_fifos (
627     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
628     &ls->rx_fifo, &ls->tx_fifo);
629   if (rv)
630     {
631       segment_manager_segment_reader_unlock (sm);
632
633       clib_rwlock_reader_lock (&cm->app_segs_lock);
634
635       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
636       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
637       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
638       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
639
640       clib_rwlock_reader_unlock (&cm->app_segs_lock);
641
642       return rv;
643     }
644
645   ls->rx_fifo->shr->master_session_index = ls->session_index;
646   ls->tx_fifo->shr->master_session_index = ls->session_index;
647   ls->rx_fifo->master_thread_index = ls->thread_index;
648   ls->tx_fifo->master_thread_index = ls->thread_index;
649
650   seg_handle = segment_manager_segment_handle (sm, fs);
651   segment_manager_segment_reader_unlock (sm);
652
653   ct->segment_handle = seg_handle;
654
655   return 0;
656 }
657
658 static void
659 ct_accept_one (u32 thread_index, u32 ho_index)
660 {
661   ct_connection_t *sct, *cct, *ho;
662   transport_connection_t *ll_ct;
663   app_worker_t *server_wrk;
664   u32 cct_index, ll_index;
665   session_t *ss, *ll;
666
667   /*
668    * Alloc client ct and initialize from ho
669    */
670   cct = ct_connection_alloc (thread_index);
671   cct_index = cct->c_c_index;
672
673   ho = ct_half_open_get (ho_index);
674
675   /* Unlikely but half-open session and transport could have been freed */
676   if (PREDICT_FALSE (!ho))
677     {
678       ct_connection_free (cct);
679       return;
680     }
681
682   clib_memcpy (cct, ho, sizeof (*ho));
683   cct->c_c_index = cct_index;
684   cct->c_thread_index = thread_index;
685   cct->flags |= CT_CONN_F_HALF_OPEN;
686
687   /* Notify session layer that half-open is on a different thread
688    * and mark ho connection index reusable. Avoids another rpc
689    */
690   session_half_open_migrate_notify (&cct->connection);
691   session_half_open_migrated_notify (&cct->connection);
692   ct_half_open_add_reusable (ho_index);
693
694   /*
695    * Alloc and init server transport
696    */
697
698   ll_index = cct->peer_index;
699   ll = listen_session_get (ll_index);
700   sct = ct_connection_alloc (thread_index);
701   /* Transport not necessarily ct but it might, so grab after sct alloc */
702   ll_ct = listen_session_get_transport (ll);
703
704   /* Make sure cct is valid after sct alloc */
705   cct = ct_connection_get (cct_index, thread_index);
706
707   sct->c_rmt_port = 0;
708   sct->c_lcl_port = ll_ct->lcl_port;
709   sct->c_is_ip4 = cct->c_is_ip4;
710   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
711   sct->client_wrk = cct->client_wrk;
712   sct->c_proto = TRANSPORT_PROTO_NONE;
713   sct->client_opaque = cct->client_opaque;
714   sct->actual_tp = cct->actual_tp;
715
716   sct->peer_index = cct->c_c_index;
717   cct->peer_index = sct->c_c_index;
718
719   /*
720    * Accept server session. Client session is created only after
721    * server confirms accept.
722    */
723   ss = session_alloc (thread_index);
724   ll = listen_session_get (ll_index);
725   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
726                                                      sct->c_is_ip4);
727   ss->connection_index = sct->c_c_index;
728   ss->listener_handle = listen_session_get_handle (ll);
729   session_set_state (ss, SESSION_STATE_CREATED);
730
731   server_wrk = application_listener_select_worker (ll);
732   ss->app_wrk_index = server_wrk->wrk_index;
733
734   sct->c_s_index = ss->session_index;
735   sct->server_wrk = ss->app_wrk_index;
736
737   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
738     {
739       ct_session_connect_notify (ss, SESSION_E_ALLOC);
740       ct_connection_free (sct);
741       session_free (ss);
742       return;
743     }
744
745   cct->server_wrk = sct->server_wrk;
746   cct->seg_ctx_index = sct->seg_ctx_index;
747   cct->ct_seg_index = sct->ct_seg_index;
748   cct->client_rx_fifo = ss->tx_fifo;
749   cct->client_tx_fifo = ss->rx_fifo;
750   cct->client_rx_fifo->refcnt++;
751   cct->client_tx_fifo->refcnt++;
752   cct->segment_handle =
753     ct_client_seg_handle (sct->segment_handle, cct->client_wrk);
754
755   session_set_state (ss, SESSION_STATE_ACCEPTING);
756   if (app_worker_accept_notify (server_wrk, ss))
757     {
758       ct_session_connect_notify (ss, SESSION_E_REFUSED);
759       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
760       ct_connection_free (sct);
761       session_free (ss);
762     }
763 }
764
765 static void
766 ct_accept_rpc_wrk_handler (void *rpc_args)
767 {
768   u32 thread_index, n_connects, i, n_pending;
769   const u32 max_connects = 32;
770   ct_worker_t *wrk;
771   u8 need_rpc = 0;
772
773   thread_index = pointer_to_uword (rpc_args);
774   wrk = ct_worker_get (thread_index);
775
776   /* Connects could be handled without worker barrier so grab lock */
777   clib_spinlock_lock (&wrk->pending_connects_lock);
778
779   n_pending = clib_fifo_elts (wrk->pending_connects);
780   n_connects = clib_min (n_pending, max_connects);
781   vec_validate (wrk->new_connects, n_connects);
782
783   for (i = 0; i < n_connects; i++)
784     clib_fifo_sub1 (wrk->pending_connects, wrk->new_connects[i]);
785
786   if (n_pending == n_connects)
787     wrk->have_connects = 0;
788   else
789     need_rpc = 1;
790
791   clib_spinlock_unlock (&wrk->pending_connects_lock);
792
793   for (i = 0; i < n_connects; i++)
794     ct_accept_one (thread_index, wrk->new_connects[i]);
795
796   if (need_rpc)
797     session_send_rpc_evt_to_thread_force (
798       thread_index, ct_accept_rpc_wrk_handler,
799       uword_to_pointer (thread_index, void *));
800 }
801
802 static void
803 ct_fwrk_flush_connects (void *rpc_args)
804 {
805   u32 thread_index, fwrk_index, n_workers;
806   ct_main_t *cm = &ct_main;
807   ct_worker_t *wrk;
808   u8 need_rpc;
809
810   fwrk_index = cm->fwrk_thread;
811   n_workers = vec_len (cm->fwrk_pending_connects);
812
813   for (thread_index = fwrk_index; thread_index < n_workers; thread_index++)
814     {
815       if (!vec_len (cm->fwrk_pending_connects[thread_index]))
816         continue;
817
818       wrk = ct_worker_get (thread_index);
819
820       /* Connects can be done without worker barrier, grab dst worker lock */
821       if (thread_index != fwrk_index)
822         clib_spinlock_lock (&wrk->pending_connects_lock);
823
824       clib_fifo_add (wrk->pending_connects,
825                      cm->fwrk_pending_connects[thread_index],
826                      vec_len (cm->fwrk_pending_connects[thread_index]));
827       if (!wrk->have_connects)
828         {
829           wrk->have_connects = 1;
830           need_rpc = 1;
831         }
832
833       if (thread_index != fwrk_index)
834         clib_spinlock_unlock (&wrk->pending_connects_lock);
835
836       vec_reset_length (cm->fwrk_pending_connects[thread_index]);
837
838       if (need_rpc)
839         session_send_rpc_evt_to_thread_force (
840           thread_index, ct_accept_rpc_wrk_handler,
841           uword_to_pointer (thread_index, void *));
842     }
843
844   cm->fwrk_have_flush = 0;
845 }
846
847 static void
848 ct_program_connect_to_wrk (u32 ho_index)
849 {
850   ct_main_t *cm = &ct_main;
851   u32 thread_index;
852
853   /* Simple round-robin policy for spreading sessions over workers. We skip
854    * thread index 0, i.e., offset the index by 1, when we have workers as it
855    * is the one dedicated to main thread. Note that n_workers does not include
856    * main thread */
857   cm->n_sessions += 1;
858   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
859
860   /* Pospone flushing of connect request to dst worker until after session
861    * layer fully initializes the half-open session. */
862   vec_add1 (cm->fwrk_pending_connects[thread_index], ho_index);
863   if (!cm->fwrk_have_flush)
864     {
865       session_send_rpc_evt_to_thread_force (
866         cm->fwrk_thread, ct_fwrk_flush_connects,
867         uword_to_pointer (thread_index, void *));
868       cm->fwrk_have_flush = 1;
869     }
870 }
871
872 static int
873 ct_connect (app_worker_t *client_wrk, session_t *ll,
874             session_endpoint_cfg_t *sep)
875 {
876   ct_connection_t *ho;
877   u32 ho_index;
878
879   /*
880    * Alloc and init client half-open transport
881    */
882
883   ho = ct_half_open_alloc ();
884   ho_index = ho->c_c_index;
885   ho->c_rmt_port = sep->port;
886   ho->c_lcl_port = 0;
887   ho->c_is_ip4 = sep->is_ip4;
888   ho->client_opaque = sep->opaque;
889   ho->client_wrk = client_wrk->wrk_index;
890   ho->peer_index = ll->session_index;
891   ho->c_proto = TRANSPORT_PROTO_NONE;
892   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
893   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
894   ho->flags |= CT_CONN_F_CLIENT;
895   ho->c_s_index = ~0;
896   ho->actual_tp = sep->original_tp;
897
898   /*
899    * Program connect on a worker, connected reply comes
900    * after server accepts the connection.
901    */
902   ct_program_connect_to_wrk (ho_index);
903
904   return ho_index;
905 }
906
907 static u32
908 ct_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
909 {
910   session_endpoint_cfg_t *sep;
911   ct_connection_t *ct;
912
913   sep = (session_endpoint_cfg_t *) tep;
914   ct = ct_connection_alloc (0);
915   ct->server_wrk = sep->app_wrk_index;
916   ct->c_is_ip4 = sep->is_ip4;
917   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
918   ct->c_lcl_port = sep->port;
919   ct->c_s_index = app_listener_index;
920   ct->actual_tp = sep->transport_proto;
921   return ct->c_c_index;
922 }
923
924 static u32
925 ct_stop_listen (u32 ct_index)
926 {
927   ct_connection_t *ct;
928   ct = ct_connection_get (ct_index, 0);
929   ct_connection_free (ct);
930   return 0;
931 }
932
933 static transport_connection_t *
934 ct_listener_get (u32 ct_index)
935 {
936   return (transport_connection_t *) ct_connection_get (ct_index, 0);
937 }
938
939 static transport_connection_t *
940 ct_session_half_open_get (u32 ct_index)
941 {
942   return (transport_connection_t *) ct_half_open_get (ct_index);
943 }
944
945 static void
946 ct_session_cleanup (u32 conn_index, u32 thread_index)
947 {
948   ct_connection_t *ct, *peer_ct;
949
950   ct = ct_connection_get (conn_index, thread_index);
951   if (!ct)
952     return;
953
954   peer_ct = ct_connection_get (ct->peer_index, thread_index);
955   if (peer_ct)
956     peer_ct->peer_index = ~0;
957
958   ct_connection_free (ct);
959 }
960
961 static void
962 ct_cleanup_ho (u32 ho_index)
963 {
964   ct_connection_t *ho;
965
966   ho = ct_half_open_get (ho_index);
967   ct_connection_free (ho);
968 }
969
970 static int
971 ct_session_connect (transport_endpoint_cfg_t * tep)
972 {
973   session_endpoint_cfg_t *sep_ext;
974   session_endpoint_t _sep, *sep = &_sep;
975   app_worker_t *app_wrk;
976   session_handle_t lh;
977   application_t *app;
978   app_listener_t *al;
979   u32 table_index;
980   session_t *ll;
981   u8 fib_proto;
982
983   sep_ext = (session_endpoint_cfg_t *) tep;
984   _sep = *(session_endpoint_t *) tep;
985   app_wrk = app_worker_get (sep_ext->app_wrk_index);
986   app = application_get (app_wrk->app_index);
987
988   sep->transport_proto = sep_ext->original_tp;
989   table_index = application_local_session_table (app);
990   lh = session_lookup_local_endpoint (table_index, sep);
991   if (lh == SESSION_DROP_HANDLE)
992     return SESSION_E_FILTERED;
993
994   if (lh == SESSION_INVALID_HANDLE)
995     goto global_scope;
996
997   ll = listen_session_get_from_handle (lh);
998   al = app_listener_get_w_session (ll);
999
1000   /*
1001    * Break loop if rule in local table points to connecting app. This
1002    * can happen if client is a generic proxy. Route connect through
1003    * global table instead.
1004    */
1005   if (al->app_index == app->app_index)
1006     goto global_scope;
1007
1008   return ct_connect (app_wrk, ll, sep_ext);
1009
1010   /*
1011    * If nothing found, check the global scope for locally attached
1012    * destinations. Make sure first that we're allowed to.
1013    */
1014
1015 global_scope:
1016   if (session_endpoint_is_local (sep))
1017     return SESSION_E_NOROUTE;
1018
1019   if (!application_has_global_scope (app))
1020     return SESSION_E_SCOPE;
1021
1022   fib_proto = session_endpoint_fib_proto (sep);
1023   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
1024   ll = session_lookup_listener_wildcard (table_index, sep);
1025
1026   /* Avoid connecting app to own listener */
1027   if (ll && ll->app_index != app->app_index)
1028     return ct_connect (app_wrk, ll, sep_ext);
1029
1030   /* Failed to connect but no error */
1031   return SESSION_E_LOCAL_CONNECT;
1032 }
1033
1034 static inline int
1035 ct_close_is_reset (ct_connection_t *ct, session_t *s)
1036 {
1037   if (ct->flags & CT_CONN_F_CLIENT)
1038     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
1039   else
1040     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
1041 }
1042
1043 static void
1044 ct_session_cleanup_server_session (session_t *s)
1045 {
1046   ct_connection_t *ct;
1047
1048   ct = (ct_connection_t *) session_get_transport (s);
1049   ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
1050   session_free (s);
1051   ct_connection_free (ct);
1052 }
1053
1054 static void
1055 ct_session_postponed_cleanup (ct_connection_t *ct)
1056 {
1057   ct_connection_t *peer_ct;
1058   app_worker_t *app_wrk;
1059   session_t *s;
1060
1061   s = session_get (ct->c_s_index, ct->c_thread_index);
1062   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1063
1064   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1065   if (peer_ct)
1066     {
1067       if (ct_close_is_reset (ct, s))
1068         session_transport_reset_notify (&peer_ct->connection);
1069       else
1070         session_transport_closing_notify (&peer_ct->connection);
1071     }
1072   session_transport_closed_notify (&ct->connection);
1073
1074   /* It would be cleaner to call session_transport_delete_notify
1075    * but then we can't control session cleanup lower */
1076   session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1077   if (app_wrk)
1078     app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
1079
1080   if (ct->flags & CT_CONN_F_CLIENT)
1081     {
1082       /* Normal free for client session as the fifos are allocated through
1083        * the connects segment manager in a segment that's not shared with
1084        * the server */
1085       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
1086       session_program_cleanup (s);
1087       ct_connection_free (ct);
1088     }
1089   else
1090     {
1091       /* Manual session and fifo segment cleanup to avoid implicit
1092        * segment manager cleanups and notifications */
1093       if (app_wrk)
1094         {
1095           /* Remove custom cleanup notify infra when/if switching to normal
1096            * session cleanup. Note that ct is freed in the cb function */
1097           app_worker_cleanup_notify_custom (app_wrk, s,
1098                                             SESSION_CLEANUP_SESSION,
1099                                             ct_session_cleanup_server_session);
1100         }
1101       else
1102         {
1103           ct_connection_free (ct);
1104         }
1105     }
1106 }
1107
1108 static void
1109 ct_handle_cleanups (void *args)
1110 {
1111   uword thread_index = pointer_to_uword (args);
1112   const u32 max_cleanups = 100;
1113   ct_cleanup_req_t *req;
1114   ct_connection_t *ct;
1115   u32 n_to_handle = 0;
1116   ct_worker_t *wrk;
1117   session_t *s;
1118
1119   wrk = ct_worker_get (thread_index);
1120   wrk->have_cleanups = 0;
1121   n_to_handle = clib_fifo_elts (wrk->pending_cleanups);
1122   n_to_handle = clib_min (n_to_handle, max_cleanups);
1123
1124   while (n_to_handle)
1125     {
1126       clib_fifo_sub2 (wrk->pending_cleanups, req);
1127       ct = ct_connection_get (req->ct_index, thread_index);
1128       s = session_get (ct->c_s_index, ct->c_thread_index);
1129       if (!svm_fifo_has_event (s->tx_fifo))
1130         ct_session_postponed_cleanup (ct);
1131       else
1132         clib_fifo_add1 (wrk->pending_cleanups, *req);
1133       n_to_handle -= 1;
1134     }
1135
1136   if (clib_fifo_elts (wrk->pending_cleanups))
1137     {
1138       wrk->have_cleanups = 1;
1139       session_send_rpc_evt_to_thread_force (
1140         thread_index, ct_handle_cleanups,
1141         uword_to_pointer (thread_index, void *));
1142     }
1143 }
1144
1145 static void
1146 ct_program_cleanup (ct_connection_t *ct)
1147 {
1148   ct_cleanup_req_t *req;
1149   uword thread_index;
1150   ct_worker_t *wrk;
1151
1152   thread_index = ct->c_thread_index;
1153   wrk = ct_worker_get (ct->c_thread_index);
1154
1155   clib_fifo_add2 (wrk->pending_cleanups, req);
1156   req->ct_index = ct->c_c_index;
1157
1158   if (wrk->have_cleanups)
1159     return;
1160
1161   wrk->have_cleanups = 1;
1162   session_send_rpc_evt_to_thread_force (
1163     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
1164 }
1165
1166 static void
1167 ct_session_close (u32 ct_index, u32 thread_index)
1168 {
1169   ct_connection_t *ct, *peer_ct;
1170   session_t *s;
1171
1172   ct = ct_connection_get (ct_index, thread_index);
1173   s = session_get (ct->c_s_index, ct->c_thread_index);
1174   peer_ct = ct_connection_get (ct->peer_index, thread_index);
1175   if (peer_ct)
1176     {
1177       peer_ct->peer_index = ~0;
1178       /* Make sure session was allocated */
1179       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
1180         {
1181           ct_session_connect_notify (s, SESSION_E_REFUSED);
1182           ct->peer_index = ~0;
1183         }
1184       else if (peer_ct->c_s_index == ~0)
1185         {
1186           /* should not happen */
1187           clib_warning ("ct peer without session");
1188           ct_connection_free (peer_ct);
1189         }
1190     }
1191
1192   /* Do not send closed notify to make sure pending tx events are
1193    * still delivered and program cleanup */
1194   ct_program_cleanup (ct);
1195 }
1196
1197 static transport_connection_t *
1198 ct_session_get (u32 ct_index, u32 thread_index)
1199 {
1200   return (transport_connection_t *) ct_connection_get (ct_index,
1201                                                        thread_index);
1202 }
1203
1204 static u8 *
1205 format_ct_connection_id (u8 * s, va_list * args)
1206 {
1207   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1208   if (!ct)
1209     return s;
1210   if (ct->c_is_ip4)
1211     {
1212       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1213                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1214                   format_ip4_address, &ct->c_lcl_ip4,
1215                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
1216                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
1217     }
1218   else
1219     {
1220       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1221                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1222                   format_ip6_address, &ct->c_lcl_ip6,
1223                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
1224                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
1225     }
1226
1227   return s;
1228 }
1229
1230 static int
1231 ct_custom_tx (void *session, transport_send_params_t * sp)
1232 {
1233   session_t *s = (session_t *) session;
1234   if (session_has_transport (s))
1235     return 0;
1236   /* If event enqueued towards peer, remove from scheduler and remove
1237    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
1238    * avoid missing events if peer did not clear fifo flag yet, which is
1239    * interpreted as successful notification and session is descheduled. */
1240   svm_fifo_unset_event (s->tx_fifo);
1241   if (!ct_session_tx (s))
1242     sp->flags = TRANSPORT_SND_F_DESCHED;
1243
1244   /* The scheduler uses packet count as a means of upper bounding the amount
1245    * of work done per dispatch. So make it look like we have sent something */
1246   return 1;
1247 }
1248
1249 static int
1250 ct_app_rx_evt (transport_connection_t * tc)
1251 {
1252   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
1253   session_t *ps, *s;
1254
1255   s = session_get (ct->c_s_index, ct->c_thread_index);
1256   if (session_has_transport (s) || s->session_state < SESSION_STATE_READY)
1257     return -1;
1258   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
1259   if (!peer_ct || (peer_ct->flags & CT_CONN_F_HALF_OPEN))
1260     return -1;
1261   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1262   if (ps->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1263     return -1;
1264   return session_dequeue_notify (ps);
1265 }
1266
1267 static u8 *
1268 format_ct_listener (u8 * s, va_list * args)
1269 {
1270   u32 tc_index = va_arg (*args, u32);
1271   u32 __clib_unused thread_index = va_arg (*args, u32);
1272   u32 __clib_unused verbose = va_arg (*args, u32);
1273   ct_connection_t *ct = ct_connection_get (tc_index, 0);
1274   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1275   if (verbose)
1276     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
1277   return s;
1278 }
1279
1280 static u8 *
1281 format_ct_half_open (u8 *s, va_list *args)
1282 {
1283   u32 ho_index = va_arg (*args, u32);
1284   u32 verbose = va_arg (*args, u32);
1285   ct_connection_t *ct = ct_half_open_get (ho_index);
1286   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1287   if (verbose)
1288     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
1289   return s;
1290 }
1291
1292 static u8 *
1293 format_ct_connection (u8 * s, va_list * args)
1294 {
1295   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1296   u32 verbose = va_arg (*args, u32);
1297
1298   if (!ct)
1299     return s;
1300   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1301   if (verbose)
1302     {
1303       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
1304       if (verbose > 1)
1305         {
1306           s = format (s, "\n");
1307         }
1308     }
1309   return s;
1310 }
1311
1312 static u8 *
1313 format_ct_session (u8 * s, va_list * args)
1314 {
1315   u32 ct_index = va_arg (*args, u32);
1316   u32 thread_index = va_arg (*args, u32);
1317   u32 verbose = va_arg (*args, u32);
1318   ct_connection_t *ct;
1319
1320   ct = ct_connection_get (ct_index, thread_index);
1321   if (!ct)
1322     {
1323       s = format (s, "empty\n");
1324       return s;
1325     }
1326
1327   s = format (s, "%U", format_ct_connection, ct, verbose);
1328   return s;
1329 }
1330
1331 clib_error_t *
1332 ct_enable_disable (vlib_main_t * vm, u8 is_en)
1333 {
1334   vlib_thread_main_t *vtm = &vlib_thread_main;
1335   ct_main_t *cm = &ct_main;
1336   ct_worker_t *wrk;
1337
1338   cm->n_workers = vlib_num_workers ();
1339   cm->fwrk_thread = transport_cl_thread ();
1340   vec_validate (cm->wrk, vtm->n_vlib_mains);
1341   vec_foreach (wrk, cm->wrk)
1342     clib_spinlock_init (&wrk->pending_connects_lock);
1343   clib_spinlock_init (&cm->ho_reuseable_lock);
1344   clib_rwlock_init (&cm->app_segs_lock);
1345   vec_validate (cm->fwrk_pending_connects, cm->n_workers);
1346   return 0;
1347 }
1348
1349 /* *INDENT-OFF* */
1350 static const transport_proto_vft_t cut_thru_proto = {
1351   .enable = ct_enable_disable,
1352   .start_listen = ct_start_listen,
1353   .stop_listen = ct_stop_listen,
1354   .get_connection = ct_session_get,
1355   .get_listener = ct_listener_get,
1356   .get_half_open = ct_session_half_open_get,
1357   .cleanup = ct_session_cleanup,
1358   .cleanup_ho = ct_cleanup_ho,
1359   .connect = ct_session_connect,
1360   .close = ct_session_close,
1361   .custom_tx = ct_custom_tx,
1362   .app_rx_evt = ct_app_rx_evt,
1363   .format_listener = format_ct_listener,
1364   .format_half_open = format_ct_half_open,
1365   .format_connection = format_ct_session,
1366   .transport_options = {
1367     .name = "ct",
1368     .short_name = "C",
1369     .tx_type = TRANSPORT_TX_INTERNAL,
1370     .service_type = TRANSPORT_SERVICE_VC,
1371   },
1372 };
1373 /* *INDENT-ON* */
1374
1375 static inline int
1376 ct_session_can_tx (session_t *s)
1377 {
1378   return (s->session_state == SESSION_STATE_READY ||
1379           s->session_state == SESSION_STATE_CLOSING ||
1380           s->session_state == SESSION_STATE_APP_CLOSED);
1381 }
1382
1383 int
1384 ct_session_tx (session_t * s)
1385 {
1386   ct_connection_t *ct, *peer_ct;
1387   session_t *peer_s;
1388
1389   if (!ct_session_can_tx (s))
1390     return 0;
1391   ct = (ct_connection_t *) session_get_transport (s);
1392   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1393   if (!peer_ct)
1394     return 0;
1395   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1396   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1397     return 0;
1398   return session_enqueue_notify (peer_s);
1399 }
1400
1401 static clib_error_t *
1402 ct_transport_init (vlib_main_t * vm)
1403 {
1404   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1405                                FIB_PROTOCOL_IP4, ~0);
1406   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1407                                FIB_PROTOCOL_IP6, ~0);
1408   return 0;
1409 }
1410
1411 VLIB_INIT_FUNCTION (ct_transport_init);
1412
1413 /*
1414  * fd.io coding-style-patch-verification: ON
1415  *
1416  * Local Variables:
1417  * eval: (c-set-style "gnu")
1418  * End:
1419  */