8fde091389e17311a513683665423d8bc54ad4d6
[vpp.git] / vlib / vlib / mc.c
1 /*
2  * mc.c: vlib reliable sequenced multicast distributed applications
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <vlib/vlib.h>
19
20 /*
21  * 1 to enable msg id training wheels, which are useful for tracking
22  * down catchup and/or partitioned network problems
23  */
24 #define MSG_ID_DEBUG 0
25
26 static format_function_t format_mc_stream_state;
27
28 static u32
29 elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
30 {
31   uword *p, r;
32   mhash_t *h = &m->elog_id_by_peer_id;
33
34   if (!m->elog_id_by_peer_id.hash)
35     mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
36
37   p = mhash_get (h, &peer_id);
38   if (p)
39     return p[0];
40   r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
41   mhash_set (h, &peer_id, r, /* old_value */ 0);
42   return r;
43 }
44
45 static u32
46 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
47 {
48   uword *p, r;
49   uword *h = m->elog_id_by_msg_name;
50   u8 *name_copy;
51
52   if (!h)
53     h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
54
55   p = hash_get_mem (h, msg_name);
56   if (p)
57     return p[0];
58   r = elog_string (m->elog_main, "%s", msg_name);
59
60   name_copy = format (0, "%s%c", msg_name, 0);
61
62   hash_set_mem (h, name_copy, r);
63   m->elog_id_by_msg_name = h;
64
65   return r;
66 }
67
68 static void
69 elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
70              u32 retry_count)
71 {
72   if (MC_EVENT_LOGGING > 0)
73     {
74       /* *INDENT-OFF* */
75       ELOG_TYPE_DECLARE (e) =
76         {
77           .format = "tx-msg: stream %d local seq %d attempt %d",
78           .format_args = "i4i4i4",
79         };
80       /* *INDENT-ON* */
81       struct
82       {
83         u32 stream_id, local_sequence, retry_count;
84       } *ed;
85       ed = ELOG_DATA (m->elog_main, e);
86       ed->stream_id = stream_id;
87       ed->local_sequence = local_sequence;
88       ed->retry_count = retry_count;
89     }
90 }
91
92 /*
93  * seq_cmp
94  * correctly compare two unsigned sequence numbers.
95  * This function works so long as x and y are within 2**(n-1) of each
96  * other, where n = bits(x, y).
97  *
98  * Magic decoder ring:
99  * seq_cmp == 0 => x and y are equal
100  * seq_cmp < 0 => x is "in the past" with respect to y
101  * seq_cmp > 0 => x is "in the future" with respect to y
102  */
103 always_inline i32
104 mc_seq_cmp (u32 x, u32 y)
105 {
106   return (i32) x - (i32) y;
107 }
108
109 void *
110 mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
111 {
112   u32 n_alloc, bi;
113   vlib_buffer_t *b;
114
115   n_alloc = vlib_buffer_alloc (vm, &bi, 1);
116   ASSERT (n_alloc == 1);
117
118   b = vlib_get_buffer (vm, bi);
119   b->current_length = n_bytes;
120   *bi_return = bi;
121   return (void *) b->data;
122 }
123
124 static void
125 delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
126                         uword index, int notify_application)
127 {
128   mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
129   ASSERT (p != 0);
130   if (s->config.peer_died && notify_application)
131     s->config.peer_died (mcm, s, p->id);
132
133   s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
134
135   if (MC_EVENT_LOGGING > 0)
136     {
137       /* *INDENT-OFF* */
138       ELOG_TYPE_DECLARE (e) =
139         {
140           .format = "delete peer %s from all_peer_bitmap",
141           .format_args = "T4",
142         };
143       /* *INDENT-ON* */
144       struct
145       {
146         u32 peer;
147       } *ed = 0;
148
149       ed = ELOG_DATA (mcm->elog_main, e);
150       ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
151     }
152   /* Do not delete the pool / hash table entries, or we lose sequence number state */
153 }
154
155 static mc_stream_peer_t *
156 get_or_create_peer_with_id (mc_main_t * mcm,
157                             mc_stream_t * s, mc_peer_id_t id, int *created)
158 {
159   uword *q = mhash_get (&s->peer_index_by_id, &id);
160   mc_stream_peer_t *p;
161
162   if (q)
163     {
164       p = pool_elt_at_index (s->peers, q[0]);
165       goto done;
166     }
167
168   pool_get (s->peers, p);
169   memset (p, 0, sizeof (p[0]));
170   p->id = id;
171   p->last_sequence_received = ~0;
172   mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
173   if (created)
174     *created = 1;
175
176 done:
177   if (MC_EVENT_LOGGING > 0)
178     {
179       /* *INDENT-OFF* */
180       ELOG_TYPE_DECLARE (e) =
181         {
182           .format = "get_or_create %s peer %s stream %d seq %d",
183           .format_args = "t4T4i4i4",
184           .n_enum_strings = 2,
185           .enum_strings = {
186             "old", "new",
187           },
188         };
189       /* *INDENT-ON* */
190       struct
191       {
192         u32 is_new, peer, stream_index, rx_sequence;
193       } *ed = 0;
194
195       ed = ELOG_DATA (mcm->elog_main, e);
196       ed->is_new = q ? 0 : 1;
197       ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
198       ed->stream_index = s->index;
199       ed->rx_sequence = p->last_sequence_received;
200     }
201   /* $$$$ Enable or reenable this peer */
202   s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
203   return p;
204 }
205
206 static void
207 maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
208 {
209   vlib_one_time_waiting_process_t *p;
210
211   if (pool_elts (stream->retry_pool) >= stream->config.window_size)
212     return;
213
214   vec_foreach (p, stream->procs_waiting_for_open_window)
215     vlib_signal_one_time_waiting_process (vm, p);
216
217   if (stream->procs_waiting_for_open_window)
218     _vec_len (stream->procs_waiting_for_open_window) = 0;
219 }
220
221 static void
222 mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
223 {
224   mc_retry_t record, *retp;
225
226   if (r->unacked_by_peer_bitmap)
227     _vec_len (r->unacked_by_peer_bitmap) = 0;
228
229   if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
230     {
231       clib_fifo_sub1 (s->retired_fifo, record);
232       vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
233     }
234
235   clib_fifo_add2 (s->retired_fifo, retp);
236
237   retp->buffer_index = r->buffer_index;
238   retp->local_sequence = r->local_sequence;
239
240   r->buffer_index = ~0;         /* poison buffer index in this retry */
241 }
242
243 static void
244 mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
245 {
246   mc_retry_t *retry;
247
248   if (MC_EVENT_LOGGING > 0)
249     {
250       /* *INDENT-OFF* */
251       ELOG_TYPE_DECLARE (e) =
252         {
253           .format = "resend-retired: search for local seq %d",
254           .format_args = "i4",
255         };
256       /* *INDENT-ON* */
257       struct
258       {
259         u32 local_sequence;
260       } *ed;
261       ed = ELOG_DATA (mcm->elog_main, e);
262       ed->local_sequence = local_sequence;
263     }
264
265   /* *INDENT-OFF* */
266   clib_fifo_foreach (retry, s->retired_fifo,
267   ({
268     if (retry->local_sequence == local_sequence)
269       {
270         elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
271         mcm->transport.tx_buffer (mcm->transport.opaque,
272                                   MC_TRANSPORT_USER_REQUEST_TO_RELAY,
273                                   retry->buffer_index);
274         return;
275       }
276   }));
277   /* *INDENT-ON* */
278
279   if (MC_EVENT_LOGGING > 0)
280     {
281       /* *INDENT-OFF* */
282       ELOG_TYPE_DECLARE (e) =
283         {
284           .format = "resend-retired: FAILED search for local seq %d",
285           .format_args = "i4",
286         };
287       /* *INDENT-ON* */
288       struct
289       {
290         u32 local_sequence;
291       } *ed;
292       ed = ELOG_DATA (mcm->elog_main, e);
293       ed->local_sequence = local_sequence;
294     }
295 }
296
297 static uword *
298 delete_retry_fifo_elt (mc_main_t * mcm,
299                        mc_stream_t * stream,
300                        mc_retry_t * r, uword * dead_peer_bitmap)
301 {
302   mc_stream_peer_t *p;
303
304   /* *INDENT-OFF* */
305   pool_foreach (p, stream->peers, ({
306     uword pi = p - stream->peers;
307     uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
308
309     if (! is_alive)
310       dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
311
312     if (MC_EVENT_LOGGING > 0)
313       {
314         ELOG_TYPE_DECLARE (e) = {
315           .format = "delete_retry_fifo_elt: peer %s is %s",
316           .format_args = "T4t4",
317           .n_enum_strings = 2,
318           .enum_strings = { "alive", "dead", },
319         };
320         struct { u32 peer, is_alive; } * ed;
321         ed = ELOG_DATA (mcm->elog_main, e);
322         ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
323         ed->is_alive = is_alive;
324       }
325   }));
326   /* *INDENT-ON* */
327
328   hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
329   mc_retry_free (mcm, stream, r);
330
331   return dead_peer_bitmap;
332 }
333
334 always_inline mc_retry_t *
335 prev_retry (mc_stream_t * s, mc_retry_t * r)
336 {
337   return (r->prev_index != ~0
338           ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
339 }
340
341 always_inline mc_retry_t *
342 next_retry (mc_stream_t * s, mc_retry_t * r)
343 {
344   return (r->next_index != ~0
345           ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
346 }
347
348 always_inline void
349 remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
350 {
351   mc_retry_t *p = prev_retry (s, r);
352   mc_retry_t *n = next_retry (s, r);
353
354   if (p)
355     p->next_index = r->next_index;
356   else
357     s->retry_head_index = r->next_index;
358   if (n)
359     n->prev_index = r->prev_index;
360   else
361     s->retry_tail_index = r->prev_index;
362
363   pool_put_index (s->retry_pool, r - s->retry_pool);
364 }
365
366 static void
367 check_retry (mc_main_t * mcm, mc_stream_t * s)
368 {
369   mc_retry_t *r;
370   vlib_main_t *vm = mcm->vlib_main;
371   f64 now = vlib_time_now (vm);
372   uword *dead_peer_bitmap = 0;
373   u32 ri, ri_next;
374
375   for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
376     {
377       r = pool_elt_at_index (s->retry_pool, ri);
378       ri_next = r->next_index;
379
380       if (now < r->sent_at + s->config.retry_interval)
381         continue;
382
383       r->n_retries += 1;
384       if (r->n_retries > s->config.retry_limit)
385         {
386           dead_peer_bitmap =
387             delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
388           remove_retry_from_pool (s, r);
389         }
390       else
391         {
392           if (MC_EVENT_LOGGING > 0)
393             {
394               mc_stream_peer_t *p;
395
396               /* *INDENT-OFF* */
397               ELOG_TYPE_DECLARE (t) =
398                 {
399                   .format = "resend local seq %d attempt %d",
400                   .format_args = "i4i4",
401                 };
402               /* *INDENT-ON* */
403
404               /* *INDENT-OFF* */
405               pool_foreach (p, s->peers, ({
406                 if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
407                   {
408                     ELOG_TYPE_DECLARE (ev) = {
409                       .format = "resend: needed by peer %s local seq %d",
410                       .format_args = "T4i4",
411                     };
412                     struct { u32 peer, rx_sequence; } * ed;
413                     ed = ELOG_DATA (mcm->elog_main, ev);
414                     ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
415                     ed->rx_sequence = r->local_sequence;
416                   }
417               }));
418               /* *INDENT-ON* */
419
420               struct
421               {
422                 u32 sequence;
423                 u32 trail;
424               } *ed;
425               ed = ELOG_DATA (mcm->elog_main, t);
426               ed->sequence = r->local_sequence;
427               ed->trail = r->n_retries;
428             }
429
430           r->sent_at = vlib_time_now (vm);
431           s->stats.n_retries += 1;
432
433           elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
434
435           mcm->transport.tx_buffer
436             (mcm->transport.opaque,
437              MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
438         }
439     }
440
441   maybe_send_window_open_event (mcm->vlib_main, s);
442
443   /* Delete any dead peers we've found. */
444   if (!clib_bitmap_is_zero (dead_peer_bitmap))
445     {
446       uword i;
447
448       /* *INDENT-OFF* */
449       clib_bitmap_foreach (i, dead_peer_bitmap, ({
450         delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
451
452         /* Delete any references to just deleted peer in retry pool. */
453         pool_foreach (r, s->retry_pool, ({
454           r->unacked_by_peer_bitmap =
455             clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
456         }));
457       }));
458 /* *INDENT-ON* */
459       clib_bitmap_free (dead_peer_bitmap);
460     }
461 }
462
463 always_inline mc_main_t *
464 mc_node_get_main (vlib_node_runtime_t * node)
465 {
466   mc_main_t **p = (void *) node->runtime_data;
467   return p[0];
468 }
469
470 static uword
471 mc_retry_process (vlib_main_t * vm,
472                   vlib_node_runtime_t * node, vlib_frame_t * f)
473 {
474   mc_main_t *mcm = mc_node_get_main (node);
475   mc_stream_t *s;
476
477   while (1)
478     {
479       vlib_process_suspend (vm, 1.0);
480       vec_foreach (s, mcm->stream_vector)
481       {
482         if (s->state != MC_STREAM_STATE_invalid)
483           check_retry (mcm, s);
484       }
485     }
486   return 0;                     /* not likely */
487 }
488
489 static void
490 send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
491 {
492   vlib_main_t *vm = mcm->vlib_main;
493   mc_msg_join_or_leave_request_t *mp;
494   u32 bi;
495
496   mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
497   memset (mp, 0, sizeof (*mp));
498   mp->type = MC_MSG_TYPE_join_or_leave_request;
499   mp->peer_id = mcm->transport.our_ack_peer_id;
500   mp->stream_index = stream_index;
501   mp->is_join = is_join;
502
503   mc_byte_swap_msg_join_or_leave_request (mp);
504
505   /*
506    * These msgs are unnumbered, unordered so send on the from-relay
507    * channel.
508    */
509   mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
510 }
511
512 static uword
513 mc_join_ager_process (vlib_main_t * vm,
514                       vlib_node_runtime_t * node, vlib_frame_t * f)
515 {
516   mc_main_t *mcm = mc_node_get_main (node);
517
518   while (1)
519     {
520       if (mcm->joins_in_progress)
521         {
522           mc_stream_t *s;
523           vlib_one_time_waiting_process_t *p;
524           f64 now = vlib_time_now (vm);
525
526           vec_foreach (s, mcm->stream_vector)
527           {
528             if (s->state != MC_STREAM_STATE_join_in_progress)
529               continue;
530
531             if (now > s->join_timeout)
532               {
533                 s->state = MC_STREAM_STATE_ready;
534
535                 if (MC_EVENT_LOGGING > 0)
536                   {
537                     /* *INDENT-OFF* */
538                     ELOG_TYPE_DECLARE (e) =
539                       {
540                         .format = "stream %d join timeout",
541                       };
542                     /* *INDENT-ON* */
543                     ELOG (mcm->elog_main, e, s->index);
544                   }
545                 /* Make sure that this app instance exists as a stream peer,
546                    or we may answer a catchup request with a NULL
547                    all_peer_bitmap... */
548                 (void) get_or_create_peer_with_id
549                   (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
550
551                 vec_foreach (p, s->procs_waiting_for_join_done)
552                   vlib_signal_one_time_waiting_process (vm, p);
553                 if (s->procs_waiting_for_join_done)
554                   _vec_len (s->procs_waiting_for_join_done) = 0;
555
556                 mcm->joins_in_progress--;
557                 ASSERT (mcm->joins_in_progress >= 0);
558               }
559             else
560               {
561                 /* Resent join request which may have been lost. */
562                 send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
563
564                 /* We're *not* alone, retry for as long as it takes */
565                 if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
566                   s->join_timeout = vlib_time_now (vm) + 2.0;
567
568
569                 if (MC_EVENT_LOGGING > 0)
570                   {
571                     /* *INDENT-OFF* */
572                     ELOG_TYPE_DECLARE (e) =
573                       {
574                         .format = "stream %d resend join request",
575                       };
576                     /* *INDENT-ON* */
577                     ELOG (mcm->elog_main, e, s->index);
578                   }
579               }
580           }
581         }
582
583       vlib_process_suspend (vm, .5);
584     }
585
586   return 0;                     /* not likely */
587 }
588
589 static void
590 serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
591 {
592   char *name = va_arg (*va, char *);
593   serialize_cstring (m, name);
594 }
595
596 static void
597 elog_stream_name (char *buf, int n_buf_bytes, char *v)
598 {
599   clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
600   buf[n_buf_bytes - 1] = 0;
601 }
602
603 static void
604 unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
605 {
606   mc_main_t *mcm = va_arg (*va, mc_main_t *);
607   char *name;
608   mc_stream_t *s;
609   uword *p;
610
611   unserialize_cstring (m, &name);
612
613   if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
614     {
615       if (MC_EVENT_LOGGING > 0)
616         {
617           /* *INDENT-OFF* */
618           ELOG_TYPE_DECLARE (e) =
619             {
620               .format = "stream index %d already named %s",
621               .format_args = "i4s16",
622             };
623           /* *INDENT-ON* */
624           struct
625           {
626             u32 stream_index;
627             char name[16];
628           } *ed;
629           ed = ELOG_DATA (mcm->elog_main, e);
630           ed->stream_index = p[0];
631           elog_stream_name (ed->name, sizeof (ed->name), name);
632         }
633
634       vec_free (name);
635       return;
636     }
637
638   vec_add2 (mcm->stream_vector, s, 1);
639   mc_stream_init (s);
640   s->state = MC_STREAM_STATE_name_known;
641   s->index = s - mcm->stream_vector;
642   s->config.name = name;
643
644   if (MC_EVENT_LOGGING > 0)
645     {
646       /* *INDENT-OFF* */
647       ELOG_TYPE_DECLARE (e) =
648         {
649           .format = "stream index %d named %s",
650           .format_args = "i4s16",
651         };
652       /* *INDENT-ON* */
653       struct
654       {
655         u32 stream_index;
656         char name[16];
657       } *ed;
658       ed = ELOG_DATA (mcm->elog_main, e);
659       ed->stream_index = s->index;
660       elog_stream_name (ed->name, sizeof (ed->name), name);
661     }
662
663   hash_set_mem (mcm->stream_index_by_name, name, s->index);
664
665   p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
666   if (p)
667     {
668       vlib_one_time_waiting_process_t *wp, **w;
669       w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
670       vec_foreach (wp, w[0])
671         vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
672       pool_put (mcm->procs_waiting_for_stream_name_pool, w);
673       hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
674     }
675 }
676
677 /* *INDENT-OFF* */
678 MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
679 {
680   .name = "mc_register_stream_name",
681   .serialize = serialize_mc_register_stream_name,
682   .unserialize = unserialize_mc_register_stream_name,
683 };
684 /* *INDENT-ON* */
685
686 void
687 mc_rx_buffer_unserialize (mc_main_t * mcm,
688                           mc_stream_t * stream,
689                           mc_peer_id_t peer_id, u32 buffer_index)
690 {
691   return mc_unserialize (mcm, stream, buffer_index);
692 }
693
694 static u8 *
695 mc_internal_catchup_snapshot (mc_main_t * mcm,
696                               u8 * data_vector,
697                               u32 last_global_sequence_processed)
698 {
699   serialize_main_t m;
700
701   /* Append serialized data to data vector. */
702   serialize_open_vector (&m, data_vector);
703   m.stream.current_buffer_index = vec_len (data_vector);
704
705   serialize (&m, serialize_mc_main, mcm);
706   return serialize_close_vector (&m);
707 }
708
709 static void
710 mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
711 {
712   serialize_main_t s;
713
714   unserialize_open_data (&s, data, n_data_bytes);
715
716   unserialize (&s, unserialize_mc_main, mcm);
717 }
718
719 /* Overridden from the application layer, not actually used here */
720 void mc_stream_join_process_hold (void) __attribute__ ((weak));
721 void
722 mc_stream_join_process_hold (void)
723 {
724 }
725
726 static u32
727 mc_stream_join_helper (mc_main_t * mcm,
728                        mc_stream_config_t * config, u32 is_internal)
729 {
730   mc_stream_t *s;
731   vlib_main_t *vm = mcm->vlib_main;
732
733   s = 0;
734   if (!is_internal)
735     {
736       uword *p;
737
738       /* Already have a stream with given name? */
739       if ((s = mc_stream_by_name (mcm, config->name)))
740         {
741           /* Already joined and ready? */
742           if (s->state == MC_STREAM_STATE_ready)
743             return s->index;
744         }
745
746       /* First join MC internal stream. */
747       if (!mcm->stream_vector
748           || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
749               == MC_STREAM_STATE_invalid))
750         {
751           static mc_stream_config_t c = {
752             .name = "mc-internal",
753             .rx_buffer = mc_rx_buffer_unserialize,
754             .catchup = mc_internal_catchup,
755             .catchup_snapshot = mc_internal_catchup_snapshot,
756           };
757
758           c.save_snapshot = config->save_snapshot;
759
760           mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
761         }
762
763       /* If stream is still unknown register this name and wait for
764          sequenced message to name stream.  This way all peers agree
765          on stream name to index mappings. */
766       s = mc_stream_by_name (mcm, config->name);
767       if (!s)
768         {
769           vlib_one_time_waiting_process_t *wp, **w;
770           u8 *name_copy = format (0, "%s", config->name);
771
772           mc_serialize_stream (mcm,
773                                MC_STREAM_INDEX_INTERNAL,
774                                &mc_register_stream_name_msg, config->name);
775
776           /* Wait for this stream to be named. */
777           p =
778             hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
779                           name_copy);
780           if (p)
781             w =
782               pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
783                                  p[0]);
784           else
785             {
786               pool_get (mcm->procs_waiting_for_stream_name_pool, w);
787               if (!mcm->procs_waiting_for_stream_name_by_name)
788                 mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
789                                                                                  sizeof
790                                                                                  (uword));
791               hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
792                             name_copy,
793                             w - mcm->procs_waiting_for_stream_name_pool);
794               w[0] = 0;
795             }
796
797           vec_add2 (w[0], wp, 1);
798           vlib_current_process_wait_for_one_time_event (vm, wp);
799           vec_free (name_copy);
800         }
801
802       /* Name should be known now. */
803       s = mc_stream_by_name (mcm, config->name);
804       ASSERT (s != 0);
805       ASSERT (s->state == MC_STREAM_STATE_name_known);
806     }
807
808   if (!s)
809     {
810       vec_add2 (mcm->stream_vector, s, 1);
811       mc_stream_init (s);
812       s->index = s - mcm->stream_vector;
813     }
814
815   {
816     /* Save name since we could have already used it as hash key. */
817     char *name_save = s->config.name;
818
819     s->config = config[0];
820
821     if (name_save)
822       s->config.name = name_save;
823   }
824
825   if (s->config.window_size == 0)
826     s->config.window_size = 8;
827
828   if (s->config.retry_interval == 0.0)
829     s->config.retry_interval = 1.0;
830
831   /* Sanity. */
832   ASSERT (s->config.retry_interval < 30);
833
834   if (s->config.retry_limit == 0)
835     s->config.retry_limit = 7;
836
837   s->state = MC_STREAM_STATE_join_in_progress;
838   if (!s->peer_index_by_id.hash)
839     mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
840
841   /* If we don't hear from someone in 5 seconds, we're alone */
842   s->join_timeout = vlib_time_now (vm) + 5.0;
843   mcm->joins_in_progress++;
844
845   if (MC_EVENT_LOGGING > 0)
846     {
847       /* *INDENT-OFF* */
848       ELOG_TYPE_DECLARE (e) =
849       {
850         .format = "stream index %d join request %s",
851         .format_args = "i4s16",
852       };
853       /* *INDENT-ON* */
854       struct
855       {
856         u32 stream_index;
857         char name[16];
858       } *ed;
859       ed = ELOG_DATA (mcm->elog_main, e);
860       ed->stream_index = s->index;
861       elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
862     }
863
864   send_join_or_leave_request (mcm, s->index, 1 /* join */ );
865
866   vlib_current_process_wait_for_one_time_event_vector
867     (vm, &s->procs_waiting_for_join_done);
868
869   if (MC_EVENT_LOGGING)
870     {
871       ELOG_TYPE (e, "join complete stream %d");
872       ELOG (mcm->elog_main, e, s->index);
873     }
874
875   return s->index;
876 }
877
878 u32
879 mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
880 {
881   return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
882 }
883
884 void
885 mc_stream_leave (mc_main_t * mcm, u32 stream_index)
886 {
887   mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
888
889   if (!s)
890     return;
891
892   if (MC_EVENT_LOGGING)
893     {
894       /* *INDENT-OFF* */
895       ELOG_TYPE_DECLARE (t) =
896         {
897           .format = "leave-stream: %d",.format_args = "i4",
898         };
899       /* *INDENT-ON* */
900       struct
901       {
902         u32 index;
903       } *ed;
904       ed = ELOG_DATA (mcm->elog_main, t);
905       ed->index = stream_index;
906     }
907
908   send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
909   mc_stream_free (s);
910   s->state = MC_STREAM_STATE_name_known;
911 }
912
913 void
914 mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
915                                       mc_msg_join_or_leave_request_t * req,
916                                       u32 buffer_index)
917 {
918   mc_stream_t *s;
919   mc_msg_join_reply_t *rep;
920   u32 bi;
921
922   mc_byte_swap_msg_join_or_leave_request (req);
923
924   s = mc_stream_by_index (mcm, req->stream_index);
925   if (!s || s->state != MC_STREAM_STATE_ready)
926     return;
927
928   /* If the peer is joining, create it */
929   if (req->is_join)
930     {
931       mc_stream_t *this_s;
932
933       /* We're not in a position to catch up a peer until all
934          stream joins are complete. */
935       if (0)
936         {
937           /* XXX This is hard to test so we've. */
938           vec_foreach (this_s, mcm->stream_vector)
939           {
940             if (this_s->state != MC_STREAM_STATE_ready
941                 && this_s->state != MC_STREAM_STATE_name_known)
942               return;
943           }
944         }
945       else if (mcm->joins_in_progress > 0)
946         return;
947
948       (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
949                                          /* created */ 0);
950
951       rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
952       memset (rep, 0, sizeof (rep[0]));
953       rep->type = MC_MSG_TYPE_join_reply;
954       rep->stream_index = req->stream_index;
955
956       mc_byte_swap_msg_join_reply (rep);
957       /* These two are already in network byte order... */
958       rep->peer_id = mcm->transport.our_ack_peer_id;
959       rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
960
961       mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
962     }
963   else
964     {
965       if (s->config.peer_died)
966         s->config.peer_died (mcm, s, req->peer_id);
967     }
968 }
969
970 void
971 mc_msg_join_reply_handler (mc_main_t * mcm,
972                            mc_msg_join_reply_t * mp, u32 buffer_index)
973 {
974   mc_stream_t *s;
975
976   mc_byte_swap_msg_join_reply (mp);
977
978   s = mc_stream_by_index (mcm, mp->stream_index);
979
980   if (!s || s->state != MC_STREAM_STATE_join_in_progress)
981     return;
982
983   /* Switch to catchup state; next join reply
984      for this stream will be ignored. */
985   s->state = MC_STREAM_STATE_catchup;
986
987   mcm->joins_in_progress--;
988   mcm->transport.catchup_request_fun (mcm->transport.opaque,
989                                       mp->stream_index, mp->catchup_peer_id);
990 }
991
992 void
993 mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
994 {
995   mc_stream_t *s;
996
997   while (1)
998     {
999       s = mc_stream_by_name (m, stream_name);
1000       if (s)
1001         break;
1002       vlib_process_suspend (m->vlib_main, .1);
1003     }
1004
1005   /* It's OK to send a message in catchup and ready states. */
1006   if (s->state == MC_STREAM_STATE_catchup
1007       || s->state == MC_STREAM_STATE_ready)
1008     return;
1009
1010   /* Otherwise we are waiting for a join to finish. */
1011   vlib_current_process_wait_for_one_time_event_vector
1012     (m->vlib_main, &s->procs_waiting_for_join_done);
1013 }
1014
1015 u32
1016 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
1017 {
1018   mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
1019   vlib_main_t *vm = mcm->vlib_main;
1020   mc_retry_t *r;
1021   mc_msg_user_request_t *mp;
1022   vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
1023   u32 ri;
1024
1025   if (!s)
1026     return 0;
1027
1028   if (s->state != MC_STREAM_STATE_ready)
1029     vlib_current_process_wait_for_one_time_event_vector
1030       (vm, &s->procs_waiting_for_join_done);
1031
1032   while (pool_elts (s->retry_pool) >= s->config.window_size)
1033     {
1034       vlib_current_process_wait_for_one_time_event_vector
1035         (vm, &s->procs_waiting_for_open_window);
1036     }
1037
1038   pool_get (s->retry_pool, r);
1039   ri = r - s->retry_pool;
1040
1041   r->prev_index = s->retry_tail_index;
1042   r->next_index = ~0;
1043   s->retry_tail_index = ri;
1044
1045   if (r->prev_index == ~0)
1046     s->retry_head_index = ri;
1047   else
1048     {
1049       mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
1050       p->next_index = ri;
1051     }
1052
1053   vlib_buffer_advance (b, -sizeof (mp[0]));
1054   mp = vlib_buffer_get_current (b);
1055
1056   mp->peer_id = mcm->transport.our_ack_peer_id;
1057   /* mp->transport.global_sequence set by relay agent. */
1058   mp->global_sequence = 0xdeadbeef;
1059   mp->stream_index = s->index;
1060   mp->local_sequence = s->our_local_sequence++;
1061   mp->n_data_bytes =
1062     vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
1063
1064   r->buffer_index = buffer_index;
1065   r->local_sequence = mp->local_sequence;
1066   r->sent_at = vlib_time_now (vm);
1067   r->n_retries = 0;
1068
1069   /* Retry will be freed when all currently known peers have acked. */
1070   vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
1071   vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
1072
1073   hash_set (s->retry_index_by_local_sequence, r->local_sequence,
1074             r - s->retry_pool);
1075
1076   elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
1077
1078   mc_byte_swap_msg_user_request (mp);
1079
1080   mcm->transport.tx_buffer (mcm->transport.opaque,
1081                             MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
1082
1083   s->user_requests_sent++;
1084
1085   /* return amount of window remaining */
1086   return s->config.window_size - pool_elts (s->retry_pool);
1087 }
1088
1089 void
1090 mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
1091                              u32 buffer_index)
1092 {
1093   vlib_main_t *vm = mcm->vlib_main;
1094   mc_stream_t *s;
1095   mc_stream_peer_t *peer;
1096   i32 seq_cmp_result;
1097   static int once = 0;
1098
1099   mc_byte_swap_msg_user_request (mp);
1100
1101   s = mc_stream_by_index (mcm, mp->stream_index);
1102
1103   /* Not signed up for this stream? Turf-o-matic */
1104   if (!s || s->state != MC_STREAM_STATE_ready)
1105     {
1106       vlib_buffer_free_one (vm, buffer_index);
1107       return;
1108     }
1109
1110   /* Find peer, including ourselves. */
1111   peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1112                                      /* created */ 0);
1113
1114   seq_cmp_result = mc_seq_cmp (mp->local_sequence,
1115                                peer->last_sequence_received + 1);
1116
1117   if (MC_EVENT_LOGGING > 0)
1118     {
1119       /* *INDENT-OFF* */
1120       ELOG_TYPE_DECLARE (e) =
1121         {
1122           .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1123           .format_args = "T4i4i4i4",
1124         };
1125       /* *INDENT-ON* */
1126       struct
1127       {
1128         u32 peer, stream_index, rx_sequence;
1129         i32 seq_cmp_result;
1130       } *ed;
1131       ed = ELOG_DATA (mcm->elog_main, e);
1132       ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1133       ed->stream_index = mp->stream_index;
1134       ed->rx_sequence = mp->local_sequence;
1135       ed->seq_cmp_result = seq_cmp_result;
1136     }
1137
1138   if (0 && mp->stream_index == 1 && once == 0)
1139     {
1140       once = 1;
1141       ELOG_TYPE (e, "FAKE lost msg on stream 1");
1142       ELOG (mcm->elog_main, e, 0);
1143       return;
1144     }
1145
1146   peer->last_sequence_received += seq_cmp_result == 0;
1147   s->user_requests_received++;
1148
1149   if (seq_cmp_result > 0)
1150     peer->stats.n_msgs_from_future += 1;
1151
1152   /* Send ack even if msg from future */
1153   if (1)
1154     {
1155       mc_msg_user_ack_t *rp;
1156       u32 bi;
1157
1158       rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
1159       rp->peer_id = mcm->transport.our_ack_peer_id;
1160       rp->stream_index = s->index;
1161       rp->local_sequence = mp->local_sequence;
1162       rp->seq_cmp_result = seq_cmp_result;
1163
1164       if (MC_EVENT_LOGGING > 0)
1165         {
1166           /* *INDENT-OFF* */
1167           ELOG_TYPE_DECLARE (e) =
1168             {
1169               .format = "tx-ack: stream %d local seq %d",
1170               .format_args = "i4i4",
1171             };
1172           /* *INDENT-ON* */
1173           struct
1174           {
1175             u32 stream_index;
1176             u32 local_sequence;
1177           } *ed;
1178           ed = ELOG_DATA (mcm->elog_main, e);
1179           ed->stream_index = rp->stream_index;
1180           ed->local_sequence = rp->local_sequence;
1181         }
1182
1183       mc_byte_swap_msg_user_ack (rp);
1184
1185       mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
1186       /* Msg from past? If so, free the buffer... */
1187       if (seq_cmp_result < 0)
1188         {
1189           vlib_buffer_free_one (vm, buffer_index);
1190           peer->stats.n_msgs_from_past += 1;
1191         }
1192     }
1193
1194   if (seq_cmp_result == 0)
1195     {
1196       vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
1197       switch (s->state)
1198         {
1199         case MC_STREAM_STATE_ready:
1200           vlib_buffer_advance (b, sizeof (mp[0]));
1201           s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
1202
1203           /* Stream vector can change address via rx callback for mc-internal
1204              stream. */
1205           s = mc_stream_by_index (mcm, mp->stream_index);
1206           ASSERT (s != 0);
1207           s->last_global_sequence_processed = mp->global_sequence;
1208           break;
1209
1210         case MC_STREAM_STATE_catchup:
1211           clib_fifo_add1 (s->catchup_fifo, buffer_index);
1212           break;
1213
1214         default:
1215           clib_warning ("stream in unknown state %U",
1216                         format_mc_stream_state, s->state);
1217           break;
1218         }
1219     }
1220 }
1221
1222 void
1223 mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
1224                          u32 buffer_index)
1225 {
1226   vlib_main_t *vm = mcm->vlib_main;
1227   uword *p;
1228   mc_stream_t *s;
1229   mc_stream_peer_t *peer;
1230   mc_retry_t *r;
1231   int peer_created = 0;
1232
1233   mc_byte_swap_msg_user_ack (mp);
1234
1235   s = mc_stream_by_index (mcm, mp->stream_index);
1236
1237   if (MC_EVENT_LOGGING > 0)
1238     {
1239       /* *INDENT-OFF* */
1240       ELOG_TYPE_DECLARE (t) =
1241         {
1242           .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
1243           .format_args = "i4T4i4",
1244         };
1245       /* *INDENT-ON* */
1246
1247       struct
1248       {
1249         u32 local_sequence;
1250         u32 peer;
1251         i32 seq_cmp_result;
1252       } *ed;
1253       ed = ELOG_DATA (mcm->elog_main, t);
1254       ed->local_sequence = mp->local_sequence;
1255       ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1256       ed->seq_cmp_result = mp->seq_cmp_result;
1257     }
1258
1259   /* Unknown stream? */
1260   if (!s)
1261     return;
1262
1263   /* Find the peer which just ack'ed. */
1264   peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1265                                      /* created */ &peer_created);
1266
1267   /*
1268    * Peer reports message from the future. If it's not in the retry
1269    * fifo, look for a retired message.
1270    */
1271   if (mp->seq_cmp_result > 0)
1272     {
1273       p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
1274                     mp->seq_cmp_result);
1275       if (p == 0)
1276         mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
1277
1278       /* Normal retry should fix it... */
1279       return;
1280     }
1281
1282   /*
1283    * Pointer to the indicated retry fifo entry.
1284    * Worth hashing because we could use a window size of 100 or 1000.
1285    */
1286   p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
1287
1288   /*
1289    * Is this a duplicate ACK, received after we've retired the
1290    * fifo entry. This can happen when learning about new
1291    * peers.
1292    */
1293   if (p == 0)
1294     {
1295       if (MC_EVENT_LOGGING > 0)
1296         {
1297           /* *INDENT-OFF* */
1298           ELOG_TYPE_DECLARE (t) =
1299             {
1300               .format = "ack: for seq %d from peer %s no fifo elt",
1301               .format_args = "i4T4",
1302             };
1303           /* *INDENT-ON* */
1304
1305           struct
1306           {
1307             u32 seq;
1308             u32 peer;
1309           } *ed;
1310           ed = ELOG_DATA (mcm->elog_main, t);
1311           ed->seq = mp->local_sequence;
1312           ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1313         }
1314
1315       return;
1316     }
1317
1318   r = pool_elt_at_index (s->retry_pool, p[0]);
1319
1320   /* Make sure that this new peer ACKs our msgs from now on */
1321   if (peer_created)
1322     {
1323       mc_retry_t *later_retry = next_retry (s, r);
1324
1325       while (later_retry)
1326         {
1327           later_retry->unacked_by_peer_bitmap =
1328             clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
1329                              peer - s->peers);
1330           later_retry = next_retry (s, later_retry);
1331         }
1332     }
1333
1334   ASSERT (mp->local_sequence == r->local_sequence);
1335
1336   /* If we weren't expecting to hear from this peer */
1337   if (!peer_created &&
1338       !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
1339     {
1340       if (MC_EVENT_LOGGING > 0)
1341         {
1342           /* *INDENT-OFF* */
1343           ELOG_TYPE_DECLARE (t) =
1344             {
1345               .format = "dup-ack: for seq %d from peer %s",
1346               .format_args = "i4T4",
1347             };
1348           /* *INDENT-ON* */
1349           struct
1350           {
1351             u32 seq;
1352             u32 peer;
1353           } *ed;
1354           ed = ELOG_DATA (mcm->elog_main, t);
1355           ed->seq = r->local_sequence;
1356           ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1357         }
1358       if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1359         return;
1360     }
1361
1362   if (MC_EVENT_LOGGING > 0)
1363     {
1364       /* *INDENT-OFF* */
1365       ELOG_TYPE_DECLARE (t) =
1366         {
1367           .format = "ack: for seq %d from peer %s",
1368           .format_args = "i4T4",
1369         };
1370       /* *INDENT-ON* */
1371       struct
1372       {
1373         u32 seq;
1374         u32 peer;
1375       } *ed;
1376       ed = ELOG_DATA (mcm->elog_main, t);
1377       ed->seq = mp->local_sequence;
1378       ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1379     }
1380
1381   r->unacked_by_peer_bitmap =
1382     clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
1383
1384   /* Not all clients have ack'ed */
1385   if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1386     {
1387       return;
1388     }
1389   if (MC_EVENT_LOGGING > 0)
1390     {
1391       /* *INDENT-OFF* */
1392       ELOG_TYPE_DECLARE (t) =
1393         {
1394           .format = "ack: retire fifo elt loc seq %d after %d acks",
1395           .format_args = "i4i4",
1396         };
1397       /* *INDENT-ON* */
1398       struct
1399       {
1400         u32 seq;
1401         u32 npeers;
1402       } *ed;
1403       ed = ELOG_DATA (mcm->elog_main, t);
1404       ed->seq = r->local_sequence;
1405       ed->npeers = pool_elts (s->peers);
1406     }
1407
1408   hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
1409   mc_retry_free (mcm, s, r);
1410   remove_retry_from_pool (s, r);
1411   maybe_send_window_open_event (vm, s);
1412 }
1413
1414 #define EVENT_MC_SEND_CATCHUP_DATA 0
1415
1416 static uword
1417 mc_catchup_process (vlib_main_t * vm,
1418                     vlib_node_runtime_t * node, vlib_frame_t * f)
1419 {
1420   mc_main_t *mcm = mc_node_get_main (node);
1421   uword *event_data = 0;
1422   mc_catchup_process_arg_t *args;
1423   int i;
1424
1425   while (1)
1426     {
1427       if (event_data)
1428         _vec_len (event_data) = 0;
1429       vlib_process_wait_for_event_with_type (vm, &event_data,
1430                                              EVENT_MC_SEND_CATCHUP_DATA);
1431
1432       for (i = 0; i < vec_len (event_data); i++)
1433         {
1434           args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
1435
1436           mcm->transport.catchup_send_fun (mcm->transport.opaque,
1437                                            args->catchup_opaque,
1438                                            args->catchup_snapshot);
1439
1440           /* Send function will free snapshot data vector. */
1441           pool_put (mcm->catchup_process_args, args);
1442         }
1443     }
1444
1445   return 0;                     /* not likely */
1446 }
1447
1448 static void
1449 serialize_mc_stream (serialize_main_t * m, va_list * va)
1450 {
1451   mc_stream_t *s = va_arg (*va, mc_stream_t *);
1452   mc_stream_peer_t *p;
1453
1454   serialize_integer (m, pool_elts (s->peers), sizeof (u32));
1455   /* *INDENT-OFF* */
1456   pool_foreach (p, s->peers, ({
1457     u8 * x = serialize_get (m, sizeof (p->id));
1458     clib_memcpy (x, p->id.as_u8, sizeof (p->id));
1459     serialize_integer (m, p->last_sequence_received,
1460                        sizeof (p->last_sequence_received));
1461   }));
1462 /* *INDENT-ON* */
1463   serialize_bitmap (m, s->all_peer_bitmap);
1464 }
1465
1466 void
1467 unserialize_mc_stream (serialize_main_t * m, va_list * va)
1468 {
1469   mc_stream_t *s = va_arg (*va, mc_stream_t *);
1470   u32 i, n_peers;
1471   mc_stream_peer_t *p;
1472
1473   unserialize_integer (m, &n_peers, sizeof (u32));
1474   mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
1475   for (i = 0; i < n_peers; i++)
1476     {
1477       u8 *x;
1478       pool_get (s->peers, p);
1479       x = unserialize_get (m, sizeof (p->id));
1480       clib_memcpy (p->id.as_u8, x, sizeof (p->id));
1481       unserialize_integer (m, &p->last_sequence_received,
1482                            sizeof (p->last_sequence_received));
1483       mhash_set (&s->peer_index_by_id, &p->id, p - s->peers,    /* old_value */
1484                  0);
1485     }
1486   s->all_peer_bitmap = unserialize_bitmap (m);
1487
1488   /* This is really bad. */
1489   if (!s->all_peer_bitmap)
1490     clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
1491 }
1492
1493 void
1494 mc_msg_catchup_request_handler (mc_main_t * mcm,
1495                                 mc_msg_catchup_request_t * req,
1496                                 u32 catchup_opaque)
1497 {
1498   vlib_main_t *vm = mcm->vlib_main;
1499   mc_stream_t *s;
1500   mc_catchup_process_arg_t *args;
1501
1502   mc_byte_swap_msg_catchup_request (req);
1503
1504   s = mc_stream_by_index (mcm, req->stream_index);
1505   if (!s || s->state != MC_STREAM_STATE_ready)
1506     return;
1507
1508   if (MC_EVENT_LOGGING > 0)
1509     {
1510       /* *INDENT-OFF* */
1511       ELOG_TYPE_DECLARE (t) =
1512         {
1513           .format = "catchup-request: from %s stream %d",
1514           .format_args = "T4i4",
1515         };
1516       /* *INDENT-ON* */
1517       struct
1518       {
1519         u32 peer, stream;
1520       } *ed;
1521       ed = ELOG_DATA (mcm->elog_main, t);
1522       ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
1523       ed->stream = req->stream_index;
1524     }
1525
1526   /*
1527    * The application has to snapshoot its data structures right
1528    * here, right now. If we process any messages after
1529    * noting the last global sequence we've processed, the client
1530    * won't be able to accurately reconstruct our data structures.
1531    *
1532    * Once the data structures are e.g. vec_dup()'ed, we
1533    * send the resulting messages from a separate process, to
1534    * make sure that we don't cause a bunch of message retransmissions
1535    */
1536   pool_get (mcm->catchup_process_args, args);
1537
1538   args->stream_index = s - mcm->stream_vector;
1539   args->catchup_opaque = catchup_opaque;
1540   args->catchup_snapshot = 0;
1541
1542   /* Construct catchup reply and snapshot state for stream to send as
1543      catchup reply payload. */
1544   {
1545     mc_msg_catchup_reply_t *rep;
1546     serialize_main_t m;
1547
1548     vec_resize (args->catchup_snapshot, sizeof (rep[0]));
1549
1550     rep = (void *) args->catchup_snapshot;
1551
1552     rep->peer_id = req->peer_id;
1553     rep->stream_index = req->stream_index;
1554     rep->last_global_sequence_included = s->last_global_sequence_processed;
1555
1556     /* Setup for serialize to append to catchup snapshot. */
1557     serialize_open_vector (&m, args->catchup_snapshot);
1558     m.stream.current_buffer_index = vec_len (m.stream.buffer);
1559
1560     serialize (&m, serialize_mc_stream, s);
1561
1562     args->catchup_snapshot = serialize_close_vector (&m);
1563
1564     /* Actually copy internal state */
1565     args->catchup_snapshot = s->config.catchup_snapshot
1566       (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
1567
1568     rep = (void *) args->catchup_snapshot;
1569     rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
1570
1571     mc_byte_swap_msg_catchup_reply (rep);
1572   }
1573
1574   /* now go send it... */
1575   vlib_process_signal_event (vm, mcm->catchup_process,
1576                              EVENT_MC_SEND_CATCHUP_DATA,
1577                              args - mcm->catchup_process_args);
1578 }
1579
1580 #define EVENT_MC_UNSERIALIZE_BUFFER 0
1581 #define EVENT_MC_UNSERIALIZE_CATCHUP 1
1582
1583 void
1584 mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
1585                               u32 catchup_opaque)
1586 {
1587   vlib_process_signal_event (mcm->vlib_main,
1588                              mcm->unserialize_process,
1589                              EVENT_MC_UNSERIALIZE_CATCHUP,
1590                              pointer_to_uword (mp));
1591 }
1592
1593 static void
1594 perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
1595 {
1596   mc_stream_t *s;
1597   i32 seq_cmp_result;
1598
1599   mc_byte_swap_msg_catchup_reply (mp);
1600
1601   s = mc_stream_by_index (mcm, mp->stream_index);
1602
1603   /* Never heard of this stream or already caught up. */
1604   if (!s || s->state == MC_STREAM_STATE_ready)
1605     return;
1606
1607   {
1608     serialize_main_t m;
1609     mc_stream_peer_t *p;
1610     u32 n_stream_bytes;
1611
1612     /* For offline sim replay: save the entire catchup snapshot... */
1613     if (s->config.save_snapshot)
1614       s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
1615                                mp->n_data_bytes);
1616
1617     unserialize_open_data (&m, mp->data, mp->n_data_bytes);
1618     unserialize (&m, unserialize_mc_stream, s);
1619
1620     /* Make sure we start numbering our messages as expected */
1621     /* *INDENT-OFF* */
1622     pool_foreach (p, s->peers, ({
1623       if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1624         s->our_local_sequence = p->last_sequence_received + 1;
1625     }));
1626 /* *INDENT-ON* */
1627
1628     n_stream_bytes = m.stream.current_buffer_index;
1629
1630     /* No need to unserialize close; nothing to free. */
1631
1632     /* After serialized stream is user's catchup data. */
1633     s->config.catchup (mcm, mp->data + n_stream_bytes,
1634                        mp->n_data_bytes - n_stream_bytes);
1635   }
1636
1637   /* Vector could have been moved by catchup.
1638      This can only happen for mc-internal stream. */
1639   s = mc_stream_by_index (mcm, mp->stream_index);
1640
1641   s->last_global_sequence_processed = mp->last_global_sequence_included;
1642
1643   while (clib_fifo_elts (s->catchup_fifo))
1644     {
1645       mc_msg_user_request_t *gp;
1646       u32 bi;
1647       vlib_buffer_t *b;
1648
1649       clib_fifo_sub1 (s->catchup_fifo, bi);
1650
1651       b = vlib_get_buffer (mcm->vlib_main, bi);
1652       gp = vlib_buffer_get_current (b);
1653
1654       /* Make sure we're replaying "new" news */
1655       seq_cmp_result = mc_seq_cmp (gp->global_sequence,
1656                                    mp->last_global_sequence_included);
1657
1658       if (seq_cmp_result > 0)
1659         {
1660           vlib_buffer_advance (b, sizeof (gp[0]));
1661           s->config.rx_buffer (mcm, s, gp->peer_id, bi);
1662           s->last_global_sequence_processed = gp->global_sequence;
1663
1664           if (MC_EVENT_LOGGING)
1665             {
1666               /* *INDENT-OFF* */
1667               ELOG_TYPE_DECLARE (t) =
1668                 {
1669                   .format = "catchup replay local sequence 0x%x",
1670                   .format_args = "i4",
1671                 };
1672               /* *INDENT-ON* */
1673               struct
1674               {
1675                 u32 local_sequence;
1676               } *ed;
1677               ed = ELOG_DATA (mcm->elog_main, t);
1678               ed->local_sequence = gp->local_sequence;
1679             }
1680         }
1681       else
1682         {
1683           if (MC_EVENT_LOGGING)
1684             {
1685               /* *INDENT-OFF* */
1686               ELOG_TYPE_DECLARE (t) =
1687                 {
1688                   .format = "catchup discard local sequence 0x%x",
1689                   .format_args = "i4",
1690                 };
1691               /* *INDENT-ON* */
1692               struct
1693               {
1694                 u32 local_sequence;
1695               } *ed;
1696               ed = ELOG_DATA (mcm->elog_main, t);
1697               ed->local_sequence = gp->local_sequence;
1698             }
1699
1700           vlib_buffer_free_one (mcm->vlib_main, bi);
1701         }
1702     }
1703
1704   s->state = MC_STREAM_STATE_ready;
1705
1706   /* Now that we are caught up wake up joining process. */
1707   {
1708     vlib_one_time_waiting_process_t *wp;
1709     vec_foreach (wp, s->procs_waiting_for_join_done)
1710       vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
1711     if (s->procs_waiting_for_join_done)
1712       _vec_len (s->procs_waiting_for_join_done) = 0;
1713   }
1714 }
1715
1716 static void
1717 this_node_maybe_master (mc_main_t * mcm)
1718 {
1719   vlib_main_t *vm = mcm->vlib_main;
1720   mc_msg_master_assert_t *mp;
1721   uword event_type;
1722   int timeouts = 0;
1723   int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
1724   clib_error_t *error;
1725   f64 now, time_last_master_assert = -1;
1726   u32 bi;
1727
1728   while (1)
1729     {
1730       if (!mcm->we_can_be_relay_master)
1731         {
1732           mcm->relay_state = MC_RELAY_STATE_SLAVE;
1733           if (MC_EVENT_LOGGING)
1734             {
1735               ELOG_TYPE (e, "become slave (config)");
1736               ELOG (mcm->elog_main, e, 0);
1737             }
1738           return;
1739         }
1740
1741       now = vlib_time_now (vm);
1742       if (now >= time_last_master_assert + 1)
1743         {
1744           time_last_master_assert = now;
1745           mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
1746
1747           mp->peer_id = mcm->transport.our_ack_peer_id;
1748           mp->global_sequence = mcm->relay_global_sequence;
1749
1750           /*
1751            * these messages clog the event log, set MC_EVENT_LOGGING higher
1752            * if you want them
1753            */
1754           if (MC_EVENT_LOGGING > 1)
1755             {
1756               /* *INDENT-OFF* */
1757               ELOG_TYPE_DECLARE (e) =
1758                 {
1759                   .format = "tx-massert: peer %s global seq %u",
1760                   .format_args = "T4i4",
1761                 };
1762               /* *INDENT-ON* */
1763               struct
1764               {
1765                 u32 peer, global_sequence;
1766               } *ed;
1767               ed = ELOG_DATA (mcm->elog_main, e);
1768               ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1769               ed->global_sequence = mp->global_sequence;
1770             }
1771
1772           mc_byte_swap_msg_master_assert (mp);
1773
1774           error =
1775             mcm->transport.tx_buffer (mcm->transport.opaque,
1776                                       MC_TRANSPORT_MASTERSHIP, bi);
1777           if (error)
1778             clib_error_report (error);
1779         }
1780
1781       vlib_process_wait_for_event_or_clock (vm, 1.0);
1782       event_type = vlib_process_get_events (vm, /* no event data */ 0);
1783
1784       switch (event_type)
1785         {
1786         case ~0:
1787           if (!is_master && timeouts++ > 2)
1788             {
1789               mcm->relay_state = MC_RELAY_STATE_MASTER;
1790               mcm->relay_master_peer_id =
1791                 mcm->transport.our_ack_peer_id.as_u64;
1792               if (MC_EVENT_LOGGING)
1793                 {
1794                   ELOG_TYPE (e, "become master (was maybe_master)");
1795                   ELOG (mcm->elog_main, e, 0);
1796                 }
1797               return;
1798             }
1799           break;
1800
1801         case MC_RELAY_STATE_SLAVE:
1802           mcm->relay_state = MC_RELAY_STATE_SLAVE;
1803           if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
1804             {
1805               ELOG_TYPE (e, "become slave (was maybe_master)");
1806               ELOG (mcm->elog_main, e, 0);
1807             }
1808           return;
1809         }
1810     }
1811 }
1812
1813 static void
1814 this_node_slave (mc_main_t * mcm)
1815 {
1816   vlib_main_t *vm = mcm->vlib_main;
1817   uword event_type;
1818   int timeouts = 0;
1819
1820   if (MC_EVENT_LOGGING)
1821     {
1822       ELOG_TYPE (e, "become slave");
1823       ELOG (mcm->elog_main, e, 0);
1824     }
1825
1826   while (1)
1827     {
1828       vlib_process_wait_for_event_or_clock (vm, 1.0);
1829       event_type = vlib_process_get_events (vm, /* no event data */ 0);
1830
1831       switch (event_type)
1832         {
1833         case ~0:
1834           if (timeouts++ > 2)
1835             {
1836               mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
1837               mcm->relay_master_peer_id = ~0ULL;
1838               if (MC_EVENT_LOGGING)
1839                 {
1840                   ELOG_TYPE (e, "timeouts; negoitate mastership");
1841                   ELOG (mcm->elog_main, e, 0);
1842                 }
1843               return;
1844             }
1845           break;
1846
1847         case MC_RELAY_STATE_SLAVE:
1848           mcm->relay_state = MC_RELAY_STATE_SLAVE;
1849           timeouts = 0;
1850           break;
1851         }
1852     }
1853 }
1854
1855 static uword
1856 mc_mastership_process (vlib_main_t * vm,
1857                        vlib_node_runtime_t * node, vlib_frame_t * f)
1858 {
1859   mc_main_t *mcm = mc_node_get_main (node);
1860
1861   while (1)
1862     {
1863       switch (mcm->relay_state)
1864         {
1865         case MC_RELAY_STATE_NEGOTIATE:
1866         case MC_RELAY_STATE_MASTER:
1867           this_node_maybe_master (mcm);
1868           break;
1869
1870         case MC_RELAY_STATE_SLAVE:
1871           this_node_slave (mcm);
1872           break;
1873         }
1874     }
1875   return 0;                     /* not likely */
1876 }
1877
1878 void
1879 mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
1880 {
1881   if (we_can_be_master != mcm->we_can_be_relay_master)
1882     {
1883       mcm->we_can_be_relay_master = we_can_be_master;
1884       vlib_process_signal_event (mcm->vlib_main,
1885                                  mcm->mastership_process,
1886                                  MC_RELAY_STATE_NEGOTIATE, 0);
1887     }
1888 }
1889
1890 void
1891 mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
1892                               u32 buffer_index)
1893 {
1894   mc_peer_id_t his_peer_id, our_peer_id;
1895   i32 seq_cmp_result;
1896   u8 signal_slave = 0;
1897   u8 update_global_sequence = 0;
1898
1899   mc_byte_swap_msg_master_assert (mp);
1900
1901   his_peer_id = mp->peer_id;
1902   our_peer_id = mcm->transport.our_ack_peer_id;
1903
1904   /* compare the incoming global sequence with ours */
1905   seq_cmp_result = mc_seq_cmp (mp->global_sequence,
1906                                mcm->relay_global_sequence);
1907
1908   /* If the sender has a lower peer id and the sender's sequence >=
1909      our global sequence, we become a slave.  Otherwise we are master. */
1910   if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
1911       && seq_cmp_result >= 0)
1912     {
1913       vlib_process_signal_event (mcm->vlib_main,
1914                                  mcm->mastership_process,
1915                                  MC_RELAY_STATE_SLAVE, 0);
1916       signal_slave = 1;
1917     }
1918
1919   /* Update our global sequence. */
1920   if (seq_cmp_result > 0)
1921     {
1922       mcm->relay_global_sequence = mp->global_sequence;
1923       update_global_sequence = 1;
1924     }
1925
1926   {
1927     uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
1928     mc_mastership_peer_t *p;
1929
1930     if (q)
1931       p = vec_elt_at_index (mcm->mastership_peers, q[0]);
1932     else
1933       {
1934         vec_add2 (mcm->mastership_peers, p, 1);
1935         p->peer_id = his_peer_id;
1936         mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
1937                    p - mcm->mastership_peers,
1938                    /* old_value */ 0);
1939       }
1940     p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
1941   }
1942
1943   /*
1944    * these messages clog the event log, set MC_EVENT_LOGGING higher
1945    * if you want them.
1946    */
1947   if (MC_EVENT_LOGGING > 1)
1948     {
1949       /* *INDENT-OFF* */
1950       ELOG_TYPE_DECLARE (e) =
1951         {
1952           .format = "rx-massert: peer %s global seq %u upd %d slave %d",
1953           .format_args = "T4i4i1i1",
1954         };
1955       /* *INDENT-ON* */
1956
1957       struct
1958       {
1959         u32 peer;
1960         u32 global_sequence;
1961         u8 update_sequence;
1962         u8 slave;
1963       } *ed;
1964       ed = ELOG_DATA (mcm->elog_main, e);
1965       ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
1966       ed->global_sequence = mp->global_sequence;
1967       ed->update_sequence = update_global_sequence;
1968       ed->slave = signal_slave;
1969     }
1970 }
1971
1972 static void
1973 mc_serialize_init (mc_main_t * mcm)
1974 {
1975   mc_serialize_msg_t *m;
1976   vlib_main_t *vm = vlib_get_main ();
1977
1978   mcm->global_msg_index_by_name
1979     = hash_create_string ( /* elts */ 0, sizeof (uword));
1980
1981   m = vm->mc_msg_registrations;
1982
1983   while (m)
1984     {
1985       m->global_index = vec_len (mcm->global_msgs);
1986       hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
1987       vec_add1 (mcm->global_msgs, m);
1988       m = m->next_registration;
1989     }
1990 }
1991
1992 clib_error_t *
1993 mc_serialize_va (mc_main_t * mc,
1994                  u32 stream_index,
1995                  u32 multiple_messages_per_vlib_buffer,
1996                  mc_serialize_msg_t * msg, va_list * va)
1997 {
1998   mc_stream_t *s;
1999   clib_error_t *error;
2000   serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
2001   vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
2002   u32 bi, n_before, n_after, n_total, n_this_msg;
2003   u32 si, gi;
2004
2005   if (!sbm->vlib_main)
2006     {
2007       sbm->tx.max_n_data_bytes_per_chain = 4096;
2008       sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
2009     }
2010
2011   if (sbm->first_buffer == 0)
2012     serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
2013
2014   n_before = serialize_vlib_buffer_n_bytes (m);
2015
2016   s = mc_stream_by_index (mc, stream_index);
2017   gi = msg->global_index;
2018   ASSERT (msg == vec_elt (mc->global_msgs, gi));
2019
2020   si = ~0;
2021   if (gi < vec_len (s->stream_msg_index_by_global_index))
2022     si = s->stream_msg_index_by_global_index[gi];
2023
2024   serialize_likely_small_unsigned_integer (m, si);
2025
2026   /* For first time message is sent, use name to identify message. */
2027   if (si == ~0 || MSG_ID_DEBUG)
2028     serialize_cstring (m, msg->name);
2029
2030   if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
2031     {
2032       /* *INDENT-OFF* */
2033       ELOG_TYPE_DECLARE (e) =
2034         {
2035           .format = "serialize-msg: %s index %d",
2036           .format_args = "T4i4",
2037         };
2038       /* *INDENT-ON* */
2039       struct
2040       {
2041         u32 c[2];
2042       } *ed;
2043       ed = ELOG_DATA (mc->elog_main, e);
2044       ed->c[0] = elog_id_for_msg_name (mc, msg->name);
2045       ed->c[1] = si;
2046     }
2047
2048   error = va_serialize (m, va);
2049
2050   n_after = serialize_vlib_buffer_n_bytes (m);
2051   n_this_msg = n_after - n_before;
2052   n_total = n_after + sizeof (mc_msg_user_request_t);
2053
2054   /* For max message size ignore first message where string name is sent. */
2055   if (si != ~0)
2056     msg->max_n_bytes_serialized =
2057       clib_max (msg->max_n_bytes_serialized, n_this_msg);
2058
2059   if (!multiple_messages_per_vlib_buffer
2060       || si == ~0
2061       || n_total + msg->max_n_bytes_serialized >
2062       mc->transport.max_packet_size)
2063     {
2064       bi = serialize_close_vlib_buffer (m);
2065       sbm->first_buffer = 0;
2066       if (!error)
2067         mc_stream_send (mc, stream_index, bi);
2068       else if (bi != ~0)
2069         vlib_buffer_free_one (mc->vlib_main, bi);
2070     }
2071
2072   return error;
2073 }
2074
2075 clib_error_t *
2076 mc_serialize_internal (mc_main_t * mc,
2077                        u32 stream_index,
2078                        u32 multiple_messages_per_vlib_buffer,
2079                        mc_serialize_msg_t * msg, ...)
2080 {
2081   vlib_main_t *vm = mc->vlib_main;
2082   va_list va;
2083   clib_error_t *error;
2084
2085   if (stream_index == ~0)
2086     {
2087       if (vm->mc_main && vm->mc_stream_index == ~0)
2088         vlib_current_process_wait_for_one_time_event_vector
2089           (vm, &vm->procs_waiting_for_mc_stream_join);
2090       stream_index = vm->mc_stream_index;
2091     }
2092
2093   va_start (va, msg);
2094   error = mc_serialize_va (mc, stream_index,
2095                            multiple_messages_per_vlib_buffer, msg, &va);
2096   va_end (va);
2097   return error;
2098 }
2099
2100 uword
2101 mc_unserialize_message (mc_main_t * mcm,
2102                         mc_stream_t * s, serialize_main_t * m)
2103 {
2104   mc_serialize_stream_msg_t *sm;
2105   u32 gi, si;
2106
2107   si = unserialize_likely_small_unsigned_integer (m);
2108
2109   if (!(si == ~0 || MSG_ID_DEBUG))
2110     {
2111       sm = vec_elt_at_index (s->stream_msgs, si);
2112       gi = sm->global_index;
2113     }
2114   else
2115     {
2116       char *name;
2117
2118       unserialize_cstring (m, &name);
2119
2120       if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
2121         {
2122           /* *INDENT-OFF* */
2123           ELOG_TYPE_DECLARE (e) =
2124             {
2125               .format = "unserialize-msg: %s rx index %d",
2126               .format_args = "T4i4",
2127             };
2128           /* *INDENT-ON* */
2129           struct
2130           {
2131             u32 c[2];
2132           } *ed;
2133           ed = ELOG_DATA (mcm->elog_main, e);
2134           ed->c[0] = elog_id_for_msg_name (mcm, name);
2135           ed->c[1] = si;
2136         }
2137
2138       {
2139         uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
2140         gi = p ? p[0] : ~0;
2141       }
2142
2143       /* Unknown message? */
2144       if (gi == ~0)
2145         {
2146           vec_free (name);
2147           goto done;
2148         }
2149
2150       vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
2151       si = s->stream_msg_index_by_global_index[gi];
2152
2153       /* Stream local index unknown?  Create it. */
2154       if (si == ~0)
2155         {
2156           vec_add2 (s->stream_msgs, sm, 1);
2157
2158           si = sm - s->stream_msgs;
2159           sm->global_index = gi;
2160           s->stream_msg_index_by_global_index[gi] = si;
2161
2162           if (MC_EVENT_LOGGING > 0)
2163             {
2164               /* *INDENT-OFF* */
2165               ELOG_TYPE_DECLARE (e) =
2166                 {
2167                   .format = "msg-bind: stream %d %s to index %d",
2168                   .format_args = "i4T4i4",
2169                 };
2170               /* *INDENT-ON* */
2171               struct
2172               {
2173                 u32 c[3];
2174               } *ed;
2175               ed = ELOG_DATA (mcm->elog_main, e);
2176               ed->c[0] = s->index;
2177               ed->c[1] = elog_id_for_msg_name (mcm, name);
2178               ed->c[2] = si;
2179             }
2180         }
2181       else
2182         {
2183           sm = vec_elt_at_index (s->stream_msgs, si);
2184           if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
2185             {
2186               /* *INDENT-OFF* */
2187               ELOG_TYPE_DECLARE (e) =
2188                 {
2189                   .format = "msg-id-ERROR: %s index %d expected %d",
2190                   .format_args = "T4i4i4",
2191                 };
2192               /* *INDENT-ON* */
2193               struct
2194               {
2195                 u32 c[3];
2196               } *ed;
2197               ed = ELOG_DATA (mcm->elog_main, e);
2198               ed->c[0] = elog_id_for_msg_name (mcm, name);
2199               ed->c[1] = si;
2200               ed->c[2] = ~0;
2201               if (sm->global_index <
2202                   vec_len (s->stream_msg_index_by_global_index))
2203                 ed->c[2] =
2204                   s->stream_msg_index_by_global_index[sm->global_index];
2205             }
2206         }
2207
2208       vec_free (name);
2209     }
2210
2211   if (gi != ~0)
2212     {
2213       mc_serialize_msg_t *msg;
2214       msg = vec_elt (mcm->global_msgs, gi);
2215       unserialize (m, msg->unserialize, mcm);
2216     }
2217
2218 done:
2219   return gi != ~0;
2220 }
2221
2222 void
2223 mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
2224 {
2225   vlib_main_t *vm = mcm->vlib_main;
2226   serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
2227   vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
2228   mc_stream_and_buffer_t *sb;
2229   mc_stream_t *stream;
2230   u32 buffer_index;
2231
2232   sb =
2233     pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
2234                        stream_and_buffer_index);
2235   buffer_index = sb->buffer_index;
2236   stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
2237   pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
2238
2239   if (stream->config.save_snapshot)
2240     {
2241       u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
2242       static u8 *contents;
2243       vec_reset_length (contents);
2244       vec_validate (contents, n_bytes - 1);
2245       vlib_buffer_contents (vm, buffer_index, contents);
2246       stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
2247                                     n_bytes);
2248     }
2249
2250   ASSERT (vlib_in_process_context (vm));
2251
2252   unserialize_open_vlib_buffer (m, vm, sbm);
2253
2254   clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
2255
2256   while (unserialize_vlib_buffer_n_bytes (m) > 0)
2257     mc_unserialize_message (mcm, stream, m);
2258
2259   /* Frees buffer. */
2260   unserialize_close_vlib_buffer (m);
2261 }
2262
2263 void
2264 mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
2265 {
2266   vlib_main_t *vm = mcm->vlib_main;
2267   mc_stream_and_buffer_t *sb;
2268   pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
2269   sb->stream_index = s->index;
2270   sb->buffer_index = buffer_index;
2271   vlib_process_signal_event (vm, mcm->unserialize_process,
2272                              EVENT_MC_UNSERIALIZE_BUFFER,
2273                              sb - mcm->mc_unserialize_stream_and_buffers);
2274 }
2275
2276 static uword
2277 mc_unserialize_process (vlib_main_t * vm,
2278                         vlib_node_runtime_t * node, vlib_frame_t * f)
2279 {
2280   mc_main_t *mcm = mc_node_get_main (node);
2281   uword event_type, *event_data = 0;
2282   int i;
2283
2284   while (1)
2285     {
2286       if (event_data)
2287         _vec_len (event_data) = 0;
2288
2289       vlib_process_wait_for_event (vm);
2290       event_type = vlib_process_get_events (vm, &event_data);
2291       switch (event_type)
2292         {
2293         case EVENT_MC_UNSERIALIZE_BUFFER:
2294           for (i = 0; i < vec_len (event_data); i++)
2295             mc_unserialize_internal (mcm, event_data[i]);
2296           break;
2297
2298         case EVENT_MC_UNSERIALIZE_CATCHUP:
2299           for (i = 0; i < vec_len (event_data); i++)
2300             {
2301               u8 *mp = uword_to_pointer (event_data[i], u8 *);
2302               perform_catchup (mcm, (void *) mp);
2303               vec_free (mp);
2304             }
2305           break;
2306
2307         default:
2308           break;
2309         }
2310     }
2311
2312   return 0;                     /* not likely */
2313 }
2314
2315 void
2316 serialize_mc_main (serialize_main_t * m, va_list * va)
2317 {
2318   mc_main_t *mcm = va_arg (*va, mc_main_t *);
2319   mc_stream_t *s;
2320   mc_serialize_stream_msg_t *sm;
2321   mc_serialize_msg_t *msg;
2322
2323   serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
2324   vec_foreach (s, mcm->stream_vector)
2325   {
2326     /* Stream name. */
2327     serialize_cstring (m, s->config.name);
2328
2329     /* Serialize global names for all sent messages. */
2330     serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
2331     vec_foreach (sm, s->stream_msgs)
2332     {
2333       msg = vec_elt (mcm->global_msgs, sm->global_index);
2334       serialize_cstring (m, msg->name);
2335     }
2336   }
2337 }
2338
2339 void
2340 unserialize_mc_main (serialize_main_t * m, va_list * va)
2341 {
2342   mc_main_t *mcm = va_arg (*va, mc_main_t *);
2343   u32 i, n_streams, n_stream_msgs;
2344   char *name;
2345   mc_stream_t *s;
2346   mc_serialize_stream_msg_t *sm;
2347
2348   unserialize_integer (m, &n_streams, sizeof (u32));
2349   for (i = 0; i < n_streams; i++)
2350     {
2351       unserialize_cstring (m, &name);
2352       if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
2353         {
2354           vec_validate (mcm->stream_vector, i);
2355           s = vec_elt_at_index (mcm->stream_vector, i);
2356           mc_stream_init (s);
2357           s->index = s - mcm->stream_vector;
2358           s->config.name = name;
2359           s->state = MC_STREAM_STATE_name_known;
2360           hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
2361         }
2362       else
2363         vec_free (name);
2364
2365       s = vec_elt_at_index (mcm->stream_vector, i);
2366
2367       vec_free (s->stream_msgs);
2368       vec_free (s->stream_msg_index_by_global_index);
2369
2370       unserialize_integer (m, &n_stream_msgs, sizeof (u32));
2371       vec_resize (s->stream_msgs, n_stream_msgs);
2372       vec_foreach (sm, s->stream_msgs)
2373       {
2374         uword *p;
2375         u32 si, gi;
2376
2377         unserialize_cstring (m, &name);
2378         p = hash_get (mcm->global_msg_index_by_name, name);
2379         gi = p ? p[0] : ~0;
2380         si = sm - s->stream_msgs;
2381
2382         if (MC_EVENT_LOGGING > 0)
2383           {
2384             /* *INDENT-OFF* */
2385             ELOG_TYPE_DECLARE (e) =
2386               {
2387                 .format = "catchup-bind: %s to %d global index %d stream %d",
2388                 .format_args = "T4i4i4i4",
2389               };
2390             /* *INDENT-ON* */
2391
2392             struct
2393             {
2394               u32 c[4];
2395             } *ed;
2396             ed = ELOG_DATA (mcm->elog_main, e);
2397             ed->c[0] = elog_id_for_msg_name (mcm, name);
2398             ed->c[1] = si;
2399             ed->c[2] = gi;
2400             ed->c[3] = s->index;
2401           }
2402
2403         vec_free (name);
2404
2405         sm->global_index = gi;
2406         if (gi != ~0)
2407           {
2408             vec_validate_init_empty (s->stream_msg_index_by_global_index,
2409                                      gi, ~0);
2410             s->stream_msg_index_by_global_index[gi] = si;
2411           }
2412       }
2413     }
2414 }
2415
2416 void
2417 mc_main_init (mc_main_t * mcm, char *tag)
2418 {
2419   vlib_main_t *vm = vlib_get_main ();
2420
2421   mcm->vlib_main = vm;
2422   mcm->elog_main = &vm->elog_main;
2423
2424   mcm->relay_master_peer_id = ~0ULL;
2425   mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
2426
2427   mcm->stream_index_by_name
2428     = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
2429
2430   {
2431     vlib_node_registration_t r;
2432
2433     memset (&r, 0, sizeof (r));
2434
2435     r.type = VLIB_NODE_TYPE_PROCESS;
2436
2437     /* Point runtime data to main instance. */
2438     r.runtime_data = &mcm;
2439     r.runtime_data_bytes = sizeof (&mcm);
2440
2441     r.name = (char *) format (0, "mc-mastership-%s", tag);
2442     r.function = mc_mastership_process;
2443     mcm->mastership_process = vlib_register_node (vm, &r);
2444
2445     r.name = (char *) format (0, "mc-join-ager-%s", tag);
2446     r.function = mc_join_ager_process;
2447     mcm->join_ager_process = vlib_register_node (vm, &r);
2448
2449     r.name = (char *) format (0, "mc-retry-%s", tag);
2450     r.function = mc_retry_process;
2451     mcm->retry_process = vlib_register_node (vm, &r);
2452
2453     r.name = (char *) format (0, "mc-catchup-%s", tag);
2454     r.function = mc_catchup_process;
2455     mcm->catchup_process = vlib_register_node (vm, &r);
2456
2457     r.name = (char *) format (0, "mc-unserialize-%s", tag);
2458     r.function = mc_unserialize_process;
2459     mcm->unserialize_process = vlib_register_node (vm, &r);
2460   }
2461
2462   if (MC_EVENT_LOGGING > 0)
2463     mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
2464                 sizeof (mc_peer_id_t));
2465
2466   mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
2467               sizeof (mc_peer_id_t));
2468   mc_serialize_init (mcm);
2469 }
2470
2471 static u8 *
2472 format_mc_relay_state (u8 * s, va_list * args)
2473 {
2474   mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
2475   char *t = 0;
2476   switch (state)
2477     {
2478     case MC_RELAY_STATE_NEGOTIATE:
2479       t = "negotiate";
2480       break;
2481     case MC_RELAY_STATE_MASTER:
2482       t = "master";
2483       break;
2484     case MC_RELAY_STATE_SLAVE:
2485       t = "slave";
2486       break;
2487     default:
2488       return format (s, "unknown 0x%x", state);
2489     }
2490
2491   return format (s, "%s", t);
2492 }
2493
2494 static u8 *
2495 format_mc_stream_state (u8 * s, va_list * args)
2496 {
2497   mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
2498   char *t = 0;
2499   switch (state)
2500     {
2501 #define _(f) case MC_STREAM_STATE_##f: t = #f; break;
2502       foreach_mc_stream_state
2503 #undef _
2504     default:
2505       return format (s, "unknown 0x%x", state);
2506     }
2507
2508   return format (s, "%s", t);
2509 }
2510
2511 static int
2512 mc_peer_comp (void *a1, void *a2)
2513 {
2514   mc_stream_peer_t *p1 = a1;
2515   mc_stream_peer_t *p2 = a2;
2516
2517   return mc_peer_id_compare (p1->id, p2->id);
2518 }
2519
2520 u8 *
2521 format_mc_main (u8 * s, va_list * args)
2522 {
2523   mc_main_t *mcm = va_arg (*args, mc_main_t *);
2524   mc_stream_t *t;
2525   mc_stream_peer_t *p, *ps;
2526   uword indent = format_get_indent (s);
2527
2528   s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
2529               format_mc_relay_state, mcm->relay_state,
2530               vec_len (mcm->stream_vector), mcm->relay_global_sequence);
2531
2532   {
2533     mc_mastership_peer_t *mp;
2534     f64 now = vlib_time_now (mcm->vlib_main);
2535     s = format (s, "\n%UMost recent mastership peers:",
2536                 format_white_space, indent + 2);
2537     vec_foreach (mp, mcm->mastership_peers)
2538     {
2539       s = format (s, "\n%U%-30U%.4e",
2540                   format_white_space, indent + 4,
2541                   mcm->transport.format_peer_id, mp->peer_id,
2542                   now - mp->time_last_master_assert_received);
2543     }
2544   }
2545
2546   vec_foreach (t, mcm->stream_vector)
2547   {
2548     s = format (s, "\n%Ustream `%s' index %d",
2549                 format_white_space, indent + 2, t->config.name, t->index);
2550
2551     s = format (s, "\n%Ustate %U",
2552                 format_white_space, indent + 4,
2553                 format_mc_stream_state, t->state);
2554
2555     s =
2556       format (s,
2557               "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2558               format_white_space, indent + 4, t->config.retry_interval,
2559               t->config.retry_limit, pool_elts (t->retry_pool),
2560               t->stats.n_retries - t->stats_last_clear.n_retries);
2561
2562     s = format (s, "\n%U%Ld/%Ld user requests sent/received",
2563                 format_white_space, indent + 4,
2564                 t->user_requests_sent, t->user_requests_received);
2565
2566     s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
2567                 format_white_space, indent + 4,
2568                 pool_elts (t->peers),
2569                 t->our_local_sequence, t->last_global_sequence_processed);
2570
2571     ps = 0;
2572     /* *INDENT-OFF* */
2573     pool_foreach (p, t->peers,
2574     ({
2575       if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2576         vec_add1 (ps, p[0]);
2577     }));
2578     /* *INDENT-ON* */
2579     vec_sort_with_function (ps, mc_peer_comp);
2580     s = format (s, "\n%U%=30s%10s%16s%16s",
2581                 format_white_space, indent + 6,
2582                 "Peer", "Last seq", "Retries", "Future");
2583
2584     vec_foreach (p, ps)
2585     {
2586       s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
2587                   format_white_space, indent + 6,
2588                   mcm->transport.format_peer_id, p->id.as_u64,
2589                   p->last_sequence_received,
2590                   p->stats.n_msgs_from_past -
2591                   p->stats_last_clear.n_msgs_from_past,
2592                   p->stats.n_msgs_from_future -
2593                   p->stats_last_clear.n_msgs_from_future,
2594                   (mcm->transport.our_ack_peer_id.as_u64 ==
2595                    p->id.as_u64 ? " (self)" : ""));
2596     }
2597     vec_free (ps);
2598   }
2599
2600   return s;
2601 }
2602
2603 /*
2604  * fd.io coding-style-patch-verification: ON
2605  *
2606  * Local Variables:
2607  * eval: (c-set-style "gnu")
2608  * End:
2609  */