Initial commit of vpp code.
[vpp.git] / vlib / vlib / mc.c
1 /*
2  * mc.c: vlib reliable sequenced multicast distributed applications
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <vlib/vlib.h>
19
20 /* 
21  * 1 to enable msg id training wheels, which are useful for tracking
22  * down catchup and/or partitioned network problems
23  */
24 #define MSG_ID_DEBUG 0
25
26 static format_function_t format_mc_stream_state;
27
28 static u32 elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
29 {
30   uword * p, r;
31   mhash_t * h = &m->elog_id_by_peer_id;
32
33   if (! m->elog_id_by_peer_id.hash)
34     mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
35   
36   p = mhash_get (h, &peer_id);
37   if (p)
38     return p[0];
39   r = elog_string (m->elog_main, "%U",
40                    m->transport.format_peer_id, peer_id);
41   mhash_set (h, &peer_id, r, /* old_value */ 0);
42   return r;
43 }
44
45 static u32 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
46 {
47   uword * p, r;
48   uword * h = m->elog_id_by_msg_name;
49   u8 *name_copy;
50
51   if (! h)
52       h = m->elog_id_by_msg_name 
53         = hash_create_string (0, sizeof (uword));
54
55   p = hash_get_mem (h, msg_name);
56   if (p)
57     return p[0];
58   r = elog_string (m->elog_main, "%s", msg_name);
59
60   name_copy = format (0, "%s%c", msg_name, 0);
61
62   hash_set_mem (h, name_copy, r);
63   m->elog_id_by_msg_name = h;
64
65   return r;
66 }
67
68 static void elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, u32 retry_count)
69 {
70   if (MC_EVENT_LOGGING > 0)
71     {
72       ELOG_TYPE_DECLARE (e) = {
73         .format = "tx-msg: stream %d local seq %d attempt %d",
74         .format_args = "i4i4i4",
75       };
76       struct { u32 stream_id, local_sequence, retry_count; } * ed;
77       ed = ELOG_DATA (m->elog_main, e);
78       ed->stream_id = stream_id;
79       ed->local_sequence = local_sequence;
80       ed->retry_count = retry_count;
81     }
82 }
83
84 /*
85  * seq_cmp
86  * correctly compare two unsigned sequence numbers. 
87  * This function works so long as x and y are within 2**(n-1) of each
88  * other, where n = bits(x, y).
89  *
90  * Magic decoder ring:
91  * seq_cmp == 0 => x and y are equal
92  * seq_cmp < 0 => x is "in the past" with respect to y
93  * seq_cmp > 0 => x is "in the future" with respect to y
94  */
95 always_inline i32 mc_seq_cmp (u32 x, u32 y)
96 { return (i32) x - (i32) y;}
97
98 void * mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
99 {
100   u32 n_alloc, bi;
101   vlib_buffer_t * b;
102
103   n_alloc = vlib_buffer_alloc (vm, &bi, 1);
104   ASSERT (n_alloc == 1);
105
106   b = vlib_get_buffer (vm, bi);
107   b->current_length = n_bytes;
108   *bi_return = bi;
109   return (void *) b->data;
110 }
111
112 static void
113 delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
114                         uword index,
115                         int notify_application)
116 {
117   mc_stream_peer_t * p = pool_elt_at_index (s->peers, index);
118   ASSERT (p != 0);
119   if (s->config.peer_died && notify_application)
120     s->config.peer_died (mcm, s, p->id);
121
122   s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
123
124   if (MC_EVENT_LOGGING > 0)
125     {
126       ELOG_TYPE_DECLARE (e) = {
127         .format = "delete peer %s from all_peer_bitmap",
128         .format_args = "T4",
129       };
130       struct { u32 peer; } * ed = 0;
131
132       ed = ELOG_DATA (mcm->elog_main, e);
133       ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
134     }
135   /* Do not delete the pool / hash table entries, or we lose sequence number state */
136 }
137
138 static mc_stream_peer_t *
139 get_or_create_peer_with_id (mc_main_t * mcm,
140                             mc_stream_t * s, mc_peer_id_t id,
141                             int * created)
142 {
143   uword * q = mhash_get (&s->peer_index_by_id, &id);
144   mc_stream_peer_t * p;
145
146   if (q)
147     {
148       p = pool_elt_at_index (s->peers, q[0]);
149       goto done;
150     }
151
152   pool_get (s->peers, p);
153   memset (p, 0, sizeof (p[0]));
154   p->id = id;
155   p->last_sequence_received = ~0;
156   mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
157   if (created)
158     *created = 1;
159
160  done:
161   if (MC_EVENT_LOGGING > 0)
162     {
163       ELOG_TYPE_DECLARE (e) = {
164         .format = "get_or_create %s peer %s stream %d seq %d",
165         .format_args = "t4T4i4i4",
166         .n_enum_strings = 2,
167         .enum_strings = { "old", "new", },
168       };
169       struct { u32 is_new, peer, stream_index, rx_sequence; } * ed = 0;
170
171       ed = ELOG_DATA (mcm->elog_main, e);
172       ed->is_new = q ? 0 : 1;
173       ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
174       ed->stream_index = s->index;
175       ed->rx_sequence = p->last_sequence_received;
176     }
177   /* $$$$ Enable or reenable this peer */
178   s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
179   return p;
180 }
181
182 static void maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
183 {
184   vlib_one_time_waiting_process_t * p;
185
186   if (pool_elts (stream->retry_pool) >= stream->config.window_size)
187     return;
188
189   vec_foreach (p, stream->procs_waiting_for_open_window)
190     vlib_signal_one_time_waiting_process (vm, p);
191
192   if (stream->procs_waiting_for_open_window)
193     _vec_len (stream->procs_waiting_for_open_window) = 0;
194 }
195
196 static void mc_retry_free (mc_main_t * mcm, mc_stream_t *s, mc_retry_t * r)
197 {
198   mc_retry_t record, *retp;
199   
200   if (r->unacked_by_peer_bitmap)
201     _vec_len (r->unacked_by_peer_bitmap) = 0;
202
203   if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size) 
204     {
205       clib_fifo_sub1 (s->retired_fifo, record);
206       vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
207     }
208   
209   clib_fifo_add2 (s->retired_fifo, retp);
210   
211   retp->buffer_index = r->buffer_index;
212   retp->local_sequence = r->local_sequence;
213
214   r->buffer_index = ~0;         /* poison buffer index in this retry */
215 }
216
217 static void mc_resend_retired (mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
218 {
219   mc_retry_t *retry;
220
221   if (MC_EVENT_LOGGING > 0)
222     {
223       ELOG_TYPE_DECLARE (e) = {
224         .format = "resend-retired: search for local seq %d",
225         .format_args = "i4",
226       };
227       struct { u32 local_sequence; } * ed;
228       ed = ELOG_DATA (mcm->elog_main, e);
229       ed->local_sequence = local_sequence;
230     }
231
232   clib_fifo_foreach 
233     (retry, s->retired_fifo,
234      ({
235        if (retry->local_sequence == local_sequence)
236          {
237            elog_tx_msg (mcm, s->index, retry->local_sequence, -13);
238
239            mcm->transport.tx_buffer
240              (mcm->transport.opaque,
241               MC_TRANSPORT_USER_REQUEST_TO_RELAY,
242               retry->buffer_index);
243            return;
244          }
245      }));
246
247   if (MC_EVENT_LOGGING > 0)
248     {
249       ELOG_TYPE_DECLARE (e) = {
250         .format = "resend-retired: FAILED search for local seq %d",
251         .format_args = "i4",
252       };
253       struct { u32 local_sequence; } * ed;
254       ed = ELOG_DATA (mcm->elog_main, e);
255       ed->local_sequence = local_sequence;
256     }
257 }
258
259 static uword *
260 delete_retry_fifo_elt (mc_main_t * mcm,
261                        mc_stream_t * stream,
262                        mc_retry_t * r,
263                        uword * dead_peer_bitmap)
264 {
265   mc_stream_peer_t * p;
266
267   pool_foreach (p, stream->peers, ({
268     uword pi = p - stream->peers;
269     uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
270
271     if (! is_alive)
272       dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
273
274     if (MC_EVENT_LOGGING > 0)
275       {
276         ELOG_TYPE_DECLARE (e) = {
277           .format = "delete_retry_fifo_elt: peer %s is %s",
278           .format_args = "T4t4",
279           .n_enum_strings = 2,
280           .enum_strings = { "alive", "dead", },
281         };
282         struct { u32 peer, is_alive; } * ed;
283         ed = ELOG_DATA (mcm->elog_main, e);
284         ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
285         ed->is_alive = is_alive;
286       }
287   }));
288     
289   hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
290   mc_retry_free (mcm, stream, r);
291
292   return dead_peer_bitmap;
293 }
294
295 always_inline mc_retry_t *
296 prev_retry (mc_stream_t * s, mc_retry_t * r)
297 {
298   return (r->prev_index != ~0
299           ? pool_elt_at_index (s->retry_pool, r->prev_index)
300           : 0);
301 }
302
303 always_inline mc_retry_t *
304 next_retry (mc_stream_t * s, mc_retry_t * r)
305 {
306   return (r->next_index != ~0
307           ? pool_elt_at_index (s->retry_pool, r->next_index)
308           : 0);
309 }
310
311 always_inline void
312 remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
313 {
314   mc_retry_t * p = prev_retry (s, r);
315   mc_retry_t * n = next_retry (s, r);
316
317   if (p)
318     p->next_index = r->next_index;
319   else
320     s->retry_head_index = r->next_index;
321   if (n)
322     n->prev_index = r->prev_index;
323   else
324     s->retry_tail_index = r->prev_index;
325
326   pool_put_index (s->retry_pool, r - s->retry_pool);
327 }
328
329 static void check_retry (mc_main_t * mcm, mc_stream_t * s)
330 {
331   mc_retry_t * r;
332   vlib_main_t * vm = mcm->vlib_main;
333   f64 now = vlib_time_now(vm);
334   uword * dead_peer_bitmap = 0;
335   u32 ri, ri_next;
336
337   for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
338     {
339       r = pool_elt_at_index (s->retry_pool, ri);
340       ri_next = r->next_index;
341
342       if (now < r->sent_at + s->config.retry_interval)
343         continue;
344
345       r->n_retries += 1;
346       if (r->n_retries > s->config.retry_limit)
347         {
348           dead_peer_bitmap =
349             delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
350           remove_retry_from_pool (s, r);
351         }
352       else
353         {
354           if (MC_EVENT_LOGGING > 0)
355             {
356               mc_stream_peer_t * p;
357               ELOG_TYPE_DECLARE (t) = {
358                 .format = "resend local seq %d attempt %d",
359                 .format_args = "i4i4",
360               };
361
362               pool_foreach (p, s->peers, ({
363                 if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
364                   {
365                     ELOG_TYPE_DECLARE (ev) = {
366                       .format = "resend: needed by peer %s local seq %d",
367                       .format_args = "T4i4",
368                     };
369                     struct { u32 peer, rx_sequence; } * ed;
370                     ed = ELOG_DATA (mcm->elog_main, ev);
371                     ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
372                     ed->rx_sequence = r->local_sequence;
373                   }
374               }));
375
376               struct { u32 sequence; u32 trail; } * ed;
377               ed = ELOG_DATA (mcm->elog_main, t);
378               ed->sequence = r->local_sequence;
379               ed->trail = r->n_retries;
380             }
381
382           r->sent_at = vlib_time_now (vm);
383           s->stats.n_retries += 1;
384
385           elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
386
387           mcm->transport.tx_buffer
388           (mcm->transport.opaque,
389            MC_TRANSPORT_USER_REQUEST_TO_RELAY,
390            r->buffer_index);
391         }
392     }
393
394   maybe_send_window_open_event (mcm->vlib_main, s);
395
396   /* Delete any dead peers we've found. */
397   if (! clib_bitmap_is_zero (dead_peer_bitmap))
398     {
399       uword i;
400
401       clib_bitmap_foreach (i, dead_peer_bitmap, ({
402         delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
403
404         /* Delete any references to just deleted peer in retry pool. */
405         pool_foreach (r, s->retry_pool, ({
406           r->unacked_by_peer_bitmap =
407             clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
408         }));
409       }));
410       clib_bitmap_free (dead_peer_bitmap);
411     }
412 }
413
414 always_inline mc_main_t *
415 mc_node_get_main (vlib_node_runtime_t * node)
416 {
417   mc_main_t ** p = (void *) node->runtime_data;
418   return p[0];
419 }
420
421 static uword
422 mc_retry_process (vlib_main_t * vm,
423                   vlib_node_runtime_t * node,
424                   vlib_frame_t * f)
425 {
426   mc_main_t * mcm = mc_node_get_main (node);
427   mc_stream_t * s;
428     
429   while (1)
430     {
431       vlib_process_suspend (vm, 1.0);
432       vec_foreach (s, mcm->stream_vector)
433         {
434           if (s->state != MC_STREAM_STATE_invalid)
435             check_retry (mcm, s);
436         }
437     }
438   return 0; /* not likely */
439 }
440
441 static void send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
442 {
443   vlib_main_t * vm = mcm->vlib_main;
444   mc_msg_join_or_leave_request_t * mp;
445   u32 bi;
446
447   mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
448   memset(mp, 0, sizeof (*mp));
449   mp->type = MC_MSG_TYPE_join_or_leave_request;
450   mp->peer_id = mcm->transport.our_ack_peer_id;
451   mp->stream_index = stream_index;
452   mp->is_join = is_join;
453
454   mc_byte_swap_msg_join_or_leave_request (mp);
455
456   /* 
457    * These msgs are unnumbered, unordered so send on the from-relay
458    * channel. 
459    */
460   mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
461 }    
462
463 static uword
464 mc_join_ager_process (vlib_main_t * vm,
465                       vlib_node_runtime_t * node,
466                       vlib_frame_t * f)
467 {
468   mc_main_t * mcm = mc_node_get_main (node);
469     
470   while (1)
471     {
472       if (mcm->joins_in_progress)
473         {
474           mc_stream_t * s;
475           vlib_one_time_waiting_process_t * p;
476           f64 now = vlib_time_now (vm);
477
478           vec_foreach (s, mcm->stream_vector)
479             {
480               if (s->state != MC_STREAM_STATE_join_in_progress)
481                 continue;
482
483               if (now > s->join_timeout)
484                 {
485                   s->state = MC_STREAM_STATE_ready;
486
487                   if (MC_EVENT_LOGGING > 0)
488                     {
489                       ELOG_TYPE_DECLARE (e) = {
490                         .format = "stream %d join timeout",
491                       };
492                       ELOG (mcm->elog_main, e, s->index);
493                     }
494                   /* Make sure that this app instance exists as a stream peer,
495                      or we may answer a catchup request with a NULL
496                      all_peer_bitmap... */
497                   (void) get_or_create_peer_with_id 
498                       (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
499
500                   vec_foreach (p, s->procs_waiting_for_join_done)
501                     vlib_signal_one_time_waiting_process (vm, p);
502                   if (s->procs_waiting_for_join_done)
503                     _vec_len (s->procs_waiting_for_join_done) = 0;
504
505                   mcm->joins_in_progress--;
506                   ASSERT (mcm->joins_in_progress >= 0);
507                 }
508               else
509                 {
510                   /* Resent join request which may have been lost. */
511                   send_join_or_leave_request (mcm, s->index, 
512                                               1 /* is_join */);
513  
514                   /* We're *not* alone, retry for as long as it takes */
515                   if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
516                     s->join_timeout = vlib_time_now (vm) + 2.0;
517
518
519                   if (MC_EVENT_LOGGING > 0)
520                     {
521                       ELOG_TYPE_DECLARE (e) = {
522                         .format = "stream %d resend join request",
523                       };
524                       ELOG (mcm->elog_main, e, s->index);
525                     }
526                 }
527             }
528         }
529
530       vlib_process_suspend (vm, .5);
531     }
532
533   return 0; /* not likely */
534 }
535
536 static void serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
537 {
538   char * name = va_arg (*va, char *);
539   serialize_cstring (m, name);
540 }
541
542 static void elog_stream_name (char * buf, int n_buf_bytes, char * v)
543 {
544   memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
545   buf[n_buf_bytes - 1] = 0;
546 }
547
548 static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
549 {
550   mc_main_t * mcm = va_arg (*va, mc_main_t *);
551   char * name;
552   mc_stream_t * s;
553   uword * p;
554
555   unserialize_cstring (m, &name);
556
557   if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
558     {
559       if (MC_EVENT_LOGGING > 0)
560         {
561           ELOG_TYPE_DECLARE (e) = {
562             .format = "stream index %d already named %s",
563             .format_args = "i4s16",
564           };
565           struct { u32 stream_index; char name[16]; } * ed;
566           ed = ELOG_DATA (mcm->elog_main, e);
567           ed->stream_index = p[0];
568           elog_stream_name (ed->name, sizeof (ed->name), name);
569         }
570
571       vec_free (name);
572       return;
573     }
574
575   vec_add2 (mcm->stream_vector, s, 1);
576   mc_stream_init (s);
577   s->state = MC_STREAM_STATE_name_known;
578   s->index = s - mcm->stream_vector;
579   s->config.name = name;
580
581   if (MC_EVENT_LOGGING > 0)
582     {
583       ELOG_TYPE_DECLARE (e) = {
584         .format = "stream index %d named %s",
585         .format_args = "i4s16",
586       };
587       struct { u32 stream_index; char name[16]; } * ed;
588       ed = ELOG_DATA (mcm->elog_main, e);
589       ed->stream_index = s->index;
590       elog_stream_name (ed->name, sizeof (ed->name), name);
591     }
592
593   hash_set_mem (mcm->stream_index_by_name, name, s->index);
594
595   p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
596   if (p)
597     {
598       vlib_one_time_waiting_process_t * wp, ** w;
599       w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
600       vec_foreach (wp, w[0])
601         vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
602       pool_put (mcm->procs_waiting_for_stream_name_pool, w);
603       hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
604     }
605 }
606
607 MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = {
608   .name = "mc_register_stream_name",
609   .serialize = serialize_mc_register_stream_name,
610   .unserialize = unserialize_mc_register_stream_name,
611 };
612
613 void
614 mc_rx_buffer_unserialize (mc_main_t * mcm,
615                           mc_stream_t * stream,
616                           mc_peer_id_t peer_id,
617                           u32 buffer_index)
618 { return mc_unserialize (mcm, stream, buffer_index); }
619
620 static u8 *
621 mc_internal_catchup_snapshot (mc_main_t * mcm,
622                               u8 * data_vector,
623                               u32 last_global_sequence_processed)
624 {
625   serialize_main_t m;
626
627   /* Append serialized data to data vector. */
628   serialize_open_vector (&m, data_vector);
629   m.stream.current_buffer_index = vec_len (data_vector);
630
631   serialize (&m, serialize_mc_main, mcm);
632   return serialize_close_vector (&m);
633 }
634
635 static void
636 mc_internal_catchup (mc_main_t * mcm,
637                      u8 * data,
638                      u32 n_data_bytes)
639 {
640   serialize_main_t s;
641
642   unserialize_open_data (&s, data, n_data_bytes);
643
644   unserialize (&s, unserialize_mc_main, mcm);
645 }
646
647 /* Overridden from the application layer, not actually used here */
648 void mc_stream_join_process_hold (void) __attribute__ ((weak));
649 void mc_stream_join_process_hold (void) { }
650
651 static u32
652 mc_stream_join_helper (mc_main_t * mcm,
653                        mc_stream_config_t * config,
654                        u32 is_internal)
655 {
656   mc_stream_t * s;
657   vlib_main_t * vm = mcm->vlib_main;
658     
659   s = 0;
660   if (! is_internal)
661     {
662       uword * p;
663
664       /* Already have a stream with given name? */
665       if ((s = mc_stream_by_name (mcm, config->name)))
666         {
667           /* Already joined and ready? */
668           if (s->state == MC_STREAM_STATE_ready)
669             return s->index;
670         }
671
672       /* First join MC internal stream. */
673       if (! mcm->stream_vector
674           || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
675               == MC_STREAM_STATE_invalid))
676         {
677           static mc_stream_config_t c = {
678             .name = "mc-internal",
679             .rx_buffer = mc_rx_buffer_unserialize,
680             .catchup = mc_internal_catchup,
681             .catchup_snapshot = mc_internal_catchup_snapshot,
682           };
683
684           c.save_snapshot = config->save_snapshot;
685
686           mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
687         }
688
689       /* If stream is still unknown register this name and wait for
690          sequenced message to name stream.  This way all peers agree
691          on stream name to index mappings. */
692       s = mc_stream_by_name (mcm, config->name);
693       if (! s)
694         {
695           vlib_one_time_waiting_process_t * wp, ** w;
696           u8 * name_copy = format (0, "%s", config->name);
697
698           mc_serialize_stream (mcm,
699                                MC_STREAM_INDEX_INTERNAL,
700                                &mc_register_stream_name_msg,
701                                config->name);
702
703           /* Wait for this stream to be named. */
704           p = hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, name_copy);
705           if (p)
706             w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
707           else
708             {
709               pool_get (mcm->procs_waiting_for_stream_name_pool, w);
710               if (! mcm->procs_waiting_for_stream_name_by_name)
711                 mcm->procs_waiting_for_stream_name_by_name
712                   = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
713               hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
714                             name_copy,
715                             w - mcm->procs_waiting_for_stream_name_pool);
716               w[0] = 0;
717             }
718
719           vec_add2 (w[0], wp, 1);
720           vlib_current_process_wait_for_one_time_event (vm, wp);
721           vec_free (name_copy);
722         }
723
724       /* Name should be known now. */
725       s = mc_stream_by_name (mcm, config->name);
726       ASSERT (s != 0);
727       ASSERT (s->state == MC_STREAM_STATE_name_known);
728     }
729
730   if (! s)
731     {
732       vec_add2 (mcm->stream_vector, s, 1);
733       mc_stream_init (s);
734       s->index = s - mcm->stream_vector;
735     }
736
737   {
738     /* Save name since we could have already used it as hash key. */
739     char * name_save = s->config.name;
740
741     s->config = config[0];
742
743     if (name_save)
744       s->config.name = name_save;
745   }
746
747   if (s->config.window_size == 0)
748     s->config.window_size = 8;
749
750   if (s->config.retry_interval == 0.0)
751       s->config.retry_interval = 1.0;
752
753   /* Sanity. */
754   ASSERT (s->config.retry_interval < 30);
755
756   if (s->config.retry_limit == 0)
757       s->config.retry_limit = 7;
758
759   s->state = MC_STREAM_STATE_join_in_progress;
760   if (! s->peer_index_by_id.hash)
761     mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
762
763   /* If we don't hear from someone in 5 seconds, we're alone */
764   s->join_timeout = vlib_time_now (vm) + 5.0;
765   mcm->joins_in_progress++;
766
767   if (MC_EVENT_LOGGING > 0)
768     {
769       ELOG_TYPE_DECLARE (e) = {
770         .format = "stream index %d join request %s",
771         .format_args = "i4s16",
772       };
773       struct { u32 stream_index; char name[16]; } * ed;
774       ed = ELOG_DATA (mcm->elog_main, e);
775       ed->stream_index = s->index;
776       elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
777     }
778
779   send_join_or_leave_request (mcm, s->index, 1 /* join */);
780     
781   vlib_current_process_wait_for_one_time_event_vector
782     (vm, &s->procs_waiting_for_join_done);
783     
784   if (MC_EVENT_LOGGING)
785     {
786       ELOG_TYPE (e, "join complete stream %d");
787       ELOG (mcm->elog_main, e, s->index);
788     }
789
790   return s->index;
791 }
792
793 u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
794 { return mc_stream_join_helper (mcm, config, /* is_internal */ 0); }
795
796 void mc_stream_leave (mc_main_t * mcm, u32 stream_index)
797 {
798   mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
799   
800   if (! s)
801     return;
802
803   if (MC_EVENT_LOGGING)
804     {
805       ELOG_TYPE_DECLARE (t) = {
806         .format = "leave-stream: %d",
807         .format_args = "i4",
808       };
809       struct { u32 index; } * ed;
810       ed = ELOG_DATA (mcm->elog_main, t);
811       ed->index = stream_index;
812     }
813
814   send_join_or_leave_request (mcm, stream_index, 0 /* is_join */);
815   mc_stream_free (s);
816   s->state = MC_STREAM_STATE_name_known;
817 }
818
819 void mc_msg_join_or_leave_request_handler (mc_main_t * mcm, 
820                                            mc_msg_join_or_leave_request_t * req, 
821                                            u32 buffer_index)
822 {
823   mc_stream_t * s;
824   mc_msg_join_reply_t * rep;
825   u32 bi;
826
827   mc_byte_swap_msg_join_or_leave_request (req);
828
829   s = mc_stream_by_index (mcm, req->stream_index);
830   if (! s || s->state != MC_STREAM_STATE_ready)
831     return;
832
833   /* If the peer is joining, create it */
834   if (req->is_join) 
835     {
836       mc_stream_t * this_s;
837
838       /* We're not in a position to catch up a peer until all
839          stream joins are complete. */
840       if (0)
841         {
842           /* XXX This is hard to test so we've. */
843           vec_foreach (this_s, mcm->stream_vector)
844             {
845               if (this_s->state != MC_STREAM_STATE_ready
846                   && this_s->state != MC_STREAM_STATE_name_known)
847                 return;
848             }
849         }
850       else
851         if (mcm->joins_in_progress > 0)
852           return;
853       
854       (void) get_or_create_peer_with_id (mcm,
855                                          s,
856                                          req->peer_id,
857                                          /* created */ 0);
858
859       rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
860       memset (rep, 0, sizeof (rep[0]));
861       rep->type = MC_MSG_TYPE_join_reply;
862       rep->stream_index = req->stream_index;
863       
864       mc_byte_swap_msg_join_reply (rep);
865       /* These two are already in network byte order... */
866       rep->peer_id = mcm->transport.our_ack_peer_id;
867       rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
868
869       mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
870     }
871   else
872     {
873       if (s->config.peer_died)
874         s->config.peer_died (mcm, s, req->peer_id);
875     }
876 }
877
878 void mc_msg_join_reply_handler (mc_main_t * mcm,
879                                 mc_msg_join_reply_t * mp,
880                                 u32 buffer_index)
881 {
882   mc_stream_t * s;
883
884   mc_byte_swap_msg_join_reply (mp);
885
886   s = mc_stream_by_index (mcm, mp->stream_index);
887
888   if (! s || s->state != MC_STREAM_STATE_join_in_progress)
889     return;
890
891   /* Switch to catchup state; next join reply 
892      for this stream will be ignored. */
893   s->state = MC_STREAM_STATE_catchup;
894
895   mcm->joins_in_progress--;
896   mcm->transport.catchup_request_fun (mcm->transport.opaque,
897                                       mp->stream_index,
898                                       mp->catchup_peer_id);
899 }
900
901 void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name)
902 {
903   mc_stream_t * s;
904
905   while (1)
906     {
907       s = mc_stream_by_name (m, stream_name);
908       if (s)
909         break;
910       vlib_process_suspend (m->vlib_main, .1);
911     }
912
913   /* It's OK to send a message in catchup and ready states. */
914   if (s->state == MC_STREAM_STATE_catchup
915       || s->state == MC_STREAM_STATE_ready)
916     return;
917
918   /* Otherwise we are waiting for a join to finish. */
919   vlib_current_process_wait_for_one_time_event_vector
920     (m->vlib_main, &s->procs_waiting_for_join_done);
921 }
922
923 u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
924 {
925   mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
926   vlib_main_t * vm = mcm->vlib_main;
927   mc_retry_t * r;
928   mc_msg_user_request_t * mp;    
929   vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
930   u32 ri;
931
932   if (! s)
933     return 0;
934
935   if (s->state != MC_STREAM_STATE_ready)
936     vlib_current_process_wait_for_one_time_event_vector
937       (vm, &s->procs_waiting_for_join_done);
938
939   while (pool_elts (s->retry_pool) >= s->config.window_size)
940     {
941       vlib_current_process_wait_for_one_time_event_vector
942         (vm, &s->procs_waiting_for_open_window);
943     }
944
945   pool_get (s->retry_pool, r);
946   ri = r - s->retry_pool;
947
948   r->prev_index = s->retry_tail_index;
949   r->next_index = ~0;
950   s->retry_tail_index = ri;
951
952   if (r->prev_index == ~0)
953     s->retry_head_index = ri;
954   else
955     {
956       mc_retry_t * p = pool_elt_at_index (s->retry_pool, r->prev_index);
957       p->next_index = ri;
958     }
959
960   vlib_buffer_advance (b, -sizeof (mp[0]));
961   mp = vlib_buffer_get_current (b);
962
963   mp->peer_id = mcm->transport.our_ack_peer_id;
964   /* mp->transport.global_sequence set by relay agent. */
965   mp->global_sequence = 0xdeadbeef;
966   mp->stream_index = s->index;
967   mp->local_sequence = s->our_local_sequence++;
968   mp->n_data_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
969
970   r->buffer_index = buffer_index;
971   r->local_sequence = mp->local_sequence;
972   r->sent_at = vlib_time_now(vm);
973   r->n_retries = 0;
974
975   /* Retry will be freed when all currently known peers have acked. */
976   vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
977   vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
978
979   hash_set (s->retry_index_by_local_sequence, r->local_sequence, r - s->retry_pool);
980
981   elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
982
983   mc_byte_swap_msg_user_request (mp);
984
985   mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
986
987   s->user_requests_sent++;
988
989   /* return amount of window remaining */
990   return s->config.window_size - pool_elts (s->retry_pool);
991 }
992
993 void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u32 buffer_index)
994 {
995   vlib_main_t * vm = mcm->vlib_main;
996   mc_stream_t * s;
997   mc_stream_peer_t * peer;
998   i32 seq_cmp_result;
999   static int once=0;
1000
1001   mc_byte_swap_msg_user_request (mp);
1002
1003   s = mc_stream_by_index (mcm, mp->stream_index);
1004
1005   /* Not signed up for this stream? Turf-o-matic */
1006   if (! s || s->state != MC_STREAM_STATE_ready)
1007     {
1008       vlib_buffer_free_one (vm, buffer_index);
1009       return;
1010     }
1011
1012   /* Find peer, including ourselves. */
1013   peer = get_or_create_peer_with_id (mcm,
1014                                      s, mp->peer_id,
1015                                      /* created */ 0);
1016
1017   seq_cmp_result = mc_seq_cmp (mp->local_sequence, 
1018                                peer->last_sequence_received + 1);
1019
1020   if (MC_EVENT_LOGGING > 0)
1021     {
1022       ELOG_TYPE_DECLARE (e) = {
1023         .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1024         .format_args = "T4i4i4i4",
1025       };
1026       struct { u32 peer, stream_index, rx_sequence; i32 seq_cmp_result; } * ed;
1027       ed = ELOG_DATA (mcm->elog_main, e);
1028       ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1029       ed->stream_index = mp->stream_index;
1030       ed->rx_sequence = mp->local_sequence;
1031       ed->seq_cmp_result = seq_cmp_result;
1032     }
1033
1034   if (0 && mp->stream_index == 1 && once == 0) 
1035     {
1036       once = 1;
1037       ELOG_TYPE (e, "FAKE lost msg on stream 1");
1038       ELOG (mcm->elog_main,e,0);
1039       return;
1040     }
1041
1042   peer->last_sequence_received += seq_cmp_result == 0;
1043   s->user_requests_received++;
1044
1045   if (seq_cmp_result > 0)
1046       peer->stats.n_msgs_from_future += 1;
1047
1048   /* Send ack even if msg from future */
1049   if (1)
1050     {
1051       mc_msg_user_ack_t * rp;
1052       u32 bi;
1053
1054       rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
1055       rp->peer_id = mcm->transport.our_ack_peer_id;
1056       rp->stream_index = s->index;
1057       rp->local_sequence = mp->local_sequence;
1058       rp->seq_cmp_result = seq_cmp_result;
1059
1060       if (MC_EVENT_LOGGING > 0)
1061         {
1062           ELOG_TYPE_DECLARE (e) = {
1063             .format = "tx-ack: stream %d local seq %d",
1064             .format_args = "i4i4",
1065           };
1066           struct { u32 stream_index; u32 local_sequence; } * ed;
1067           ed = ELOG_DATA (mcm->elog_main, e);
1068           ed->stream_index = rp->stream_index;
1069           ed->local_sequence = rp->local_sequence;
1070         }
1071
1072       mc_byte_swap_msg_user_ack (rp);
1073
1074       mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
1075       /* Msg from past? If so, free the buffer... */
1076       if (seq_cmp_result < 0) 
1077         {
1078           vlib_buffer_free_one (vm, buffer_index);
1079           peer->stats.n_msgs_from_past += 1;
1080         }
1081     }
1082     
1083   if (seq_cmp_result == 0)
1084     {
1085       vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
1086       switch (s->state)
1087         {
1088         case MC_STREAM_STATE_ready:
1089           vlib_buffer_advance (b, sizeof (mp[0]));
1090           s->config.rx_buffer(mcm, s, mp->peer_id, buffer_index);
1091
1092           /* Stream vector can change address via rx callback for mc-internal
1093              stream. */
1094           s = mc_stream_by_index (mcm, mp->stream_index);
1095           ASSERT (s != 0);
1096           s->last_global_sequence_processed = mp->global_sequence;
1097           break;
1098
1099         case MC_STREAM_STATE_catchup:
1100           clib_fifo_add1 (s->catchup_fifo, buffer_index);
1101           break;
1102
1103         default:
1104           clib_warning ("stream in unknown state %U",
1105                         format_mc_stream_state, s->state);
1106           break;
1107         }
1108     }
1109 }
1110
1111 void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffer_index)
1112 {
1113   vlib_main_t * vm = mcm->vlib_main;
1114   uword *p;
1115   mc_stream_t * s;
1116   mc_stream_peer_t * peer;
1117   mc_retry_t * r;
1118   int peer_created = 0;
1119
1120   mc_byte_swap_msg_user_ack (mp);
1121
1122   s = mc_stream_by_index (mcm, mp->stream_index);
1123
1124   if (MC_EVENT_LOGGING > 0)
1125     {
1126       ELOG_TYPE_DECLARE (t) = {
1127         .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
1128         .format_args = "i4T4i4",
1129       };
1130       struct { u32 local_sequence; u32 peer; i32 seq_cmp_result;} * ed;
1131       ed = ELOG_DATA (mcm->elog_main, t);
1132       ed->local_sequence = mp->local_sequence;
1133       ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1134       ed->seq_cmp_result = mp->seq_cmp_result;
1135     }
1136
1137   /* Unknown stream? */
1138   if (! s)
1139     return;
1140
1141   /* Find the peer which just ack'ed. */
1142   peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1143                                      /* created */ &peer_created);
1144
1145   /* 
1146    * Peer reports message from the future. If it's not in the retry
1147    * fifo, look for a retired message. 
1148    */
1149   if (mp->seq_cmp_result > 0)
1150     {
1151       p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence - 
1152                     mp->seq_cmp_result);
1153       if (p == 0)
1154           mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
1155
1156       /* Normal retry should fix it... */
1157       return;
1158     }
1159
1160   /* 
1161    * Pointer to the indicated retry fifo entry.
1162    * Worth hashing because we could use a window size of 100 or 1000.
1163    */
1164   p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
1165
1166   /* 
1167    * Is this a duplicate ACK, received after we've retired the
1168    * fifo entry. This can happen when learning about new
1169    * peers.
1170    */
1171   if (p == 0)
1172     {
1173       if (MC_EVENT_LOGGING > 0)
1174         {
1175           ELOG_TYPE_DECLARE (t) = 
1176             {
1177               .format = "ack: for seq %d from peer %s no fifo elt",
1178               .format_args = "i4T4",
1179             };
1180           struct { u32 seq; u32 peer; } * ed;
1181           ed = ELOG_DATA (mcm->elog_main, t);
1182           ed->seq = mp->local_sequence;
1183           ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1184         }
1185
1186       return;
1187     }
1188   
1189   r = pool_elt_at_index (s->retry_pool, p[0]);
1190
1191   /* Make sure that this new peer ACKs our msgs from now on */
1192   if (peer_created)
1193     {
1194       mc_retry_t *later_retry = next_retry (s, r);
1195
1196       while (later_retry)
1197         {
1198           later_retry->unacked_by_peer_bitmap =
1199             clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
1200                              peer - s->peers);
1201           later_retry = next_retry (s, later_retry);
1202         }
1203     }
1204
1205   ASSERT (mp->local_sequence == r->local_sequence);
1206   
1207   /* If we weren't expecting to hear from this peer */
1208   if (!peer_created && 
1209       ! clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
1210     {
1211       if (MC_EVENT_LOGGING > 0)
1212         {
1213           ELOG_TYPE_DECLARE (t) = 
1214             {
1215               .format = "dup-ack: for seq %d from peer %s",
1216               .format_args = "i4T4",
1217             };
1218           struct { u32 seq; u32 peer; } * ed;
1219           ed = ELOG_DATA (mcm->elog_main, t);
1220           ed->seq = r->local_sequence;
1221           ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1222         }
1223       if (! clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1224         return;
1225     }
1226
1227   if (MC_EVENT_LOGGING > 0)
1228     {
1229       ELOG_TYPE_DECLARE (t) = 
1230         {
1231           .format = "ack: for seq %d from peer %s",
1232           .format_args = "i4T4",
1233         };
1234       struct { u32 seq; u32 peer; } * ed;
1235       ed = ELOG_DATA (mcm->elog_main, t);
1236       ed->seq = mp->local_sequence;
1237       ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1238     }
1239
1240   r->unacked_by_peer_bitmap = 
1241     clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
1242   
1243   /* Not all clients have ack'ed */
1244   if (! clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1245     {
1246       return;
1247     }
1248   if (MC_EVENT_LOGGING > 0)
1249     {
1250       ELOG_TYPE_DECLARE (t) = 
1251         {
1252           .format = "ack: retire fifo elt loc seq %d after %d acks",
1253           .format_args = "i4i4",
1254         };
1255       struct { u32 seq; u32 npeers; } * ed;
1256       ed = ELOG_DATA (mcm->elog_main, t);
1257       ed->seq = r->local_sequence;
1258       ed->npeers = pool_elts (s->peers);
1259     }
1260   
1261   hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
1262   mc_retry_free (mcm, s, r);
1263   remove_retry_from_pool (s, r);
1264   maybe_send_window_open_event (vm, s);
1265 }
1266
1267 #define EVENT_MC_SEND_CATCHUP_DATA 0
1268
1269 static uword
1270 mc_catchup_process (vlib_main_t * vm,
1271                     vlib_node_runtime_t * node,
1272                     vlib_frame_t * f)
1273 {
1274   mc_main_t * mcm = mc_node_get_main (node);
1275   uword *event_data = 0;
1276   mc_catchup_process_arg_t * args;
1277   int i;
1278     
1279   while (1)
1280     {
1281       if (event_data)
1282         _vec_len(event_data) = 0;
1283       vlib_process_wait_for_event_with_type (vm, &event_data, EVENT_MC_SEND_CATCHUP_DATA);
1284
1285       for (i = 0; i < vec_len(event_data); i++)
1286         {
1287           args = pool_elt_at_index (mcm->catchup_process_args,
1288                                     event_data[i]);
1289             
1290           mcm->transport.catchup_send_fun (mcm->transport.opaque,
1291                                            args->catchup_opaque,
1292                                            args->catchup_snapshot);
1293
1294           /* Send function will free snapshot data vector. */
1295           pool_put (mcm->catchup_process_args, args);
1296         }
1297     }
1298
1299   return 0; /* not likely */
1300 }
1301
1302 static void serialize_mc_stream (serialize_main_t * m, va_list * va)
1303 {
1304   mc_stream_t * s = va_arg (*va, mc_stream_t *);
1305   mc_stream_peer_t * p;
1306
1307   serialize_integer (m, pool_elts (s->peers), sizeof (u32));
1308   pool_foreach (p, s->peers, ({
1309     u8 * x = serialize_get (m, sizeof (p->id));
1310     memcpy (x, p->id.as_u8, sizeof (p->id));
1311     serialize_integer (m, p->last_sequence_received, 
1312                        sizeof (p->last_sequence_received));
1313   }));
1314   serialize_bitmap (m, s->all_peer_bitmap);
1315 }
1316
1317 void unserialize_mc_stream (serialize_main_t * m, va_list * va)
1318 {
1319   mc_stream_t * s = va_arg (*va, mc_stream_t *);
1320   u32 i, n_peers;
1321   mc_stream_peer_t * p;
1322
1323   unserialize_integer (m, &n_peers, sizeof (u32));
1324   mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
1325   for (i = 0; i < n_peers; i++)
1326     {
1327       u8 * x;
1328       pool_get (s->peers, p);
1329       x = unserialize_get (m, sizeof (p->id));
1330       memcpy (p->id.as_u8, x, sizeof (p->id));
1331       unserialize_integer (m, &p->last_sequence_received, sizeof (p->last_sequence_received));
1332       mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ 0);
1333     }
1334   s->all_peer_bitmap = unserialize_bitmap (m);
1335
1336   /* This is really bad. */
1337   if (!s->all_peer_bitmap)
1338     clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
1339 }
1340
1341 void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t * req, u32 catchup_opaque)
1342 {
1343   vlib_main_t * vm = mcm->vlib_main;
1344   mc_stream_t * s;
1345   mc_catchup_process_arg_t * args;
1346
1347   mc_byte_swap_msg_catchup_request (req);
1348
1349   s = mc_stream_by_index (mcm, req->stream_index);
1350   if (! s || s->state != MC_STREAM_STATE_ready)
1351     return;
1352     
1353   if (MC_EVENT_LOGGING > 0)
1354     {
1355       ELOG_TYPE_DECLARE (t) = 
1356         {
1357           .format = "catchup-request: from %s stream %d",
1358           .format_args = "T4i4",
1359         };
1360       struct { u32 peer, stream; } * ed;
1361       ed = ELOG_DATA (mcm->elog_main, t);
1362       ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
1363       ed->stream = req->stream_index;
1364     }
1365
1366   /* 
1367    * The application has to snapshoot its data structures right 
1368    * here, right now. If we process any messages after 
1369    * noting the last global sequence we've processed, the client 
1370    * won't be able to accurately reconstruct our data structures.
1371    *
1372    * Once the data structures are e.g. vec_dup()'ed, we 
1373    * send the resulting messages from a separate process, to
1374    * make sure that we don't cause a bunch of message retransmissions
1375    */
1376   pool_get (mcm->catchup_process_args, args);
1377
1378   args->stream_index = s - mcm->stream_vector;
1379   args->catchup_opaque = catchup_opaque;
1380   args->catchup_snapshot = 0;
1381
1382   /* Construct catchup reply and snapshot state for stream to send as
1383      catchup reply payload. */
1384   {
1385     mc_msg_catchup_reply_t * rep;
1386     serialize_main_t m;
1387
1388     vec_resize (args->catchup_snapshot, sizeof (rep[0]));
1389
1390     rep = (void *) args->catchup_snapshot;
1391
1392     rep->peer_id = req->peer_id;
1393     rep->stream_index = req->stream_index;
1394     rep->last_global_sequence_included = s->last_global_sequence_processed;
1395
1396     /* Setup for serialize to append to catchup snapshot. */
1397     serialize_open_vector (&m, args->catchup_snapshot);
1398     m.stream.current_buffer_index = vec_len (m.stream.buffer);
1399
1400     serialize (&m, serialize_mc_stream, s);
1401
1402     args->catchup_snapshot = serialize_close_vector (&m);
1403
1404     /* Actually copy internal state */
1405     args->catchup_snapshot = s->config.catchup_snapshot
1406       (mcm,
1407        args->catchup_snapshot,
1408        rep->last_global_sequence_included);
1409
1410     rep = (void *) args->catchup_snapshot;
1411     rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
1412
1413     mc_byte_swap_msg_catchup_reply (rep);
1414   }
1415
1416   /* now go send it... */
1417   vlib_process_signal_event (vm, mcm->catchup_process,
1418                              EVENT_MC_SEND_CATCHUP_DATA, 
1419                              args - mcm->catchup_process_args);
1420 }
1421
1422 #define EVENT_MC_UNSERIALIZE_BUFFER 0
1423 #define EVENT_MC_UNSERIALIZE_CATCHUP 1
1424
1425 void mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, u32 catchup_opaque)
1426 {
1427   vlib_process_signal_event (mcm->vlib_main,
1428                              mcm->unserialize_process,
1429                              EVENT_MC_UNSERIALIZE_CATCHUP,
1430                              pointer_to_uword (mp));
1431 }
1432
1433 static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
1434 {
1435   mc_stream_t * s;
1436   i32 seq_cmp_result;
1437     
1438   mc_byte_swap_msg_catchup_reply (mp);
1439
1440   s = mc_stream_by_index (mcm, mp->stream_index);
1441
1442   /* Never heard of this stream or already caught up. */
1443   if (! s || s->state == MC_STREAM_STATE_ready)
1444     return;
1445     
1446   {
1447     serialize_main_t m;
1448     mc_stream_peer_t * p;
1449     u32 n_stream_bytes;
1450
1451     /* For offline sim replay: save the entire catchup snapshot... */
1452     if (s->config.save_snapshot)
1453       s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, mp->n_data_bytes);
1454
1455     unserialize_open_data (&m, mp->data, mp->n_data_bytes);
1456     unserialize (&m, unserialize_mc_stream, s);
1457
1458     /* Make sure we start numbering our messages as expected */
1459     pool_foreach (p, s->peers, ({
1460       if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1461         s->our_local_sequence = p->last_sequence_received + 1;
1462     }));
1463
1464     n_stream_bytes = m.stream.current_buffer_index;
1465
1466     /* No need to unserialize close; nothing to free. */
1467
1468     /* After serialized stream is user's catchup data. */
1469     s->config.catchup (mcm, mp->data + n_stream_bytes,
1470                        mp->n_data_bytes - n_stream_bytes);
1471   }
1472
1473   /* Vector could have been moved by catchup.
1474      This can only happen for mc-internal stream. */
1475   s = mc_stream_by_index (mcm, mp->stream_index);
1476
1477   s->last_global_sequence_processed = mp->last_global_sequence_included;
1478
1479   while (clib_fifo_elts (s->catchup_fifo))
1480     {
1481       mc_msg_user_request_t * gp;
1482       u32 bi;
1483       vlib_buffer_t * b;
1484
1485       clib_fifo_sub1(s->catchup_fifo, bi);
1486         
1487       b = vlib_get_buffer (mcm->vlib_main, bi);
1488       gp = vlib_buffer_get_current (b);
1489
1490       /* Make sure we're replaying "new" news */
1491       seq_cmp_result = mc_seq_cmp (gp->global_sequence,
1492                                    mp->last_global_sequence_included);
1493
1494       if (seq_cmp_result > 0)
1495         {
1496           vlib_buffer_advance (b, sizeof (gp[0]));
1497           s->config.rx_buffer (mcm, s, gp->peer_id, bi);
1498           s->last_global_sequence_processed = gp->global_sequence;
1499
1500           if (MC_EVENT_LOGGING)
1501             {
1502               ELOG_TYPE_DECLARE (t) = {
1503                 .format = "catchup replay local sequence 0x%x",
1504                 .format_args = "i4",
1505               };
1506               struct { u32 local_sequence; } * ed;
1507               ed = ELOG_DATA (mcm->elog_main, t);
1508               ed->local_sequence = gp->local_sequence;
1509             }
1510         }
1511       else
1512         {
1513           if (MC_EVENT_LOGGING)
1514             {
1515               ELOG_TYPE_DECLARE (t) = {
1516                 .format = "catchup discard local sequence 0x%x",
1517                 .format_args = "i4",
1518               };
1519               struct { u32 local_sequence; } * ed;
1520               ed = ELOG_DATA (mcm->elog_main, t);
1521               ed->local_sequence = gp->local_sequence;
1522             }
1523
1524           vlib_buffer_free_one (mcm->vlib_main, bi);
1525         }
1526     }
1527
1528   s->state = MC_STREAM_STATE_ready;
1529
1530   /* Now that we are caught up wake up joining process. */
1531   {
1532     vlib_one_time_waiting_process_t * wp;
1533     vec_foreach (wp, s->procs_waiting_for_join_done)
1534       vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
1535     if (s->procs_waiting_for_join_done)
1536       _vec_len (s->procs_waiting_for_join_done) = 0;
1537   }
1538 }
1539
1540 static void this_node_maybe_master (mc_main_t * mcm)
1541 {
1542   vlib_main_t * vm = mcm->vlib_main;
1543   mc_msg_master_assert_t * mp;
1544   uword event_type;
1545   int timeouts = 0;
1546   int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
1547   clib_error_t * error;
1548   f64 now, time_last_master_assert = -1;
1549   u32 bi;
1550
1551   while (1)
1552     {
1553       if (! mcm->we_can_be_relay_master)
1554         {
1555           mcm->relay_state = MC_RELAY_STATE_SLAVE;
1556           if (MC_EVENT_LOGGING)
1557             {
1558               ELOG_TYPE (e, "become slave (config)");
1559               ELOG (mcm->elog_main, e, 0);
1560             }
1561           return;
1562         }
1563
1564       now = vlib_time_now (vm);
1565       if (now >= time_last_master_assert + 1)
1566         {
1567           time_last_master_assert = now;
1568           mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
1569
1570           mp->peer_id = mcm->transport.our_ack_peer_id;
1571           mp->global_sequence = mcm->relay_global_sequence;
1572
1573           /* 
1574            * these messages clog the event log, set MC_EVENT_LOGGING higher
1575            * if you want them
1576            */
1577           if (MC_EVENT_LOGGING > 1)
1578             {
1579               ELOG_TYPE_DECLARE (e) = {
1580                 .format = "tx-massert: peer %s global seq %u",
1581                 .format_args = "T4i4",
1582               };
1583               struct { u32 peer, global_sequence; } * ed;
1584               ed = ELOG_DATA (mcm->elog_main, e);
1585               ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1586               ed->global_sequence = mp->global_sequence;
1587             }
1588
1589           mc_byte_swap_msg_master_assert (mp);
1590
1591           error = mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_MASTERSHIP, bi);
1592           if (error)
1593             clib_error_report (error);
1594         }
1595
1596       vlib_process_wait_for_event_or_clock (vm, 1.0);
1597       event_type = vlib_process_get_events (vm, /* no event data */ 0);
1598
1599       switch (event_type)
1600         {
1601         case ~0:
1602           if (! is_master && timeouts++ > 2)
1603             {
1604               mcm->relay_state = MC_RELAY_STATE_MASTER;
1605               mcm->relay_master_peer_id = mcm->transport.our_ack_peer_id.as_u64;
1606               if (MC_EVENT_LOGGING)
1607                 {
1608                   ELOG_TYPE (e, "become master (was maybe_master)");
1609                   ELOG (mcm->elog_main, e, 0);
1610                 }
1611               return;
1612             }
1613           break;
1614             
1615         case MC_RELAY_STATE_SLAVE:
1616           mcm->relay_state = MC_RELAY_STATE_SLAVE;
1617           if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
1618             {
1619               ELOG_TYPE (e, "become slave (was maybe_master)");
1620               ELOG (mcm->elog_main, e, 0);
1621             }
1622           return;
1623         }
1624     }
1625 }
1626
1627 static void this_node_slave (mc_main_t * mcm)
1628 {
1629   vlib_main_t * vm = mcm->vlib_main;
1630   uword event_type;
1631   int timeouts = 0;
1632
1633   if (MC_EVENT_LOGGING)
1634     {
1635       ELOG_TYPE (e, "become slave");
1636       ELOG (mcm->elog_main, e, 0);
1637     }
1638
1639   while (1)
1640     {
1641       vlib_process_wait_for_event_or_clock (vm, 1.0);
1642       event_type = vlib_process_get_events (vm, /* no event data */ 0);
1643
1644       switch (event_type)
1645         {
1646         case ~0:
1647           if (timeouts++ > 2)
1648             {
1649               mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
1650               mcm->relay_master_peer_id = ~0ULL;
1651               if (MC_EVENT_LOGGING)
1652                 {
1653                   ELOG_TYPE (e, "timeouts; negoitate mastership");
1654                   ELOG (mcm->elog_main, e, 0);
1655                 }
1656               return;
1657             }
1658           break;
1659
1660         case MC_RELAY_STATE_SLAVE:
1661           mcm->relay_state = MC_RELAY_STATE_SLAVE;
1662           timeouts = 0;
1663           break;
1664         }
1665     }
1666 }
1667
1668 static uword
1669 mc_mastership_process (vlib_main_t * vm,
1670                        vlib_node_runtime_t * node,
1671                        vlib_frame_t * f)
1672 {
1673   mc_main_t * mcm = mc_node_get_main (node);
1674     
1675   while (1)
1676     {
1677       switch (mcm->relay_state)
1678         {
1679         case MC_RELAY_STATE_NEGOTIATE:
1680         case MC_RELAY_STATE_MASTER:
1681           this_node_maybe_master(mcm);
1682           break;
1683
1684         case MC_RELAY_STATE_SLAVE:
1685           this_node_slave (mcm);
1686           break;
1687         }
1688     }
1689   return 0; /* not likely */
1690 }
1691
1692 void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
1693 {
1694   if (we_can_be_master != mcm->we_can_be_relay_master)
1695     {
1696       mcm->we_can_be_relay_master = we_can_be_master;
1697       vlib_process_signal_event (mcm->vlib_main, 
1698                                  mcm->mastership_process,
1699                                  MC_RELAY_STATE_NEGOTIATE, 0);
1700     }
1701 }
1702
1703 void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, u32 buffer_index)
1704 {
1705   mc_peer_id_t his_peer_id, our_peer_id;
1706   i32 seq_cmp_result;
1707   u8 signal_slave = 0;
1708   u8 update_global_sequence = 0;
1709
1710   mc_byte_swap_msg_master_assert (mp);
1711
1712   his_peer_id = mp->peer_id;
1713   our_peer_id = mcm->transport.our_ack_peer_id;
1714
1715   /* compare the incoming global sequence with ours */
1716   seq_cmp_result = mc_seq_cmp (mp->global_sequence,
1717                                mcm->relay_global_sequence);
1718
1719   /* If the sender has a lower peer id and the sender's sequence >=
1720      our global sequence, we become a slave.  Otherwise we are master. */
1721   if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 && seq_cmp_result >= 0)
1722     {
1723       vlib_process_signal_event (mcm->vlib_main, 
1724                                  mcm->mastership_process,
1725                                  MC_RELAY_STATE_SLAVE, 0);
1726       signal_slave = 1;
1727     }
1728
1729   /* Update our global sequence. */
1730   if (seq_cmp_result > 0)
1731     {
1732       mcm->relay_global_sequence = mp->global_sequence;
1733       update_global_sequence = 1;
1734     }
1735
1736   {
1737     uword * q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
1738     mc_mastership_peer_t * p;
1739
1740     if (q)
1741       p = vec_elt_at_index (mcm->mastership_peers, q[0]);
1742     else
1743       {
1744         vec_add2 (mcm->mastership_peers, p, 1);
1745         p->peer_id = his_peer_id;
1746         mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, p - mcm->mastership_peers,
1747                    /* old_value */ 0);
1748       }
1749     p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
1750   }
1751
1752   /* 
1753    * these messages clog the event log, set MC_EVENT_LOGGING higher
1754    * if you want them.
1755    */
1756   if (MC_EVENT_LOGGING > 1)
1757     {
1758       ELOG_TYPE_DECLARE (e) = {
1759         .format = "rx-massert: peer %s global seq %u upd %d slave %d",
1760         .format_args = "T4i4i1i1",
1761       };
1762       struct { 
1763         u32 peer;
1764         u32 global_sequence; 
1765         u8 update_sequence;
1766         u8 slave;
1767       } * ed;
1768       ed = ELOG_DATA (mcm->elog_main, e);
1769       ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
1770       ed->global_sequence = mp->global_sequence;
1771       ed->update_sequence = update_global_sequence;
1772       ed->slave = signal_slave;
1773     }
1774 }
1775
1776 static void
1777 mc_serialize_init (mc_main_t * mcm)
1778 {
1779   mc_serialize_msg_t * m;
1780   vlib_main_t * vm = vlib_get_main();
1781
1782   mcm->global_msg_index_by_name
1783       = hash_create_string (/* elts */ 0, sizeof (uword));
1784
1785   m = vm->mc_msg_registrations;
1786   
1787   while (m)
1788     {
1789       m->global_index = vec_len (mcm->global_msgs);
1790       hash_set_mem (mcm->global_msg_index_by_name,
1791                     m->name,
1792                     m->global_index);
1793       vec_add1 (mcm->global_msgs, m);
1794       m = m->next_registration;
1795     }
1796 }
1797
1798 clib_error_t *
1799 mc_serialize_va (mc_main_t * mc,
1800                  u32 stream_index,
1801                  u32 multiple_messages_per_vlib_buffer,
1802                  mc_serialize_msg_t * msg,
1803                  va_list * va)
1804 {
1805   mc_stream_t * s;
1806   clib_error_t * error;
1807   serialize_main_t * m = &mc->serialize_mains[VLIB_TX];
1808   vlib_serialize_buffer_main_t * sbm = &mc->serialize_buffer_mains[VLIB_TX];
1809   u32 bi, n_before, n_after, n_total, n_this_msg;
1810   u32 si, gi;
1811
1812   if (! sbm->vlib_main)
1813     {
1814       sbm->tx.max_n_data_bytes_per_chain = 4096;
1815       sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
1816     }
1817
1818   if (sbm->first_buffer == 0)
1819     serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
1820
1821   n_before = serialize_vlib_buffer_n_bytes (m);
1822
1823   s = mc_stream_by_index (mc, stream_index);
1824   gi = msg->global_index;
1825   ASSERT (msg == vec_elt (mc->global_msgs, gi));
1826
1827   si = ~0;
1828   if (gi < vec_len (s->stream_msg_index_by_global_index))
1829     si = s->stream_msg_index_by_global_index[gi];
1830
1831   serialize_likely_small_unsigned_integer (m, si);
1832
1833   /* For first time message is sent, use name to identify message. */
1834   if (si == ~0 || MSG_ID_DEBUG) 
1835     serialize_cstring (m, msg->name);
1836
1837   if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
1838     {
1839       ELOG_TYPE_DECLARE (e) = {
1840         .format = "serialize-msg: %s index %d",
1841         .format_args = "T4i4",
1842       };
1843       struct { u32 c[2]; } * ed;
1844       ed = ELOG_DATA (mc->elog_main, e);
1845       ed->c[0] = elog_id_for_msg_name (mc, msg->name);
1846       ed->c[1] = si;
1847     }
1848
1849   error = va_serialize (m, va);
1850
1851   n_after = serialize_vlib_buffer_n_bytes (m);
1852   n_this_msg = n_after - n_before;
1853   n_total = n_after + sizeof (mc_msg_user_request_t);
1854
1855   /* For max message size ignore first message where string name is sent. */
1856   if (si != ~0)
1857     msg->max_n_bytes_serialized = clib_max (msg->max_n_bytes_serialized, n_this_msg);
1858
1859   if (! multiple_messages_per_vlib_buffer
1860       || si == ~0
1861       || n_total + msg->max_n_bytes_serialized > mc->transport.max_packet_size)
1862     {
1863       bi = serialize_close_vlib_buffer (m);
1864       sbm->first_buffer = 0;
1865       if (! error)
1866         mc_stream_send (mc, stream_index, bi);
1867       else if (bi != ~0)
1868         vlib_buffer_free_one (mc->vlib_main, bi);
1869     }
1870
1871   return error;
1872 }
1873
1874 clib_error_t *
1875 mc_serialize_internal (mc_main_t * mc,
1876                        u32 stream_index,
1877                        u32 multiple_messages_per_vlib_buffer,
1878                        mc_serialize_msg_t * msg,
1879                        ...)
1880 {
1881   vlib_main_t * vm = mc->vlib_main;
1882   va_list va;
1883   clib_error_t * error;
1884
1885   if (stream_index == ~0)
1886     {
1887       if (vm->mc_main && vm->mc_stream_index == ~0)
1888         vlib_current_process_wait_for_one_time_event_vector
1889           (vm, &vm->procs_waiting_for_mc_stream_join);
1890       stream_index = vm->mc_stream_index;
1891     }
1892
1893   va_start (va, msg);
1894   error = mc_serialize_va (mc, stream_index,
1895                            multiple_messages_per_vlib_buffer,
1896                            msg, &va);
1897   va_end (va);
1898   return error;
1899 }
1900
1901 uword mc_unserialize_message (mc_main_t * mcm,
1902                               mc_stream_t * s,
1903                               serialize_main_t * m)
1904 {
1905   mc_serialize_stream_msg_t * sm;
1906   u32 gi, si;
1907
1908   si = unserialize_likely_small_unsigned_integer (m);
1909
1910   if (! (si == ~0 || MSG_ID_DEBUG))
1911     {
1912       sm = vec_elt_at_index (s->stream_msgs, si);
1913       gi = sm->global_index;
1914     }
1915   else
1916     {
1917       char * name;
1918
1919       unserialize_cstring (m, &name);
1920
1921       if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
1922         {
1923           ELOG_TYPE_DECLARE (e) = {
1924             .format = "unserialize-msg: %s rx index %d",
1925             .format_args = "T4i4",
1926           };
1927           struct { u32 c[2]; } * ed;
1928           ed = ELOG_DATA (mcm->elog_main, e);
1929           ed->c[0] = elog_id_for_msg_name (mcm, name);
1930           ed->c[1] = si;
1931         }
1932
1933       {
1934         uword * p = hash_get_mem (mcm->global_msg_index_by_name, name);
1935         gi = p ? p[0] : ~0;
1936       }
1937
1938       /* Unknown message? */
1939       if (gi == ~0)
1940         {
1941           vec_free (name);
1942           goto done;
1943         }
1944
1945       vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
1946       si = s->stream_msg_index_by_global_index[gi];
1947
1948       /* Stream local index unknown?  Create it. */
1949       if (si == ~0)
1950         {
1951           vec_add2 (s->stream_msgs, sm, 1);
1952
1953           si = sm - s->stream_msgs;
1954           sm->global_index = gi;
1955           s->stream_msg_index_by_global_index[gi] = si;
1956
1957           if (MC_EVENT_LOGGING > 0)
1958             {
1959               ELOG_TYPE_DECLARE (e) = {
1960                 .format = "msg-bind: stream %d %s to index %d",
1961                 .format_args = "i4T4i4",
1962               };
1963               struct { u32 c[3]; } * ed;
1964               ed = ELOG_DATA (mcm->elog_main, e);
1965               ed->c[0] = s->index;
1966               ed->c[1] = elog_id_for_msg_name (mcm, name);
1967               ed->c[2] = si;
1968             }
1969         }
1970       else
1971         {
1972           sm = vec_elt_at_index (s->stream_msgs, si);
1973           if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
1974             {
1975               ELOG_TYPE_DECLARE (e) = {
1976                 .format = "msg-id-ERROR: %s index %d expected %d",
1977                 .format_args = "T4i4i4",
1978               };
1979               struct { u32 c[3]; } * ed;
1980               ed = ELOG_DATA (mcm->elog_main, e);
1981               ed->c[0] = elog_id_for_msg_name (mcm, name);
1982               ed->c[1] = si;
1983               ed->c[2] = ~0;
1984               if (sm->global_index < vec_len (s->stream_msg_index_by_global_index))
1985                 ed->c[2] = s->stream_msg_index_by_global_index[sm->global_index];
1986             }
1987         }
1988
1989       vec_free (name);
1990     }
1991
1992   if (gi != ~0)
1993     {
1994       mc_serialize_msg_t * msg;
1995       msg = vec_elt (mcm->global_msgs, gi);
1996       unserialize (m, msg->unserialize, mcm);
1997     }
1998
1999  done:
2000   return gi != ~0;
2001 }
2002
2003 void
2004 mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
2005 {
2006   vlib_main_t * vm = mcm->vlib_main;
2007   serialize_main_t * m = &mcm->serialize_mains[VLIB_RX];
2008   vlib_serialize_buffer_main_t * sbm = &mcm->serialize_buffer_mains[VLIB_RX];
2009   mc_stream_and_buffer_t * sb;
2010   mc_stream_t * stream;
2011   u32 buffer_index;
2012
2013   sb = pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, stream_and_buffer_index);
2014   buffer_index = sb->buffer_index;
2015   stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
2016   pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
2017
2018   if (stream->config.save_snapshot)
2019     {
2020       u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
2021       static u8 * contents;
2022       vec_reset_length (contents);
2023       vec_validate (contents, n_bytes - 1);
2024       vlib_buffer_contents (vm, buffer_index, contents);
2025       stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, n_bytes);
2026     }
2027
2028   ASSERT (vlib_in_process_context (vm));
2029
2030   unserialize_open_vlib_buffer (m, vm, sbm);
2031
2032   clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
2033
2034   while (unserialize_vlib_buffer_n_bytes (m) > 0)
2035     mc_unserialize_message (mcm, stream, m);
2036
2037   /* Frees buffer. */
2038   unserialize_close_vlib_buffer (m);
2039 }
2040
2041 void
2042 mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
2043 {
2044   vlib_main_t * vm = mcm->vlib_main;
2045   mc_stream_and_buffer_t * sb;
2046   pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
2047   sb->stream_index = s->index;
2048   sb->buffer_index = buffer_index;
2049   vlib_process_signal_event (vm, mcm->unserialize_process, 
2050                              EVENT_MC_UNSERIALIZE_BUFFER, sb - mcm->mc_unserialize_stream_and_buffers);
2051 }
2052
2053 static uword
2054 mc_unserialize_process (vlib_main_t * vm,
2055                         vlib_node_runtime_t * node,
2056                         vlib_frame_t * f)
2057 {
2058   mc_main_t * mcm = mc_node_get_main (node);
2059   uword event_type, * event_data = 0;
2060   int i;
2061     
2062   while (1)
2063     {
2064       if (event_data)
2065         _vec_len(event_data) = 0;
2066
2067       vlib_process_wait_for_event (vm);
2068       event_type = vlib_process_get_events (vm, &event_data);
2069       switch (event_type)
2070         {
2071         case EVENT_MC_UNSERIALIZE_BUFFER:
2072           for (i = 0; i < vec_len (event_data); i++)
2073             mc_unserialize_internal (mcm, event_data[i]);
2074           break;
2075
2076         case EVENT_MC_UNSERIALIZE_CATCHUP:
2077           for (i = 0; i < vec_len (event_data); i++)
2078             {
2079               u8 * mp = uword_to_pointer (event_data[i], u8 *);
2080               perform_catchup (mcm, (void *) mp);
2081               vec_free (mp);
2082             }
2083           break;
2084
2085         default:
2086           break;
2087         }
2088     }
2089
2090   return 0; /* not likely */
2091 }
2092
2093 void serialize_mc_main (serialize_main_t * m, va_list * va)
2094 {
2095   mc_main_t * mcm = va_arg (*va, mc_main_t *);
2096   mc_stream_t * s;
2097   mc_serialize_stream_msg_t * sm;
2098   mc_serialize_msg_t * msg;
2099
2100   serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
2101   vec_foreach (s, mcm->stream_vector)
2102     {
2103       /* Stream name. */
2104       serialize_cstring (m, s->config.name);
2105
2106       /* Serialize global names for all sent messages. */
2107       serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
2108       vec_foreach (sm, s->stream_msgs)
2109         {
2110           msg = vec_elt (mcm->global_msgs, sm->global_index);
2111           serialize_cstring (m, msg->name);
2112         }
2113     }
2114 }
2115
2116 void unserialize_mc_main (serialize_main_t * m, va_list * va)
2117 {
2118   mc_main_t * mcm = va_arg (*va, mc_main_t *);
2119   u32 i, n_streams, n_stream_msgs;
2120   char * name;
2121   mc_stream_t * s;
2122   mc_serialize_stream_msg_t * sm;
2123
2124   unserialize_integer (m, &n_streams, sizeof (u32));
2125   for (i = 0; i < n_streams; i++)
2126     {
2127       unserialize_cstring (m, &name);
2128       if (i != MC_STREAM_INDEX_INTERNAL
2129           && ! mc_stream_by_name (mcm, name))
2130         {
2131           vec_validate (mcm->stream_vector, i);
2132           s = vec_elt_at_index (mcm->stream_vector, i);
2133           mc_stream_init (s);
2134           s->index = s - mcm->stream_vector;
2135           s->config.name = name;
2136           s->state = MC_STREAM_STATE_name_known;
2137           hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
2138         }
2139       else
2140         vec_free (name);
2141
2142       s = vec_elt_at_index (mcm->stream_vector, i);
2143
2144       vec_free (s->stream_msgs);
2145       vec_free (s->stream_msg_index_by_global_index);
2146
2147       unserialize_integer (m, &n_stream_msgs, sizeof (u32));
2148       vec_resize (s->stream_msgs, n_stream_msgs);
2149       vec_foreach (sm, s->stream_msgs)
2150         {
2151           uword * p;
2152           u32 si, gi;
2153
2154           unserialize_cstring (m, &name);
2155           p = hash_get (mcm->global_msg_index_by_name, name);
2156           gi = p ? p[0] : ~0;
2157           si = sm - s->stream_msgs;
2158
2159           if (MC_EVENT_LOGGING > 0)
2160             {
2161               ELOG_TYPE_DECLARE (e) = {
2162                 .format = "catchup-bind: %s to %d global index %d stream %d",
2163                 .format_args = "T4i4i4i4",
2164               };
2165               struct { u32 c[4]; } * ed;
2166               ed = ELOG_DATA (mcm->elog_main, e);
2167               ed->c[0] = elog_id_for_msg_name (mcm, name);
2168               ed->c[1] = si;
2169               ed->c[2] = gi;
2170               ed->c[3] = s->index;
2171             }
2172
2173           vec_free (name);
2174
2175           sm->global_index = gi;
2176           if (gi != ~0)
2177             {
2178               vec_validate_init_empty (s->stream_msg_index_by_global_index,
2179                                        gi, ~0);
2180               s->stream_msg_index_by_global_index[gi] = si;
2181             }
2182         }
2183     }
2184 }
2185
2186 void mc_main_init (mc_main_t * mcm, char * tag)
2187 {
2188   vlib_main_t * vm = vlib_get_main();
2189
2190   mcm->vlib_main = vm;
2191   mcm->elog_main = &vm->elog_main;
2192
2193   mcm->relay_master_peer_id = ~0ULL;
2194   mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
2195
2196   mcm->stream_index_by_name
2197     = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
2198
2199   {
2200     vlib_node_registration_t r;
2201
2202     memset (&r, 0, sizeof (r));
2203
2204     r.type = VLIB_NODE_TYPE_PROCESS;
2205
2206     /* Point runtime data to main instance. */
2207     r.runtime_data = &mcm;
2208     r.runtime_data_bytes = sizeof (&mcm);
2209
2210     r.name = (char *) format (0, "mc-mastership-%s", tag);
2211     r.function = mc_mastership_process;
2212     mcm->mastership_process = vlib_register_node (vm, &r);
2213
2214     r.name = (char *) format (0, "mc-join-ager-%s", tag);
2215     r.function = mc_join_ager_process;
2216     mcm->join_ager_process = vlib_register_node (vm, &r);
2217
2218     r.name = (char *) format (0, "mc-retry-%s", tag);
2219     r.function = mc_retry_process;
2220     mcm->retry_process = vlib_register_node (vm, &r);
2221
2222     r.name = (char *) format (0, "mc-catchup-%s", tag);
2223     r.function = mc_catchup_process;
2224     mcm->catchup_process = vlib_register_node (vm, &r);
2225
2226     r.name = (char *) format (0, "mc-unserialize-%s", tag);
2227     r.function = mc_unserialize_process;
2228     mcm->unserialize_process = vlib_register_node (vm, &r);
2229   }
2230
2231   if (MC_EVENT_LOGGING > 0)
2232     mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), sizeof (mc_peer_id_t));
2233
2234   mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
2235   mc_serialize_init (mcm);
2236 }
2237
2238 static u8 * format_mc_relay_state (u8 * s, va_list * args)
2239 {
2240   mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
2241   char * t = 0;
2242   switch (state)
2243     {
2244     case MC_RELAY_STATE_NEGOTIATE:
2245       t = "negotiate";
2246       break;
2247     case MC_RELAY_STATE_MASTER:
2248       t = "master";
2249       break;
2250     case MC_RELAY_STATE_SLAVE:
2251       t = "slave";
2252       break;
2253     default:
2254       return format (s, "unknown 0x%x", state);
2255     }
2256
2257   return format (s, "%s", t);
2258 }
2259
2260 static u8 * format_mc_stream_state (u8 * s, va_list * args)
2261 {
2262   mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
2263   char * t = 0;
2264   switch (state)
2265     {
2266 #define _(f) case MC_STREAM_STATE_##f: t = #f; break;
2267       foreach_mc_stream_state
2268 #undef _
2269     default:
2270       return format (s, "unknown 0x%x", state);
2271     }
2272
2273   return format (s, "%s", t);
2274 }
2275
2276 u8 * format_mc_main (u8 * s, va_list * args)
2277 {
2278   mc_main_t * mcm = va_arg (*args, mc_main_t *);
2279   mc_stream_t * t;
2280   mc_stream_peer_t * p, * ps;
2281   uword indent = format_get_indent (s);
2282
2283   s = format (s, "MC state %U, %d streams joined, global sequence 0x%x", 
2284               format_mc_relay_state, mcm->relay_state, 
2285               vec_len (mcm->stream_vector),
2286               mcm->relay_global_sequence);
2287
2288   {
2289     mc_mastership_peer_t * mp;
2290     f64 now = vlib_time_now (mcm->vlib_main);
2291     s = format (s, "\n%UMost recent mastership peers:",
2292                 format_white_space, indent + 2);
2293     vec_foreach (mp, mcm->mastership_peers)
2294       {
2295         s = format (s, "\n%U%-30U%.4e",
2296                     format_white_space, indent + 4,
2297                     mcm->transport.format_peer_id, mp->peer_id,
2298                     now - mp->time_last_master_assert_received);
2299       }
2300   }
2301
2302   vec_foreach (t, mcm->stream_vector)
2303     {
2304       s = format (s, "\n%Ustream `%s' index %d",
2305                   format_white_space, indent + 2,
2306                   t->config.name, t->index);
2307
2308       s = format (s, "\n%Ustate %U",
2309                   format_white_space, indent + 4,
2310                   format_mc_stream_state, t->state);
2311
2312       s = format (s, "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2313                   format_white_space, indent + 4, t->config.retry_interval, 
2314                   t->config.retry_limit,
2315                   pool_elts (t->retry_pool),
2316                   t->stats.n_retries - t->stats_last_clear.n_retries);
2317
2318       s = format (s, "\n%U%Ld/%Ld user requests sent/received",
2319                   format_white_space, indent + 4,
2320                   t->user_requests_sent, t->user_requests_received);
2321
2322       s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
2323                   format_white_space, indent + 4,
2324                   pool_elts (t->peers),
2325                   t->our_local_sequence,
2326                   t->last_global_sequence_processed);
2327
2328       ps = 0;
2329       pool_foreach (p, t->peers, 
2330       ({ 
2331         if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2332           vec_add1 (ps, p[0]); 
2333       }));
2334       vec_sort (ps, p1, p2, mc_peer_id_compare (p1->id, p2->id));
2335       s = format (s, "\n%U%=30s%10s%16s%16s",
2336                   format_white_space, indent + 6,
2337                   "Peer", "Last seq", "Retries", "Future");
2338                   
2339       vec_foreach (p, ps)
2340         {
2341           s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
2342                       format_white_space, indent + 6,
2343                       mcm->transport.format_peer_id, p->id.as_u64,
2344                       p->last_sequence_received,
2345                       p->stats.n_msgs_from_past - p->stats_last_clear.n_msgs_from_past,
2346                       p->stats.n_msgs_from_future - p->stats_last_clear.n_msgs_from_future,
2347                       (mcm->transport.our_ack_peer_id.as_u64 == p->id.as_u64
2348                        ? " (self)" : ""));
2349         }
2350       vec_free (ps);
2351     }
2352
2353   return s;
2354 }