session svm: support for segments larger than 4GB
[vpp.git] / src / vnet / session / segment_manager.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/segment_manager.h>
17 #include <vnet/session/session.h>
18 #include <vnet/session/application.h>
19
20 typedef struct segment_manager_main_
21 {
22   segment_manager_t *segment_managers;  /**< Pool of segment managers */
23   clib_valloc_main_t va_allocator;      /**< Virtual address allocator */
24   u32 seg_name_counter;                 /**< Counter for segment names */
25
26   /*
27    * Configuration
28    */
29   u32 default_fifo_size;                /**< default rx/tx fifo size */
30   u32 default_segment_size;             /**< default fifo segment size */
31   u32 default_app_mq_size;              /**< default app msg q size */
32 } segment_manager_main_t;
33
34 static segment_manager_main_t sm_main;
35
36 #define segment_manager_foreach_segment_w_lock(VAR, SM, BODY)           \
37 do {                                                                    \
38     clib_rwlock_reader_lock (&(SM)->segments_rwlock);                   \
39     pool_foreach((VAR), ((SM)->segments), (BODY));                      \
40     clib_rwlock_reader_unlock (&(SM)->segments_rwlock);                 \
41 } while (0)
42
43 static segment_manager_props_t *
44 segment_manager_properties_get (segment_manager_t * sm)
45 {
46   app_worker_t *app_wrk = app_worker_get (sm->app_wrk_index);
47   return application_get_segment_manager_properties (app_wrk->app_index);
48 }
49
50 segment_manager_props_t *
51 segment_manager_props_init (segment_manager_props_t * props)
52 {
53   props->add_segment_size = sm_main.default_segment_size;
54   props->rx_fifo_size = sm_main.default_fifo_size;
55   props->tx_fifo_size = sm_main.default_fifo_size;
56   props->evt_q_size = sm_main.default_app_mq_size;
57   props->n_slices = vlib_num_workers () + 1;
58   return props;
59 }
60
61 static u8
62 segment_manager_app_detached (segment_manager_t * sm)
63 {
64   return (sm->app_wrk_index == SEGMENT_MANAGER_INVALID_APP_INDEX);
65 }
66
67 void
68 segment_manager_app_detach (segment_manager_t * sm)
69 {
70   sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
71 }
72
73 always_inline u32
74 segment_manager_segment_index (segment_manager_t * sm, fifo_segment_t * seg)
75 {
76   return (seg - sm->segments);
77 }
78
79 /**
80  * Adds segment to segment manager's pool
81  *
82  * If needed a writer's lock is acquired before allocating a new segment
83  * to avoid affecting any of the segments pool readers.
84  */
85 int
86 segment_manager_add_segment (segment_manager_t * sm, uword segment_size)
87 {
88   uword baseva = (uword) ~ 0ULL, alloc_size, page_size;
89   u32 rnd_margin = 128 << 10, fs_index = ~0;
90   segment_manager_main_t *smm = &sm_main;
91   segment_manager_props_t *props;
92   fifo_segment_t *fs;
93   u8 *seg_name;
94   int rv;
95
96   props = segment_manager_properties_get (sm);
97
98   /* Not configured for addition of new segments and not first */
99   if (!props->add_segment && !segment_size)
100     {
101       clib_warning ("cannot allocate new segment");
102       return VNET_API_ERROR_INVALID_VALUE;
103     }
104
105   /*
106    * Allocate fifo segment and grab lock if needed
107    */
108   if (vlib_num_workers ())
109     clib_rwlock_writer_lock (&sm->segments_rwlock);
110
111   pool_get_zero (sm->segments, fs);
112
113   /*
114    * Allocate ssvm segment
115    */
116   segment_size = segment_size ? segment_size : props->add_segment_size;
117   page_size = clib_mem_get_page_size ();
118   /* Protect against segment size u32 wrap */
119   segment_size = clib_max (segment_size + page_size - 1, segment_size);
120   segment_size = segment_size & ~(page_size - 1);
121
122   if (props->segment_type != SSVM_SEGMENT_PRIVATE)
123     {
124       seg_name = format (0, "%d-%d%c", getpid (), smm->seg_name_counter++, 0);
125       alloc_size = (uword) segment_size + rnd_margin;
126       baseva = clib_valloc_alloc (&smm->va_allocator, alloc_size, 0);
127       if (!baseva)
128         {
129           clib_warning ("out of space for segments");
130           pool_put (sm->segments, fs);
131           goto done;
132         }
133     }
134   else
135     seg_name = format (0, "%s%c", "process-private", 0);
136
137   fs->ssvm.ssvm_size = segment_size;
138   fs->ssvm.name = seg_name;
139   fs->ssvm.requested_va = baseva;
140
141   if ((rv = ssvm_master_init (&fs->ssvm, props->segment_type)))
142     {
143       clib_warning ("svm_master_init ('%v', %u) failed", seg_name,
144                     segment_size);
145
146       if (props->segment_type != SSVM_SEGMENT_PRIVATE)
147         clib_valloc_free (&smm->va_allocator, baseva);
148       pool_put (sm->segments, fs);
149       goto done;
150     }
151
152   /*
153    * Initialize fifo segment
154    */
155   fs->n_slices = props->n_slices;
156   fifo_segment_init (fs);
157
158   /*
159    * Save segment index before dropping lock, if any held
160    */
161   fs_index = fs - sm->segments;
162
163 done:
164
165   if (vlib_num_workers ())
166     clib_rwlock_writer_unlock (&sm->segments_rwlock);
167
168   return fs_index;
169 }
170
171 /**
172  * Remove segment without lock
173  */
174 void
175 segment_manager_del_segment (segment_manager_t * sm, fifo_segment_t * fs)
176 {
177   segment_manager_main_t *smm = &sm_main;
178
179   if (ssvm_type (&fs->ssvm) != SSVM_SEGMENT_PRIVATE)
180     {
181       clib_valloc_free (&smm->va_allocator, fs->ssvm.requested_va);
182
183       if (sm->app_wrk_index != SEGMENT_MANAGER_INVALID_APP_INDEX)
184         {
185           app_worker_t *app_wrk;
186           u64 segment_handle;
187           app_wrk = app_worker_get (sm->app_wrk_index);
188           segment_handle = segment_manager_segment_handle (sm, fs);
189           app_worker_del_segment_notify (app_wrk, segment_handle);
190         }
191     }
192
193   ssvm_delete (&fs->ssvm);
194
195   if (CLIB_DEBUG)
196     clib_memset (fs, 0xfb, sizeof (*fs));
197   pool_put (sm->segments, fs);
198 }
199
200 /**
201  * Removes segment after acquiring writer lock
202  */
203 static inline void
204 segment_manager_lock_and_del_segment (segment_manager_t * sm, u32 fs_index)
205 {
206   fifo_segment_t *fs;
207   u8 is_prealloc;
208
209   clib_rwlock_writer_lock (&sm->segments_rwlock);
210   fs = segment_manager_get_segment (sm, fs_index);
211   is_prealloc = fifo_segment_flags (fs) & FIFO_SEGMENT_F_IS_PREALLOCATED;
212   if (is_prealloc && !segment_manager_app_detached (sm))
213     {
214       clib_rwlock_writer_unlock (&sm->segments_rwlock);
215       return;
216     }
217
218   segment_manager_del_segment (sm, fs);
219   clib_rwlock_writer_unlock (&sm->segments_rwlock);
220 }
221
222 /**
223  * Reads a segment from the segment manager's pool without lock
224  */
225 fifo_segment_t *
226 segment_manager_get_segment (segment_manager_t * sm, u32 segment_index)
227 {
228   return pool_elt_at_index (sm->segments, segment_index);
229 }
230
231 u64
232 segment_manager_segment_handle (segment_manager_t * sm,
233                                 fifo_segment_t * segment)
234 {
235   u32 segment_index = segment_manager_segment_index (sm, segment);
236   return (((u64) segment_manager_index (sm) << 32) | segment_index);
237 }
238
239 static void
240 segment_manager_parse_segment_handle (u64 segment_handle, u32 * sm_index,
241                                       u32 * segment_index)
242 {
243   *sm_index = segment_handle >> 32;
244   *segment_index = segment_handle & 0xFFFFFFFF;
245 }
246
247 u64
248 segment_manager_make_segment_handle (u32 segment_manager_index,
249                                      u32 segment_index)
250 {
251   return (((u64) segment_manager_index << 32) | segment_index);
252 }
253
254 fifo_segment_t *
255 segment_manager_get_segment_w_handle (u64 segment_handle)
256 {
257   u32 sm_index, segment_index;
258   segment_manager_t *sm;
259
260   segment_manager_parse_segment_handle (segment_handle, &sm_index,
261                                         &segment_index);
262   sm = segment_manager_get (sm_index);
263   if (!sm || pool_is_free_index (sm->segments, segment_index))
264     return 0;
265   return pool_elt_at_index (sm->segments, segment_index);
266 }
267
268 /**
269  * Reads a segment from the segment manager's pool and acquires reader lock
270  *
271  * Caller must drop the reader's lock by calling
272  * @ref segment_manager_segment_reader_unlock once it finishes working with
273  * the segment.
274  */
275 fifo_segment_t *
276 segment_manager_get_segment_w_lock (segment_manager_t * sm, u32 segment_index)
277 {
278   clib_rwlock_reader_lock (&sm->segments_rwlock);
279   return pool_elt_at_index (sm->segments, segment_index);
280 }
281
282 void
283 segment_manager_segment_reader_unlock (segment_manager_t * sm)
284 {
285   clib_rwlock_reader_unlock (&sm->segments_rwlock);
286 }
287
288 void
289 segment_manager_segment_writer_unlock (segment_manager_t * sm)
290 {
291   clib_rwlock_writer_unlock (&sm->segments_rwlock);
292 }
293
294 segment_manager_t *
295 segment_manager_alloc (void)
296 {
297   segment_manager_main_t *smm = &sm_main;
298   segment_manager_t *sm;
299
300   pool_get_zero (smm->segment_managers, sm);
301   clib_rwlock_init (&sm->segments_rwlock);
302   return sm;
303 }
304
305 /**
306  * Initializes segment manager based on options provided.
307  * Returns error if ssvm segment(s) allocation fails.
308  */
309 int
310 segment_manager_init (segment_manager_t * sm, uword first_seg_size,
311                       u32 prealloc_fifo_pairs)
312 {
313   u32 rx_fifo_size, tx_fifo_size, pair_size;
314   u32 rx_rounded_data_size, tx_rounded_data_size;
315   u64 approx_total_size, max_seg_size = ((u64) 1 << 32) - (128 << 10);
316   segment_manager_props_t *props;
317   fifo_segment_t *segment;
318   u32 approx_segment_count;
319   int seg_index, i;
320
321   props = segment_manager_properties_get (sm);
322   first_seg_size = clib_max (first_seg_size, sm_main.default_segment_size);
323
324   if (prealloc_fifo_pairs)
325     {
326       /* Figure out how many segments should be preallocated */
327       rx_rounded_data_size = (1 << (max_log2 (props->rx_fifo_size)));
328       tx_rounded_data_size = (1 << (max_log2 (props->tx_fifo_size)));
329
330       rx_fifo_size = sizeof (svm_fifo_t) + rx_rounded_data_size;
331       tx_fifo_size = sizeof (svm_fifo_t) + tx_rounded_data_size;
332       pair_size = rx_fifo_size + tx_fifo_size;
333
334       approx_total_size = (u64) prealloc_fifo_pairs *pair_size;
335       if (first_seg_size > approx_total_size)
336         max_seg_size = first_seg_size;
337       approx_segment_count = (approx_total_size + (max_seg_size - 1))
338         / max_seg_size;
339
340       /* Allocate the segments */
341       for (i = 0; i < approx_segment_count + 1; i++)
342         {
343           seg_index = segment_manager_add_segment (sm, max_seg_size);
344           if (seg_index < 0)
345             {
346               clib_warning ("Failed to preallocate segment %d", i);
347               return seg_index;
348             }
349
350           segment = segment_manager_get_segment (sm, seg_index);
351           if (i == 0)
352             sm->event_queue = segment_manager_alloc_queue (segment, props);
353
354           fifo_segment_preallocate_fifo_pairs (segment,
355                                                props->rx_fifo_size,
356                                                props->tx_fifo_size,
357                                                &prealloc_fifo_pairs);
358           fifo_segment_flags (segment) = FIFO_SEGMENT_F_IS_PREALLOCATED;
359           if (prealloc_fifo_pairs == 0)
360             break;
361         }
362     }
363   else
364     {
365       seg_index = segment_manager_add_segment (sm, first_seg_size);
366       if (seg_index < 0)
367         {
368           clib_warning ("Failed to allocate segment");
369           return seg_index;
370         }
371       segment = segment_manager_get_segment (sm, seg_index);
372       sm->event_queue = segment_manager_alloc_queue (segment, props);
373     }
374
375   return 0;
376 }
377
378 /**
379  * Cleanup segment manager.
380  */
381 void
382 segment_manager_free (segment_manager_t * sm)
383 {
384   segment_manager_main_t *smm = &sm_main;
385   fifo_segment_t *fifo_segment;
386
387   ASSERT (!segment_manager_has_fifos (sm)
388           && segment_manager_app_detached (sm));
389
390   /* If we have empty preallocated segments that haven't been removed, remove
391    * them now. Apart from that, the first segment in the first segment manager
392    * is not removed when all fifos are removed. It can only be removed when
393    * the manager is explicitly deleted/detached by the app. */
394   clib_rwlock_writer_lock (&sm->segments_rwlock);
395
396   /* *INDENT-OFF* */
397   pool_foreach (fifo_segment, sm->segments, ({
398     segment_manager_del_segment (sm, fifo_segment);
399   }));
400   /* *INDENT-ON* */
401
402   clib_rwlock_writer_unlock (&sm->segments_rwlock);
403
404   clib_rwlock_free (&sm->segments_rwlock);
405   if (CLIB_DEBUG)
406     clib_memset (sm, 0xfe, sizeof (*sm));
407   pool_put (smm->segment_managers, sm);
408 }
409
410 void
411 segment_manager_init_free (segment_manager_t * sm)
412 {
413   segment_manager_app_detach (sm);
414   if (segment_manager_has_fifos (sm))
415     segment_manager_del_sessions (sm);
416   else
417     {
418       ASSERT (!sm->first_is_protected || segment_manager_app_detached (sm));
419       segment_manager_free (sm);
420     }
421 }
422
423 segment_manager_t *
424 segment_manager_get (u32 index)
425 {
426   return pool_elt_at_index (sm_main.segment_managers, index);
427 }
428
429 segment_manager_t *
430 segment_manager_get_if_valid (u32 index)
431 {
432   if (pool_is_free_index (sm_main.segment_managers, index))
433     return 0;
434   return pool_elt_at_index (sm_main.segment_managers, index);
435 }
436
437 u32
438 segment_manager_index (segment_manager_t * sm)
439 {
440   return sm - sm_main.segment_managers;
441 }
442
443 u8
444 segment_manager_has_fifos (segment_manager_t * sm)
445 {
446   fifo_segment_t *seg;
447   u8 first = 1;
448
449   /* *INDENT-OFF* */
450   segment_manager_foreach_segment_w_lock (seg, sm, ({
451     if (CLIB_DEBUG && !first && !fifo_segment_has_fifos (seg)
452         && !(fifo_segment_flags (seg) & FIFO_SEGMENT_F_IS_PREALLOCATED))
453       {
454         clib_warning ("segment %d has no fifos!",
455                       segment_manager_segment_index (sm, seg));
456         first = 0;
457       }
458     if (fifo_segment_has_fifos (seg))
459       {
460         segment_manager_segment_reader_unlock (sm);
461         return 1;
462       }
463   }));
464   /* *INDENT-ON* */
465
466   return 0;
467 }
468
469 /**
470  * Initiate disconnects for all sessions 'owned' by a segment manager
471  */
472 void
473 segment_manager_del_sessions (segment_manager_t * sm)
474 {
475   session_handle_t *handles = 0, *handle;
476   fifo_segment_t *fs;
477   session_t *session;
478   int slice_index;
479   svm_fifo_t *f;
480
481   ASSERT (pool_elts (sm->segments) != 0);
482
483   /* Across all fifo segments used by the server */
484   /* *INDENT-OFF* */
485   segment_manager_foreach_segment_w_lock (fs, sm, ({
486     for (slice_index = 0; slice_index < fs->n_slices; slice_index++)
487       {
488         f = fifo_segment_get_slice_fifo_list (fs, slice_index);
489
490         /*
491          * Remove any residual sessions from the session lookup table
492          * Don't bother deleting the individual fifos, we're going to
493          * throw away the fifo segment in a minute.
494          */
495         while (f)
496           {
497             session = session_get_if_valid (f->master_session_index,
498                                             f->master_thread_index);
499             if (session)
500               vec_add1 (handles, session_handle (session));
501             f = f->next;
502           }
503       }
504
505     /* Instead of removing the segment, test when cleaning up disconnected
506      * sessions if the segment can be removed.
507      */
508   }));
509   /* *INDENT-ON* */
510
511   vec_foreach (handle, handles)
512     session_close (session_get_from_handle (*handle));
513 }
514
515 int
516 segment_manager_try_alloc_fifos (fifo_segment_t * fifo_segment,
517                                  u32 thread_index,
518                                  u32 rx_fifo_size, u32 tx_fifo_size,
519                                  svm_fifo_t ** rx_fifo, svm_fifo_t ** tx_fifo)
520 {
521   rx_fifo_size = clib_max (rx_fifo_size, sm_main.default_fifo_size);
522   *rx_fifo = fifo_segment_alloc_fifo_w_slice (fifo_segment, thread_index,
523                                               rx_fifo_size,
524                                               FIFO_SEGMENT_RX_FIFO);
525
526   tx_fifo_size = clib_max (tx_fifo_size, sm_main.default_fifo_size);
527   *tx_fifo = fifo_segment_alloc_fifo_w_slice (fifo_segment, thread_index,
528                                               tx_fifo_size,
529                                               FIFO_SEGMENT_TX_FIFO);
530
531   if (*rx_fifo == 0)
532     {
533       /* This would be very odd, but handle it... */
534       if (*tx_fifo != 0)
535         {
536           fifo_segment_free_fifo (fifo_segment, *tx_fifo);
537           *tx_fifo = 0;
538         }
539       return -1;
540     }
541   if (*tx_fifo == 0)
542     {
543       if (*rx_fifo != 0)
544         {
545           fifo_segment_free_fifo (fifo_segment, *rx_fifo);
546           *rx_fifo = 0;
547         }
548       return -1;
549     }
550
551   return 0;
552 }
553
554 int
555 segment_manager_alloc_session_fifos (segment_manager_t * sm,
556                                      u32 thread_index,
557                                      svm_fifo_t ** rx_fifo,
558                                      svm_fifo_t ** tx_fifo)
559 {
560   int alloc_fail = 1, rv = 0, new_fs_index;
561   segment_manager_props_t *props;
562   fifo_segment_t *fs = 0;
563   u32 sm_index, fs_index;
564   u8 added_a_segment = 0;
565   u64 fs_handle;
566
567   props = segment_manager_properties_get (sm);
568
569   /*
570    * Find the first free segment to allocate the fifos in
571    */
572
573   /* *INDENT-OFF* */
574   segment_manager_foreach_segment_w_lock (fs, sm, ({
575     alloc_fail = segment_manager_try_alloc_fifos (fs,
576                                                   thread_index,
577                                                   props->rx_fifo_size,
578                                                   props->tx_fifo_size,
579                                                   rx_fifo, tx_fifo);
580     /* Exit with lock held, drop it after notifying app */
581     if (!alloc_fail)
582       goto alloc_success;
583   }));
584   /* *INDENT-ON* */
585
586 alloc_check:
587
588   if (!alloc_fail)
589     {
590
591     alloc_success:
592
593       ASSERT (rx_fifo && tx_fifo);
594       sm_index = segment_manager_index (sm);
595       fs_index = segment_manager_segment_index (sm, fs);
596       (*tx_fifo)->segment_manager = sm_index;
597       (*rx_fifo)->segment_manager = sm_index;
598       (*tx_fifo)->segment_index = fs_index;
599       (*rx_fifo)->segment_index = fs_index;
600
601       if (added_a_segment)
602         {
603           app_worker_t *app_wrk;
604           fs_handle = segment_manager_segment_handle (sm, fs);
605           app_wrk = app_worker_get (sm->app_wrk_index);
606           rv = app_worker_add_segment_notify (app_wrk, fs_handle);
607         }
608       /* Drop the lock after app is notified */
609       segment_manager_segment_reader_unlock (sm);
610       return rv;
611     }
612
613   /*
614    * Allocation failed, see if we can add a new segment
615    */
616   if (props->add_segment)
617     {
618       if (added_a_segment)
619         {
620           clib_warning ("Added a segment, still can't allocate a fifo");
621           segment_manager_segment_reader_unlock (sm);
622           return SESSION_ERROR_NEW_SEG_NO_SPACE;
623         }
624       if ((new_fs_index = segment_manager_add_segment (sm, 0)) < 0)
625         {
626           clib_warning ("Failed to add new segment");
627           return SESSION_ERROR_SEG_CREATE;
628         }
629       fs = segment_manager_get_segment_w_lock (sm, new_fs_index);
630       alloc_fail = segment_manager_try_alloc_fifos (fs, thread_index,
631                                                     props->rx_fifo_size,
632                                                     props->tx_fifo_size,
633                                                     rx_fifo, tx_fifo);
634       added_a_segment = 1;
635       goto alloc_check;
636     }
637   else
638     {
639       clib_warning ("Can't add new seg and no space to allocate fifos!");
640       return SESSION_ERROR_NO_SPACE;
641     }
642 }
643
644 void
645 segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
646 {
647   segment_manager_t *sm;
648   fifo_segment_t *fs;
649   u32 segment_index;
650
651   if (!rx_fifo || !tx_fifo)
652     return;
653
654   /* It's possible to have no segment manager if the session was removed
655    * as result of a detach. */
656   if (!(sm = segment_manager_get_if_valid (rx_fifo->segment_manager)))
657     return;
658
659   segment_index = rx_fifo->segment_index;
660   fs = segment_manager_get_segment_w_lock (sm, segment_index);
661   fifo_segment_free_fifo (fs, rx_fifo);
662   fifo_segment_free_fifo (fs, tx_fifo);
663
664   /*
665    * Try to remove svm segment if it has no fifos. This can be done only if
666    * the segment is not the first in the segment manager or if it is first
667    * and it is not protected. Moreover, if the segment is first and the app
668    * has detached from the segment manager, remove the segment manager.
669    */
670   if (!fifo_segment_has_fifos (fs))
671     {
672       segment_manager_segment_reader_unlock (sm);
673
674       /* Remove segment if it holds no fifos or first but not protected */
675       if (segment_index != 0 || !sm->first_is_protected)
676         segment_manager_lock_and_del_segment (sm, segment_index);
677
678       /* Remove segment manager if no sessions and detached from app */
679       if (segment_manager_app_detached (sm)
680           && !segment_manager_has_fifos (sm))
681         {
682           segment_manager_free (sm);
683         }
684     }
685   else
686     segment_manager_segment_reader_unlock (sm);
687 }
688
689 int
690 segment_manager_grow_fifo (segment_manager_t * sm, svm_fifo_t * f, u32 size)
691 {
692   fifo_segment_t *fs;
693   int rv;
694
695   fs = segment_manager_get_segment_w_lock (sm, f->segment_index);
696   rv = fifo_segment_grow_fifo (fs, f, size);
697   segment_manager_segment_reader_unlock (sm);
698
699   return rv;
700 }
701
702 int
703 segment_manager_collect_fifo_chunks (segment_manager_t * sm, svm_fifo_t * f)
704 {
705   fifo_segment_t *fs;
706   int rv;
707
708   fs = segment_manager_get_segment_w_lock (sm, f->segment_index);
709   rv = fifo_segment_collect_fifo_chunks (fs, f);
710   segment_manager_segment_reader_unlock (sm);
711
712   return rv;
713 }
714
715 int
716 segment_manager_shrink_fifo (segment_manager_t * sm, svm_fifo_t * f, u32 size,
717                              u8 is_producer)
718 {
719   int rv;
720
721   rv = svm_fifo_reduce_size (f, size, is_producer);
722
723   /* Nothing to collect at this point */
724   if (!is_producer)
725     return rv;
726
727   if (f->flags & SVM_FIFO_F_COLLECT_CHUNKS)
728     segment_manager_collect_fifo_chunks (sm, f);
729
730   return rv;
731 }
732
733 u32
734 segment_manager_evt_q_expected_size (u32 q_len)
735 {
736   u32 fifo_evt_size, notif_q_size, q_hdrs;
737   u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz;
738
739   fifo_evt_size = 1 << max_log2 (sizeof (session_event_t));
740   notif_q_size = clib_max (16, q_len >> 4);
741
742   msg_q_sz = q_len * sizeof (svm_msg_q_msg_t);
743   fifo_evt_ring_sz = q_len * fifo_evt_size;
744   session_ntf_ring_sz = notif_q_size * 256;
745   q_hdrs = sizeof (svm_queue_t) + sizeof (svm_msg_q_t);
746
747   return (msg_q_sz + fifo_evt_ring_sz + session_ntf_ring_sz + q_hdrs);
748 }
749
750 /**
751  * Allocates shm queue in the first segment
752  *
753  * Must be called with lock held
754  */
755 svm_msg_q_t *
756 segment_manager_alloc_queue (fifo_segment_t * segment,
757                              segment_manager_props_t * props)
758 {
759   u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
760   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
761   svm_msg_q_t *q;
762   void *oldheap;
763
764   fifo_evt_size = sizeof (session_event_t);
765   notif_q_size = clib_max (16, props->evt_q_size >> 4);
766   /* *INDENT-OFF* */
767   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
768     {props->evt_q_size, fifo_evt_size, 0},
769     {notif_q_size, session_evt_size, 0}
770   };
771   /* *INDENT-ON* */
772   cfg->consumer_pid = 0;
773   cfg->n_rings = 2;
774   cfg->q_nitems = props->evt_q_size;
775   cfg->ring_cfgs = rc;
776
777   oldheap = ssvm_push_heap (segment->ssvm.sh);
778   q = svm_msg_q_alloc (cfg);
779   fifo_segment_update_free_bytes (segment);
780   ssvm_pop_heap (oldheap);
781
782   if (props->use_mq_eventfd)
783     {
784       if (svm_msg_q_alloc_producer_eventfd (q))
785         clib_warning ("failed to alloc eventfd");
786     }
787   return q;
788 }
789
790 svm_msg_q_t *
791 segment_manager_event_queue (segment_manager_t * sm)
792 {
793   return sm->event_queue;
794 }
795
796 /**
797  * Frees shm queue allocated in the first segment
798  */
799 void
800 segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q)
801 {
802   fifo_segment_t *segment;
803   ssvm_shared_header_t *sh;
804   void *oldheap;
805
806   ASSERT (!pool_is_free_index (sm->segments, 0));
807
808   segment = segment_manager_get_segment_w_lock (sm, 0);
809   sh = segment->ssvm.sh;
810
811   oldheap = ssvm_push_heap (sh);
812   svm_queue_free (q);
813   ssvm_pop_heap (oldheap);
814   segment_manager_segment_reader_unlock (sm);
815 }
816
817 /*
818  * Init segment vm address allocator
819  */
820 void
821 segment_manager_main_init (segment_manager_main_init_args_t * a)
822 {
823   segment_manager_main_t *sm = &sm_main;
824   clib_valloc_chunk_t _ip, *ip = &_ip;
825
826   ip->baseva = a->baseva;
827   ip->size = a->size;
828
829   clib_valloc_init (&sm->va_allocator, ip, 1 /* lock */ );
830
831   sm->default_fifo_size = 1 << 12;
832   sm->default_segment_size = 1 << 20;
833   sm->default_app_mq_size = 128;
834 }
835
836 static clib_error_t *
837 segment_manager_show_fn (vlib_main_t * vm, unformat_input_t * input,
838                          vlib_cli_command_t * cmd)
839 {
840   segment_manager_main_t *smm = &sm_main;
841   u8 show_segments = 0, verbose = 0;
842   segment_manager_t *sm;
843   fifo_segment_t *seg;
844
845   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
846     {
847       if (unformat (input, "segments"))
848         show_segments = 1;
849       else if (unformat (input, "verbose"))
850         verbose = 1;
851       else
852         return clib_error_return (0, "unknown input `%U'",
853                                   format_unformat_error, input);
854     }
855   vlib_cli_output (vm, "%d segment managers allocated",
856                    pool_elts (smm->segment_managers));
857   if (verbose && pool_elts (smm->segment_managers))
858     {
859       vlib_cli_output (vm, "%-10s%=15s%=12s", "Index", "App Index",
860                        "Segments");
861
862       /* *INDENT-OFF* */
863       pool_foreach (sm, smm->segment_managers, ({
864         vlib_cli_output (vm, "%-10d%=15d%=12d", segment_manager_index (sm),
865                            sm->app_wrk_index, pool_elts (sm->segments));
866       }));
867       /* *INDENT-ON* */
868
869     }
870   if (show_segments)
871     {
872       vlib_cli_output (vm, "%U", format_fifo_segment, 0, verbose);
873
874       /* *INDENT-OFF* */
875       pool_foreach (sm, smm->segment_managers, ({
876           segment_manager_foreach_segment_w_lock (seg, sm, ({
877             vlib_cli_output (vm, "%U", format_fifo_segment, seg, verbose);
878           }));
879       }));
880       /* *INDENT-ON* */
881
882     }
883   return 0;
884 }
885
886 /* *INDENT-OFF* */
887 VLIB_CLI_COMMAND (segment_manager_show_command, static) =
888 {
889   .path = "show segment-manager",
890   .short_help = "show segment-manager [segments][verbose]",
891   .function = segment_manager_show_fn,
892 };
893 /* *INDENT-ON* */
894
895 void
896 segment_manager_format_sessions (segment_manager_t * sm, int verbose)
897 {
898   vlib_main_t *vm = vlib_get_main ();
899   app_worker_t *app_wrk;
900   fifo_segment_t *fs;
901   const u8 *app_name;
902   int slice_index;
903   u8 *s = 0, *str;
904   svm_fifo_t *f;
905
906   if (!sm)
907     {
908       if (verbose)
909         vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App",
910                          "API Client", "SegManager");
911       else
912         vlib_cli_output (vm, "%-40s%-20s", "Connection", "App");
913       return;
914     }
915
916   app_wrk = app_worker_get (sm->app_wrk_index);
917   app_name = application_name_from_index (app_wrk->app_index);
918
919   clib_rwlock_reader_lock (&sm->segments_rwlock);
920
921   /* *INDENT-OFF* */
922   pool_foreach (fs, sm->segments, ({
923     for (slice_index = 0; slice_index < fs->n_slices; slice_index++)
924       {
925         f = fifo_segment_get_slice_fifo_list (fs, slice_index);
926         while (f)
927           {
928             u32 session_index, thread_index;
929             session_t *session;
930
931             session_index = f->master_session_index;
932             thread_index = f->master_thread_index;
933
934             session = session_get (session_index, thread_index);
935             str = format (0, "%U", format_session, session, verbose);
936
937             if (verbose)
938               s = format (s, "%-40s%-20s%-15u%-10u", str, app_name,
939                           app_wrk->api_client_index, app_wrk->connects_seg_manager);
940             else
941               s = format (s, "%-40s%-20s", str, app_name);
942
943             vlib_cli_output (vm, "%v", s);
944             vec_reset_length (s);
945             vec_free (str);
946
947             f = f->next;
948           }
949         vec_free (s);
950       }
951   }));
952   /* *INDENT-ON* */
953
954   clib_rwlock_reader_unlock (&sm->segments_rwlock);
955 }
956
957 /*
958  * fd.io coding-style-patch-verification: ON
959  *
960  * Local Variables:
961  * eval: (c-set-style "gnu")
962  * End:
963  */