8fd3d44efe07afaa597d9e525bd3ad094729a5ce
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef enum ct_segment_flags_
20 {
21   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
23 } ct_segment_flags_t;
24
25 typedef struct ct_segment_
26 {
27   u32 client_n_sessions;
28   u32 server_n_sessions;
29   u32 seg_ctx_index;
30   u32 ct_seg_index;
31   u32 segment_index;
32   ct_segment_flags_t flags;
33 } ct_segment_t;
34
35 typedef struct ct_segments_
36 {
37   u32 sm_index;
38   u32 server_wrk;
39   u32 client_wrk;
40   u32 fifo_pair_bytes;
41   ct_segment_t *segments;
42 } ct_segments_ctx_t;
43
44 typedef struct ct_cleanup_req_
45 {
46   u32 ct_index;
47 } ct_cleanup_req_t;
48
49 typedef struct ct_worker_
50 {
51   ct_connection_t *connections;       /**< Per-worker connection pools */
52   u32 *pending_connects;              /**< Fifo of pending ho indices */
53   ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
54   u8 have_connects;                   /**< Set if connect rpc pending */
55   u8 have_cleanups;                   /**< Set if cleanup rpc pending */
56   clib_spinlock_t pending_connects_lock; /**< Lock for pending connects */
57   u32 *new_connects;                     /**< Burst of connects to be done */
58 } ct_worker_t;
59
60 typedef struct ct_main_
61 {
62   ct_worker_t *wrk;                     /**< Per-worker state */
63   u32 n_workers;                        /**< Number of vpp workers */
64   u32 n_sessions;                       /**< Cumulative sessions counter */
65   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
66   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
67   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
68   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
69   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
70   u32 **fwrk_pending_connects;          /**< First wrk pending half-opens */
71   u32 fwrk_thread;                      /**< First worker thread */
72   u8 fwrk_have_flush;                   /**< Flag for connect flush rpc */
73 } ct_main_t;
74
75 static ct_main_t ct_main;
76
77 static inline ct_worker_t *
78 ct_worker_get (u32 thread_index)
79 {
80   return &ct_main.wrk[thread_index];
81 }
82
83 static ct_connection_t *
84 ct_connection_alloc (u32 thread_index)
85 {
86   ct_worker_t *wrk = ct_worker_get (thread_index);
87   ct_connection_t *ct;
88
89   pool_get_aligned_safe (wrk->connections, ct, CLIB_CACHE_LINE_BYTES);
90   clib_memset (ct, 0, sizeof (*ct));
91   ct->c_c_index = ct - wrk->connections;
92   ct->c_thread_index = thread_index;
93   ct->client_wrk = ~0;
94   ct->server_wrk = ~0;
95   ct->seg_ctx_index = ~0;
96   ct->ct_seg_index = ~0;
97   return ct;
98 }
99
100 static ct_connection_t *
101 ct_connection_get (u32 ct_index, u32 thread_index)
102 {
103   ct_worker_t *wrk = ct_worker_get (thread_index);
104
105   if (pool_is_free_index (wrk->connections, ct_index))
106     return 0;
107   return pool_elt_at_index (wrk->connections, ct_index);
108 }
109
110 static void
111 ct_connection_free (ct_connection_t * ct)
112 {
113   ct_worker_t *wrk = ct_worker_get (ct->c_thread_index);
114
115   if (CLIB_DEBUG)
116     {
117       clib_memset (ct, 0xfc, sizeof (*ct));
118       pool_put (wrk->connections, ct);
119       return;
120     }
121   pool_put (wrk->connections, ct);
122 }
123
124 static ct_connection_t *
125 ct_half_open_alloc (void)
126 {
127   ct_main_t *cm = &ct_main;
128   u32 *hip;
129
130   clib_spinlock_lock (&cm->ho_reuseable_lock);
131   vec_foreach (hip, cm->ho_reusable)
132     pool_put_index (cm->wrk[cm->fwrk_thread].connections, *hip);
133   vec_reset_length (cm->ho_reusable);
134   clib_spinlock_unlock (&cm->ho_reuseable_lock);
135
136   return ct_connection_alloc (cm->fwrk_thread);
137 }
138
139 static ct_connection_t *
140 ct_half_open_get (u32 ho_index)
141 {
142   ct_main_t *cm = &ct_main;
143   return ct_connection_get (ho_index, cm->fwrk_thread);
144 }
145
146 void
147 ct_half_open_add_reusable (u32 ho_index)
148 {
149   ct_main_t *cm = &ct_main;
150
151   clib_spinlock_lock (&cm->ho_reuseable_lock);
152   vec_add1 (cm->ho_reusable, ho_index);
153   clib_spinlock_unlock (&cm->ho_reuseable_lock);
154 }
155
156 session_t *
157 ct_session_get_peer (session_t * s)
158 {
159   ct_connection_t *ct, *peer_ct;
160   ct = ct_connection_get (s->connection_index, s->thread_index);
161   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
162   return session_get (peer_ct->c_s_index, s->thread_index);
163 }
164
165 void
166 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
167 {
168   ct_connection_t *ct;
169   ct = (ct_connection_t *) session_get_transport (ll);
170   sep->transport_proto = ct->actual_tp;
171   sep->port = ct->c_lcl_port;
172   sep->is_ip4 = ct->c_is_ip4;
173   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
174 }
175
176 static void
177 ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
178 {
179   ct_connection_t *peer_ct;
180
181   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
182
183   if (is_client)
184     {
185       ct->client_wrk = APP_INVALID_INDEX;
186       if (peer_ct)
187         ct->client_wrk = APP_INVALID_INDEX;
188     }
189   else
190     {
191       ct->server_wrk = APP_INVALID_INDEX;
192       if (peer_ct)
193         ct->server_wrk = APP_INVALID_INDEX;
194     }
195 }
196
197 static inline u64
198 ct_client_seg_handle (u64 server_sh, u32 client_wrk_index)
199 {
200   return (((u64) client_wrk_index << 56) | server_sh);
201 }
202
203 static void
204 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
205                           svm_fifo_t *tx_fifo)
206 {
207   ct_segments_ctx_t *seg_ctx;
208   ct_main_t *cm = &ct_main;
209   segment_manager_t *sm;
210   app_worker_t *app_wrk;
211   ct_segment_t *ct_seg;
212   fifo_segment_t *fs;
213   u32 seg_index;
214   session_t *s;
215   int cnt;
216
217   /*
218    * Cleanup fifos
219    */
220
221   sm = segment_manager_get (rx_fifo->segment_manager);
222   seg_index = rx_fifo->segment_index;
223
224   fs = segment_manager_get_segment_w_lock (sm, seg_index);
225   fifo_segment_free_fifo (fs, rx_fifo);
226   fifo_segment_free_fifo (fs, tx_fifo);
227   segment_manager_segment_reader_unlock (sm);
228
229   /*
230    * Atomically update segment context with readers lock
231    */
232
233   clib_rwlock_reader_lock (&cm->app_segs_lock);
234
235   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
236   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
237
238   if (ct->flags & CT_CONN_F_CLIENT)
239     {
240       cnt =
241         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
242     }
243   else
244     {
245       cnt =
246         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
247     }
248
249   clib_rwlock_reader_unlock (&cm->app_segs_lock);
250
251   /*
252    * No need to do any app updates, return
253    */
254   ASSERT (cnt >= 0);
255   if (cnt)
256     return;
257
258   /*
259    * Grab exclusive lock and update flags unless some other thread
260    * added more sessions
261    */
262   clib_rwlock_writer_lock (&cm->app_segs_lock);
263
264   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
265   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
266   if (ct->flags & CT_CONN_F_CLIENT)
267     {
268       cnt = ct_seg->client_n_sessions;
269       if (cnt)
270         goto done;
271       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
272       s = session_get (ct->c_s_index, ct->c_thread_index);
273       if (s->app_wrk_index == APP_INVALID_INDEX)
274         ct_set_invalid_app_wrk (ct, 1 /* is_client */);
275     }
276   else
277     {
278       cnt = ct_seg->server_n_sessions;
279       if (cnt)
280         goto done;
281       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
282       s = session_get (ct->c_s_index, ct->c_thread_index);
283       if (s->app_wrk_index == APP_INVALID_INDEX)
284         ct_set_invalid_app_wrk (ct, 0 /* is_client */);
285     }
286
287   if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
288       !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
289     goto done;
290
291   /*
292    * Remove segment context because both client and server detached
293    */
294
295   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
296
297   /*
298    * No more segment indices left, remove the segments context
299    */
300   if (!pool_elts (seg_ctx->segments))
301     {
302       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
303       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
304       hash_unset (cm->app_segs_ctxs_table, table_handle);
305       pool_free (seg_ctx->segments);
306       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
307     }
308
309   /*
310    * Segment to be removed so notify both apps
311    */
312
313   app_wrk = app_worker_get_if_valid (ct->client_wrk);
314   /* Determine if client app still needs notification, i.e., if it is
315    * still attached. If client detached and this is the last ct session
316    * on this segment, then its connects segment manager should also be
317    * detached, so do not send notification */
318   if (app_wrk)
319     {
320       segment_manager_t *csm;
321       csm = app_worker_get_connect_segment_manager (app_wrk);
322       if (!segment_manager_app_detached (csm))
323         app_worker_del_segment_notify (
324           app_wrk, ct_client_seg_handle (ct->segment_handle, ct->client_wrk));
325     }
326
327   /* Notify server app and free segment */
328   segment_manager_lock_and_del_segment (sm, seg_index);
329
330   /* Cleanup segment manager if needed. If server detaches there's a chance
331    * the client's sessions will hold up segment removal */
332   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
333     segment_manager_free_safe (sm);
334
335 done:
336
337   clib_rwlock_writer_unlock (&cm->app_segs_lock);
338 }
339
340 static void
341 ct_session_force_disconnect_server (ct_connection_t *sct)
342 {
343   sct->peer_index = ~0;
344   session_transport_closing_notify (&sct->connection);
345 }
346
347 int
348 ct_session_connect_notify (session_t *ss, session_error_t err)
349 {
350   u32 ss_index, opaque, thread_index;
351   ct_connection_t *sct, *cct;
352   app_worker_t *client_wrk;
353   session_t *cs;
354
355   ss_index = ss->session_index;
356   thread_index = ss->thread_index;
357   sct = (ct_connection_t *) session_get_transport (ss);
358   client_wrk = app_worker_get (sct->client_wrk);
359   opaque = sct->client_opaque;
360
361   cct = ct_connection_get (sct->peer_index, thread_index);
362
363   /* Client closed while waiting for reply from server */
364   if (PREDICT_FALSE (!cct))
365     {
366       ct_session_force_disconnect_server (sct);
367       return 0;
368     }
369
370   session_half_open_delete_notify (&cct->connection);
371   cct->flags &= ~CT_CONN_F_HALF_OPEN;
372
373   if (PREDICT_FALSE (err))
374     goto connect_error;
375
376   /*
377    * Alloc client session, server session assumed to be established
378    */
379
380   ASSERT (ss->session_state >= SESSION_STATE_READY);
381
382   cs = session_alloc (thread_index);
383   ss = session_get (ss_index, thread_index);
384   cs->session_type = ss->session_type;
385   cs->listener_handle = SESSION_INVALID_HANDLE;
386   session_set_state (cs, SESSION_STATE_CONNECTING);
387   cs->app_wrk_index = client_wrk->wrk_index;
388   cs->connection_index = cct->c_c_index;
389   cs->opaque = opaque;
390   cct->c_s_index = cs->session_index;
391
392   /* This will allocate fifos for the session. They won't be used for
393    * exchanging data but they will be used to close the connection if
394    * the segment manager/worker is freed */
395   if ((err = app_worker_init_connected (client_wrk, cs)))
396     {
397       session_free (cs);
398       ct_session_force_disconnect_server (sct);
399       err = SESSION_E_ALLOC;
400       goto connect_error;
401     }
402
403   session_set_state (cs, SESSION_STATE_CONNECTING);
404
405   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
406     {
407       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
408       session_free (cs);
409       ct_session_force_disconnect_server (sct);
410       goto cleanup_client;
411     }
412
413   cs = session_get (cct->c_s_index, cct->c_thread_index);
414   session_set_state (cs, SESSION_STATE_READY);
415
416   return 0;
417
418 connect_error:
419
420   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
421
422 cleanup_client:
423
424   if (cct->client_rx_fifo)
425     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
426   ct_connection_free (cct);
427   return -1;
428 }
429
430 static inline ct_segment_t *
431 ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
432                         u32 seg_ctx_index)
433 {
434   uword free_bytes, max_free_bytes;
435   ct_segment_t *ct_seg, *res = 0;
436   ct_segments_ctx_t *seg_ctx;
437   fifo_segment_t *fs;
438   u32 max_fifos;
439
440   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
441   max_free_bytes = seg_ctx->fifo_pair_bytes;
442
443   pool_foreach (ct_seg, seg_ctx->segments)
444     {
445       /* Client or server has detached so segment cannot be used */
446       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
447       free_bytes = fifo_segment_available_bytes (fs);
448       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
449       if (free_bytes > max_free_bytes &&
450           fifo_segment_num_fifos (fs) / 2 < max_fifos)
451         {
452           max_free_bytes = free_bytes;
453           res = ct_seg;
454         }
455     }
456
457   return res;
458 }
459
460 static ct_segment_t *
461 ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
462                   segment_manager_t *sm, u32 client_wrk_index)
463 {
464   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
465   u64 seg_size, seg_handle, client_seg_handle;
466   segment_manager_props_t *props;
467   const u32 margin = 16 << 10;
468   ct_segments_ctx_t *seg_ctx;
469   app_worker_t *client_wrk;
470   application_t *server;
471   ct_segment_t *ct_seg;
472   uword *spp;
473   int fs_index;
474
475   server = application_get (server_wrk->app_index);
476   props = application_segment_manager_properties (server);
477   sm_index = segment_manager_index (sm);
478   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
479
480   /*
481    * Make sure another thread did not alloc a segment while acquiring the lock
482    */
483
484   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
485   if (spp)
486     {
487       seg_ctx_index = *spp;
488       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
489       if (ct_seg)
490         return ct_seg;
491     }
492
493   /*
494    * No segment, try to alloc one and notify the server and the client.
495    * Make sure the segment is not used for other fifos
496    */
497   seg_size = clib_max (props->segment_size, 128 << 20);
498   fs_index =
499     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
500   if (fs_index < 0)
501     return 0;
502
503   if (seg_ctx_index == ~0)
504     {
505       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
506       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
507       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
508       seg_ctx->server_wrk = server_wrk->wrk_index;
509       seg_ctx->client_wrk = client_wrk_index;
510       seg_ctx->sm_index = sm_index;
511       seg_ctx->fifo_pair_bytes = pair_bytes;
512     }
513   else
514     {
515       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
516     }
517
518   pool_get_zero (seg_ctx->segments, ct_seg);
519   ct_seg->segment_index = fs_index;
520   ct_seg->server_n_sessions = 0;
521   ct_seg->client_n_sessions = 0;
522   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
523   ct_seg->seg_ctx_index = seg_ctx_index;
524
525   /* New segment, notify the server and client */
526   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
527   if (app_worker_add_segment_notify (server_wrk, seg_handle))
528     goto error;
529
530   client_wrk = app_worker_get (client_wrk_index);
531   /* Make sure client workers do not have overlapping segment handles.
532    * Ideally, we should attach fs to client worker segment manager and
533    * create a new handle but that's not currently possible. */
534   client_seg_handle = ct_client_seg_handle (seg_handle, client_wrk_index);
535   if (app_worker_add_segment_notify (client_wrk, client_seg_handle))
536     {
537       app_worker_del_segment_notify (server_wrk, seg_handle);
538       goto error;
539     }
540
541   return ct_seg;
542
543 error:
544
545   segment_manager_lock_and_del_segment (sm, fs_index);
546   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
547   return 0;
548 }
549
550 static int
551 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
552                           session_t *ls, session_t *ll)
553 {
554   segment_manager_props_t *props;
555   u64 seg_handle, table_handle;
556   u32 sm_index, fs_index = ~0;
557   ct_segments_ctx_t *seg_ctx;
558   ct_main_t *cm = &ct_main;
559   application_t *server;
560   segment_manager_t *sm;
561   ct_segment_t *ct_seg;
562   fifo_segment_t *fs;
563   uword *spp;
564   int rv;
565
566   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
567   sm_index = segment_manager_index (sm);
568   server = application_get (server_wrk->app_index);
569   props = application_segment_manager_properties (server);
570
571   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
572   table_handle = (u64) sm_index << 32 | table_handle;
573
574   /*
575    * Check if we already have a segment that can hold the fifos
576    */
577
578   clib_rwlock_reader_lock (&cm->app_segs_lock);
579
580   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
581   if (spp)
582     {
583       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
584       if (ct_seg)
585         {
586           ct->seg_ctx_index = ct_seg->seg_ctx_index;
587           ct->ct_seg_index = ct_seg->ct_seg_index;
588           fs_index = ct_seg->segment_index;
589           ct_seg->flags &=
590             ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
591           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
592           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
593         }
594     }
595
596   clib_rwlock_reader_unlock (&cm->app_segs_lock);
597
598   /*
599    * If not, grab exclusive lock and allocate segment
600    */
601   if (fs_index == ~0)
602     {
603       clib_rwlock_writer_lock (&cm->app_segs_lock);
604
605       ct_seg =
606         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
607       if (!ct_seg)
608         {
609           clib_rwlock_writer_unlock (&cm->app_segs_lock);
610           return -1;
611         }
612
613       ct->seg_ctx_index = ct_seg->seg_ctx_index;
614       ct->ct_seg_index = ct_seg->ct_seg_index;
615       ct_seg->server_n_sessions += 1;
616       ct_seg->client_n_sessions += 1;
617       fs_index = ct_seg->segment_index;
618
619       clib_rwlock_writer_unlock (&cm->app_segs_lock);
620     }
621
622   /*
623    * Allocate and initialize the fifos
624    */
625   fs = segment_manager_get_segment_w_lock (sm, fs_index);
626   rv = segment_manager_try_alloc_fifos (
627     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
628     &ls->rx_fifo, &ls->tx_fifo);
629   if (rv)
630     {
631       segment_manager_segment_reader_unlock (sm);
632
633       clib_rwlock_reader_lock (&cm->app_segs_lock);
634
635       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
636       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
637       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
638       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
639
640       clib_rwlock_reader_unlock (&cm->app_segs_lock);
641
642       return rv;
643     }
644
645   ls->rx_fifo->shr->master_session_index = ls->session_index;
646   ls->tx_fifo->shr->master_session_index = ls->session_index;
647   ls->rx_fifo->master_thread_index = ls->thread_index;
648   ls->tx_fifo->master_thread_index = ls->thread_index;
649
650   seg_handle = segment_manager_segment_handle (sm, fs);
651   segment_manager_segment_reader_unlock (sm);
652
653   ct->segment_handle = seg_handle;
654
655   return 0;
656 }
657
658 static void
659 ct_accept_one (u32 thread_index, u32 ho_index)
660 {
661   ct_connection_t *sct, *cct, *ho;
662   transport_connection_t *ll_ct;
663   app_worker_t *server_wrk;
664   u32 cct_index, ll_index;
665   session_t *ss, *ll;
666
667   /*
668    * Alloc client ct and initialize from ho
669    */
670   cct = ct_connection_alloc (thread_index);
671   cct_index = cct->c_c_index;
672
673   ho = ct_half_open_get (ho_index);
674
675   /* Unlikely but half-open session and transport could have been freed */
676   if (PREDICT_FALSE (!ho))
677     {
678       ct_connection_free (cct);
679       return;
680     }
681
682   clib_memcpy (cct, ho, sizeof (*ho));
683   cct->c_c_index = cct_index;
684   cct->c_thread_index = thread_index;
685   cct->flags |= CT_CONN_F_HALF_OPEN;
686
687   /* Notify session layer that half-open is on a different thread
688    * and mark ho connection index reusable. Avoids another rpc
689    */
690   session_half_open_migrate_notify (&cct->connection);
691   session_half_open_migrated_notify (&cct->connection);
692   ct_half_open_add_reusable (ho_index);
693
694   /*
695    * Alloc and init server transport
696    */
697
698   ll_index = cct->peer_index;
699   ll = listen_session_get (ll_index);
700   sct = ct_connection_alloc (thread_index);
701   /* Transport not necessarily ct but it might, so grab after sct alloc */
702   ll_ct = listen_session_get_transport (ll);
703
704   /* Make sure cct is valid after sct alloc */
705   cct = ct_connection_get (cct_index, thread_index);
706
707   sct->c_rmt_port = 0;
708   sct->c_lcl_port = ll_ct->lcl_port;
709   sct->c_is_ip4 = cct->c_is_ip4;
710   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
711   sct->client_wrk = cct->client_wrk;
712   sct->c_proto = TRANSPORT_PROTO_NONE;
713   sct->client_opaque = cct->client_opaque;
714   sct->actual_tp = cct->actual_tp;
715
716   sct->peer_index = cct->c_c_index;
717   cct->peer_index = sct->c_c_index;
718
719   /*
720    * Accept server session. Client session is created only after
721    * server confirms accept.
722    */
723   ss = session_alloc (thread_index);
724   ll = listen_session_get (ll_index);
725   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
726                                                      sct->c_is_ip4);
727   ss->connection_index = sct->c_c_index;
728   ss->listener_handle = listen_session_get_handle (ll);
729   session_set_state (ss, SESSION_STATE_CREATED);
730
731   server_wrk = application_listener_select_worker (ll);
732   ss->app_wrk_index = server_wrk->wrk_index;
733
734   sct->c_s_index = ss->session_index;
735   sct->server_wrk = ss->app_wrk_index;
736
737   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
738     {
739       ct_session_connect_notify (ss, SESSION_E_ALLOC);
740       ct_connection_free (sct);
741       session_free (ss);
742       return;
743     }
744
745   cct->server_wrk = sct->server_wrk;
746   cct->seg_ctx_index = sct->seg_ctx_index;
747   cct->ct_seg_index = sct->ct_seg_index;
748   cct->client_rx_fifo = ss->tx_fifo;
749   cct->client_tx_fifo = ss->rx_fifo;
750   cct->client_rx_fifo->refcnt++;
751   cct->client_tx_fifo->refcnt++;
752   cct->segment_handle =
753     ct_client_seg_handle (sct->segment_handle, cct->client_wrk);
754
755   session_set_state (ss, SESSION_STATE_ACCEPTING);
756   if (app_worker_accept_notify (server_wrk, ss))
757     {
758       ct_session_connect_notify (ss, SESSION_E_REFUSED);
759       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
760       ct_connection_free (sct);
761       session_free (ss);
762     }
763 }
764
765 static void
766 ct_accept_rpc_wrk_handler (void *rpc_args)
767 {
768   u32 thread_index, n_connects, i, n_pending;
769   const u32 max_connects = 32;
770   ct_worker_t *wrk;
771   u8 need_rpc = 0;
772
773   thread_index = pointer_to_uword (rpc_args);
774   wrk = ct_worker_get (thread_index);
775
776   /* Connects could be handled without worker barrier so grab lock */
777   clib_spinlock_lock (&wrk->pending_connects_lock);
778
779   n_pending = clib_fifo_elts (wrk->pending_connects);
780   n_connects = clib_min (n_pending, max_connects);
781   vec_validate (wrk->new_connects, n_connects);
782
783   for (i = 0; i < n_connects; i++)
784     clib_fifo_sub1 (wrk->pending_connects, wrk->new_connects[i]);
785
786   if (n_pending == n_connects)
787     wrk->have_connects = 0;
788   else
789     need_rpc = 1;
790
791   clib_spinlock_unlock (&wrk->pending_connects_lock);
792
793   for (i = 0; i < n_connects; i++)
794     ct_accept_one (thread_index, wrk->new_connects[i]);
795
796   if (need_rpc)
797     session_send_rpc_evt_to_thread_force (
798       thread_index, ct_accept_rpc_wrk_handler,
799       uword_to_pointer (thread_index, void *));
800 }
801
802 static void
803 ct_fwrk_flush_connects (void *rpc_args)
804 {
805   u32 thread_index, fwrk_index, n_workers;
806   ct_main_t *cm = &ct_main;
807   ct_worker_t *wrk;
808   u8 need_rpc;
809
810   fwrk_index = cm->fwrk_thread;
811   n_workers = vec_len (cm->fwrk_pending_connects);
812
813   for (thread_index = fwrk_index; thread_index < n_workers; thread_index++)
814     {
815       if (!vec_len (cm->fwrk_pending_connects[thread_index]))
816         continue;
817
818       wrk = ct_worker_get (thread_index);
819
820       /* Connects can be done without worker barrier, grab dst worker lock */
821       if (thread_index != fwrk_index)
822         clib_spinlock_lock (&wrk->pending_connects_lock);
823
824       clib_fifo_add (wrk->pending_connects,
825                      cm->fwrk_pending_connects[thread_index],
826                      vec_len (cm->fwrk_pending_connects[thread_index]));
827       if (!wrk->have_connects)
828         {
829           wrk->have_connects = 1;
830           need_rpc = 1;
831         }
832
833       if (thread_index != fwrk_index)
834         clib_spinlock_unlock (&wrk->pending_connects_lock);
835
836       vec_reset_length (cm->fwrk_pending_connects[thread_index]);
837
838       if (need_rpc)
839         session_send_rpc_evt_to_thread_force (
840           thread_index, ct_accept_rpc_wrk_handler,
841           uword_to_pointer (thread_index, void *));
842     }
843
844   cm->fwrk_have_flush = 0;
845 }
846
847 static void
848 ct_program_connect_to_wrk (u32 ho_index)
849 {
850   ct_main_t *cm = &ct_main;
851   u32 thread_index;
852
853   /* Simple round-robin policy for spreading sessions over workers. We skip
854    * thread index 0, i.e., offset the index by 1, when we have workers as it
855    * is the one dedicated to main thread. Note that n_workers does not include
856    * main thread */
857   cm->n_sessions += 1;
858   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
859
860   /* Pospone flushing of connect request to dst worker until after session
861    * layer fully initializes the half-open session. */
862   vec_add1 (cm->fwrk_pending_connects[thread_index], ho_index);
863   if (!cm->fwrk_have_flush)
864     {
865       session_send_rpc_evt_to_thread_force (
866         cm->fwrk_thread, ct_fwrk_flush_connects,
867         uword_to_pointer (thread_index, void *));
868       cm->fwrk_have_flush = 1;
869     }
870 }
871
872 static int
873 ct_connect (app_worker_t *client_wrk, session_t *ll,
874             session_endpoint_cfg_t *sep)
875 {
876   ct_connection_t *ho;
877   u32 ho_index;
878
879   /*
880    * Alloc and init client half-open transport
881    */
882
883   ho = ct_half_open_alloc ();
884   ho_index = ho->c_c_index;
885   ho->c_rmt_port = sep->port;
886   ho->c_lcl_port = 0;
887   ho->c_is_ip4 = sep->is_ip4;
888   ho->client_opaque = sep->opaque;
889   ho->client_wrk = client_wrk->wrk_index;
890   ho->peer_index = ll->session_index;
891   ho->c_proto = TRANSPORT_PROTO_NONE;
892   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
893   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
894   ho->flags |= CT_CONN_F_CLIENT;
895   ho->c_s_index = ~0;
896   ho->actual_tp = sep->original_tp;
897
898   /*
899    * Program connect on a worker, connected reply comes
900    * after server accepts the connection.
901    */
902   ct_program_connect_to_wrk (ho_index);
903
904   return ho_index;
905 }
906
907 static u32
908 ct_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
909 {
910   session_endpoint_cfg_t *sep;
911   ct_connection_t *ct;
912
913   sep = (session_endpoint_cfg_t *) tep;
914   ct = ct_connection_alloc (0);
915   ct->server_wrk = sep->app_wrk_index;
916   ct->c_is_ip4 = sep->is_ip4;
917   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
918   ct->c_lcl_port = sep->port;
919   ct->c_s_index = app_listener_index;
920   ct->actual_tp = sep->transport_proto;
921   return ct->c_c_index;
922 }
923
924 static u32
925 ct_stop_listen (u32 ct_index)
926 {
927   ct_connection_t *ct;
928   ct = ct_connection_get (ct_index, 0);
929   ct_connection_free (ct);
930   return 0;
931 }
932
933 static transport_connection_t *
934 ct_listener_get (u32 ct_index)
935 {
936   return (transport_connection_t *) ct_connection_get (ct_index, 0);
937 }
938
939 static transport_connection_t *
940 ct_session_half_open_get (u32 ct_index)
941 {
942   return (transport_connection_t *) ct_half_open_get (ct_index);
943 }
944
945 static void
946 ct_session_cleanup (u32 conn_index, u32 thread_index)
947 {
948   ct_connection_t *ct, *peer_ct;
949
950   ct = ct_connection_get (conn_index, thread_index);
951   if (!ct)
952     return;
953
954   peer_ct = ct_connection_get (ct->peer_index, thread_index);
955   if (peer_ct)
956     peer_ct->peer_index = ~0;
957
958   ct_connection_free (ct);
959 }
960
961 static void
962 ct_cleanup_ho (u32 ho_index)
963 {
964   ct_connection_t *ho;
965
966   ho = ct_half_open_get (ho_index);
967   ct_connection_free (ho);
968 }
969
970 static int
971 ct_session_connect (transport_endpoint_cfg_t * tep)
972 {
973   session_endpoint_cfg_t *sep_ext;
974   session_endpoint_t _sep, *sep = &_sep;
975   app_worker_t *app_wrk;
976   session_handle_t lh;
977   application_t *app;
978   app_listener_t *al;
979   u32 table_index;
980   session_t *ll;
981   u8 fib_proto;
982
983   sep_ext = (session_endpoint_cfg_t *) tep;
984   _sep = *(session_endpoint_t *) tep;
985   app_wrk = app_worker_get (sep_ext->app_wrk_index);
986   app = application_get (app_wrk->app_index);
987
988   sep->transport_proto = sep_ext->original_tp;
989   table_index = application_local_session_table (app);
990   lh = session_lookup_local_endpoint (table_index, sep);
991   if (lh == SESSION_DROP_HANDLE)
992     return SESSION_E_FILTERED;
993
994   if (lh == SESSION_INVALID_HANDLE)
995     goto global_scope;
996
997   ll = listen_session_get_from_handle (lh);
998   al = app_listener_get_w_session (ll);
999
1000   /*
1001    * Break loop if rule in local table points to connecting app. This
1002    * can happen if client is a generic proxy. Route connect through
1003    * global table instead.
1004    */
1005   if (al->app_index == app->app_index)
1006     goto global_scope;
1007
1008   return ct_connect (app_wrk, ll, sep_ext);
1009
1010   /*
1011    * If nothing found, check the global scope for locally attached
1012    * destinations. Make sure first that we're allowed to.
1013    */
1014
1015 global_scope:
1016   if (session_endpoint_is_local (sep))
1017     return SESSION_E_NOROUTE;
1018
1019   if (!application_has_global_scope (app))
1020     return SESSION_E_SCOPE;
1021
1022   fib_proto = session_endpoint_fib_proto (sep);
1023   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
1024   ll = session_lookup_listener_wildcard (table_index, sep);
1025
1026   /* Avoid connecting app to own listener */
1027   if (ll && ll->app_index != app->app_index)
1028     return ct_connect (app_wrk, ll, sep_ext);
1029
1030   /* Failed to connect but no error */
1031   return SESSION_E_LOCAL_CONNECT;
1032 }
1033
1034 static inline int
1035 ct_close_is_reset (ct_connection_t *ct, session_t *s)
1036 {
1037   if (ct->flags & CT_CONN_F_RESET)
1038     return 1;
1039   if (ct->flags & CT_CONN_F_CLIENT)
1040     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
1041   else
1042     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
1043 }
1044
1045 static void
1046 ct_session_cleanup_server_session (session_t *s)
1047 {
1048   ct_connection_t *ct;
1049
1050   ct = (ct_connection_t *) session_get_transport (s);
1051   ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
1052   session_free (s);
1053   ct_connection_free (ct);
1054 }
1055
1056 static void
1057 ct_session_postponed_cleanup (ct_connection_t *ct)
1058 {
1059   ct_connection_t *peer_ct;
1060   app_worker_t *app_wrk;
1061   session_t *s;
1062
1063   s = session_get (ct->c_s_index, ct->c_thread_index);
1064   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1065
1066   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1067   if (peer_ct)
1068     {
1069       if (ct_close_is_reset (ct, s))
1070         session_transport_reset_notify (&peer_ct->connection);
1071       else
1072         session_transport_closing_notify (&peer_ct->connection);
1073     }
1074   session_transport_closed_notify (&ct->connection);
1075
1076   /* It would be cleaner to call session_transport_delete_notify
1077    * but then we can't control session cleanup lower */
1078   session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1079   if (app_wrk)
1080     app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
1081
1082   if (ct->flags & CT_CONN_F_CLIENT)
1083     {
1084       /* Normal free for client session as the fifos are allocated through
1085        * the connects segment manager in a segment that's not shared with
1086        * the server */
1087       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
1088       session_program_cleanup (s);
1089       ct_connection_free (ct);
1090     }
1091   else
1092     {
1093       /* Manual session and fifo segment cleanup to avoid implicit
1094        * segment manager cleanups and notifications */
1095       if (app_wrk)
1096         {
1097           /* Remove custom cleanup notify infra when/if switching to normal
1098            * session cleanup. Note that ct is freed in the cb function */
1099           app_worker_cleanup_notify_custom (app_wrk, s,
1100                                             SESSION_CLEANUP_SESSION,
1101                                             ct_session_cleanup_server_session);
1102         }
1103       else
1104         {
1105           ct_connection_free (ct);
1106         }
1107     }
1108 }
1109
1110 static void
1111 ct_handle_cleanups (void *args)
1112 {
1113   uword thread_index = pointer_to_uword (args);
1114   const u32 max_cleanups = 100;
1115   ct_cleanup_req_t *req;
1116   ct_connection_t *ct;
1117   u32 n_to_handle = 0;
1118   ct_worker_t *wrk;
1119   session_t *s;
1120
1121   wrk = ct_worker_get (thread_index);
1122   wrk->have_cleanups = 0;
1123   n_to_handle = clib_fifo_elts (wrk->pending_cleanups);
1124   n_to_handle = clib_min (n_to_handle, max_cleanups);
1125
1126   while (n_to_handle)
1127     {
1128       clib_fifo_sub2 (wrk->pending_cleanups, req);
1129       ct = ct_connection_get (req->ct_index, thread_index);
1130       s = session_get (ct->c_s_index, ct->c_thread_index);
1131       if (!svm_fifo_has_event (s->tx_fifo))
1132         ct_session_postponed_cleanup (ct);
1133       else
1134         clib_fifo_add1 (wrk->pending_cleanups, *req);
1135       n_to_handle -= 1;
1136     }
1137
1138   if (clib_fifo_elts (wrk->pending_cleanups))
1139     {
1140       wrk->have_cleanups = 1;
1141       session_send_rpc_evt_to_thread_force (
1142         thread_index, ct_handle_cleanups,
1143         uword_to_pointer (thread_index, void *));
1144     }
1145 }
1146
1147 static void
1148 ct_program_cleanup (ct_connection_t *ct)
1149 {
1150   ct_cleanup_req_t *req;
1151   uword thread_index;
1152   ct_worker_t *wrk;
1153
1154   thread_index = ct->c_thread_index;
1155   wrk = ct_worker_get (ct->c_thread_index);
1156
1157   clib_fifo_add2 (wrk->pending_cleanups, req);
1158   req->ct_index = ct->c_c_index;
1159
1160   if (wrk->have_cleanups)
1161     return;
1162
1163   wrk->have_cleanups = 1;
1164   session_send_rpc_evt_to_thread_force (
1165     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
1166 }
1167
1168 static void
1169 ct_session_close (u32 ct_index, u32 thread_index)
1170 {
1171   ct_connection_t *ct, *peer_ct;
1172   session_t *s;
1173
1174   ct = ct_connection_get (ct_index, thread_index);
1175   s = session_get (ct->c_s_index, ct->c_thread_index);
1176   peer_ct = ct_connection_get (ct->peer_index, thread_index);
1177   if (peer_ct)
1178     {
1179       peer_ct->peer_index = ~0;
1180       /* Make sure session was allocated */
1181       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
1182         {
1183           ct_session_connect_notify (s, SESSION_E_REFUSED);
1184           ct->peer_index = ~0;
1185         }
1186       else if (peer_ct->c_s_index == ~0)
1187         {
1188           /* should not happen */
1189           clib_warning ("ct peer without session");
1190           ct_connection_free (peer_ct);
1191         }
1192     }
1193
1194   /* Do not send closed notify to make sure pending tx events are
1195    * still delivered and program cleanup */
1196   ct_program_cleanup (ct);
1197 }
1198
1199 static void
1200 ct_session_reset (u32 ct_index, u32 thread_index)
1201 {
1202   ct_connection_t *ct;
1203   ct = ct_connection_get (ct_index, thread_index);
1204   ct->flags |= CT_CONN_F_RESET;
1205   ct_session_close (ct_index, thread_index);
1206 }
1207
1208 static transport_connection_t *
1209 ct_session_get (u32 ct_index, u32 thread_index)
1210 {
1211   return (transport_connection_t *) ct_connection_get (ct_index,
1212                                                        thread_index);
1213 }
1214
1215 static u8 *
1216 format_ct_connection_id (u8 * s, va_list * args)
1217 {
1218   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1219   if (!ct)
1220     return s;
1221   if (ct->c_is_ip4)
1222     {
1223       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1224                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1225                   format_ip4_address, &ct->c_lcl_ip4,
1226                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
1227                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
1228     }
1229   else
1230     {
1231       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1232                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1233                   format_ip6_address, &ct->c_lcl_ip6,
1234                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
1235                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
1236     }
1237
1238   return s;
1239 }
1240
1241 static int
1242 ct_custom_tx (void *session, transport_send_params_t * sp)
1243 {
1244   session_t *s = (session_t *) session;
1245   if (session_has_transport (s))
1246     return 0;
1247   /* If event enqueued towards peer, remove from scheduler and remove
1248    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
1249    * avoid missing events if peer did not clear fifo flag yet, which is
1250    * interpreted as successful notification and session is descheduled. */
1251   svm_fifo_unset_event (s->tx_fifo);
1252   if (!ct_session_tx (s))
1253     sp->flags = TRANSPORT_SND_F_DESCHED;
1254
1255   /* The scheduler uses packet count as a means of upper bounding the amount
1256    * of work done per dispatch. So make it look like we have sent something */
1257   return 1;
1258 }
1259
1260 static int
1261 ct_app_rx_evt (transport_connection_t * tc)
1262 {
1263   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
1264   session_t *ps, *s;
1265
1266   s = session_get (ct->c_s_index, ct->c_thread_index);
1267   if (session_has_transport (s) || s->session_state < SESSION_STATE_READY)
1268     return -1;
1269   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
1270   if (!peer_ct || (peer_ct->flags & CT_CONN_F_HALF_OPEN))
1271     return -1;
1272   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1273   if (ps->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1274     return -1;
1275   return session_dequeue_notify (ps);
1276 }
1277
1278 static u8 *
1279 format_ct_listener (u8 * s, va_list * args)
1280 {
1281   u32 tc_index = va_arg (*args, u32);
1282   u32 __clib_unused thread_index = va_arg (*args, u32);
1283   u32 __clib_unused verbose = va_arg (*args, u32);
1284   ct_connection_t *ct = ct_connection_get (tc_index, 0);
1285   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1286   if (verbose)
1287     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
1288   return s;
1289 }
1290
1291 static u8 *
1292 format_ct_half_open (u8 *s, va_list *args)
1293 {
1294   u32 ho_index = va_arg (*args, u32);
1295   u32 verbose = va_arg (*args, u32);
1296   ct_connection_t *ct = ct_half_open_get (ho_index);
1297   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1298   if (verbose)
1299     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
1300   return s;
1301 }
1302
1303 static u8 *
1304 format_ct_connection (u8 * s, va_list * args)
1305 {
1306   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1307   u32 verbose = va_arg (*args, u32);
1308
1309   if (!ct)
1310     return s;
1311   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1312   if (verbose)
1313     {
1314       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
1315       if (verbose > 1)
1316         {
1317           s = format (s, "\n");
1318         }
1319     }
1320   return s;
1321 }
1322
1323 static u8 *
1324 format_ct_session (u8 * s, va_list * args)
1325 {
1326   u32 ct_index = va_arg (*args, u32);
1327   u32 thread_index = va_arg (*args, u32);
1328   u32 verbose = va_arg (*args, u32);
1329   ct_connection_t *ct;
1330
1331   ct = ct_connection_get (ct_index, thread_index);
1332   if (!ct)
1333     {
1334       s = format (s, "empty\n");
1335       return s;
1336     }
1337
1338   s = format (s, "%U", format_ct_connection, ct, verbose);
1339   return s;
1340 }
1341
1342 clib_error_t *
1343 ct_enable_disable (vlib_main_t * vm, u8 is_en)
1344 {
1345   vlib_thread_main_t *vtm = &vlib_thread_main;
1346   ct_main_t *cm = &ct_main;
1347   ct_worker_t *wrk;
1348
1349   cm->n_workers = vlib_num_workers ();
1350   cm->fwrk_thread = transport_cl_thread ();
1351   vec_validate (cm->wrk, vtm->n_vlib_mains);
1352   vec_foreach (wrk, cm->wrk)
1353     clib_spinlock_init (&wrk->pending_connects_lock);
1354   clib_spinlock_init (&cm->ho_reuseable_lock);
1355   clib_rwlock_init (&cm->app_segs_lock);
1356   vec_validate (cm->fwrk_pending_connects, cm->n_workers);
1357   return 0;
1358 }
1359
1360 /* *INDENT-OFF* */
1361 static const transport_proto_vft_t cut_thru_proto = {
1362   .enable = ct_enable_disable,
1363   .start_listen = ct_start_listen,
1364   .stop_listen = ct_stop_listen,
1365   .get_connection = ct_session_get,
1366   .get_listener = ct_listener_get,
1367   .get_half_open = ct_session_half_open_get,
1368   .cleanup = ct_session_cleanup,
1369   .cleanup_ho = ct_cleanup_ho,
1370   .connect = ct_session_connect,
1371   .close = ct_session_close,
1372   .reset = ct_session_reset,
1373   .custom_tx = ct_custom_tx,
1374   .app_rx_evt = ct_app_rx_evt,
1375   .format_listener = format_ct_listener,
1376   .format_half_open = format_ct_half_open,
1377   .format_connection = format_ct_session,
1378   .transport_options = {
1379     .name = "ct",
1380     .short_name = "C",
1381     .tx_type = TRANSPORT_TX_INTERNAL,
1382     .service_type = TRANSPORT_SERVICE_VC,
1383   },
1384 };
1385 /* *INDENT-ON* */
1386
1387 static inline int
1388 ct_session_can_tx (session_t *s)
1389 {
1390   return (s->session_state == SESSION_STATE_READY ||
1391           s->session_state == SESSION_STATE_CLOSING ||
1392           s->session_state == SESSION_STATE_APP_CLOSED);
1393 }
1394
1395 int
1396 ct_session_tx (session_t * s)
1397 {
1398   ct_connection_t *ct, *peer_ct;
1399   session_t *peer_s;
1400
1401   if (!ct_session_can_tx (s))
1402     return 0;
1403   ct = (ct_connection_t *) session_get_transport (s);
1404   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1405   if (!peer_ct)
1406     return 0;
1407   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1408   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1409     return 0;
1410   return session_enqueue_notify (peer_s);
1411 }
1412
1413 static clib_error_t *
1414 ct_transport_init (vlib_main_t * vm)
1415 {
1416   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1417                                FIB_PROTOCOL_IP4, ~0);
1418   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1419                                FIB_PROTOCOL_IP6, ~0);
1420   return 0;
1421 }
1422
1423 VLIB_INIT_FUNCTION (ct_transport_init);
1424
1425 /*
1426  * fd.io coding-style-patch-verification: ON
1427  *
1428  * Local Variables:
1429  * eval: (c-set-style "gnu")
1430  * End:
1431  */