d9798966c2b09846f806c7148a1cacd4ab5012b3
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef enum ct_segment_flags_
20 {
21   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
23 } ct_segment_flags_t;
24
25 typedef struct ct_segment_
26 {
27   u32 client_n_sessions;
28   u32 server_n_sessions;
29   u32 seg_ctx_index;
30   u32 ct_seg_index;
31   u32 segment_index;
32   ct_segment_flags_t flags;
33 } ct_segment_t;
34
35 typedef struct ct_segments_
36 {
37   u32 sm_index;
38   u32 server_wrk;
39   u32 client_wrk;
40   u32 fifo_pair_bytes;
41   ct_segment_t *segments;
42 } ct_segments_ctx_t;
43
44 typedef struct ct_cleanup_req_
45 {
46   u32 ct_index;
47 } ct_cleanup_req_t;
48
49 typedef struct ct_main_
50 {
51   ct_connection_t **connections;        /**< Per-worker connection pools */
52   ct_cleanup_req_t **pending_cleanups;
53   u8 *heave_cleanups;
54   u32 n_workers;                        /**< Number of vpp workers */
55   u32 n_sessions;                       /**< Cumulative sessions counter */
56   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
57   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
58   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
59   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
60   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
61 } ct_main_t;
62
63 static ct_main_t ct_main;
64
65 static ct_connection_t *
66 ct_connection_alloc (u32 thread_index)
67 {
68   ct_connection_t *ct;
69
70   pool_get_zero (ct_main.connections[thread_index], ct);
71   ct->c_c_index = ct - ct_main.connections[thread_index];
72   ct->c_thread_index = thread_index;
73   ct->client_wrk = ~0;
74   ct->server_wrk = ~0;
75   ct->seg_ctx_index = ~0;
76   ct->ct_seg_index = ~0;
77   return ct;
78 }
79
80 static ct_connection_t *
81 ct_connection_get (u32 ct_index, u32 thread_index)
82 {
83   if (pool_is_free_index (ct_main.connections[thread_index], ct_index))
84     return 0;
85   return pool_elt_at_index (ct_main.connections[thread_index], ct_index);
86 }
87
88 static void
89 ct_connection_free (ct_connection_t * ct)
90 {
91   if (CLIB_DEBUG)
92     {
93       u32 thread_index = ct->c_thread_index;
94       memset (ct, 0xfc, sizeof (*ct));
95       pool_put (ct_main.connections[thread_index], ct);
96       return;
97     }
98   pool_put (ct_main.connections[ct->c_thread_index], ct);
99 }
100
101 static ct_connection_t *
102 ct_half_open_alloc (void)
103 {
104   ct_main_t *cm = &ct_main;
105   u32 *hip;
106
107   clib_spinlock_lock (&cm->ho_reuseable_lock);
108   vec_foreach (hip, cm->ho_reusable)
109     pool_put_index (cm->connections[0], *hip);
110   vec_reset_length (cm->ho_reusable);
111   clib_spinlock_unlock (&cm->ho_reuseable_lock);
112
113   return ct_connection_alloc (0);
114 }
115
116 void
117 ct_half_open_add_reusable (u32 ho_index)
118 {
119   ct_main_t *cm = &ct_main;
120
121   clib_spinlock_lock (&cm->ho_reuseable_lock);
122   vec_add1 (cm->ho_reusable, ho_index);
123   clib_spinlock_unlock (&cm->ho_reuseable_lock);
124 }
125
126 session_t *
127 ct_session_get_peer (session_t * s)
128 {
129   ct_connection_t *ct, *peer_ct;
130   ct = ct_connection_get (s->connection_index, s->thread_index);
131   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
132   return session_get (peer_ct->c_s_index, s->thread_index);
133 }
134
135 void
136 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
137 {
138   ct_connection_t *ct;
139   ct = (ct_connection_t *) session_get_transport (ll);
140   sep->transport_proto = ct->actual_tp;
141   sep->port = ct->c_lcl_port;
142   sep->is_ip4 = ct->c_is_ip4;
143   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
144 }
145
146 static void
147 ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
148 {
149   ct_connection_t *peer_ct;
150
151   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
152
153   if (is_client)
154     {
155       ct->client_wrk = APP_INVALID_INDEX;
156       if (peer_ct)
157         ct->client_wrk = APP_INVALID_INDEX;
158     }
159   else
160     {
161       ct->server_wrk = APP_INVALID_INDEX;
162       if (peer_ct)
163         ct->server_wrk = APP_INVALID_INDEX;
164     }
165 }
166
167 static void
168 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
169                           svm_fifo_t *tx_fifo)
170 {
171   ct_segments_ctx_t *seg_ctx;
172   ct_main_t *cm = &ct_main;
173   segment_manager_t *sm;
174   app_worker_t *app_wrk;
175   ct_segment_t *ct_seg;
176   fifo_segment_t *fs;
177   u32 seg_index;
178   session_t *s;
179   int cnt;
180
181   /*
182    * Cleanup fifos
183    */
184
185   sm = segment_manager_get (rx_fifo->segment_manager);
186   seg_index = rx_fifo->segment_index;
187
188   fs = segment_manager_get_segment_w_lock (sm, seg_index);
189   fifo_segment_free_fifo (fs, rx_fifo);
190   fifo_segment_free_fifo (fs, tx_fifo);
191   segment_manager_segment_reader_unlock (sm);
192
193   /*
194    * Atomically update segment context with readers lock
195    */
196
197   clib_rwlock_reader_lock (&cm->app_segs_lock);
198
199   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
200   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
201
202   if (ct->flags & CT_CONN_F_CLIENT)
203     {
204       cnt =
205         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
206     }
207   else
208     {
209       cnt =
210         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
211     }
212
213   clib_rwlock_reader_unlock (&cm->app_segs_lock);
214
215   /*
216    * No need to do any app updates, return
217    */
218   ASSERT (cnt >= 0);
219   if (cnt)
220     return;
221
222   /*
223    * Grab exclusive lock and update flags unless some other thread
224    * added more sessions
225    */
226   clib_rwlock_writer_lock (&cm->app_segs_lock);
227
228   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
229   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
230   if (ct->flags & CT_CONN_F_CLIENT)
231     {
232       cnt = ct_seg->client_n_sessions;
233       if (cnt)
234         goto done;
235       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
236       s = session_get (ct->c_s_index, ct->c_thread_index);
237       if (s->app_wrk_index == APP_INVALID_INDEX)
238         ct_set_invalid_app_wrk (ct, 1 /* is_client */);
239     }
240   else
241     {
242       cnt = ct_seg->server_n_sessions;
243       if (cnt)
244         goto done;
245       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
246       s = session_get (ct->c_s_index, ct->c_thread_index);
247       if (s->app_wrk_index == APP_INVALID_INDEX)
248         ct_set_invalid_app_wrk (ct, 0 /* is_client */);
249     }
250
251   if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
252       !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
253     goto done;
254
255   /*
256    * Remove segment context because both client and server detached
257    */
258
259   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
260
261   /*
262    * No more segment indices left, remove the segments context
263    */
264   if (!pool_elts (seg_ctx->segments))
265     {
266       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
267       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
268       hash_unset (cm->app_segs_ctxs_table, table_handle);
269       pool_free (seg_ctx->segments);
270       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
271     }
272
273   /*
274    * Segment to be removed so notify both apps
275    */
276
277   app_wrk = app_worker_get_if_valid (ct->client_wrk);
278   /* Determine if client app still needs notification, i.e., if it is
279    * still attached. If client detached and this is the last ct session
280    * on this segment, then its connects segment manager should also be
281    * detached, so do not send notification */
282   if (app_wrk)
283     {
284       segment_manager_t *csm;
285       csm = app_worker_get_connect_segment_manager (app_wrk);
286       if (!segment_manager_app_detached (csm))
287         app_worker_del_segment_notify (app_wrk, ct->segment_handle);
288     }
289
290   if (!segment_manager_app_detached (sm))
291     {
292       app_wrk = app_worker_get (ct->server_wrk);
293       app_worker_del_segment_notify (app_wrk, ct->segment_handle);
294     }
295
296   segment_manager_lock_and_del_segment (sm, seg_index);
297
298   /* Cleanup segment manager if needed. If server detaches there's a chance
299    * the client's sessions will hold up segment removal */
300   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
301     segment_manager_free_safe (sm);
302
303 done:
304
305   clib_rwlock_writer_unlock (&cm->app_segs_lock);
306 }
307
308 int
309 ct_session_connect_notify (session_t *ss, session_error_t err)
310 {
311   u32 ss_index, opaque, thread_index;
312   ct_connection_t *sct, *cct;
313   app_worker_t *client_wrk;
314   session_t *cs;
315
316   ss_index = ss->session_index;
317   thread_index = ss->thread_index;
318   sct = (ct_connection_t *) session_get_transport (ss);
319   client_wrk = app_worker_get (sct->client_wrk);
320   opaque = sct->client_opaque;
321
322   cct = ct_connection_get (sct->peer_index, thread_index);
323
324   /* Client closed while waiting for reply from server */
325   if (PREDICT_FALSE (!cct))
326     {
327       session_transport_closing_notify (&sct->connection);
328       session_transport_delete_notify (&sct->connection);
329       ct_connection_free (sct);
330       return 0;
331     }
332
333   session_half_open_delete_notify (&cct->connection);
334   cct->flags &= ~CT_CONN_F_HALF_OPEN;
335
336   if (PREDICT_FALSE (err))
337     goto connect_error;
338
339   /*
340    * Alloc client session
341    */
342
343   cs = session_alloc (thread_index);
344   ss = session_get (ss_index, thread_index);
345   cs->session_type = ss->session_type;
346   cs->listener_handle = SESSION_INVALID_HANDLE;
347   cs->session_state = SESSION_STATE_CONNECTING;
348   cs->app_wrk_index = client_wrk->wrk_index;
349   cs->connection_index = cct->c_c_index;
350   cct->c_s_index = cs->session_index;
351
352   /* This will allocate fifos for the session. They won't be used for
353    * exchanging data but they will be used to close the connection if
354    * the segment manager/worker is freed */
355   if ((err = app_worker_init_connected (client_wrk, cs)))
356     {
357       session_free (cs);
358       session_close (ss);
359       err = SESSION_E_ALLOC;
360       goto connect_error;
361     }
362
363   cs->session_state = SESSION_STATE_CONNECTING;
364
365   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
366     {
367       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
368       session_free (cs);
369       session_close (ss);
370       goto cleanup_client;
371     }
372
373   cs = session_get (cct->c_s_index, cct->c_thread_index);
374   cs->session_state = SESSION_STATE_READY;
375
376   return 0;
377
378 connect_error:
379
380   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
381
382 cleanup_client:
383
384   if (cct->client_rx_fifo)
385     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
386   ct_connection_free (cct);
387   return -1;
388 }
389
390 static inline ct_segment_t *
391 ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
392                         u32 seg_ctx_index)
393 {
394   uword free_bytes, max_free_bytes;
395   ct_segment_t *ct_seg, *res = 0;
396   ct_segments_ctx_t *seg_ctx;
397   fifo_segment_t *fs;
398   u32 max_fifos;
399
400   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
401   max_free_bytes = seg_ctx->fifo_pair_bytes;
402
403   pool_foreach (ct_seg, seg_ctx->segments)
404     {
405       /* Client or server has detached so segment cannot be used */
406       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
407       free_bytes = fifo_segment_available_bytes (fs);
408       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
409       if (free_bytes > max_free_bytes &&
410           fifo_segment_num_fifos (fs) / 2 < max_fifos)
411         {
412           max_free_bytes = free_bytes;
413           res = ct_seg;
414         }
415     }
416
417   return res;
418 }
419
420 static ct_segment_t *
421 ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
422                   segment_manager_t *sm, u32 client_wrk_index)
423 {
424   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
425   segment_manager_props_t *props;
426   const u32 margin = 16 << 10;
427   ct_segments_ctx_t *seg_ctx;
428   app_worker_t *client_wrk;
429   u64 seg_size, seg_handle;
430   application_t *server;
431   ct_segment_t *ct_seg;
432   uword *spp;
433   int fs_index;
434
435   server = application_get (server_wrk->app_index);
436   props = application_segment_manager_properties (server);
437   sm_index = segment_manager_index (sm);
438   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
439
440   /*
441    * Make sure another thread did not alloc a segment while acquiring the lock
442    */
443
444   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
445   if (spp)
446     {
447       seg_ctx_index = *spp;
448       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
449       if (ct_seg)
450         return ct_seg;
451     }
452
453   /*
454    * No segment, try to alloc one and notify the server and the client.
455    * Make sure the segment is not used for other fifos
456    */
457   seg_size = clib_max (props->segment_size, 128 << 20);
458   fs_index =
459     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
460   if (fs_index < 0)
461     return 0;
462
463   if (seg_ctx_index == ~0)
464     {
465       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
466       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
467       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
468       seg_ctx->server_wrk = server_wrk->wrk_index;
469       seg_ctx->client_wrk = client_wrk_index;
470       seg_ctx->sm_index = sm_index;
471       seg_ctx->fifo_pair_bytes = pair_bytes;
472     }
473   else
474     {
475       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
476     }
477
478   pool_get_zero (seg_ctx->segments, ct_seg);
479   ct_seg->segment_index = fs_index;
480   ct_seg->server_n_sessions = 0;
481   ct_seg->client_n_sessions = 0;
482   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
483   ct_seg->seg_ctx_index = seg_ctx_index;
484
485   /* New segment, notify the server and client */
486   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
487   if (app_worker_add_segment_notify (server_wrk, seg_handle))
488     goto error;
489
490   client_wrk = app_worker_get (client_wrk_index);
491   if (app_worker_add_segment_notify (client_wrk, seg_handle))
492     {
493       app_worker_del_segment_notify (server_wrk, seg_handle);
494       goto error;
495     }
496
497   return ct_seg;
498
499 error:
500
501   segment_manager_lock_and_del_segment (sm, fs_index);
502   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
503   return 0;
504 }
505
506 static int
507 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
508                           session_t *ls, session_t *ll)
509 {
510   segment_manager_props_t *props;
511   u64 seg_handle, table_handle;
512   u32 sm_index, fs_index = ~0;
513   ct_segments_ctx_t *seg_ctx;
514   ct_main_t *cm = &ct_main;
515   application_t *server;
516   segment_manager_t *sm;
517   ct_segment_t *ct_seg;
518   fifo_segment_t *fs;
519   uword *spp;
520   int rv;
521
522   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
523   sm_index = segment_manager_index (sm);
524   server = application_get (server_wrk->app_index);
525   props = application_segment_manager_properties (server);
526
527   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
528   table_handle = (u64) sm_index << 32 | table_handle;
529
530   /*
531    * Check if we already have a segment that can hold the fifos
532    */
533
534   clib_rwlock_reader_lock (&cm->app_segs_lock);
535
536   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
537   if (spp)
538     {
539       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
540       if (ct_seg)
541         {
542           ct->seg_ctx_index = ct_seg->seg_ctx_index;
543           ct->ct_seg_index = ct_seg->ct_seg_index;
544           fs_index = ct_seg->segment_index;
545           ct_seg->flags &=
546             ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
547           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
548           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
549         }
550     }
551
552   clib_rwlock_reader_unlock (&cm->app_segs_lock);
553
554   /*
555    * If not, grab exclusive lock and allocate segment
556    */
557   if (fs_index == ~0)
558     {
559       clib_rwlock_writer_lock (&cm->app_segs_lock);
560
561       ct_seg =
562         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
563       if (!ct_seg)
564         {
565           clib_rwlock_writer_unlock (&cm->app_segs_lock);
566           return -1;
567         }
568
569       ct->seg_ctx_index = ct_seg->seg_ctx_index;
570       ct->ct_seg_index = ct_seg->ct_seg_index;
571       ct_seg->server_n_sessions += 1;
572       ct_seg->client_n_sessions += 1;
573       fs_index = ct_seg->segment_index;
574
575       clib_rwlock_writer_unlock (&cm->app_segs_lock);
576     }
577
578   /*
579    * Allocate and initialize the fifos
580    */
581   fs = segment_manager_get_segment_w_lock (sm, fs_index);
582   rv = segment_manager_try_alloc_fifos (
583     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
584     &ls->rx_fifo, &ls->tx_fifo);
585   if (rv)
586     {
587       segment_manager_segment_reader_unlock (sm);
588
589       clib_rwlock_reader_lock (&cm->app_segs_lock);
590
591       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
592       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
593       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
594       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
595
596       clib_rwlock_reader_unlock (&cm->app_segs_lock);
597
598       return rv;
599     }
600
601   ls->rx_fifo->shr->master_session_index = ls->session_index;
602   ls->tx_fifo->shr->master_session_index = ls->session_index;
603   ls->rx_fifo->master_thread_index = ls->thread_index;
604   ls->tx_fifo->master_thread_index = ls->thread_index;
605   ls->rx_fifo->segment_manager = sm_index;
606   ls->tx_fifo->segment_manager = sm_index;
607   ls->rx_fifo->segment_index = fs_index;
608   ls->tx_fifo->segment_index = fs_index;
609
610   seg_handle = segment_manager_segment_handle (sm, fs);
611   segment_manager_segment_reader_unlock (sm);
612
613   ct->segment_handle = seg_handle;
614
615   return 0;
616 }
617
618 static void
619 ct_accept_rpc_wrk_handler (void *accept_args)
620 {
621   u32 cct_index, ho_index, thread_index, ll_index;
622   ct_connection_t *sct, *cct, *ho;
623   transport_connection_t *ll_ct;
624   app_worker_t *server_wrk;
625   session_t *ss, *ll;
626
627   /*
628    * Alloc client ct and initialize from ho
629    */
630   thread_index = vlib_get_thread_index ();
631   cct = ct_connection_alloc (thread_index);
632   cct_index = cct->c_c_index;
633
634   ho_index = pointer_to_uword (accept_args);
635   ho = ct_connection_get (ho_index, 0);
636
637   /* Unlikely but half-open session and transport could have been freed */
638   if (PREDICT_FALSE (!ho))
639     {
640       ct_connection_free (cct);
641       return;
642     }
643
644   clib_memcpy (cct, ho, sizeof (*ho));
645   cct->c_c_index = cct_index;
646   cct->c_thread_index = thread_index;
647   cct->flags |= CT_CONN_F_HALF_OPEN;
648
649   /* Notify session layer that half-open is on a different thread
650    * and mark ho connection index reusable. Avoids another rpc
651    */
652   session_half_open_migrate_notify (&cct->connection);
653   session_half_open_migrated_notify (&cct->connection);
654   ct_half_open_add_reusable (ho_index);
655
656   /*
657    * Alloc and init server transport
658    */
659
660   ll_index = cct->peer_index;
661   ll = listen_session_get (ll_index);
662   sct = ct_connection_alloc (thread_index);
663   /* Transport not necessarily ct but it might, so grab after sct alloc */
664   ll_ct = listen_session_get_transport (ll);
665
666   /* Make sure cct is valid after sct alloc */
667   cct = ct_connection_get (cct_index, thread_index);
668
669   sct->c_rmt_port = 0;
670   sct->c_lcl_port = ll_ct->lcl_port;
671   sct->c_is_ip4 = cct->c_is_ip4;
672   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
673   sct->client_wrk = cct->client_wrk;
674   sct->c_proto = TRANSPORT_PROTO_NONE;
675   sct->client_opaque = cct->client_opaque;
676   sct->actual_tp = cct->actual_tp;
677
678   sct->peer_index = cct->c_c_index;
679   cct->peer_index = sct->c_c_index;
680
681   /*
682    * Accept server session. Client session is created only after
683    * server confirms accept.
684    */
685   ss = session_alloc (thread_index);
686   ll = listen_session_get (ll_index);
687   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
688                                                      sct->c_is_ip4);
689   ss->connection_index = sct->c_c_index;
690   ss->listener_handle = listen_session_get_handle (ll);
691   ss->session_state = SESSION_STATE_CREATED;
692
693   server_wrk = application_listener_select_worker (ll);
694   ss->app_wrk_index = server_wrk->wrk_index;
695
696   sct->c_s_index = ss->session_index;
697   sct->server_wrk = ss->app_wrk_index;
698
699   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
700     {
701       ct_session_connect_notify (ss, SESSION_E_ALLOC);
702       ct_connection_free (sct);
703       session_free (ss);
704       return;
705     }
706
707   cct->server_wrk = sct->server_wrk;
708   cct->seg_ctx_index = sct->seg_ctx_index;
709   cct->ct_seg_index = sct->ct_seg_index;
710   cct->client_rx_fifo = ss->tx_fifo;
711   cct->client_tx_fifo = ss->rx_fifo;
712   cct->client_rx_fifo->refcnt++;
713   cct->client_tx_fifo->refcnt++;
714   cct->segment_handle = sct->segment_handle;
715
716   ss->session_state = SESSION_STATE_ACCEPTING;
717   if (app_worker_accept_notify (server_wrk, ss))
718     {
719       ct_session_connect_notify (ss, SESSION_E_REFUSED);
720       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
721       ct_connection_free (sct);
722       session_free (ss);
723     }
724 }
725
726 static int
727 ct_connect (app_worker_t * client_wrk, session_t * ll,
728             session_endpoint_cfg_t * sep)
729 {
730   u32 thread_index, ho_index;
731   ct_main_t *cm = &ct_main;
732   ct_connection_t *ho;
733
734   /* Simple round-robin policy for spreading sessions over workers. We skip
735    * thread index 0, i.e., offset the index by 1, when we have workers as it
736    * is the one dedicated to main thread. Note that n_workers does not include
737    * main thread */
738   cm->n_sessions += 1;
739   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
740
741   /*
742    * Alloc and init client half-open transport
743    */
744
745   ho = ct_half_open_alloc ();
746   ho_index = ho->c_c_index;
747   ho->c_rmt_port = sep->port;
748   ho->c_lcl_port = 0;
749   ho->c_is_ip4 = sep->is_ip4;
750   ho->client_opaque = sep->opaque;
751   ho->client_wrk = client_wrk->wrk_index;
752   ho->peer_index = ll->session_index;
753   ho->c_proto = TRANSPORT_PROTO_NONE;
754   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
755   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
756   ho->flags |= CT_CONN_F_CLIENT;
757   ho->c_s_index = ~0;
758   ho->actual_tp = sep->original_tp;
759
760   /*
761    * Accept connection on thread selected above. Connected reply comes
762    * after server accepts the connection.
763    */
764
765   session_send_rpc_evt_to_thread_force (thread_index,
766                                         ct_accept_rpc_wrk_handler,
767                                         uword_to_pointer (ho_index, void *));
768
769   return ho_index;
770 }
771
772 static u32
773 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
774 {
775   session_endpoint_cfg_t *sep;
776   ct_connection_t *ct;
777
778   sep = (session_endpoint_cfg_t *) tep;
779   ct = ct_connection_alloc (0);
780   ct->server_wrk = sep->app_wrk_index;
781   ct->c_is_ip4 = sep->is_ip4;
782   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
783   ct->c_lcl_port = sep->port;
784   ct->c_s_index = app_listener_index;
785   ct->actual_tp = sep->transport_proto;
786   return ct->c_c_index;
787 }
788
789 static u32
790 ct_stop_listen (u32 ct_index)
791 {
792   ct_connection_t *ct;
793   ct = ct_connection_get (ct_index, 0);
794   ct_connection_free (ct);
795   return 0;
796 }
797
798 static transport_connection_t *
799 ct_listener_get (u32 ct_index)
800 {
801   return (transport_connection_t *) ct_connection_get (ct_index, 0);
802 }
803
804 static transport_connection_t *
805 ct_half_open_get (u32 ct_index)
806 {
807   return (transport_connection_t *) ct_connection_get (ct_index, 0);
808 }
809
810 static void
811 ct_session_cleanup (u32 conn_index, u32 thread_index)
812 {
813   ct_connection_t *ct, *peer_ct;
814
815   ct = ct_connection_get (conn_index, thread_index);
816   if (!ct)
817     return;
818
819   peer_ct = ct_connection_get (ct->peer_index, thread_index);
820   if (peer_ct)
821     peer_ct->peer_index = ~0;
822
823   ct_connection_free (ct);
824 }
825
826 static void
827 ct_cleanup_ho (u32 ho_index)
828 {
829   ct_connection_free (ct_connection_get (ho_index, 0));
830 }
831
832 static int
833 ct_session_connect (transport_endpoint_cfg_t * tep)
834 {
835   session_endpoint_cfg_t *sep_ext;
836   session_endpoint_t _sep, *sep = &_sep;
837   app_worker_t *app_wrk;
838   session_handle_t lh;
839   application_t *app;
840   app_listener_t *al;
841   u32 table_index;
842   session_t *ll;
843   u8 fib_proto;
844
845   sep_ext = (session_endpoint_cfg_t *) tep;
846   _sep = *(session_endpoint_t *) tep;
847   app_wrk = app_worker_get (sep_ext->app_wrk_index);
848   app = application_get (app_wrk->app_index);
849
850   sep->transport_proto = sep_ext->original_tp;
851   table_index = application_local_session_table (app);
852   lh = session_lookup_local_endpoint (table_index, sep);
853   if (lh == SESSION_DROP_HANDLE)
854     return SESSION_E_FILTERED;
855
856   if (lh == SESSION_INVALID_HANDLE)
857     goto global_scope;
858
859   ll = listen_session_get_from_handle (lh);
860   al = app_listener_get_w_session (ll);
861
862   /*
863    * Break loop if rule in local table points to connecting app. This
864    * can happen if client is a generic proxy. Route connect through
865    * global table instead.
866    */
867   if (al->app_index == app->app_index)
868     goto global_scope;
869
870   return ct_connect (app_wrk, ll, sep_ext);
871
872   /*
873    * If nothing found, check the global scope for locally attached
874    * destinations. Make sure first that we're allowed to.
875    */
876
877 global_scope:
878   if (session_endpoint_is_local (sep))
879     return SESSION_E_NOROUTE;
880
881   if (!application_has_global_scope (app))
882     return SESSION_E_SCOPE;
883
884   fib_proto = session_endpoint_fib_proto (sep);
885   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
886   ll = session_lookup_listener_wildcard (table_index, sep);
887
888   /* Avoid connecting app to own listener */
889   if (ll && ll->app_index != app->app_index)
890     return ct_connect (app_wrk, ll, sep_ext);
891
892   /* Failed to connect but no error */
893   return SESSION_E_LOCAL_CONNECT;
894 }
895
896 static void
897 ct_session_postponed_cleanup (ct_connection_t *ct)
898 {
899   app_worker_t *app_wrk;
900   session_t *s;
901
902   s = session_get (ct->c_s_index, ct->c_thread_index);
903   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
904
905   if (ct->flags & CT_CONN_F_CLIENT)
906     {
907       if (app_wrk)
908         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
909
910       /* Normal free for client session as the fifos are allocated through
911        * the connects segment manager in a segment that's not shared with
912        * the server */
913       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
914       session_free_w_fifos (s);
915     }
916   else
917     {
918       /* Manual session and fifo segment cleanup to avoid implicit
919        * segment manager cleanups and notifications */
920       app_wrk = app_worker_get_if_valid (s->app_wrk_index);
921       if (app_wrk)
922         {
923           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
924           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
925         }
926
927       ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
928       session_free (s);
929     }
930
931   ct_connection_free (ct);
932 }
933
934 static void
935 ct_handle_cleanups (void *args)
936 {
937   uword thread_index = pointer_to_uword (args);
938   const u32 max_cleanups = 100;
939   ct_main_t *cm = &ct_main;
940   ct_cleanup_req_t *req;
941   ct_connection_t *ct;
942   u32 n_to_handle = 0;
943   session_t *s;
944
945   cm->heave_cleanups[thread_index] = 0;
946   n_to_handle = clib_fifo_elts (cm->pending_cleanups[thread_index]);
947   n_to_handle = clib_min (n_to_handle, max_cleanups);
948
949   while (n_to_handle)
950     {
951       clib_fifo_sub2 (cm->pending_cleanups[thread_index], req);
952       ct = ct_connection_get (req->ct_index, thread_index);
953       s = session_get (ct->c_s_index, ct->c_thread_index);
954       if (!svm_fifo_has_event (s->tx_fifo))
955         ct_session_postponed_cleanup (ct);
956       else
957         clib_fifo_add1 (cm->pending_cleanups[thread_index], *req);
958       n_to_handle -= 1;
959     }
960
961   if (clib_fifo_elts (cm->pending_cleanups[thread_index]))
962     {
963       cm->heave_cleanups[thread_index] = 1;
964       session_send_rpc_evt_to_thread_force (
965         thread_index, ct_handle_cleanups,
966         uword_to_pointer (thread_index, void *));
967     }
968 }
969
970 static void
971 ct_program_cleanup (ct_connection_t *ct)
972 {
973   ct_main_t *cm = &ct_main;
974   ct_cleanup_req_t *req;
975   uword thread_index;
976
977   thread_index = ct->c_thread_index;
978   clib_fifo_add2 (cm->pending_cleanups[thread_index], req);
979   req->ct_index = ct->c_c_index;
980
981   if (cm->heave_cleanups[thread_index])
982     return;
983
984   cm->heave_cleanups[thread_index] = 1;
985   session_send_rpc_evt_to_thread_force (
986     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
987 }
988
989 static inline int
990 ct_close_is_reset (ct_connection_t *ct, session_t *s)
991 {
992   if (ct->flags & CT_CONN_F_CLIENT)
993     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
994   else
995     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
996 }
997
998 static void
999 ct_session_close (u32 ct_index, u32 thread_index)
1000 {
1001   ct_connection_t *ct, *peer_ct;
1002   session_t *s;
1003
1004   ct = ct_connection_get (ct_index, thread_index);
1005   s = session_get (ct->c_s_index, ct->c_thread_index);
1006   peer_ct = ct_connection_get (ct->peer_index, thread_index);
1007   if (peer_ct)
1008     {
1009       peer_ct->peer_index = ~0;
1010       /* Make sure session was allocated */
1011       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
1012         {
1013           ct_session_connect_notify (s, SESSION_E_REFUSED);
1014         }
1015       else if (peer_ct->c_s_index != ~0)
1016         {
1017           if (ct_close_is_reset (ct, s))
1018             session_transport_reset_notify (&peer_ct->connection);
1019           else
1020             session_transport_closing_notify (&peer_ct->connection);
1021         }
1022       else
1023         {
1024           /* should not happen */
1025           clib_warning ("ct peer without session");
1026           ct_connection_free (peer_ct);
1027         }
1028     }
1029
1030   /* Do not send closed notify to make sure pending tx events are
1031    * still delivered and program cleanup */
1032   ct_program_cleanup (ct);
1033 }
1034
1035 static transport_connection_t *
1036 ct_session_get (u32 ct_index, u32 thread_index)
1037 {
1038   return (transport_connection_t *) ct_connection_get (ct_index,
1039                                                        thread_index);
1040 }
1041
1042 static u8 *
1043 format_ct_connection_id (u8 * s, va_list * args)
1044 {
1045   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1046   if (!ct)
1047     return s;
1048   if (ct->c_is_ip4)
1049     {
1050       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1051                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1052                   format_ip4_address, &ct->c_lcl_ip4,
1053                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
1054                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
1055     }
1056   else
1057     {
1058       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1059                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1060                   format_ip6_address, &ct->c_lcl_ip6,
1061                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
1062                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
1063     }
1064
1065   return s;
1066 }
1067
1068 static int
1069 ct_custom_tx (void *session, transport_send_params_t * sp)
1070 {
1071   session_t *s = (session_t *) session;
1072   if (session_has_transport (s))
1073     return 0;
1074   /* If event enqueued towards peer, remove from scheduler and remove
1075    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
1076    * avoid missing events if peer did not clear fifo flag yet, which is
1077    * interpreted as successful notification and session is descheduled. */
1078   svm_fifo_unset_event (s->tx_fifo);
1079   if (!ct_session_tx (s))
1080     sp->flags = TRANSPORT_SND_F_DESCHED;
1081
1082   /* The scheduler uses packet count as a means of upper bounding the amount
1083    * of work done per dispatch. So make it look like we have sent something */
1084   return 1;
1085 }
1086
1087 static int
1088 ct_app_rx_evt (transport_connection_t * tc)
1089 {
1090   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
1091   session_t *ps;
1092
1093   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
1094   if (!peer_ct)
1095     return -1;
1096   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1097   return session_dequeue_notify (ps);
1098 }
1099
1100 static u8 *
1101 format_ct_listener (u8 * s, va_list * args)
1102 {
1103   u32 tc_index = va_arg (*args, u32);
1104   u32 __clib_unused thread_index = va_arg (*args, u32);
1105   u32 __clib_unused verbose = va_arg (*args, u32);
1106   ct_connection_t *ct = ct_connection_get (tc_index, 0);
1107   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1108   if (verbose)
1109     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
1110   return s;
1111 }
1112
1113 static u8 *
1114 format_ct_half_open (u8 *s, va_list *args)
1115 {
1116   u32 ho_index = va_arg (*args, u32);
1117   u32 verbose = va_arg (*args, u32);
1118   ct_connection_t *ct = ct_connection_get (ho_index, 0);
1119   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1120   if (verbose)
1121     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
1122   return s;
1123 }
1124
1125 static u8 *
1126 format_ct_connection (u8 * s, va_list * args)
1127 {
1128   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1129   u32 verbose = va_arg (*args, u32);
1130
1131   if (!ct)
1132     return s;
1133   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1134   if (verbose)
1135     {
1136       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
1137       if (verbose > 1)
1138         {
1139           s = format (s, "\n");
1140         }
1141     }
1142   return s;
1143 }
1144
1145 static u8 *
1146 format_ct_session (u8 * s, va_list * args)
1147 {
1148   u32 ct_index = va_arg (*args, u32);
1149   u32 thread_index = va_arg (*args, u32);
1150   u32 verbose = va_arg (*args, u32);
1151   ct_connection_t *ct;
1152
1153   ct = ct_connection_get (ct_index, thread_index);
1154   if (!ct)
1155     {
1156       s = format (s, "empty\n");
1157       return s;
1158     }
1159
1160   s = format (s, "%U", format_ct_connection, ct, verbose);
1161   return s;
1162 }
1163
1164 clib_error_t *
1165 ct_enable_disable (vlib_main_t * vm, u8 is_en)
1166 {
1167   ct_main_t *cm = &ct_main;
1168
1169   cm->n_workers = vlib_num_workers ();
1170   vec_validate (cm->connections, cm->n_workers);
1171   vec_validate (cm->pending_cleanups, cm->n_workers);
1172   vec_validate (cm->heave_cleanups, cm->n_workers);
1173   clib_spinlock_init (&cm->ho_reuseable_lock);
1174   clib_rwlock_init (&cm->app_segs_lock);
1175   return 0;
1176 }
1177
1178 /* *INDENT-OFF* */
1179 static const transport_proto_vft_t cut_thru_proto = {
1180   .enable = ct_enable_disable,
1181   .start_listen = ct_start_listen,
1182   .stop_listen = ct_stop_listen,
1183   .get_connection = ct_session_get,
1184   .get_listener = ct_listener_get,
1185   .get_half_open = ct_half_open_get,
1186   .cleanup = ct_session_cleanup,
1187   .cleanup_ho = ct_cleanup_ho,
1188   .connect = ct_session_connect,
1189   .close = ct_session_close,
1190   .custom_tx = ct_custom_tx,
1191   .app_rx_evt = ct_app_rx_evt,
1192   .format_listener = format_ct_listener,
1193   .format_half_open = format_ct_half_open,
1194   .format_connection = format_ct_session,
1195   .transport_options = {
1196     .name = "ct",
1197     .short_name = "C",
1198     .tx_type = TRANSPORT_TX_INTERNAL,
1199     .service_type = TRANSPORT_SERVICE_VC,
1200   },
1201 };
1202 /* *INDENT-ON* */
1203
1204 int
1205 ct_session_tx (session_t * s)
1206 {
1207   ct_connection_t *ct, *peer_ct;
1208   session_t *peer_s;
1209
1210   ct = (ct_connection_t *) session_get_transport (s);
1211   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1212   if (!peer_ct)
1213     return 0;
1214   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1215   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1216     return 0;
1217   return session_enqueue_notify (peer_s);
1218 }
1219
1220 static clib_error_t *
1221 ct_transport_init (vlib_main_t * vm)
1222 {
1223   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1224                                FIB_PROTOCOL_IP4, ~0);
1225   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1226                                FIB_PROTOCOL_IP6, ~0);
1227   return 0;
1228 }
1229
1230 VLIB_INIT_FUNCTION (ct_transport_init);
1231
1232 /*
1233  * fd.io coding-style-patch-verification: ON
1234  *
1235  * Local Variables:
1236  * eval: (c-set-style "gnu")
1237  * End:
1238  */