session: optimize ct fifo segment allocations
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef enum ct_segment_flags_
20 {
21   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
23 } ct_segment_flags_t;
24
25 typedef struct ct_segment_
26 {
27   u32 segment_index;
28   u32 client_n_sessions;
29   u32 server_n_sessions;
30   ct_segment_flags_t flags;
31 } ct_segment_t;
32
33 typedef struct ct_segments_
34 {
35   u32 sm_index;
36   u32 server_wrk;
37   u32 client_wrk;
38   ct_segment_t *segments;
39 } ct_segments_ctx_t;
40
41 typedef struct ct_main_
42 {
43   ct_connection_t **connections;        /**< Per-worker connection pools */
44   u32 n_workers;                        /**< Number of vpp workers */
45   u32 n_sessions;                       /**< Cumulative sessions counter */
46   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
47   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
48   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
49   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
50   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
51 } ct_main_t;
52
53 static ct_main_t ct_main;
54
55 static ct_connection_t *
56 ct_connection_alloc (u32 thread_index)
57 {
58   ct_connection_t *ct;
59
60   pool_get_zero (ct_main.connections[thread_index], ct);
61   ct->c_c_index = ct - ct_main.connections[thread_index];
62   ct->c_thread_index = thread_index;
63   ct->client_wrk = ~0;
64   ct->server_wrk = ~0;
65   ct->seg_ctx_index = ~0;
66   ct->ct_seg_index = ~0;
67   return ct;
68 }
69
70 static ct_connection_t *
71 ct_connection_get (u32 ct_index, u32 thread_index)
72 {
73   if (pool_is_free_index (ct_main.connections[thread_index], ct_index))
74     return 0;
75   return pool_elt_at_index (ct_main.connections[thread_index], ct_index);
76 }
77
78 static void
79 ct_connection_free (ct_connection_t * ct)
80 {
81   if (CLIB_DEBUG)
82     {
83       u32 thread_index = ct->c_thread_index;
84       memset (ct, 0xfc, sizeof (*ct));
85       pool_put (ct_main.connections[thread_index], ct);
86       return;
87     }
88   pool_put (ct_main.connections[ct->c_thread_index], ct);
89 }
90
91 static ct_connection_t *
92 ct_half_open_alloc (void)
93 {
94   ct_main_t *cm = &ct_main;
95   u32 *hip;
96
97   clib_spinlock_lock (&cm->ho_reuseable_lock);
98   vec_foreach (hip, cm->ho_reusable)
99     pool_put_index (cm->connections[0], *hip);
100   vec_reset_length (cm->ho_reusable);
101   clib_spinlock_unlock (&cm->ho_reuseable_lock);
102
103   return ct_connection_alloc (0);
104 }
105
106 void
107 ct_half_open_add_reusable (u32 ho_index)
108 {
109   ct_main_t *cm = &ct_main;
110
111   clib_spinlock_lock (&cm->ho_reuseable_lock);
112   vec_add1 (cm->ho_reusable, ho_index);
113   clib_spinlock_unlock (&cm->ho_reuseable_lock);
114 }
115
116 session_t *
117 ct_session_get_peer (session_t * s)
118 {
119   ct_connection_t *ct, *peer_ct;
120   ct = ct_connection_get (s->connection_index, s->thread_index);
121   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
122   return session_get (peer_ct->c_s_index, s->thread_index);
123 }
124
125 void
126 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
127 {
128   ct_connection_t *ct;
129   ct = (ct_connection_t *) session_get_transport (ll);
130   sep->transport_proto = ct->actual_tp;
131   sep->port = ct->c_lcl_port;
132   sep->is_ip4 = ct->c_is_ip4;
133   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
134 }
135
136 static void
137 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
138                           svm_fifo_t *tx_fifo)
139 {
140   ct_segments_ctx_t *seg_ctx;
141   ct_main_t *cm = &ct_main;
142   ct_segment_flags_t flags;
143   segment_manager_t *sm;
144   app_worker_t *app_wrk;
145   ct_segment_t *ct_seg;
146   fifo_segment_t *fs;
147   u32 seg_index;
148   u8 cnt;
149
150   /*
151    * Cleanup fifos
152    */
153
154   sm = segment_manager_get (rx_fifo->segment_manager);
155   seg_index = rx_fifo->segment_index;
156
157   fs = segment_manager_get_segment_w_lock (sm, seg_index);
158   fifo_segment_free_fifo (fs, rx_fifo);
159   fifo_segment_free_fifo (fs, tx_fifo);
160   segment_manager_segment_reader_unlock (sm);
161
162   /*
163    * Update segment context
164    */
165
166   clib_rwlock_reader_lock (&cm->app_segs_lock);
167
168   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
169   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
170
171   if (ct->flags & CT_CONN_F_CLIENT)
172     {
173       cnt =
174         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
175       if (!cnt)
176         ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
177     }
178   else
179     {
180       cnt =
181         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
182       if (!cnt)
183         ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
184     }
185
186   flags = ct_seg->flags;
187
188   clib_rwlock_reader_unlock (&cm->app_segs_lock);
189
190   /*
191    * No need to do any app updates, return
192    */
193   if (cnt)
194     return;
195
196   if (ct->flags & CT_CONN_F_CLIENT)
197     {
198       app_wrk = app_worker_get_if_valid (ct->client_wrk);
199       /* Determine if client app still needs notification, i.e., if it is
200        * still attached. If client detached and this is the last ct session
201        * on this segment, then its connects segment manager should also be
202        * detached, so do not send notification */
203       if (app_wrk)
204         {
205           segment_manager_t *csm;
206           csm = app_worker_get_connect_segment_manager (app_wrk);
207           if (!segment_manager_app_detached (csm))
208             app_worker_del_segment_notify (app_wrk, ct->segment_handle);
209         }
210     }
211   else if (!segment_manager_app_detached (sm))
212     {
213       app_wrk = app_worker_get (ct->server_wrk);
214       app_worker_del_segment_notify (app_wrk, ct->segment_handle);
215     }
216
217   if (!(flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
218       !(flags & CT_SEGMENT_F_SERVER_DETACHED))
219     return;
220
221   /*
222    * Remove segment context because both client and server detached
223    */
224
225   clib_rwlock_writer_lock (&cm->app_segs_lock);
226
227   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
228   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
229
230   /*
231    * No more segment indices left, remove the segments context
232    */
233   if (!pool_elts (seg_ctx->segments))
234     {
235       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
236       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
237       hash_unset (cm->app_segs_ctxs_table, table_handle);
238       pool_free (seg_ctx->segments);
239       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
240     }
241
242   clib_rwlock_writer_unlock (&cm->app_segs_lock);
243
244   segment_manager_lock_and_del_segment (sm, seg_index);
245
246   /* Cleanup segment manager if needed. If server detaches there's a chance
247    * the client's sessions will hold up segment removal */
248   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
249     segment_manager_free_safe (sm);
250 }
251
252 int
253 ct_session_connect_notify (session_t *ss)
254 {
255   u32 ss_index, opaque, thread_index, cnt;
256   ct_connection_t *sct, *cct;
257   ct_segments_ctx_t *seg_ctx;
258   app_worker_t *client_wrk;
259   ct_main_t *cm = &ct_main;
260   ct_segment_t *ct_seg;
261   session_t *cs;
262   int err = 0;
263
264   ss_index = ss->session_index;
265   thread_index = ss->thread_index;
266   sct = (ct_connection_t *) session_get_transport (ss);
267   client_wrk = app_worker_get (sct->client_wrk);
268   opaque = sct->client_opaque;
269
270   cct = ct_connection_get (sct->peer_index, thread_index);
271
272   /* Client closed while waiting for reply from server */
273   if (!cct)
274     {
275       session_transport_closing_notify (&sct->connection);
276       session_transport_delete_notify (&sct->connection);
277       ct_connection_free (sct);
278       return 0;
279     }
280
281   session_half_open_delete_notify (&cct->connection);
282   cct->flags &= ~CT_CONN_F_HALF_OPEN;
283
284   /*
285    * Update ct segment context
286    */
287
288   clib_rwlock_reader_lock (&cm->app_segs_lock);
289
290   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, sct->seg_ctx_index);
291   ct_seg = pool_elt_at_index (seg_ctx->segments, sct->ct_seg_index);
292
293   cnt = __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
294   if (cnt == 1)
295     {
296       err = app_worker_add_segment_notify (client_wrk, cct->segment_handle);
297       if (err)
298         {
299           clib_rwlock_reader_unlock (&cm->app_segs_lock);
300           session_close (ss);
301           goto error;
302         }
303     }
304
305   clib_rwlock_reader_unlock (&cm->app_segs_lock);
306
307   /*
308    * Alloc client session
309    */
310
311   cs = session_alloc (thread_index);
312   ss = session_get (ss_index, thread_index);
313   cs->session_type = ss->session_type;
314   cs->listener_handle = SESSION_INVALID_HANDLE;
315   cs->session_state = SESSION_STATE_CONNECTING;
316   cs->app_wrk_index = client_wrk->wrk_index;
317   cs->connection_index = cct->c_c_index;
318   cct->seg_ctx_index = sct->seg_ctx_index;
319   cct->ct_seg_index = sct->ct_seg_index;
320
321   cct->c_s_index = cs->session_index;
322   cct->client_rx_fifo = ss->tx_fifo;
323   cct->client_tx_fifo = ss->rx_fifo;
324
325   cct->client_rx_fifo->refcnt++;
326   cct->client_tx_fifo->refcnt++;
327
328   /* This will allocate fifos for the session. They won't be used for
329    * exchanging data but they will be used to close the connection if
330    * the segment manager/worker is freed */
331   if ((err = app_worker_init_connected (client_wrk, cs)))
332     {
333       session_close (ss);
334       session_free (cs);
335       goto error;
336     }
337
338   cs->session_state = SESSION_STATE_CONNECTING;
339
340   if (app_worker_connect_notify (client_wrk, cs, err, opaque))
341     {
342       session_close (ss);
343       ct_session_dealloc_fifos (cct, cs->rx_fifo, cs->tx_fifo);
344       session_free (cs);
345       return -1;
346     }
347
348   cs = session_get (cct->c_s_index, cct->c_thread_index);
349   cs->session_state = SESSION_STATE_READY;
350
351   return 0;
352
353 error:
354   app_worker_connect_notify (client_wrk, 0, err, opaque);
355   return -1;
356 }
357
358 static ct_segment_t *
359 ct_lookup_free_segment (segment_manager_t *sm, ct_segments_ctx_t *seg_ctx,
360                         u32 pair_bytes)
361 {
362   uword free_bytes, max_free_bytes;
363   ct_segment_t *ct_seg, *res = 0;
364   fifo_segment_t *fs;
365   u32 max_fifos;
366
367   max_free_bytes = pair_bytes;
368   pool_foreach (ct_seg, seg_ctx->segments)
369     {
370       /* Client or server has detached so segment cannot be used */
371       if ((ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED) ||
372           (ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED))
373         continue;
374       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
375       free_bytes = fifo_segment_available_bytes (fs);
376       max_fifos = fifo_segment_size (fs) / pair_bytes;
377       if (free_bytes > max_free_bytes &&
378           fifo_segment_num_fifos (fs) / 2 < max_fifos)
379         {
380           max_free_bytes = free_bytes;
381           res = ct_seg;
382         }
383     }
384
385   return res;
386 }
387
388 static int
389 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
390                           session_t *ls, session_t *ll)
391 {
392   u32 sm_index, pair_bytes, seg_ctx_index = ~0, ct_seg_index = ~0;
393   u64 seg_handle, table_handle, seg_size;
394   segment_manager_props_t *props;
395   const u32 margin = 16 << 10;
396   ct_segments_ctx_t *seg_ctx;
397   ct_main_t *cm = &ct_main;
398   application_t *server;
399   segment_manager_t *sm;
400   ct_segment_t *ct_seg;
401   fifo_segment_t *fs;
402   int rv, fs_index;
403   uword *spp;
404
405   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
406   sm_index = segment_manager_index (sm);
407   server = application_get (server_wrk->app_index);
408   props = application_segment_manager_properties (server);
409
410   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
411   table_handle = (u64) segment_manager_index (sm) << 32 | table_handle;
412
413   /*
414    * Check if we already have a segment that can hold the fifos
415    */
416
417   clib_rwlock_reader_lock (&cm->app_segs_lock);
418
419   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
420   if (spp)
421     {
422       seg_ctx_index = *spp;
423       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
424       pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
425       ct_seg = ct_lookup_free_segment (sm, seg_ctx, pair_bytes);
426       if (ct_seg)
427         {
428           ct_seg_index = ct_seg - seg_ctx->segments;
429           fs_index = ct_seg->segment_index;
430           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
431         }
432     }
433
434   clib_rwlock_reader_unlock (&cm->app_segs_lock);
435
436   /*
437    * No segment, try to alloc one and notify the server
438    */
439
440   if (ct_seg_index == ~0)
441     {
442       seg_size = clib_max (props->segment_size, 128 << 20);
443       fs_index = segment_manager_add_segment (sm, seg_size, 0);
444       if (fs_index < 0)
445         {
446           rv = -1;
447           goto failed;
448         }
449
450       /* Make sure the segment is not used for other fifos */
451       fs = segment_manager_get_segment_w_lock (sm, fs_index);
452       fifo_segment_flags (fs) |= FIFO_SEGMENT_F_CUSTOM_USE;
453       segment_manager_segment_reader_unlock (sm);
454
455       clib_rwlock_writer_lock (&cm->app_segs_lock);
456
457       if (seg_ctx_index == ~0)
458         {
459           pool_get_zero (cm->app_seg_ctxs, seg_ctx);
460           seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
461           hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
462           seg_ctx->server_wrk = server_wrk->wrk_index;
463           seg_ctx->client_wrk = ct->client_wrk;
464           seg_ctx->sm_index = sm_index;
465         }
466       else
467         seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
468
469       pool_get_zero (seg_ctx->segments, ct_seg);
470       ct_seg->segment_index = fs_index;
471       ct_seg->server_n_sessions += 1;
472       ct_seg_index = ct_seg - seg_ctx->segments;
473
474       clib_rwlock_writer_unlock (&cm->app_segs_lock);
475
476       /* New segment, notify the server. Client notification sent after
477        * server accepts the connection */
478       seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
479       if ((rv = app_worker_add_segment_notify (server_wrk, seg_handle)))
480         {
481           segment_manager_lock_and_del_segment (sm, fs_index);
482
483           clib_rwlock_writer_lock (&cm->app_segs_lock);
484           pool_put_index (seg_ctx->segments, ct_seg_index);
485           clib_rwlock_writer_unlock (&cm->app_segs_lock);
486
487           goto failed_fix_count;
488         }
489     }
490
491   /*
492    * Allocate and initialize the fifos
493    */
494   fs = segment_manager_get_segment_w_lock (sm, fs_index);
495   rv = segment_manager_try_alloc_fifos (
496     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
497     &ls->rx_fifo, &ls->tx_fifo);
498   if (rv)
499     {
500       segment_manager_segment_reader_unlock (sm);
501       goto failed_fix_count;
502     }
503
504   ls->rx_fifo->shr->master_session_index = ls->session_index;
505   ls->tx_fifo->shr->master_session_index = ls->session_index;
506   ls->rx_fifo->master_thread_index = ls->thread_index;
507   ls->tx_fifo->master_thread_index = ls->thread_index;
508   ls->rx_fifo->segment_manager = sm_index;
509   ls->tx_fifo->segment_manager = sm_index;
510   ls->rx_fifo->segment_index = fs_index;
511   ls->tx_fifo->segment_index = fs_index;
512
513   seg_handle = segment_manager_segment_handle (sm, fs);
514   segment_manager_segment_reader_unlock (sm);
515
516   ct->segment_handle = seg_handle;
517   ct->seg_ctx_index = seg_ctx_index;
518   ct->ct_seg_index = ct_seg_index;
519
520   return 0;
521
522 failed_fix_count:
523
524   clib_rwlock_reader_lock (&cm->app_segs_lock);
525
526   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
527   ct_seg = pool_elt_at_index (seg_ctx->segments, ct_seg_index);
528   __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
529
530   clib_rwlock_reader_unlock (&cm->app_segs_lock);
531
532 failed:
533   return rv;
534 }
535
536 static void
537 ct_accept_rpc_wrk_handler (void *accept_args)
538 {
539   u32 cct_index, ho_index, thread_index, ll_index;
540   ct_connection_t *sct, *cct, *ho;
541   transport_connection_t *ll_ct;
542   app_worker_t *server_wrk;
543   session_t *ss, *ll;
544
545   /*
546    * Alloc client ct and initialize from ho
547    */
548   thread_index = vlib_get_thread_index ();
549   cct = ct_connection_alloc (thread_index);
550   cct_index = cct->c_c_index;
551
552   ho_index = pointer_to_uword (accept_args);
553   ho = ct_connection_get (ho_index, 0);
554
555   /* Unlikely but half-open session and transport could have been freed */
556   if (PREDICT_FALSE (!ho))
557     {
558       ct_connection_free (cct);
559       return;
560     }
561
562   clib_memcpy (cct, ho, sizeof (*ho));
563   cct->c_c_index = cct_index;
564   cct->c_thread_index = thread_index;
565   cct->flags |= CT_CONN_F_HALF_OPEN;
566
567   /* Notify session layer that half-open is on a different thread
568    * and mark ho connection index reusable. Avoids another rpc
569    */
570   session_half_open_migrate_notify (&cct->connection);
571   session_half_open_migrated_notify (&cct->connection);
572   ct_half_open_add_reusable (ho_index);
573
574   /*
575    * Alloc and init server transport
576    */
577
578   ll_index = cct->peer_index;
579   ll = listen_session_get (ll_index);
580   sct = ct_connection_alloc (thread_index);
581   /* Transport not necessarily ct but it might, so grab after sct alloc */
582   ll_ct = listen_session_get_transport (ll);
583
584   /* Make sure cct is valid after sct alloc */
585   cct = ct_connection_get (cct_index, thread_index);
586
587   sct->c_rmt_port = 0;
588   sct->c_lcl_port = ll_ct->lcl_port;
589   sct->c_is_ip4 = cct->c_is_ip4;
590   clib_memcpy (&sct->c_lcl_ip, &ll_ct->lcl_ip, sizeof (ll_ct->lcl_ip));
591   sct->client_wrk = cct->client_wrk;
592   sct->c_proto = TRANSPORT_PROTO_NONE;
593   sct->client_opaque = cct->client_opaque;
594   sct->actual_tp = cct->actual_tp;
595
596   sct->peer_index = cct->c_c_index;
597   cct->peer_index = sct->c_c_index;
598
599   /*
600    * Accept server session. Client session is created only after
601    * server confirms accept.
602    */
603   ss = session_alloc (thread_index);
604   ll = listen_session_get (ll_index);
605   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
606                                                      sct->c_is_ip4);
607   ss->connection_index = sct->c_c_index;
608   ss->listener_handle = listen_session_get_handle (ll);
609   ss->session_state = SESSION_STATE_CREATED;
610
611   server_wrk = application_listener_select_worker (ll);
612   ss->app_wrk_index = server_wrk->wrk_index;
613
614   sct->c_s_index = ss->session_index;
615   sct->server_wrk = ss->app_wrk_index;
616
617   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
618     {
619       ct_connection_free (sct);
620       session_free (ss);
621       return;
622     }
623
624   ss->session_state = SESSION_STATE_ACCEPTING;
625   if (app_worker_accept_notify (server_wrk, ss))
626     {
627       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
628       ct_connection_free (sct);
629       session_free (ss);
630       return;
631     }
632
633   cct->segment_handle = sct->segment_handle;
634 }
635
636 static int
637 ct_connect (app_worker_t * client_wrk, session_t * ll,
638             session_endpoint_cfg_t * sep)
639 {
640   u32 thread_index, ho_index;
641   ct_main_t *cm = &ct_main;
642   ct_connection_t *ho;
643
644   /* Simple round-robin policy for spreading sessions over workers. We skip
645    * thread index 0, i.e., offset the index by 1, when we have workers as it
646    * is the one dedicated to main thread. Note that n_workers does not include
647    * main thread */
648   cm->n_sessions += 1;
649   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
650
651   /*
652    * Alloc and init client half-open transport
653    */
654
655   ho = ct_half_open_alloc ();
656   ho_index = ho->c_c_index;
657   ho->c_rmt_port = sep->port;
658   ho->c_lcl_port = 0;
659   ho->c_is_ip4 = sep->is_ip4;
660   ho->client_opaque = sep->opaque;
661   ho->client_wrk = client_wrk->wrk_index;
662   ho->peer_index = ll->session_index;
663   ho->c_proto = TRANSPORT_PROTO_NONE;
664   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
665   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
666   ho->flags |= CT_CONN_F_CLIENT;
667   ho->c_s_index = ~0;
668   ho->actual_tp = sep->transport_proto;
669
670   /*
671    * Accept connection on thread selected above. Connected reply comes
672    * after server accepts the connection.
673    */
674
675   session_send_rpc_evt_to_thread_force (thread_index,
676                                         ct_accept_rpc_wrk_handler,
677                                         uword_to_pointer (ho_index, void *));
678
679   return ho_index;
680 }
681
682 static u32
683 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
684 {
685   session_endpoint_cfg_t *sep;
686   ct_connection_t *ct;
687
688   sep = (session_endpoint_cfg_t *) tep;
689   ct = ct_connection_alloc (0);
690   ct->server_wrk = sep->app_wrk_index;
691   ct->c_is_ip4 = sep->is_ip4;
692   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
693   ct->c_lcl_port = sep->port;
694   ct->c_s_index = app_listener_index;
695   ct->actual_tp = sep->transport_proto;
696   return ct->c_c_index;
697 }
698
699 static u32
700 ct_stop_listen (u32 ct_index)
701 {
702   ct_connection_t *ct;
703   ct = ct_connection_get (ct_index, 0);
704   ct_connection_free (ct);
705   return 0;
706 }
707
708 static transport_connection_t *
709 ct_listener_get (u32 ct_index)
710 {
711   return (transport_connection_t *) ct_connection_get (ct_index, 0);
712 }
713
714 static transport_connection_t *
715 ct_half_open_get (u32 ct_index)
716 {
717   return (transport_connection_t *) ct_connection_get (ct_index, 0);
718 }
719
720 static void
721 ct_session_cleanup (u32 conn_index, u32 thread_index)
722 {
723   ct_connection_t *ct, *peer_ct;
724
725   ct = ct_connection_get (conn_index, thread_index);
726   if (!ct)
727     return;
728
729   peer_ct = ct_connection_get (ct->peer_index, thread_index);
730   if (peer_ct)
731     peer_ct->peer_index = ~0;
732
733   ct_connection_free (ct);
734 }
735
736 static void
737 ct_cleanup_ho (u32 ho_index)
738 {
739   ct_connection_free (ct_connection_get (ho_index, 0));
740 }
741
742 static int
743 ct_session_connect (transport_endpoint_cfg_t * tep)
744 {
745   session_endpoint_cfg_t *sep_ext;
746   session_endpoint_t _sep, *sep = &_sep;
747   app_worker_t *app_wrk;
748   session_handle_t lh;
749   application_t *app;
750   app_listener_t *al;
751   u32 table_index;
752   session_t *ll;
753   u8 fib_proto;
754
755   sep_ext = (session_endpoint_cfg_t *) tep;
756   _sep = *(session_endpoint_t *) tep;
757   app_wrk = app_worker_get (sep_ext->app_wrk_index);
758   app = application_get (app_wrk->app_index);
759
760   sep->transport_proto = sep_ext->original_tp;
761   table_index = application_local_session_table (app);
762   lh = session_lookup_local_endpoint (table_index, sep);
763   if (lh == SESSION_DROP_HANDLE)
764     return SESSION_E_FILTERED;
765
766   if (lh == SESSION_INVALID_HANDLE)
767     goto global_scope;
768
769   ll = listen_session_get_from_handle (lh);
770   al = app_listener_get_w_session (ll);
771
772   /*
773    * Break loop if rule in local table points to connecting app. This
774    * can happen if client is a generic proxy. Route connect through
775    * global table instead.
776    */
777   if (al->app_index == app->app_index)
778     goto global_scope;
779
780   return ct_connect (app_wrk, ll, sep_ext);
781
782   /*
783    * If nothing found, check the global scope for locally attached
784    * destinations. Make sure first that we're allowed to.
785    */
786
787 global_scope:
788   if (session_endpoint_is_local (sep))
789     return SESSION_E_NOROUTE;
790
791   if (!application_has_global_scope (app))
792     return SESSION_E_SCOPE;
793
794   fib_proto = session_endpoint_fib_proto (sep);
795   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
796   ll = session_lookup_listener_wildcard (table_index, sep);
797
798   /* Avoid connecting app to own listener */
799   if (ll && ll->app_index != app->app_index)
800     return ct_connect (app_wrk, ll, sep_ext);
801
802   /* Failed to connect but no error */
803   return SESSION_E_LOCAL_CONNECT;
804 }
805
806 static void
807 ct_session_close (u32 ct_index, u32 thread_index)
808 {
809   ct_connection_t *ct, *peer_ct;
810   app_worker_t *app_wrk;
811   session_t *s;
812
813   ct = ct_connection_get (ct_index, thread_index);
814   peer_ct = ct_connection_get (ct->peer_index, thread_index);
815   if (peer_ct)
816     {
817       peer_ct->peer_index = ~0;
818       /* Make sure session was allocated */
819       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
820         {
821           app_wrk = app_worker_get (peer_ct->client_wrk);
822           app_worker_connect_notify (app_wrk, 0, SESSION_E_REFUSED,
823                                      peer_ct->client_opaque);
824         }
825       else if (peer_ct->c_s_index != ~0)
826         session_transport_closing_notify (&peer_ct->connection);
827       else
828         ct_connection_free (peer_ct);
829     }
830
831   s = session_get (ct->c_s_index, ct->c_thread_index);
832
833   if (ct->flags & CT_CONN_F_CLIENT)
834     {
835       /* Normal free for client session as the fifos are allocated through
836        * the connects segment manager in a segment that's not shared with
837        * the server */
838       session_free_w_fifos (s);
839       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
840     }
841   else
842     {
843       /* Manual session and fifo segment cleanup to avoid implicit
844        * segment manager cleanups and notifications */
845       app_wrk = app_worker_get_if_valid (s->app_wrk_index);
846       if (app_wrk)
847         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
848
849       ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
850       session_free (s);
851     }
852
853   ct_connection_free (ct);
854 }
855
856 static transport_connection_t *
857 ct_session_get (u32 ct_index, u32 thread_index)
858 {
859   return (transport_connection_t *) ct_connection_get (ct_index,
860                                                        thread_index);
861 }
862
863 static u8 *
864 format_ct_connection_id (u8 * s, va_list * args)
865 {
866   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
867   if (!ct)
868     return s;
869   if (ct->c_is_ip4)
870     {
871       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
872                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
873                   format_ip4_address, &ct->c_lcl_ip4,
874                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
875                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
876     }
877   else
878     {
879       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
880                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
881                   format_ip6_address, &ct->c_lcl_ip6,
882                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
883                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
884     }
885
886   return s;
887 }
888
889 static int
890 ct_custom_tx (void *session, transport_send_params_t * sp)
891 {
892   session_t *s = (session_t *) session;
893   if (session_has_transport (s))
894     return 0;
895   /* If event enqueued towards peer, remove from scheduler and remove
896    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
897    * avoid missing events if peer did not clear fifo flag yet, which is
898    * interpreted as successful notification and session is descheduled. */
899   svm_fifo_unset_event (s->tx_fifo);
900   if (!ct_session_tx (s))
901     sp->flags = TRANSPORT_SND_F_DESCHED;
902
903   /* The scheduler uses packet count as a means of upper bounding the amount
904    * of work done per dispatch. So make it look like we have sent something */
905   return 1;
906 }
907
908 static int
909 ct_app_rx_evt (transport_connection_t * tc)
910 {
911   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
912   session_t *ps;
913
914   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
915   if (!peer_ct)
916     return -1;
917   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
918   return session_dequeue_notify (ps);
919 }
920
921 static u8 *
922 format_ct_listener (u8 * s, va_list * args)
923 {
924   u32 tc_index = va_arg (*args, u32);
925   u32 __clib_unused thread_index = va_arg (*args, u32);
926   u32 __clib_unused verbose = va_arg (*args, u32);
927   ct_connection_t *ct = ct_connection_get (tc_index, 0);
928   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
929   if (verbose)
930     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
931   return s;
932 }
933
934 static u8 *
935 format_ct_half_open (u8 *s, va_list *args)
936 {
937   u32 ho_index = va_arg (*args, u32);
938   u32 verbose = va_arg (*args, u32);
939   ct_connection_t *ct = ct_connection_get (ho_index, 0);
940   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
941   if (verbose)
942     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
943   return s;
944 }
945
946 static u8 *
947 format_ct_connection (u8 * s, va_list * args)
948 {
949   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
950   u32 verbose = va_arg (*args, u32);
951
952   if (!ct)
953     return s;
954   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
955   if (verbose)
956     {
957       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
958       if (verbose > 1)
959         {
960           s = format (s, "\n");
961         }
962     }
963   return s;
964 }
965
966 static u8 *
967 format_ct_session (u8 * s, va_list * args)
968 {
969   u32 ct_index = va_arg (*args, u32);
970   u32 thread_index = va_arg (*args, u32);
971   u32 verbose = va_arg (*args, u32);
972   ct_connection_t *ct;
973
974   ct = ct_connection_get (ct_index, thread_index);
975   if (!ct)
976     {
977       s = format (s, "empty\n");
978       return s;
979     }
980
981   s = format (s, "%U", format_ct_connection, ct, verbose);
982   return s;
983 }
984
985 clib_error_t *
986 ct_enable_disable (vlib_main_t * vm, u8 is_en)
987 {
988   ct_main_t *cm = &ct_main;
989
990   cm->n_workers = vlib_num_workers ();
991   vec_validate (cm->connections, cm->n_workers);
992   clib_spinlock_init (&cm->ho_reuseable_lock);
993   clib_rwlock_init (&cm->app_segs_lock);
994   return 0;
995 }
996
997 /* *INDENT-OFF* */
998 static const transport_proto_vft_t cut_thru_proto = {
999   .enable = ct_enable_disable,
1000   .start_listen = ct_start_listen,
1001   .stop_listen = ct_stop_listen,
1002   .get_connection = ct_session_get,
1003   .get_listener = ct_listener_get,
1004   .get_half_open = ct_half_open_get,
1005   .cleanup = ct_session_cleanup,
1006   .cleanup_ho = ct_cleanup_ho,
1007   .connect = ct_session_connect,
1008   .close = ct_session_close,
1009   .custom_tx = ct_custom_tx,
1010   .app_rx_evt = ct_app_rx_evt,
1011   .format_listener = format_ct_listener,
1012   .format_half_open = format_ct_half_open,
1013   .format_connection = format_ct_session,
1014   .transport_options = {
1015     .name = "ct",
1016     .short_name = "C",
1017     .tx_type = TRANSPORT_TX_INTERNAL,
1018     .service_type = TRANSPORT_SERVICE_VC,
1019   },
1020 };
1021 /* *INDENT-ON* */
1022
1023 int
1024 ct_session_tx (session_t * s)
1025 {
1026   ct_connection_t *ct, *peer_ct;
1027   session_t *peer_s;
1028
1029   ct = (ct_connection_t *) session_get_transport (s);
1030   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1031   if (!peer_ct)
1032     return 0;
1033   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1034   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1035     return 0;
1036   return session_enqueue_notify (peer_s);
1037 }
1038
1039 static clib_error_t *
1040 ct_transport_init (vlib_main_t * vm)
1041 {
1042   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1043                                FIB_PROTOCOL_IP4, ~0);
1044   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1045                                FIB_PROTOCOL_IP6, ~0);
1046   return 0;
1047 }
1048
1049 VLIB_INIT_FUNCTION (ct_transport_init);
1050
1051 /*
1052  * fd.io coding-style-patch-verification: ON
1053  *
1054  * Local Variables:
1055  * eval: (c-set-style "gnu")
1056  * End:
1057  */