session: fix ct tracking of actual transport proto
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef enum ct_segment_flags_
20 {
21   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
23 } ct_segment_flags_t;
24
25 typedef struct ct_segment_
26 {
27   u32 client_n_sessions;
28   u32 server_n_sessions;
29   u32 seg_ctx_index;
30   u32 ct_seg_index;
31   u32 segment_index;
32   ct_segment_flags_t flags;
33 } ct_segment_t;
34
35 typedef struct ct_segments_
36 {
37   u32 sm_index;
38   u32 server_wrk;
39   u32 client_wrk;
40   u32 fifo_pair_bytes;
41   ct_segment_t *segments;
42 } ct_segments_ctx_t;
43
44 typedef struct ct_cleanup_req_
45 {
46   u32 ct_index;
47 } ct_cleanup_req_t;
48
49 typedef struct ct_main_
50 {
51   ct_connection_t **connections;        /**< Per-worker connection pools */
52   ct_cleanup_req_t **pending_cleanups;
53   u8 *heave_cleanups;
54   u32 n_workers;                        /**< Number of vpp workers */
55   u32 n_sessions;                       /**< Cumulative sessions counter */
56   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
57   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
58   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
59   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
60   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
61 } ct_main_t;
62
63 static ct_main_t ct_main;
64
65 static ct_connection_t *
66 ct_connection_alloc (u32 thread_index)
67 {
68   ct_connection_t *ct;
69
70   pool_get_zero (ct_main.connections[thread_index], ct);
71   ct->c_c_index = ct - ct_main.connections[thread_index];
72   ct->c_thread_index = thread_index;
73   ct->client_wrk = ~0;
74   ct->server_wrk = ~0;
75   ct->seg_ctx_index = ~0;
76   ct->ct_seg_index = ~0;
77   return ct;
78 }
79
80 static ct_connection_t *
81 ct_connection_get (u32 ct_index, u32 thread_index)
82 {
83   if (pool_is_free_index (ct_main.connections[thread_index], ct_index))
84     return 0;
85   return pool_elt_at_index (ct_main.connections[thread_index], ct_index);
86 }
87
88 static void
89 ct_connection_free (ct_connection_t * ct)
90 {
91   if (CLIB_DEBUG)
92     {
93       u32 thread_index = ct->c_thread_index;
94       memset (ct, 0xfc, sizeof (*ct));
95       pool_put (ct_main.connections[thread_index], ct);
96       return;
97     }
98   pool_put (ct_main.connections[ct->c_thread_index], ct);
99 }
100
101 static ct_connection_t *
102 ct_half_open_alloc (void)
103 {
104   ct_main_t *cm = &ct_main;
105   u32 *hip;
106
107   clib_spinlock_lock (&cm->ho_reuseable_lock);
108   vec_foreach (hip, cm->ho_reusable)
109     pool_put_index (cm->connections[0], *hip);
110   vec_reset_length (cm->ho_reusable);
111   clib_spinlock_unlock (&cm->ho_reuseable_lock);
112
113   return ct_connection_alloc (0);
114 }
115
116 void
117 ct_half_open_add_reusable (u32 ho_index)
118 {
119   ct_main_t *cm = &ct_main;
120
121   clib_spinlock_lock (&cm->ho_reuseable_lock);
122   vec_add1 (cm->ho_reusable, ho_index);
123   clib_spinlock_unlock (&cm->ho_reuseable_lock);
124 }
125
126 session_t *
127 ct_session_get_peer (session_t * s)
128 {
129   ct_connection_t *ct, *peer_ct;
130   ct = ct_connection_get (s->connection_index, s->thread_index);
131   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
132   return session_get (peer_ct->c_s_index, s->thread_index);
133 }
134
135 void
136 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
137 {
138   ct_connection_t *ct;
139   ct = (ct_connection_t *) session_get_transport (ll);
140   sep->transport_proto = ct->actual_tp;
141   sep->port = ct->c_lcl_port;
142   sep->is_ip4 = ct->c_is_ip4;
143   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
144 }
145
146 static void
147 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
148                           svm_fifo_t *tx_fifo)
149 {
150   ct_segments_ctx_t *seg_ctx;
151   ct_main_t *cm = &ct_main;
152   segment_manager_t *sm;
153   app_worker_t *app_wrk;
154   ct_segment_t *ct_seg;
155   fifo_segment_t *fs;
156   u8 del_segment = 0;
157   u32 seg_index;
158   int cnt;
159
160   /*
161    * Cleanup fifos
162    */
163
164   sm = segment_manager_get (rx_fifo->segment_manager);
165   seg_index = rx_fifo->segment_index;
166
167   fs = segment_manager_get_segment_w_lock (sm, seg_index);
168   fifo_segment_free_fifo (fs, rx_fifo);
169   fifo_segment_free_fifo (fs, tx_fifo);
170   segment_manager_segment_reader_unlock (sm);
171
172   /*
173    * Atomically update segment context with readers lock
174    */
175
176   clib_rwlock_reader_lock (&cm->app_segs_lock);
177
178   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
179   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
180
181   if (ct->flags & CT_CONN_F_CLIENT)
182     {
183       cnt =
184         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
185     }
186   else
187     {
188       cnt =
189         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
190     }
191
192   clib_rwlock_reader_unlock (&cm->app_segs_lock);
193
194   /*
195    * No need to do any app updates, return
196    */
197   ASSERT (cnt >= 0);
198   if (cnt)
199     return;
200
201   /*
202    * Grab exclusive lock and update flags unless some other thread
203    * added more sessions
204    */
205   clib_rwlock_writer_lock (&cm->app_segs_lock);
206
207   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
208   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
209   if (ct->flags & CT_CONN_F_CLIENT)
210     {
211       cnt = ct_seg->client_n_sessions;
212       if (!cnt)
213         ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
214     }
215   else
216     {
217       cnt = ct_seg->server_n_sessions;
218       if (!cnt)
219         ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
220     }
221
222   /*
223    * Remove segment context because both client and server detached
224    */
225
226   if (!cnt && (ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) &&
227       (ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
228     {
229       pool_put_index (seg_ctx->segments, ct->ct_seg_index);
230
231       /*
232        * No more segment indices left, remove the segments context
233        */
234       if (!pool_elts (seg_ctx->segments))
235         {
236           u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
237           table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
238           hash_unset (cm->app_segs_ctxs_table, table_handle);
239           pool_free (seg_ctx->segments);
240           pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
241         }
242       del_segment = 1;
243     }
244
245   clib_rwlock_writer_unlock (&cm->app_segs_lock);
246
247   /*
248    * Session counter went to zero, notify the app that detached
249    */
250   if (cnt)
251     return;
252
253   if (ct->flags & CT_CONN_F_CLIENT)
254     {
255       app_wrk = app_worker_get_if_valid (ct->client_wrk);
256       /* Determine if client app still needs notification, i.e., if it is
257        * still attached. If client detached and this is the last ct session
258        * on this segment, then its connects segment manager should also be
259        * detached, so do not send notification */
260       if (app_wrk)
261         {
262           segment_manager_t *csm;
263           csm = app_worker_get_connect_segment_manager (app_wrk);
264           if (!segment_manager_app_detached (csm))
265             app_worker_del_segment_notify (app_wrk, ct->segment_handle);
266         }
267     }
268   else if (!segment_manager_app_detached (sm))
269     {
270       app_wrk = app_worker_get (ct->server_wrk);
271       app_worker_del_segment_notify (app_wrk, ct->segment_handle);
272     }
273
274   if (!del_segment)
275     return;
276
277   segment_manager_lock_and_del_segment (sm, seg_index);
278
279   /* Cleanup segment manager if needed. If server detaches there's a chance
280    * the client's sessions will hold up segment removal */
281   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
282     segment_manager_free_safe (sm);
283 }
284
285 int
286 ct_session_connect_notify (session_t *ss, session_error_t err)
287 {
288   u32 ss_index, opaque, thread_index;
289   ct_connection_t *sct, *cct;
290   app_worker_t *client_wrk;
291   session_t *cs;
292
293   ss_index = ss->session_index;
294   thread_index = ss->thread_index;
295   sct = (ct_connection_t *) session_get_transport (ss);
296   client_wrk = app_worker_get (sct->client_wrk);
297   opaque = sct->client_opaque;
298
299   cct = ct_connection_get (sct->peer_index, thread_index);
300
301   /* Client closed while waiting for reply from server */
302   if (PREDICT_FALSE (!cct))
303     {
304       session_transport_closing_notify (&sct->connection);
305       session_transport_delete_notify (&sct->connection);
306       ct_connection_free (sct);
307       return 0;
308     }
309
310   session_half_open_delete_notify (&cct->connection);
311   cct->flags &= ~CT_CONN_F_HALF_OPEN;
312
313   if (PREDICT_FALSE (err))
314     goto connect_error;
315
316   /*
317    * Alloc client session
318    */
319
320   cs = session_alloc (thread_index);
321   ss = session_get (ss_index, thread_index);
322   cs->session_type = ss->session_type;
323   cs->listener_handle = SESSION_INVALID_HANDLE;
324   cs->session_state = SESSION_STATE_CONNECTING;
325   cs->app_wrk_index = client_wrk->wrk_index;
326   cs->connection_index = cct->c_c_index;
327   cct->c_s_index = cs->session_index;
328
329   /* This will allocate fifos for the session. They won't be used for
330    * exchanging data but they will be used to close the connection if
331    * the segment manager/worker is freed */
332   if ((err = app_worker_init_connected (client_wrk, cs)))
333     {
334       session_free (cs);
335       session_close (ss);
336       err = SESSION_E_ALLOC;
337       goto connect_error;
338     }
339
340   cs->session_state = SESSION_STATE_CONNECTING;
341
342   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
343     {
344       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
345       session_free (cs);
346       session_close (ss);
347       goto cleanup_client;
348     }
349
350   cs = session_get (cct->c_s_index, cct->c_thread_index);
351   cs->session_state = SESSION_STATE_READY;
352
353   return 0;
354
355 connect_error:
356
357   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
358
359 cleanup_client:
360
361   if (cct->client_rx_fifo)
362     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
363   ct_connection_free (cct);
364   return -1;
365 }
366
367 static inline ct_segment_t *
368 ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
369                         u32 seg_ctx_index)
370 {
371   uword free_bytes, max_free_bytes;
372   ct_segment_t *ct_seg, *res = 0;
373   ct_segments_ctx_t *seg_ctx;
374   fifo_segment_t *fs;
375   u32 max_fifos;
376
377   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
378   max_free_bytes = seg_ctx->fifo_pair_bytes;
379
380   pool_foreach (ct_seg, seg_ctx->segments)
381     {
382       /* Client or server has detached so segment cannot be used */
383       if ((ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED) ||
384           (ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED))
385         continue;
386       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
387       free_bytes = fifo_segment_available_bytes (fs);
388       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
389       if (free_bytes > max_free_bytes &&
390           fifo_segment_num_fifos (fs) / 2 < max_fifos)
391         {
392           max_free_bytes = free_bytes;
393           res = ct_seg;
394         }
395     }
396
397   return res;
398 }
399
400 static ct_segment_t *
401 ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
402                   segment_manager_t *sm, u32 client_wrk_index)
403 {
404   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
405   segment_manager_props_t *props;
406   const u32 margin = 16 << 10;
407   ct_segments_ctx_t *seg_ctx;
408   app_worker_t *client_wrk;
409   u64 seg_size, seg_handle;
410   application_t *server;
411   ct_segment_t *ct_seg;
412   uword *spp;
413   int fs_index;
414
415   server = application_get (server_wrk->app_index);
416   props = application_segment_manager_properties (server);
417   sm_index = segment_manager_index (sm);
418   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
419
420   /*
421    * Make sure another thread did not alloc a segment while acquiring the lock
422    */
423
424   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
425   if (spp)
426     {
427       seg_ctx_index = *spp;
428       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
429       if (ct_seg)
430         return ct_seg;
431     }
432
433   /*
434    * No segment, try to alloc one and notify the server and the client.
435    * Make sure the segment is not used for other fifos
436    */
437   seg_size = clib_max (props->segment_size, 128 << 20);
438   fs_index =
439     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
440   if (fs_index < 0)
441     return 0;
442
443   if (seg_ctx_index == ~0)
444     {
445       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
446       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
447       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
448       seg_ctx->server_wrk = server_wrk->wrk_index;
449       seg_ctx->client_wrk = client_wrk_index;
450       seg_ctx->sm_index = sm_index;
451       seg_ctx->fifo_pair_bytes = pair_bytes;
452     }
453   else
454     {
455       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
456     }
457
458   pool_get_zero (seg_ctx->segments, ct_seg);
459   ct_seg->segment_index = fs_index;
460   ct_seg->server_n_sessions = 0;
461   ct_seg->client_n_sessions = 0;
462   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
463   ct_seg->seg_ctx_index = seg_ctx_index;
464
465   /* New segment, notify the server and client */
466   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
467   if (app_worker_add_segment_notify (server_wrk, seg_handle))
468     goto error;
469
470   client_wrk = app_worker_get (client_wrk_index);
471   if (app_worker_add_segment_notify (client_wrk, seg_handle))
472     {
473       app_worker_del_segment_notify (server_wrk, seg_handle);
474       goto error;
475     }
476
477   return ct_seg;
478
479 error:
480
481   segment_manager_lock_and_del_segment (sm, fs_index);
482   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
483   return 0;
484 }
485
486 static int
487 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
488                           session_t *ls, session_t *ll)
489 {
490   segment_manager_props_t *props;
491   u64 seg_handle, table_handle;
492   u32 sm_index, fs_index = ~0;
493   ct_segments_ctx_t *seg_ctx;
494   ct_main_t *cm = &ct_main;
495   application_t *server;
496   segment_manager_t *sm;
497   ct_segment_t *ct_seg;
498   fifo_segment_t *fs;
499   uword *spp;
500   int rv;
501
502   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
503   sm_index = segment_manager_index (sm);
504   server = application_get (server_wrk->app_index);
505   props = application_segment_manager_properties (server);
506
507   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
508   table_handle = (u64) sm_index << 32 | table_handle;
509
510   /*
511    * Check if we already have a segment that can hold the fifos
512    */
513
514   clib_rwlock_reader_lock (&cm->app_segs_lock);
515
516   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
517   if (spp)
518     {
519       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
520       if (ct_seg)
521         {
522           ct->seg_ctx_index = ct_seg->seg_ctx_index;
523           ct->ct_seg_index = ct_seg->ct_seg_index;
524           fs_index = ct_seg->segment_index;
525           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
526           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
527         }
528     }
529
530   clib_rwlock_reader_unlock (&cm->app_segs_lock);
531
532   /*
533    * If not, grab exclusive lock and allocate segment
534    */
535   if (fs_index == ~0)
536     {
537       clib_rwlock_writer_lock (&cm->app_segs_lock);
538
539       ct_seg =
540         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
541       if (!ct_seg)
542         {
543           clib_rwlock_writer_unlock (&cm->app_segs_lock);
544           return -1;
545         }
546
547       ct->seg_ctx_index = ct_seg->seg_ctx_index;
548       ct->ct_seg_index = ct_seg->ct_seg_index;
549       ct_seg->server_n_sessions += 1;
550       ct_seg->client_n_sessions += 1;
551       fs_index = ct_seg->segment_index;
552
553       clib_rwlock_writer_unlock (&cm->app_segs_lock);
554     }
555
556   /*
557    * Allocate and initialize the fifos
558    */
559   fs = segment_manager_get_segment_w_lock (sm, fs_index);
560   rv = segment_manager_try_alloc_fifos (
561     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
562     &ls->rx_fifo, &ls->tx_fifo);
563   if (rv)
564     {
565       segment_manager_segment_reader_unlock (sm);
566
567       clib_rwlock_reader_lock (&cm->app_segs_lock);
568
569       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
570       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
571       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
572       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
573
574       clib_rwlock_reader_unlock (&cm->app_segs_lock);
575
576       return rv;
577     }
578
579   ls->rx_fifo->shr->master_session_index = ls->session_index;
580   ls->tx_fifo->shr->master_session_index = ls->session_index;
581   ls->rx_fifo->master_thread_index = ls->thread_index;
582   ls->tx_fifo->master_thread_index = ls->thread_index;
583   ls->rx_fifo->segment_manager = sm_index;
584   ls->tx_fifo->segment_manager = sm_index;
585   ls->rx_fifo->segment_index = fs_index;
586   ls->tx_fifo->segment_index = fs_index;
587
588   seg_handle = segment_manager_segment_handle (sm, fs);
589   segment_manager_segment_reader_unlock (sm);
590
591   ct->segment_handle = seg_handle;
592
593   return 0;
594 }
595
596 static void
597 ct_accept_rpc_wrk_handler (void *accept_args)
598 {
599   u32 cct_index, ho_index, thread_index, ll_index;
600   ct_connection_t *sct, *cct, *ho;
601   transport_connection_t *ll_ct;
602   app_worker_t *server_wrk;
603   session_t *ss, *ll;
604
605   /*
606    * Alloc client ct and initialize from ho
607    */
608   thread_index = vlib_get_thread_index ();
609   cct = ct_connection_alloc (thread_index);
610   cct_index = cct->c_c_index;
611
612   ho_index = pointer_to_uword (accept_args);
613   ho = ct_connection_get (ho_index, 0);
614
615   /* Unlikely but half-open session and transport could have been freed */
616   if (PREDICT_FALSE (!ho))
617     {
618       ct_connection_free (cct);
619       return;
620     }
621
622   clib_memcpy (cct, ho, sizeof (*ho));
623   cct->c_c_index = cct_index;
624   cct->c_thread_index = thread_index;
625   cct->flags |= CT_CONN_F_HALF_OPEN;
626
627   /* Notify session layer that half-open is on a different thread
628    * and mark ho connection index reusable. Avoids another rpc
629    */
630   session_half_open_migrate_notify (&cct->connection);
631   session_half_open_migrated_notify (&cct->connection);
632   ct_half_open_add_reusable (ho_index);
633
634   /*
635    * Alloc and init server transport
636    */
637
638   ll_index = cct->peer_index;
639   ll = listen_session_get (ll_index);
640   sct = ct_connection_alloc (thread_index);
641   /* Transport not necessarily ct but it might, so grab after sct alloc */
642   ll_ct = listen_session_get_transport (ll);
643
644   /* Make sure cct is valid after sct alloc */
645   cct = ct_connection_get (cct_index, thread_index);
646
647   sct->c_rmt_port = 0;
648   sct->c_lcl_port = ll_ct->lcl_port;
649   sct->c_is_ip4 = cct->c_is_ip4;
650   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
651   sct->client_wrk = cct->client_wrk;
652   sct->c_proto = TRANSPORT_PROTO_NONE;
653   sct->client_opaque = cct->client_opaque;
654   sct->actual_tp = cct->actual_tp;
655
656   sct->peer_index = cct->c_c_index;
657   cct->peer_index = sct->c_c_index;
658
659   /*
660    * Accept server session. Client session is created only after
661    * server confirms accept.
662    */
663   ss = session_alloc (thread_index);
664   ll = listen_session_get (ll_index);
665   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
666                                                      sct->c_is_ip4);
667   ss->connection_index = sct->c_c_index;
668   ss->listener_handle = listen_session_get_handle (ll);
669   ss->session_state = SESSION_STATE_CREATED;
670
671   server_wrk = application_listener_select_worker (ll);
672   ss->app_wrk_index = server_wrk->wrk_index;
673
674   sct->c_s_index = ss->session_index;
675   sct->server_wrk = ss->app_wrk_index;
676
677   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
678     {
679       ct_session_connect_notify (ss, SESSION_E_ALLOC);
680       ct_connection_free (sct);
681       session_free (ss);
682       return;
683     }
684
685   cct->seg_ctx_index = sct->seg_ctx_index;
686   cct->ct_seg_index = sct->ct_seg_index;
687   cct->client_rx_fifo = ss->tx_fifo;
688   cct->client_tx_fifo = ss->rx_fifo;
689   cct->client_rx_fifo->refcnt++;
690   cct->client_tx_fifo->refcnt++;
691   cct->segment_handle = sct->segment_handle;
692
693   ss->session_state = SESSION_STATE_ACCEPTING;
694   if (app_worker_accept_notify (server_wrk, ss))
695     {
696       ct_session_connect_notify (ss, SESSION_E_REFUSED);
697       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
698       ct_connection_free (sct);
699       session_free (ss);
700     }
701 }
702
703 static int
704 ct_connect (app_worker_t * client_wrk, session_t * ll,
705             session_endpoint_cfg_t * sep)
706 {
707   u32 thread_index, ho_index;
708   ct_main_t *cm = &ct_main;
709   ct_connection_t *ho;
710
711   /* Simple round-robin policy for spreading sessions over workers. We skip
712    * thread index 0, i.e., offset the index by 1, when we have workers as it
713    * is the one dedicated to main thread. Note that n_workers does not include
714    * main thread */
715   cm->n_sessions += 1;
716   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
717
718   /*
719    * Alloc and init client half-open transport
720    */
721
722   ho = ct_half_open_alloc ();
723   ho_index = ho->c_c_index;
724   ho->c_rmt_port = sep->port;
725   ho->c_lcl_port = 0;
726   ho->c_is_ip4 = sep->is_ip4;
727   ho->client_opaque = sep->opaque;
728   ho->client_wrk = client_wrk->wrk_index;
729   ho->peer_index = ll->session_index;
730   ho->c_proto = TRANSPORT_PROTO_NONE;
731   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
732   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
733   ho->flags |= CT_CONN_F_CLIENT;
734   ho->c_s_index = ~0;
735   ho->actual_tp = sep->original_tp;
736
737   /*
738    * Accept connection on thread selected above. Connected reply comes
739    * after server accepts the connection.
740    */
741
742   session_send_rpc_evt_to_thread_force (thread_index,
743                                         ct_accept_rpc_wrk_handler,
744                                         uword_to_pointer (ho_index, void *));
745
746   return ho_index;
747 }
748
749 static u32
750 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
751 {
752   session_endpoint_cfg_t *sep;
753   ct_connection_t *ct;
754
755   sep = (session_endpoint_cfg_t *) tep;
756   ct = ct_connection_alloc (0);
757   ct->server_wrk = sep->app_wrk_index;
758   ct->c_is_ip4 = sep->is_ip4;
759   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
760   ct->c_lcl_port = sep->port;
761   ct->c_s_index = app_listener_index;
762   ct->actual_tp = sep->transport_proto;
763   return ct->c_c_index;
764 }
765
766 static u32
767 ct_stop_listen (u32 ct_index)
768 {
769   ct_connection_t *ct;
770   ct = ct_connection_get (ct_index, 0);
771   ct_connection_free (ct);
772   return 0;
773 }
774
775 static transport_connection_t *
776 ct_listener_get (u32 ct_index)
777 {
778   return (transport_connection_t *) ct_connection_get (ct_index, 0);
779 }
780
781 static transport_connection_t *
782 ct_half_open_get (u32 ct_index)
783 {
784   return (transport_connection_t *) ct_connection_get (ct_index, 0);
785 }
786
787 static void
788 ct_session_cleanup (u32 conn_index, u32 thread_index)
789 {
790   ct_connection_t *ct, *peer_ct;
791
792   ct = ct_connection_get (conn_index, thread_index);
793   if (!ct)
794     return;
795
796   peer_ct = ct_connection_get (ct->peer_index, thread_index);
797   if (peer_ct)
798     peer_ct->peer_index = ~0;
799
800   ct_connection_free (ct);
801 }
802
803 static void
804 ct_cleanup_ho (u32 ho_index)
805 {
806   ct_connection_free (ct_connection_get (ho_index, 0));
807 }
808
809 static int
810 ct_session_connect (transport_endpoint_cfg_t * tep)
811 {
812   session_endpoint_cfg_t *sep_ext;
813   session_endpoint_t _sep, *sep = &_sep;
814   app_worker_t *app_wrk;
815   session_handle_t lh;
816   application_t *app;
817   app_listener_t *al;
818   u32 table_index;
819   session_t *ll;
820   u8 fib_proto;
821
822   sep_ext = (session_endpoint_cfg_t *) tep;
823   _sep = *(session_endpoint_t *) tep;
824   app_wrk = app_worker_get (sep_ext->app_wrk_index);
825   app = application_get (app_wrk->app_index);
826
827   sep->transport_proto = sep_ext->original_tp;
828   table_index = application_local_session_table (app);
829   lh = session_lookup_local_endpoint (table_index, sep);
830   if (lh == SESSION_DROP_HANDLE)
831     return SESSION_E_FILTERED;
832
833   if (lh == SESSION_INVALID_HANDLE)
834     goto global_scope;
835
836   ll = listen_session_get_from_handle (lh);
837   al = app_listener_get_w_session (ll);
838
839   /*
840    * Break loop if rule in local table points to connecting app. This
841    * can happen if client is a generic proxy. Route connect through
842    * global table instead.
843    */
844   if (al->app_index == app->app_index)
845     goto global_scope;
846
847   return ct_connect (app_wrk, ll, sep_ext);
848
849   /*
850    * If nothing found, check the global scope for locally attached
851    * destinations. Make sure first that we're allowed to.
852    */
853
854 global_scope:
855   if (session_endpoint_is_local (sep))
856     return SESSION_E_NOROUTE;
857
858   if (!application_has_global_scope (app))
859     return SESSION_E_SCOPE;
860
861   fib_proto = session_endpoint_fib_proto (sep);
862   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
863   ll = session_lookup_listener_wildcard (table_index, sep);
864
865   /* Avoid connecting app to own listener */
866   if (ll && ll->app_index != app->app_index)
867     return ct_connect (app_wrk, ll, sep_ext);
868
869   /* Failed to connect but no error */
870   return SESSION_E_LOCAL_CONNECT;
871 }
872
873 static void
874 ct_session_postponed_cleanup (ct_connection_t *ct)
875 {
876   app_worker_t *app_wrk;
877   session_t *s;
878
879   s = session_get (ct->c_s_index, ct->c_thread_index);
880   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
881
882   if (ct->flags & CT_CONN_F_CLIENT)
883     {
884       if (app_wrk)
885         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
886
887       /* Normal free for client session as the fifos are allocated through
888        * the connects segment manager in a segment that's not shared with
889        * the server */
890       session_free_w_fifos (s);
891       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
892     }
893   else
894     {
895       /* Manual session and fifo segment cleanup to avoid implicit
896        * segment manager cleanups and notifications */
897       app_wrk = app_worker_get_if_valid (s->app_wrk_index);
898       if (app_wrk)
899         {
900           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
901           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
902         }
903
904       ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
905       session_free (s);
906     }
907
908   ct_connection_free (ct);
909 }
910
911 static void
912 ct_handle_cleanups (void *args)
913 {
914   uword thread_index = pointer_to_uword (args);
915   const u32 max_cleanups = 100;
916   ct_main_t *cm = &ct_main;
917   ct_cleanup_req_t *req;
918   ct_connection_t *ct;
919   u32 n_to_handle = 0;
920   session_t *s;
921
922   cm->heave_cleanups[thread_index] = 0;
923   n_to_handle = clib_fifo_elts (cm->pending_cleanups[thread_index]);
924   n_to_handle = clib_min (n_to_handle, max_cleanups);
925
926   while (n_to_handle)
927     {
928       clib_fifo_sub2 (cm->pending_cleanups[thread_index], req);
929       ct = ct_connection_get (req->ct_index, thread_index);
930       s = session_get (ct->c_s_index, ct->c_thread_index);
931       if (!svm_fifo_has_event (s->tx_fifo))
932         ct_session_postponed_cleanup (ct);
933       else
934         clib_fifo_add1 (cm->pending_cleanups[thread_index], *req);
935       n_to_handle -= 1;
936     }
937
938   if (clib_fifo_elts (cm->pending_cleanups[thread_index]))
939     {
940       cm->heave_cleanups[thread_index] = 1;
941       session_send_rpc_evt_to_thread_force (
942         thread_index, ct_handle_cleanups,
943         uword_to_pointer (thread_index, void *));
944     }
945 }
946
947 static void
948 ct_program_cleanup (ct_connection_t *ct)
949 {
950   ct_main_t *cm = &ct_main;
951   ct_cleanup_req_t *req;
952   uword thread_index;
953
954   thread_index = ct->c_thread_index;
955   clib_fifo_add2 (cm->pending_cleanups[thread_index], req);
956   req->ct_index = ct->c_c_index;
957
958   if (cm->heave_cleanups[thread_index])
959     return;
960
961   cm->heave_cleanups[thread_index] = 1;
962   session_send_rpc_evt_to_thread_force (
963     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
964 }
965
966 static inline int
967 ct_close_is_reset (ct_connection_t *ct, session_t *s)
968 {
969   if (ct->flags & CT_CONN_F_CLIENT)
970     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
971   else
972     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
973 }
974
975 static void
976 ct_session_close (u32 ct_index, u32 thread_index)
977 {
978   ct_connection_t *ct, *peer_ct;
979   session_t *s;
980
981   ct = ct_connection_get (ct_index, thread_index);
982   s = session_get (ct->c_s_index, ct->c_thread_index);
983   peer_ct = ct_connection_get (ct->peer_index, thread_index);
984   if (peer_ct)
985     {
986       peer_ct->peer_index = ~0;
987       /* Make sure session was allocated */
988       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
989         {
990           ct_session_connect_notify (s, SESSION_E_REFUSED);
991         }
992       else if (peer_ct->c_s_index != ~0)
993         {
994           if (ct_close_is_reset (ct, s))
995             session_transport_reset_notify (&peer_ct->connection);
996           else
997             session_transport_closing_notify (&peer_ct->connection);
998         }
999       else
1000         {
1001           /* should not happen */
1002           clib_warning ("ct peer without session");
1003           ct_connection_free (peer_ct);
1004         }
1005     }
1006
1007   /* Do not send closed notify to make sure pending tx events are
1008    * still delivered and program cleanup */
1009   ct_program_cleanup (ct);
1010 }
1011
1012 static transport_connection_t *
1013 ct_session_get (u32 ct_index, u32 thread_index)
1014 {
1015   return (transport_connection_t *) ct_connection_get (ct_index,
1016                                                        thread_index);
1017 }
1018
1019 static u8 *
1020 format_ct_connection_id (u8 * s, va_list * args)
1021 {
1022   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1023   if (!ct)
1024     return s;
1025   if (ct->c_is_ip4)
1026     {
1027       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1028                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1029                   format_ip4_address, &ct->c_lcl_ip4,
1030                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
1031                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
1032     }
1033   else
1034     {
1035       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1036                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1037                   format_ip6_address, &ct->c_lcl_ip6,
1038                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
1039                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
1040     }
1041
1042   return s;
1043 }
1044
1045 static int
1046 ct_custom_tx (void *session, transport_send_params_t * sp)
1047 {
1048   session_t *s = (session_t *) session;
1049   if (session_has_transport (s))
1050     return 0;
1051   /* If event enqueued towards peer, remove from scheduler and remove
1052    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
1053    * avoid missing events if peer did not clear fifo flag yet, which is
1054    * interpreted as successful notification and session is descheduled. */
1055   svm_fifo_unset_event (s->tx_fifo);
1056   if (!ct_session_tx (s))
1057     sp->flags = TRANSPORT_SND_F_DESCHED;
1058
1059   /* The scheduler uses packet count as a means of upper bounding the amount
1060    * of work done per dispatch. So make it look like we have sent something */
1061   return 1;
1062 }
1063
1064 static int
1065 ct_app_rx_evt (transport_connection_t * tc)
1066 {
1067   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
1068   session_t *ps;
1069
1070   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
1071   if (!peer_ct)
1072     return -1;
1073   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1074   return session_dequeue_notify (ps);
1075 }
1076
1077 static u8 *
1078 format_ct_listener (u8 * s, va_list * args)
1079 {
1080   u32 tc_index = va_arg (*args, u32);
1081   u32 __clib_unused thread_index = va_arg (*args, u32);
1082   u32 __clib_unused verbose = va_arg (*args, u32);
1083   ct_connection_t *ct = ct_connection_get (tc_index, 0);
1084   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1085   if (verbose)
1086     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
1087   return s;
1088 }
1089
1090 static u8 *
1091 format_ct_half_open (u8 *s, va_list *args)
1092 {
1093   u32 ho_index = va_arg (*args, u32);
1094   u32 verbose = va_arg (*args, u32);
1095   ct_connection_t *ct = ct_connection_get (ho_index, 0);
1096   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1097   if (verbose)
1098     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
1099   return s;
1100 }
1101
1102 static u8 *
1103 format_ct_connection (u8 * s, va_list * args)
1104 {
1105   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1106   u32 verbose = va_arg (*args, u32);
1107
1108   if (!ct)
1109     return s;
1110   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1111   if (verbose)
1112     {
1113       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
1114       if (verbose > 1)
1115         {
1116           s = format (s, "\n");
1117         }
1118     }
1119   return s;
1120 }
1121
1122 static u8 *
1123 format_ct_session (u8 * s, va_list * args)
1124 {
1125   u32 ct_index = va_arg (*args, u32);
1126   u32 thread_index = va_arg (*args, u32);
1127   u32 verbose = va_arg (*args, u32);
1128   ct_connection_t *ct;
1129
1130   ct = ct_connection_get (ct_index, thread_index);
1131   if (!ct)
1132     {
1133       s = format (s, "empty\n");
1134       return s;
1135     }
1136
1137   s = format (s, "%U", format_ct_connection, ct, verbose);
1138   return s;
1139 }
1140
1141 clib_error_t *
1142 ct_enable_disable (vlib_main_t * vm, u8 is_en)
1143 {
1144   ct_main_t *cm = &ct_main;
1145
1146   cm->n_workers = vlib_num_workers ();
1147   vec_validate (cm->connections, cm->n_workers);
1148   vec_validate (cm->pending_cleanups, cm->n_workers);
1149   vec_validate (cm->heave_cleanups, cm->n_workers);
1150   clib_spinlock_init (&cm->ho_reuseable_lock);
1151   clib_rwlock_init (&cm->app_segs_lock);
1152   return 0;
1153 }
1154
1155 /* *INDENT-OFF* */
1156 static const transport_proto_vft_t cut_thru_proto = {
1157   .enable = ct_enable_disable,
1158   .start_listen = ct_start_listen,
1159   .stop_listen = ct_stop_listen,
1160   .get_connection = ct_session_get,
1161   .get_listener = ct_listener_get,
1162   .get_half_open = ct_half_open_get,
1163   .cleanup = ct_session_cleanup,
1164   .cleanup_ho = ct_cleanup_ho,
1165   .connect = ct_session_connect,
1166   .close = ct_session_close,
1167   .custom_tx = ct_custom_tx,
1168   .app_rx_evt = ct_app_rx_evt,
1169   .format_listener = format_ct_listener,
1170   .format_half_open = format_ct_half_open,
1171   .format_connection = format_ct_session,
1172   .transport_options = {
1173     .name = "ct",
1174     .short_name = "C",
1175     .tx_type = TRANSPORT_TX_INTERNAL,
1176     .service_type = TRANSPORT_SERVICE_VC,
1177   },
1178 };
1179 /* *INDENT-ON* */
1180
1181 int
1182 ct_session_tx (session_t * s)
1183 {
1184   ct_connection_t *ct, *peer_ct;
1185   session_t *peer_s;
1186
1187   ct = (ct_connection_t *) session_get_transport (s);
1188   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1189   if (!peer_ct)
1190     return 0;
1191   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1192   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1193     return 0;
1194   return session_enqueue_notify (peer_s);
1195 }
1196
1197 static clib_error_t *
1198 ct_transport_init (vlib_main_t * vm)
1199 {
1200   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1201                                FIB_PROTOCOL_IP4, ~0);
1202   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1203                                FIB_PROTOCOL_IP6, ~0);
1204   return 0;
1205 }
1206
1207 VLIB_INIT_FUNCTION (ct_transport_init);
1208
1209 /*
1210  * fd.io coding-style-patch-verification: ON
1211  *
1212  * Local Variables:
1213  * eval: (c-set-style "gnu")
1214  * End:
1215  */