2 * mc.c: vlib reliable sequenced multicast distributed applications
4 * Copyright (c) 2010 Cisco and/or its affiliates.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include <vlib/vlib.h>
21 * 1 to enable msg id training wheels, which are useful for tracking
22 * down catchup and/or partitioned network problems
24 #define MSG_ID_DEBUG 0
26 static format_function_t format_mc_stream_state;
28 static u32 elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
31 mhash_t * h = &m->elog_id_by_peer_id;
33 if (! m->elog_id_by_peer_id.hash)
34 mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
36 p = mhash_get (h, &peer_id);
39 r = elog_string (m->elog_main, "%U",
40 m->transport.format_peer_id, peer_id);
41 mhash_set (h, &peer_id, r, /* old_value */ 0);
45 static u32 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
48 uword * h = m->elog_id_by_msg_name;
52 h = m->elog_id_by_msg_name
53 = hash_create_string (0, sizeof (uword));
55 p = hash_get_mem (h, msg_name);
58 r = elog_string (m->elog_main, "%s", msg_name);
60 name_copy = format (0, "%s%c", msg_name, 0);
62 hash_set_mem (h, name_copy, r);
63 m->elog_id_by_msg_name = h;
68 static void elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, u32 retry_count)
70 if (MC_EVENT_LOGGING > 0)
72 ELOG_TYPE_DECLARE (e) = {
73 .format = "tx-msg: stream %d local seq %d attempt %d",
74 .format_args = "i4i4i4",
76 struct { u32 stream_id, local_sequence, retry_count; } * ed;
77 ed = ELOG_DATA (m->elog_main, e);
78 ed->stream_id = stream_id;
79 ed->local_sequence = local_sequence;
80 ed->retry_count = retry_count;
86 * correctly compare two unsigned sequence numbers.
87 * This function works so long as x and y are within 2**(n-1) of each
88 * other, where n = bits(x, y).
91 * seq_cmp == 0 => x and y are equal
92 * seq_cmp < 0 => x is "in the past" with respect to y
93 * seq_cmp > 0 => x is "in the future" with respect to y
95 always_inline i32 mc_seq_cmp (u32 x, u32 y)
96 { return (i32) x - (i32) y;}
98 void * mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
103 n_alloc = vlib_buffer_alloc (vm, &bi, 1);
104 ASSERT (n_alloc == 1);
106 b = vlib_get_buffer (vm, bi);
107 b->current_length = n_bytes;
109 return (void *) b->data;
113 delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
115 int notify_application)
117 mc_stream_peer_t * p = pool_elt_at_index (s->peers, index);
119 if (s->config.peer_died && notify_application)
120 s->config.peer_died (mcm, s, p->id);
122 s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
124 if (MC_EVENT_LOGGING > 0)
126 ELOG_TYPE_DECLARE (e) = {
127 .format = "delete peer %s from all_peer_bitmap",
130 struct { u32 peer; } * ed = 0;
132 ed = ELOG_DATA (mcm->elog_main, e);
133 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
135 /* Do not delete the pool / hash table entries, or we lose sequence number state */
138 static mc_stream_peer_t *
139 get_or_create_peer_with_id (mc_main_t * mcm,
140 mc_stream_t * s, mc_peer_id_t id,
143 uword * q = mhash_get (&s->peer_index_by_id, &id);
144 mc_stream_peer_t * p;
148 p = pool_elt_at_index (s->peers, q[0]);
152 pool_get (s->peers, p);
153 memset (p, 0, sizeof (p[0]));
155 p->last_sequence_received = ~0;
156 mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
161 if (MC_EVENT_LOGGING > 0)
163 ELOG_TYPE_DECLARE (e) = {
164 .format = "get_or_create %s peer %s stream %d seq %d",
165 .format_args = "t4T4i4i4",
167 .enum_strings = { "old", "new", },
169 struct { u32 is_new, peer, stream_index, rx_sequence; } * ed = 0;
171 ed = ELOG_DATA (mcm->elog_main, e);
172 ed->is_new = q ? 0 : 1;
173 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
174 ed->stream_index = s->index;
175 ed->rx_sequence = p->last_sequence_received;
177 /* $$$$ Enable or reenable this peer */
178 s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
182 static void maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
184 vlib_one_time_waiting_process_t * p;
186 if (pool_elts (stream->retry_pool) >= stream->config.window_size)
189 vec_foreach (p, stream->procs_waiting_for_open_window)
190 vlib_signal_one_time_waiting_process (vm, p);
192 if (stream->procs_waiting_for_open_window)
193 _vec_len (stream->procs_waiting_for_open_window) = 0;
196 static void mc_retry_free (mc_main_t * mcm, mc_stream_t *s, mc_retry_t * r)
198 mc_retry_t record, *retp;
200 if (r->unacked_by_peer_bitmap)
201 _vec_len (r->unacked_by_peer_bitmap) = 0;
203 if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
205 clib_fifo_sub1 (s->retired_fifo, record);
206 vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
209 clib_fifo_add2 (s->retired_fifo, retp);
211 retp->buffer_index = r->buffer_index;
212 retp->local_sequence = r->local_sequence;
214 r->buffer_index = ~0; /* poison buffer index in this retry */
217 static void mc_resend_retired (mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
221 if (MC_EVENT_LOGGING > 0)
223 ELOG_TYPE_DECLARE (e) = {
224 .format = "resend-retired: search for local seq %d",
227 struct { u32 local_sequence; } * ed;
228 ed = ELOG_DATA (mcm->elog_main, e);
229 ed->local_sequence = local_sequence;
233 (retry, s->retired_fifo,
235 if (retry->local_sequence == local_sequence)
237 elog_tx_msg (mcm, s->index, retry->local_sequence, -13);
239 mcm->transport.tx_buffer
240 (mcm->transport.opaque,
241 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
242 retry->buffer_index);
247 if (MC_EVENT_LOGGING > 0)
249 ELOG_TYPE_DECLARE (e) = {
250 .format = "resend-retired: FAILED search for local seq %d",
253 struct { u32 local_sequence; } * ed;
254 ed = ELOG_DATA (mcm->elog_main, e);
255 ed->local_sequence = local_sequence;
260 delete_retry_fifo_elt (mc_main_t * mcm,
261 mc_stream_t * stream,
263 uword * dead_peer_bitmap)
265 mc_stream_peer_t * p;
267 pool_foreach (p, stream->peers, ({
268 uword pi = p - stream->peers;
269 uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
272 dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
274 if (MC_EVENT_LOGGING > 0)
276 ELOG_TYPE_DECLARE (e) = {
277 .format = "delete_retry_fifo_elt: peer %s is %s",
278 .format_args = "T4t4",
280 .enum_strings = { "alive", "dead", },
282 struct { u32 peer, is_alive; } * ed;
283 ed = ELOG_DATA (mcm->elog_main, e);
284 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
285 ed->is_alive = is_alive;
289 hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
290 mc_retry_free (mcm, stream, r);
292 return dead_peer_bitmap;
295 always_inline mc_retry_t *
296 prev_retry (mc_stream_t * s, mc_retry_t * r)
298 return (r->prev_index != ~0
299 ? pool_elt_at_index (s->retry_pool, r->prev_index)
303 always_inline mc_retry_t *
304 next_retry (mc_stream_t * s, mc_retry_t * r)
306 return (r->next_index != ~0
307 ? pool_elt_at_index (s->retry_pool, r->next_index)
312 remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
314 mc_retry_t * p = prev_retry (s, r);
315 mc_retry_t * n = next_retry (s, r);
318 p->next_index = r->next_index;
320 s->retry_head_index = r->next_index;
322 n->prev_index = r->prev_index;
324 s->retry_tail_index = r->prev_index;
326 pool_put_index (s->retry_pool, r - s->retry_pool);
329 static void check_retry (mc_main_t * mcm, mc_stream_t * s)
332 vlib_main_t * vm = mcm->vlib_main;
333 f64 now = vlib_time_now(vm);
334 uword * dead_peer_bitmap = 0;
337 for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
339 r = pool_elt_at_index (s->retry_pool, ri);
340 ri_next = r->next_index;
342 if (now < r->sent_at + s->config.retry_interval)
346 if (r->n_retries > s->config.retry_limit)
349 delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
350 remove_retry_from_pool (s, r);
354 if (MC_EVENT_LOGGING > 0)
356 mc_stream_peer_t * p;
357 ELOG_TYPE_DECLARE (t) = {
358 .format = "resend local seq %d attempt %d",
359 .format_args = "i4i4",
362 pool_foreach (p, s->peers, ({
363 if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
365 ELOG_TYPE_DECLARE (ev) = {
366 .format = "resend: needed by peer %s local seq %d",
367 .format_args = "T4i4",
369 struct { u32 peer, rx_sequence; } * ed;
370 ed = ELOG_DATA (mcm->elog_main, ev);
371 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
372 ed->rx_sequence = r->local_sequence;
376 struct { u32 sequence; u32 trail; } * ed;
377 ed = ELOG_DATA (mcm->elog_main, t);
378 ed->sequence = r->local_sequence;
379 ed->trail = r->n_retries;
382 r->sent_at = vlib_time_now (vm);
383 s->stats.n_retries += 1;
385 elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
387 mcm->transport.tx_buffer
388 (mcm->transport.opaque,
389 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
394 maybe_send_window_open_event (mcm->vlib_main, s);
396 /* Delete any dead peers we've found. */
397 if (! clib_bitmap_is_zero (dead_peer_bitmap))
401 clib_bitmap_foreach (i, dead_peer_bitmap, ({
402 delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
404 /* Delete any references to just deleted peer in retry pool. */
405 pool_foreach (r, s->retry_pool, ({
406 r->unacked_by_peer_bitmap =
407 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
410 clib_bitmap_free (dead_peer_bitmap);
414 always_inline mc_main_t *
415 mc_node_get_main (vlib_node_runtime_t * node)
417 mc_main_t ** p = (void *) node->runtime_data;
422 mc_retry_process (vlib_main_t * vm,
423 vlib_node_runtime_t * node,
426 mc_main_t * mcm = mc_node_get_main (node);
431 vlib_process_suspend (vm, 1.0);
432 vec_foreach (s, mcm->stream_vector)
434 if (s->state != MC_STREAM_STATE_invalid)
435 check_retry (mcm, s);
438 return 0; /* not likely */
441 static void send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
443 vlib_main_t * vm = mcm->vlib_main;
444 mc_msg_join_or_leave_request_t * mp;
447 mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
448 memset(mp, 0, sizeof (*mp));
449 mp->type = MC_MSG_TYPE_join_or_leave_request;
450 mp->peer_id = mcm->transport.our_ack_peer_id;
451 mp->stream_index = stream_index;
452 mp->is_join = is_join;
454 mc_byte_swap_msg_join_or_leave_request (mp);
457 * These msgs are unnumbered, unordered so send on the from-relay
460 mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
464 mc_join_ager_process (vlib_main_t * vm,
465 vlib_node_runtime_t * node,
468 mc_main_t * mcm = mc_node_get_main (node);
472 if (mcm->joins_in_progress)
475 vlib_one_time_waiting_process_t * p;
476 f64 now = vlib_time_now (vm);
478 vec_foreach (s, mcm->stream_vector)
480 if (s->state != MC_STREAM_STATE_join_in_progress)
483 if (now > s->join_timeout)
485 s->state = MC_STREAM_STATE_ready;
487 if (MC_EVENT_LOGGING > 0)
489 ELOG_TYPE_DECLARE (e) = {
490 .format = "stream %d join timeout",
492 ELOG (mcm->elog_main, e, s->index);
494 /* Make sure that this app instance exists as a stream peer,
495 or we may answer a catchup request with a NULL
496 all_peer_bitmap... */
497 (void) get_or_create_peer_with_id
498 (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
500 vec_foreach (p, s->procs_waiting_for_join_done)
501 vlib_signal_one_time_waiting_process (vm, p);
502 if (s->procs_waiting_for_join_done)
503 _vec_len (s->procs_waiting_for_join_done) = 0;
505 mcm->joins_in_progress--;
506 ASSERT (mcm->joins_in_progress >= 0);
510 /* Resent join request which may have been lost. */
511 send_join_or_leave_request (mcm, s->index,
514 /* We're *not* alone, retry for as long as it takes */
515 if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
516 s->join_timeout = vlib_time_now (vm) + 2.0;
519 if (MC_EVENT_LOGGING > 0)
521 ELOG_TYPE_DECLARE (e) = {
522 .format = "stream %d resend join request",
524 ELOG (mcm->elog_main, e, s->index);
530 vlib_process_suspend (vm, .5);
533 return 0; /* not likely */
536 static void serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
538 char * name = va_arg (*va, char *);
539 serialize_cstring (m, name);
542 static void elog_stream_name (char * buf, int n_buf_bytes, char * v)
544 clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
545 buf[n_buf_bytes - 1] = 0;
548 static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
550 mc_main_t * mcm = va_arg (*va, mc_main_t *);
555 unserialize_cstring (m, &name);
557 if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
559 if (MC_EVENT_LOGGING > 0)
561 ELOG_TYPE_DECLARE (e) = {
562 .format = "stream index %d already named %s",
563 .format_args = "i4s16",
565 struct { u32 stream_index; char name[16]; } * ed;
566 ed = ELOG_DATA (mcm->elog_main, e);
567 ed->stream_index = p[0];
568 elog_stream_name (ed->name, sizeof (ed->name), name);
575 vec_add2 (mcm->stream_vector, s, 1);
577 s->state = MC_STREAM_STATE_name_known;
578 s->index = s - mcm->stream_vector;
579 s->config.name = name;
581 if (MC_EVENT_LOGGING > 0)
583 ELOG_TYPE_DECLARE (e) = {
584 .format = "stream index %d named %s",
585 .format_args = "i4s16",
587 struct { u32 stream_index; char name[16]; } * ed;
588 ed = ELOG_DATA (mcm->elog_main, e);
589 ed->stream_index = s->index;
590 elog_stream_name (ed->name, sizeof (ed->name), name);
593 hash_set_mem (mcm->stream_index_by_name, name, s->index);
595 p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
598 vlib_one_time_waiting_process_t * wp, ** w;
599 w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
600 vec_foreach (wp, w[0])
601 vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
602 pool_put (mcm->procs_waiting_for_stream_name_pool, w);
603 hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
607 MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = {
608 .name = "mc_register_stream_name",
609 .serialize = serialize_mc_register_stream_name,
610 .unserialize = unserialize_mc_register_stream_name,
614 mc_rx_buffer_unserialize (mc_main_t * mcm,
615 mc_stream_t * stream,
616 mc_peer_id_t peer_id,
618 { return mc_unserialize (mcm, stream, buffer_index); }
621 mc_internal_catchup_snapshot (mc_main_t * mcm,
623 u32 last_global_sequence_processed)
627 /* Append serialized data to data vector. */
628 serialize_open_vector (&m, data_vector);
629 m.stream.current_buffer_index = vec_len (data_vector);
631 serialize (&m, serialize_mc_main, mcm);
632 return serialize_close_vector (&m);
636 mc_internal_catchup (mc_main_t * mcm,
642 unserialize_open_data (&s, data, n_data_bytes);
644 unserialize (&s, unserialize_mc_main, mcm);
647 /* Overridden from the application layer, not actually used here */
648 void mc_stream_join_process_hold (void) __attribute__ ((weak));
649 void mc_stream_join_process_hold (void) { }
652 mc_stream_join_helper (mc_main_t * mcm,
653 mc_stream_config_t * config,
657 vlib_main_t * vm = mcm->vlib_main;
664 /* Already have a stream with given name? */
665 if ((s = mc_stream_by_name (mcm, config->name)))
667 /* Already joined and ready? */
668 if (s->state == MC_STREAM_STATE_ready)
672 /* First join MC internal stream. */
673 if (! mcm->stream_vector
674 || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
675 == MC_STREAM_STATE_invalid))
677 static mc_stream_config_t c = {
678 .name = "mc-internal",
679 .rx_buffer = mc_rx_buffer_unserialize,
680 .catchup = mc_internal_catchup,
681 .catchup_snapshot = mc_internal_catchup_snapshot,
684 c.save_snapshot = config->save_snapshot;
686 mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
689 /* If stream is still unknown register this name and wait for
690 sequenced message to name stream. This way all peers agree
691 on stream name to index mappings. */
692 s = mc_stream_by_name (mcm, config->name);
695 vlib_one_time_waiting_process_t * wp, ** w;
696 u8 * name_copy = format (0, "%s", config->name);
698 mc_serialize_stream (mcm,
699 MC_STREAM_INDEX_INTERNAL,
700 &mc_register_stream_name_msg,
703 /* Wait for this stream to be named. */
704 p = hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, name_copy);
706 w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
709 pool_get (mcm->procs_waiting_for_stream_name_pool, w);
710 if (! mcm->procs_waiting_for_stream_name_by_name)
711 mcm->procs_waiting_for_stream_name_by_name
712 = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
713 hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
715 w - mcm->procs_waiting_for_stream_name_pool);
719 vec_add2 (w[0], wp, 1);
720 vlib_current_process_wait_for_one_time_event (vm, wp);
721 vec_free (name_copy);
724 /* Name should be known now. */
725 s = mc_stream_by_name (mcm, config->name);
727 ASSERT (s->state == MC_STREAM_STATE_name_known);
732 vec_add2 (mcm->stream_vector, s, 1);
734 s->index = s - mcm->stream_vector;
738 /* Save name since we could have already used it as hash key. */
739 char * name_save = s->config.name;
741 s->config = config[0];
744 s->config.name = name_save;
747 if (s->config.window_size == 0)
748 s->config.window_size = 8;
750 if (s->config.retry_interval == 0.0)
751 s->config.retry_interval = 1.0;
754 ASSERT (s->config.retry_interval < 30);
756 if (s->config.retry_limit == 0)
757 s->config.retry_limit = 7;
759 s->state = MC_STREAM_STATE_join_in_progress;
760 if (! s->peer_index_by_id.hash)
761 mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
763 /* If we don't hear from someone in 5 seconds, we're alone */
764 s->join_timeout = vlib_time_now (vm) + 5.0;
765 mcm->joins_in_progress++;
767 if (MC_EVENT_LOGGING > 0)
769 ELOG_TYPE_DECLARE (e) = {
770 .format = "stream index %d join request %s",
771 .format_args = "i4s16",
773 struct { u32 stream_index; char name[16]; } * ed;
774 ed = ELOG_DATA (mcm->elog_main, e);
775 ed->stream_index = s->index;
776 elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
779 send_join_or_leave_request (mcm, s->index, 1 /* join */);
781 vlib_current_process_wait_for_one_time_event_vector
782 (vm, &s->procs_waiting_for_join_done);
784 if (MC_EVENT_LOGGING)
786 ELOG_TYPE (e, "join complete stream %d");
787 ELOG (mcm->elog_main, e, s->index);
793 u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
794 { return mc_stream_join_helper (mcm, config, /* is_internal */ 0); }
796 void mc_stream_leave (mc_main_t * mcm, u32 stream_index)
798 mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
803 if (MC_EVENT_LOGGING)
805 ELOG_TYPE_DECLARE (t) = {
806 .format = "leave-stream: %d",
809 struct { u32 index; } * ed;
810 ed = ELOG_DATA (mcm->elog_main, t);
811 ed->index = stream_index;
814 send_join_or_leave_request (mcm, stream_index, 0 /* is_join */);
816 s->state = MC_STREAM_STATE_name_known;
819 void mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
820 mc_msg_join_or_leave_request_t * req,
824 mc_msg_join_reply_t * rep;
827 mc_byte_swap_msg_join_or_leave_request (req);
829 s = mc_stream_by_index (mcm, req->stream_index);
830 if (! s || s->state != MC_STREAM_STATE_ready)
833 /* If the peer is joining, create it */
836 mc_stream_t * this_s;
838 /* We're not in a position to catch up a peer until all
839 stream joins are complete. */
842 /* XXX This is hard to test so we've. */
843 vec_foreach (this_s, mcm->stream_vector)
845 if (this_s->state != MC_STREAM_STATE_ready
846 && this_s->state != MC_STREAM_STATE_name_known)
851 if (mcm->joins_in_progress > 0)
854 (void) get_or_create_peer_with_id (mcm,
859 rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
860 memset (rep, 0, sizeof (rep[0]));
861 rep->type = MC_MSG_TYPE_join_reply;
862 rep->stream_index = req->stream_index;
864 mc_byte_swap_msg_join_reply (rep);
865 /* These two are already in network byte order... */
866 rep->peer_id = mcm->transport.our_ack_peer_id;
867 rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
869 mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
873 if (s->config.peer_died)
874 s->config.peer_died (mcm, s, req->peer_id);
878 void mc_msg_join_reply_handler (mc_main_t * mcm,
879 mc_msg_join_reply_t * mp,
884 mc_byte_swap_msg_join_reply (mp);
886 s = mc_stream_by_index (mcm, mp->stream_index);
888 if (! s || s->state != MC_STREAM_STATE_join_in_progress)
891 /* Switch to catchup state; next join reply
892 for this stream will be ignored. */
893 s->state = MC_STREAM_STATE_catchup;
895 mcm->joins_in_progress--;
896 mcm->transport.catchup_request_fun (mcm->transport.opaque,
898 mp->catchup_peer_id);
901 void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name)
907 s = mc_stream_by_name (m, stream_name);
910 vlib_process_suspend (m->vlib_main, .1);
913 /* It's OK to send a message in catchup and ready states. */
914 if (s->state == MC_STREAM_STATE_catchup
915 || s->state == MC_STREAM_STATE_ready)
918 /* Otherwise we are waiting for a join to finish. */
919 vlib_current_process_wait_for_one_time_event_vector
920 (m->vlib_main, &s->procs_waiting_for_join_done);
923 u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
925 mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
926 vlib_main_t * vm = mcm->vlib_main;
928 mc_msg_user_request_t * mp;
929 vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
935 if (s->state != MC_STREAM_STATE_ready)
936 vlib_current_process_wait_for_one_time_event_vector
937 (vm, &s->procs_waiting_for_join_done);
939 while (pool_elts (s->retry_pool) >= s->config.window_size)
941 vlib_current_process_wait_for_one_time_event_vector
942 (vm, &s->procs_waiting_for_open_window);
945 pool_get (s->retry_pool, r);
946 ri = r - s->retry_pool;
948 r->prev_index = s->retry_tail_index;
950 s->retry_tail_index = ri;
952 if (r->prev_index == ~0)
953 s->retry_head_index = ri;
956 mc_retry_t * p = pool_elt_at_index (s->retry_pool, r->prev_index);
960 vlib_buffer_advance (b, -sizeof (mp[0]));
961 mp = vlib_buffer_get_current (b);
963 mp->peer_id = mcm->transport.our_ack_peer_id;
964 /* mp->transport.global_sequence set by relay agent. */
965 mp->global_sequence = 0xdeadbeef;
966 mp->stream_index = s->index;
967 mp->local_sequence = s->our_local_sequence++;
968 mp->n_data_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
970 r->buffer_index = buffer_index;
971 r->local_sequence = mp->local_sequence;
972 r->sent_at = vlib_time_now(vm);
975 /* Retry will be freed when all currently known peers have acked. */
976 vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
977 vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
979 hash_set (s->retry_index_by_local_sequence, r->local_sequence, r - s->retry_pool);
981 elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
983 mc_byte_swap_msg_user_request (mp);
985 mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
987 s->user_requests_sent++;
989 /* return amount of window remaining */
990 return s->config.window_size - pool_elts (s->retry_pool);
993 void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u32 buffer_index)
995 vlib_main_t * vm = mcm->vlib_main;
997 mc_stream_peer_t * peer;
1001 mc_byte_swap_msg_user_request (mp);
1003 s = mc_stream_by_index (mcm, mp->stream_index);
1005 /* Not signed up for this stream? Turf-o-matic */
1006 if (! s || s->state != MC_STREAM_STATE_ready)
1008 vlib_buffer_free_one (vm, buffer_index);
1012 /* Find peer, including ourselves. */
1013 peer = get_or_create_peer_with_id (mcm,
1017 seq_cmp_result = mc_seq_cmp (mp->local_sequence,
1018 peer->last_sequence_received + 1);
1020 if (MC_EVENT_LOGGING > 0)
1022 ELOG_TYPE_DECLARE (e) = {
1023 .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1024 .format_args = "T4i4i4i4",
1026 struct { u32 peer, stream_index, rx_sequence; i32 seq_cmp_result; } * ed;
1027 ed = ELOG_DATA (mcm->elog_main, e);
1028 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1029 ed->stream_index = mp->stream_index;
1030 ed->rx_sequence = mp->local_sequence;
1031 ed->seq_cmp_result = seq_cmp_result;
1034 if (0 && mp->stream_index == 1 && once == 0)
1037 ELOG_TYPE (e, "FAKE lost msg on stream 1");
1038 ELOG (mcm->elog_main,e,0);
1042 peer->last_sequence_received += seq_cmp_result == 0;
1043 s->user_requests_received++;
1045 if (seq_cmp_result > 0)
1046 peer->stats.n_msgs_from_future += 1;
1048 /* Send ack even if msg from future */
1051 mc_msg_user_ack_t * rp;
1054 rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
1055 rp->peer_id = mcm->transport.our_ack_peer_id;
1056 rp->stream_index = s->index;
1057 rp->local_sequence = mp->local_sequence;
1058 rp->seq_cmp_result = seq_cmp_result;
1060 if (MC_EVENT_LOGGING > 0)
1062 ELOG_TYPE_DECLARE (e) = {
1063 .format = "tx-ack: stream %d local seq %d",
1064 .format_args = "i4i4",
1066 struct { u32 stream_index; u32 local_sequence; } * ed;
1067 ed = ELOG_DATA (mcm->elog_main, e);
1068 ed->stream_index = rp->stream_index;
1069 ed->local_sequence = rp->local_sequence;
1072 mc_byte_swap_msg_user_ack (rp);
1074 mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
1075 /* Msg from past? If so, free the buffer... */
1076 if (seq_cmp_result < 0)
1078 vlib_buffer_free_one (vm, buffer_index);
1079 peer->stats.n_msgs_from_past += 1;
1083 if (seq_cmp_result == 0)
1085 vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
1088 case MC_STREAM_STATE_ready:
1089 vlib_buffer_advance (b, sizeof (mp[0]));
1090 s->config.rx_buffer(mcm, s, mp->peer_id, buffer_index);
1092 /* Stream vector can change address via rx callback for mc-internal
1094 s = mc_stream_by_index (mcm, mp->stream_index);
1096 s->last_global_sequence_processed = mp->global_sequence;
1099 case MC_STREAM_STATE_catchup:
1100 clib_fifo_add1 (s->catchup_fifo, buffer_index);
1104 clib_warning ("stream in unknown state %U",
1105 format_mc_stream_state, s->state);
1111 void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffer_index)
1113 vlib_main_t * vm = mcm->vlib_main;
1116 mc_stream_peer_t * peer;
1118 int peer_created = 0;
1120 mc_byte_swap_msg_user_ack (mp);
1122 s = mc_stream_by_index (mcm, mp->stream_index);
1124 if (MC_EVENT_LOGGING > 0)
1126 ELOG_TYPE_DECLARE (t) = {
1127 .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
1128 .format_args = "i4T4i4",
1130 struct { u32 local_sequence; u32 peer; i32 seq_cmp_result;} * ed;
1131 ed = ELOG_DATA (mcm->elog_main, t);
1132 ed->local_sequence = mp->local_sequence;
1133 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1134 ed->seq_cmp_result = mp->seq_cmp_result;
1137 /* Unknown stream? */
1141 /* Find the peer which just ack'ed. */
1142 peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1143 /* created */ &peer_created);
1146 * Peer reports message from the future. If it's not in the retry
1147 * fifo, look for a retired message.
1149 if (mp->seq_cmp_result > 0)
1151 p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
1152 mp->seq_cmp_result);
1154 mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
1156 /* Normal retry should fix it... */
1161 * Pointer to the indicated retry fifo entry.
1162 * Worth hashing because we could use a window size of 100 or 1000.
1164 p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
1167 * Is this a duplicate ACK, received after we've retired the
1168 * fifo entry. This can happen when learning about new
1173 if (MC_EVENT_LOGGING > 0)
1175 ELOG_TYPE_DECLARE (t) =
1177 .format = "ack: for seq %d from peer %s no fifo elt",
1178 .format_args = "i4T4",
1180 struct { u32 seq; u32 peer; } * ed;
1181 ed = ELOG_DATA (mcm->elog_main, t);
1182 ed->seq = mp->local_sequence;
1183 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1189 r = pool_elt_at_index (s->retry_pool, p[0]);
1191 /* Make sure that this new peer ACKs our msgs from now on */
1194 mc_retry_t *later_retry = next_retry (s, r);
1198 later_retry->unacked_by_peer_bitmap =
1199 clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
1201 later_retry = next_retry (s, later_retry);
1205 ASSERT (mp->local_sequence == r->local_sequence);
1207 /* If we weren't expecting to hear from this peer */
1208 if (!peer_created &&
1209 ! clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
1211 if (MC_EVENT_LOGGING > 0)
1213 ELOG_TYPE_DECLARE (t) =
1215 .format = "dup-ack: for seq %d from peer %s",
1216 .format_args = "i4T4",
1218 struct { u32 seq; u32 peer; } * ed;
1219 ed = ELOG_DATA (mcm->elog_main, t);
1220 ed->seq = r->local_sequence;
1221 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1223 if (! clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1227 if (MC_EVENT_LOGGING > 0)
1229 ELOG_TYPE_DECLARE (t) =
1231 .format = "ack: for seq %d from peer %s",
1232 .format_args = "i4T4",
1234 struct { u32 seq; u32 peer; } * ed;
1235 ed = ELOG_DATA (mcm->elog_main, t);
1236 ed->seq = mp->local_sequence;
1237 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1240 r->unacked_by_peer_bitmap =
1241 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
1243 /* Not all clients have ack'ed */
1244 if (! clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1248 if (MC_EVENT_LOGGING > 0)
1250 ELOG_TYPE_DECLARE (t) =
1252 .format = "ack: retire fifo elt loc seq %d after %d acks",
1253 .format_args = "i4i4",
1255 struct { u32 seq; u32 npeers; } * ed;
1256 ed = ELOG_DATA (mcm->elog_main, t);
1257 ed->seq = r->local_sequence;
1258 ed->npeers = pool_elts (s->peers);
1261 hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
1262 mc_retry_free (mcm, s, r);
1263 remove_retry_from_pool (s, r);
1264 maybe_send_window_open_event (vm, s);
1267 #define EVENT_MC_SEND_CATCHUP_DATA 0
1270 mc_catchup_process (vlib_main_t * vm,
1271 vlib_node_runtime_t * node,
1274 mc_main_t * mcm = mc_node_get_main (node);
1275 uword *event_data = 0;
1276 mc_catchup_process_arg_t * args;
1282 _vec_len(event_data) = 0;
1283 vlib_process_wait_for_event_with_type (vm, &event_data, EVENT_MC_SEND_CATCHUP_DATA);
1285 for (i = 0; i < vec_len(event_data); i++)
1287 args = pool_elt_at_index (mcm->catchup_process_args,
1290 mcm->transport.catchup_send_fun (mcm->transport.opaque,
1291 args->catchup_opaque,
1292 args->catchup_snapshot);
1294 /* Send function will free snapshot data vector. */
1295 pool_put (mcm->catchup_process_args, args);
1299 return 0; /* not likely */
1302 static void serialize_mc_stream (serialize_main_t * m, va_list * va)
1304 mc_stream_t * s = va_arg (*va, mc_stream_t *);
1305 mc_stream_peer_t * p;
1307 serialize_integer (m, pool_elts (s->peers), sizeof (u32));
1308 pool_foreach (p, s->peers, ({
1309 u8 * x = serialize_get (m, sizeof (p->id));
1310 clib_memcpy (x, p->id.as_u8, sizeof (p->id));
1311 serialize_integer (m, p->last_sequence_received,
1312 sizeof (p->last_sequence_received));
1314 serialize_bitmap (m, s->all_peer_bitmap);
1317 void unserialize_mc_stream (serialize_main_t * m, va_list * va)
1319 mc_stream_t * s = va_arg (*va, mc_stream_t *);
1321 mc_stream_peer_t * p;
1323 unserialize_integer (m, &n_peers, sizeof (u32));
1324 mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
1325 for (i = 0; i < n_peers; i++)
1328 pool_get (s->peers, p);
1329 x = unserialize_get (m, sizeof (p->id));
1330 clib_memcpy (p->id.as_u8, x, sizeof (p->id));
1331 unserialize_integer (m, &p->last_sequence_received, sizeof (p->last_sequence_received));
1332 mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ 0);
1334 s->all_peer_bitmap = unserialize_bitmap (m);
1336 /* This is really bad. */
1337 if (!s->all_peer_bitmap)
1338 clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
1341 void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t * req, u32 catchup_opaque)
1343 vlib_main_t * vm = mcm->vlib_main;
1345 mc_catchup_process_arg_t * args;
1347 mc_byte_swap_msg_catchup_request (req);
1349 s = mc_stream_by_index (mcm, req->stream_index);
1350 if (! s || s->state != MC_STREAM_STATE_ready)
1353 if (MC_EVENT_LOGGING > 0)
1355 ELOG_TYPE_DECLARE (t) =
1357 .format = "catchup-request: from %s stream %d",
1358 .format_args = "T4i4",
1360 struct { u32 peer, stream; } * ed;
1361 ed = ELOG_DATA (mcm->elog_main, t);
1362 ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
1363 ed->stream = req->stream_index;
1367 * The application has to snapshoot its data structures right
1368 * here, right now. If we process any messages after
1369 * noting the last global sequence we've processed, the client
1370 * won't be able to accurately reconstruct our data structures.
1372 * Once the data structures are e.g. vec_dup()'ed, we
1373 * send the resulting messages from a separate process, to
1374 * make sure that we don't cause a bunch of message retransmissions
1376 pool_get (mcm->catchup_process_args, args);
1378 args->stream_index = s - mcm->stream_vector;
1379 args->catchup_opaque = catchup_opaque;
1380 args->catchup_snapshot = 0;
1382 /* Construct catchup reply and snapshot state for stream to send as
1383 catchup reply payload. */
1385 mc_msg_catchup_reply_t * rep;
1388 vec_resize (args->catchup_snapshot, sizeof (rep[0]));
1390 rep = (void *) args->catchup_snapshot;
1392 rep->peer_id = req->peer_id;
1393 rep->stream_index = req->stream_index;
1394 rep->last_global_sequence_included = s->last_global_sequence_processed;
1396 /* Setup for serialize to append to catchup snapshot. */
1397 serialize_open_vector (&m, args->catchup_snapshot);
1398 m.stream.current_buffer_index = vec_len (m.stream.buffer);
1400 serialize (&m, serialize_mc_stream, s);
1402 args->catchup_snapshot = serialize_close_vector (&m);
1404 /* Actually copy internal state */
1405 args->catchup_snapshot = s->config.catchup_snapshot
1407 args->catchup_snapshot,
1408 rep->last_global_sequence_included);
1410 rep = (void *) args->catchup_snapshot;
1411 rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
1413 mc_byte_swap_msg_catchup_reply (rep);
1416 /* now go send it... */
1417 vlib_process_signal_event (vm, mcm->catchup_process,
1418 EVENT_MC_SEND_CATCHUP_DATA,
1419 args - mcm->catchup_process_args);
1422 #define EVENT_MC_UNSERIALIZE_BUFFER 0
1423 #define EVENT_MC_UNSERIALIZE_CATCHUP 1
1425 void mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, u32 catchup_opaque)
1427 vlib_process_signal_event (mcm->vlib_main,
1428 mcm->unserialize_process,
1429 EVENT_MC_UNSERIALIZE_CATCHUP,
1430 pointer_to_uword (mp));
1433 static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
1438 mc_byte_swap_msg_catchup_reply (mp);
1440 s = mc_stream_by_index (mcm, mp->stream_index);
1442 /* Never heard of this stream or already caught up. */
1443 if (! s || s->state == MC_STREAM_STATE_ready)
1448 mc_stream_peer_t * p;
1451 /* For offline sim replay: save the entire catchup snapshot... */
1452 if (s->config.save_snapshot)
1453 s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, mp->n_data_bytes);
1455 unserialize_open_data (&m, mp->data, mp->n_data_bytes);
1456 unserialize (&m, unserialize_mc_stream, s);
1458 /* Make sure we start numbering our messages as expected */
1459 pool_foreach (p, s->peers, ({
1460 if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1461 s->our_local_sequence = p->last_sequence_received + 1;
1464 n_stream_bytes = m.stream.current_buffer_index;
1466 /* No need to unserialize close; nothing to free. */
1468 /* After serialized stream is user's catchup data. */
1469 s->config.catchup (mcm, mp->data + n_stream_bytes,
1470 mp->n_data_bytes - n_stream_bytes);
1473 /* Vector could have been moved by catchup.
1474 This can only happen for mc-internal stream. */
1475 s = mc_stream_by_index (mcm, mp->stream_index);
1477 s->last_global_sequence_processed = mp->last_global_sequence_included;
1479 while (clib_fifo_elts (s->catchup_fifo))
1481 mc_msg_user_request_t * gp;
1485 clib_fifo_sub1(s->catchup_fifo, bi);
1487 b = vlib_get_buffer (mcm->vlib_main, bi);
1488 gp = vlib_buffer_get_current (b);
1490 /* Make sure we're replaying "new" news */
1491 seq_cmp_result = mc_seq_cmp (gp->global_sequence,
1492 mp->last_global_sequence_included);
1494 if (seq_cmp_result > 0)
1496 vlib_buffer_advance (b, sizeof (gp[0]));
1497 s->config.rx_buffer (mcm, s, gp->peer_id, bi);
1498 s->last_global_sequence_processed = gp->global_sequence;
1500 if (MC_EVENT_LOGGING)
1502 ELOG_TYPE_DECLARE (t) = {
1503 .format = "catchup replay local sequence 0x%x",
1504 .format_args = "i4",
1506 struct { u32 local_sequence; } * ed;
1507 ed = ELOG_DATA (mcm->elog_main, t);
1508 ed->local_sequence = gp->local_sequence;
1513 if (MC_EVENT_LOGGING)
1515 ELOG_TYPE_DECLARE (t) = {
1516 .format = "catchup discard local sequence 0x%x",
1517 .format_args = "i4",
1519 struct { u32 local_sequence; } * ed;
1520 ed = ELOG_DATA (mcm->elog_main, t);
1521 ed->local_sequence = gp->local_sequence;
1524 vlib_buffer_free_one (mcm->vlib_main, bi);
1528 s->state = MC_STREAM_STATE_ready;
1530 /* Now that we are caught up wake up joining process. */
1532 vlib_one_time_waiting_process_t * wp;
1533 vec_foreach (wp, s->procs_waiting_for_join_done)
1534 vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
1535 if (s->procs_waiting_for_join_done)
1536 _vec_len (s->procs_waiting_for_join_done) = 0;
1540 static void this_node_maybe_master (mc_main_t * mcm)
1542 vlib_main_t * vm = mcm->vlib_main;
1543 mc_msg_master_assert_t * mp;
1546 int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
1547 clib_error_t * error;
1548 f64 now, time_last_master_assert = -1;
1553 if (! mcm->we_can_be_relay_master)
1555 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1556 if (MC_EVENT_LOGGING)
1558 ELOG_TYPE (e, "become slave (config)");
1559 ELOG (mcm->elog_main, e, 0);
1564 now = vlib_time_now (vm);
1565 if (now >= time_last_master_assert + 1)
1567 time_last_master_assert = now;
1568 mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
1570 mp->peer_id = mcm->transport.our_ack_peer_id;
1571 mp->global_sequence = mcm->relay_global_sequence;
1574 * these messages clog the event log, set MC_EVENT_LOGGING higher
1577 if (MC_EVENT_LOGGING > 1)
1579 ELOG_TYPE_DECLARE (e) = {
1580 .format = "tx-massert: peer %s global seq %u",
1581 .format_args = "T4i4",
1583 struct { u32 peer, global_sequence; } * ed;
1584 ed = ELOG_DATA (mcm->elog_main, e);
1585 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1586 ed->global_sequence = mp->global_sequence;
1589 mc_byte_swap_msg_master_assert (mp);
1591 error = mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_MASTERSHIP, bi);
1593 clib_error_report (error);
1596 vlib_process_wait_for_event_or_clock (vm, 1.0);
1597 event_type = vlib_process_get_events (vm, /* no event data */ 0);
1602 if (! is_master && timeouts++ > 2)
1604 mcm->relay_state = MC_RELAY_STATE_MASTER;
1605 mcm->relay_master_peer_id = mcm->transport.our_ack_peer_id.as_u64;
1606 if (MC_EVENT_LOGGING)
1608 ELOG_TYPE (e, "become master (was maybe_master)");
1609 ELOG (mcm->elog_main, e, 0);
1615 case MC_RELAY_STATE_SLAVE:
1616 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1617 if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
1619 ELOG_TYPE (e, "become slave (was maybe_master)");
1620 ELOG (mcm->elog_main, e, 0);
1627 static void this_node_slave (mc_main_t * mcm)
1629 vlib_main_t * vm = mcm->vlib_main;
1633 if (MC_EVENT_LOGGING)
1635 ELOG_TYPE (e, "become slave");
1636 ELOG (mcm->elog_main, e, 0);
1641 vlib_process_wait_for_event_or_clock (vm, 1.0);
1642 event_type = vlib_process_get_events (vm, /* no event data */ 0);
1649 mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
1650 mcm->relay_master_peer_id = ~0ULL;
1651 if (MC_EVENT_LOGGING)
1653 ELOG_TYPE (e, "timeouts; negoitate mastership");
1654 ELOG (mcm->elog_main, e, 0);
1660 case MC_RELAY_STATE_SLAVE:
1661 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1669 mc_mastership_process (vlib_main_t * vm,
1670 vlib_node_runtime_t * node,
1673 mc_main_t * mcm = mc_node_get_main (node);
1677 switch (mcm->relay_state)
1679 case MC_RELAY_STATE_NEGOTIATE:
1680 case MC_RELAY_STATE_MASTER:
1681 this_node_maybe_master(mcm);
1684 case MC_RELAY_STATE_SLAVE:
1685 this_node_slave (mcm);
1689 return 0; /* not likely */
1692 void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
1694 if (we_can_be_master != mcm->we_can_be_relay_master)
1696 mcm->we_can_be_relay_master = we_can_be_master;
1697 vlib_process_signal_event (mcm->vlib_main,
1698 mcm->mastership_process,
1699 MC_RELAY_STATE_NEGOTIATE, 0);
1703 void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, u32 buffer_index)
1705 mc_peer_id_t his_peer_id, our_peer_id;
1707 u8 signal_slave = 0;
1708 u8 update_global_sequence = 0;
1710 mc_byte_swap_msg_master_assert (mp);
1712 his_peer_id = mp->peer_id;
1713 our_peer_id = mcm->transport.our_ack_peer_id;
1715 /* compare the incoming global sequence with ours */
1716 seq_cmp_result = mc_seq_cmp (mp->global_sequence,
1717 mcm->relay_global_sequence);
1719 /* If the sender has a lower peer id and the sender's sequence >=
1720 our global sequence, we become a slave. Otherwise we are master. */
1721 if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 && seq_cmp_result >= 0)
1723 vlib_process_signal_event (mcm->vlib_main,
1724 mcm->mastership_process,
1725 MC_RELAY_STATE_SLAVE, 0);
1729 /* Update our global sequence. */
1730 if (seq_cmp_result > 0)
1732 mcm->relay_global_sequence = mp->global_sequence;
1733 update_global_sequence = 1;
1737 uword * q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
1738 mc_mastership_peer_t * p;
1741 p = vec_elt_at_index (mcm->mastership_peers, q[0]);
1744 vec_add2 (mcm->mastership_peers, p, 1);
1745 p->peer_id = his_peer_id;
1746 mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, p - mcm->mastership_peers,
1749 p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
1753 * these messages clog the event log, set MC_EVENT_LOGGING higher
1756 if (MC_EVENT_LOGGING > 1)
1758 ELOG_TYPE_DECLARE (e) = {
1759 .format = "rx-massert: peer %s global seq %u upd %d slave %d",
1760 .format_args = "T4i4i1i1",
1764 u32 global_sequence;
1768 ed = ELOG_DATA (mcm->elog_main, e);
1769 ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
1770 ed->global_sequence = mp->global_sequence;
1771 ed->update_sequence = update_global_sequence;
1772 ed->slave = signal_slave;
1777 mc_serialize_init (mc_main_t * mcm)
1779 mc_serialize_msg_t * m;
1780 vlib_main_t * vm = vlib_get_main();
1782 mcm->global_msg_index_by_name
1783 = hash_create_string (/* elts */ 0, sizeof (uword));
1785 m = vm->mc_msg_registrations;
1789 m->global_index = vec_len (mcm->global_msgs);
1790 hash_set_mem (mcm->global_msg_index_by_name,
1793 vec_add1 (mcm->global_msgs, m);
1794 m = m->next_registration;
1799 mc_serialize_va (mc_main_t * mc,
1801 u32 multiple_messages_per_vlib_buffer,
1802 mc_serialize_msg_t * msg,
1806 clib_error_t * error;
1807 serialize_main_t * m = &mc->serialize_mains[VLIB_TX];
1808 vlib_serialize_buffer_main_t * sbm = &mc->serialize_buffer_mains[VLIB_TX];
1809 u32 bi, n_before, n_after, n_total, n_this_msg;
1812 if (! sbm->vlib_main)
1814 sbm->tx.max_n_data_bytes_per_chain = 4096;
1815 sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
1818 if (sbm->first_buffer == 0)
1819 serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
1821 n_before = serialize_vlib_buffer_n_bytes (m);
1823 s = mc_stream_by_index (mc, stream_index);
1824 gi = msg->global_index;
1825 ASSERT (msg == vec_elt (mc->global_msgs, gi));
1828 if (gi < vec_len (s->stream_msg_index_by_global_index))
1829 si = s->stream_msg_index_by_global_index[gi];
1831 serialize_likely_small_unsigned_integer (m, si);
1833 /* For first time message is sent, use name to identify message. */
1834 if (si == ~0 || MSG_ID_DEBUG)
1835 serialize_cstring (m, msg->name);
1837 if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
1839 ELOG_TYPE_DECLARE (e) = {
1840 .format = "serialize-msg: %s index %d",
1841 .format_args = "T4i4",
1843 struct { u32 c[2]; } * ed;
1844 ed = ELOG_DATA (mc->elog_main, e);
1845 ed->c[0] = elog_id_for_msg_name (mc, msg->name);
1849 error = va_serialize (m, va);
1851 n_after = serialize_vlib_buffer_n_bytes (m);
1852 n_this_msg = n_after - n_before;
1853 n_total = n_after + sizeof (mc_msg_user_request_t);
1855 /* For max message size ignore first message where string name is sent. */
1857 msg->max_n_bytes_serialized = clib_max (msg->max_n_bytes_serialized, n_this_msg);
1859 if (! multiple_messages_per_vlib_buffer
1861 || n_total + msg->max_n_bytes_serialized > mc->transport.max_packet_size)
1863 bi = serialize_close_vlib_buffer (m);
1864 sbm->first_buffer = 0;
1866 mc_stream_send (mc, stream_index, bi);
1868 vlib_buffer_free_one (mc->vlib_main, bi);
1875 mc_serialize_internal (mc_main_t * mc,
1877 u32 multiple_messages_per_vlib_buffer,
1878 mc_serialize_msg_t * msg,
1881 vlib_main_t * vm = mc->vlib_main;
1883 clib_error_t * error;
1885 if (stream_index == ~0)
1887 if (vm->mc_main && vm->mc_stream_index == ~0)
1888 vlib_current_process_wait_for_one_time_event_vector
1889 (vm, &vm->procs_waiting_for_mc_stream_join);
1890 stream_index = vm->mc_stream_index;
1894 error = mc_serialize_va (mc, stream_index,
1895 multiple_messages_per_vlib_buffer,
1901 uword mc_unserialize_message (mc_main_t * mcm,
1903 serialize_main_t * m)
1905 mc_serialize_stream_msg_t * sm;
1908 si = unserialize_likely_small_unsigned_integer (m);
1910 if (! (si == ~0 || MSG_ID_DEBUG))
1912 sm = vec_elt_at_index (s->stream_msgs, si);
1913 gi = sm->global_index;
1919 unserialize_cstring (m, &name);
1921 if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
1923 ELOG_TYPE_DECLARE (e) = {
1924 .format = "unserialize-msg: %s rx index %d",
1925 .format_args = "T4i4",
1927 struct { u32 c[2]; } * ed;
1928 ed = ELOG_DATA (mcm->elog_main, e);
1929 ed->c[0] = elog_id_for_msg_name (mcm, name);
1934 uword * p = hash_get_mem (mcm->global_msg_index_by_name, name);
1938 /* Unknown message? */
1945 vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
1946 si = s->stream_msg_index_by_global_index[gi];
1948 /* Stream local index unknown? Create it. */
1951 vec_add2 (s->stream_msgs, sm, 1);
1953 si = sm - s->stream_msgs;
1954 sm->global_index = gi;
1955 s->stream_msg_index_by_global_index[gi] = si;
1957 if (MC_EVENT_LOGGING > 0)
1959 ELOG_TYPE_DECLARE (e) = {
1960 .format = "msg-bind: stream %d %s to index %d",
1961 .format_args = "i4T4i4",
1963 struct { u32 c[3]; } * ed;
1964 ed = ELOG_DATA (mcm->elog_main, e);
1965 ed->c[0] = s->index;
1966 ed->c[1] = elog_id_for_msg_name (mcm, name);
1972 sm = vec_elt_at_index (s->stream_msgs, si);
1973 if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
1975 ELOG_TYPE_DECLARE (e) = {
1976 .format = "msg-id-ERROR: %s index %d expected %d",
1977 .format_args = "T4i4i4",
1979 struct { u32 c[3]; } * ed;
1980 ed = ELOG_DATA (mcm->elog_main, e);
1981 ed->c[0] = elog_id_for_msg_name (mcm, name);
1984 if (sm->global_index < vec_len (s->stream_msg_index_by_global_index))
1985 ed->c[2] = s->stream_msg_index_by_global_index[sm->global_index];
1994 mc_serialize_msg_t * msg;
1995 msg = vec_elt (mcm->global_msgs, gi);
1996 unserialize (m, msg->unserialize, mcm);
2004 mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
2006 vlib_main_t * vm = mcm->vlib_main;
2007 serialize_main_t * m = &mcm->serialize_mains[VLIB_RX];
2008 vlib_serialize_buffer_main_t * sbm = &mcm->serialize_buffer_mains[VLIB_RX];
2009 mc_stream_and_buffer_t * sb;
2010 mc_stream_t * stream;
2013 sb = pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, stream_and_buffer_index);
2014 buffer_index = sb->buffer_index;
2015 stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
2016 pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
2018 if (stream->config.save_snapshot)
2020 u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
2021 static u8 * contents;
2022 vec_reset_length (contents);
2023 vec_validate (contents, n_bytes - 1);
2024 vlib_buffer_contents (vm, buffer_index, contents);
2025 stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, n_bytes);
2028 ASSERT (vlib_in_process_context (vm));
2030 unserialize_open_vlib_buffer (m, vm, sbm);
2032 clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
2034 while (unserialize_vlib_buffer_n_bytes (m) > 0)
2035 mc_unserialize_message (mcm, stream, m);
2038 unserialize_close_vlib_buffer (m);
2042 mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
2044 vlib_main_t * vm = mcm->vlib_main;
2045 mc_stream_and_buffer_t * sb;
2046 pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
2047 sb->stream_index = s->index;
2048 sb->buffer_index = buffer_index;
2049 vlib_process_signal_event (vm, mcm->unserialize_process,
2050 EVENT_MC_UNSERIALIZE_BUFFER, sb - mcm->mc_unserialize_stream_and_buffers);
2054 mc_unserialize_process (vlib_main_t * vm,
2055 vlib_node_runtime_t * node,
2058 mc_main_t * mcm = mc_node_get_main (node);
2059 uword event_type, * event_data = 0;
2065 _vec_len(event_data) = 0;
2067 vlib_process_wait_for_event (vm);
2068 event_type = vlib_process_get_events (vm, &event_data);
2071 case EVENT_MC_UNSERIALIZE_BUFFER:
2072 for (i = 0; i < vec_len (event_data); i++)
2073 mc_unserialize_internal (mcm, event_data[i]);
2076 case EVENT_MC_UNSERIALIZE_CATCHUP:
2077 for (i = 0; i < vec_len (event_data); i++)
2079 u8 * mp = uword_to_pointer (event_data[i], u8 *);
2080 perform_catchup (mcm, (void *) mp);
2090 return 0; /* not likely */
2093 void serialize_mc_main (serialize_main_t * m, va_list * va)
2095 mc_main_t * mcm = va_arg (*va, mc_main_t *);
2097 mc_serialize_stream_msg_t * sm;
2098 mc_serialize_msg_t * msg;
2100 serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
2101 vec_foreach (s, mcm->stream_vector)
2104 serialize_cstring (m, s->config.name);
2106 /* Serialize global names for all sent messages. */
2107 serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
2108 vec_foreach (sm, s->stream_msgs)
2110 msg = vec_elt (mcm->global_msgs, sm->global_index);
2111 serialize_cstring (m, msg->name);
2116 void unserialize_mc_main (serialize_main_t * m, va_list * va)
2118 mc_main_t * mcm = va_arg (*va, mc_main_t *);
2119 u32 i, n_streams, n_stream_msgs;
2122 mc_serialize_stream_msg_t * sm;
2124 unserialize_integer (m, &n_streams, sizeof (u32));
2125 for (i = 0; i < n_streams; i++)
2127 unserialize_cstring (m, &name);
2128 if (i != MC_STREAM_INDEX_INTERNAL
2129 && ! mc_stream_by_name (mcm, name))
2131 vec_validate (mcm->stream_vector, i);
2132 s = vec_elt_at_index (mcm->stream_vector, i);
2134 s->index = s - mcm->stream_vector;
2135 s->config.name = name;
2136 s->state = MC_STREAM_STATE_name_known;
2137 hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
2142 s = vec_elt_at_index (mcm->stream_vector, i);
2144 vec_free (s->stream_msgs);
2145 vec_free (s->stream_msg_index_by_global_index);
2147 unserialize_integer (m, &n_stream_msgs, sizeof (u32));
2148 vec_resize (s->stream_msgs, n_stream_msgs);
2149 vec_foreach (sm, s->stream_msgs)
2154 unserialize_cstring (m, &name);
2155 p = hash_get (mcm->global_msg_index_by_name, name);
2157 si = sm - s->stream_msgs;
2159 if (MC_EVENT_LOGGING > 0)
2161 ELOG_TYPE_DECLARE (e) = {
2162 .format = "catchup-bind: %s to %d global index %d stream %d",
2163 .format_args = "T4i4i4i4",
2165 struct { u32 c[4]; } * ed;
2166 ed = ELOG_DATA (mcm->elog_main, e);
2167 ed->c[0] = elog_id_for_msg_name (mcm, name);
2170 ed->c[3] = s->index;
2175 sm->global_index = gi;
2178 vec_validate_init_empty (s->stream_msg_index_by_global_index,
2180 s->stream_msg_index_by_global_index[gi] = si;
2186 void mc_main_init (mc_main_t * mcm, char * tag)
2188 vlib_main_t * vm = vlib_get_main();
2190 mcm->vlib_main = vm;
2191 mcm->elog_main = &vm->elog_main;
2193 mcm->relay_master_peer_id = ~0ULL;
2194 mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
2196 mcm->stream_index_by_name
2197 = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
2200 vlib_node_registration_t r;
2202 memset (&r, 0, sizeof (r));
2204 r.type = VLIB_NODE_TYPE_PROCESS;
2206 /* Point runtime data to main instance. */
2207 r.runtime_data = &mcm;
2208 r.runtime_data_bytes = sizeof (&mcm);
2210 r.name = (char *) format (0, "mc-mastership-%s", tag);
2211 r.function = mc_mastership_process;
2212 mcm->mastership_process = vlib_register_node (vm, &r);
2214 r.name = (char *) format (0, "mc-join-ager-%s", tag);
2215 r.function = mc_join_ager_process;
2216 mcm->join_ager_process = vlib_register_node (vm, &r);
2218 r.name = (char *) format (0, "mc-retry-%s", tag);
2219 r.function = mc_retry_process;
2220 mcm->retry_process = vlib_register_node (vm, &r);
2222 r.name = (char *) format (0, "mc-catchup-%s", tag);
2223 r.function = mc_catchup_process;
2224 mcm->catchup_process = vlib_register_node (vm, &r);
2226 r.name = (char *) format (0, "mc-unserialize-%s", tag);
2227 r.function = mc_unserialize_process;
2228 mcm->unserialize_process = vlib_register_node (vm, &r);
2231 if (MC_EVENT_LOGGING > 0)
2232 mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), sizeof (mc_peer_id_t));
2234 mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
2235 mc_serialize_init (mcm);
2238 static u8 * format_mc_relay_state (u8 * s, va_list * args)
2240 mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
2244 case MC_RELAY_STATE_NEGOTIATE:
2247 case MC_RELAY_STATE_MASTER:
2250 case MC_RELAY_STATE_SLAVE:
2254 return format (s, "unknown 0x%x", state);
2257 return format (s, "%s", t);
2260 static u8 * format_mc_stream_state (u8 * s, va_list * args)
2262 mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
2266 #define _(f) case MC_STREAM_STATE_##f: t = #f; break;
2267 foreach_mc_stream_state
2270 return format (s, "unknown 0x%x", state);
2273 return format (s, "%s", t);
2277 mc_peer_comp (void * a1, void * a2)
2279 mc_stream_peer_t * p1 = a1;
2280 mc_stream_peer_t * p2 = a2;
2282 return mc_peer_id_compare (p1->id, p2->id);
2285 u8 * format_mc_main (u8 * s, va_list * args)
2287 mc_main_t * mcm = va_arg (*args, mc_main_t *);
2289 mc_stream_peer_t * p, * ps;
2290 uword indent = format_get_indent (s);
2292 s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
2293 format_mc_relay_state, mcm->relay_state,
2294 vec_len (mcm->stream_vector),
2295 mcm->relay_global_sequence);
2298 mc_mastership_peer_t * mp;
2299 f64 now = vlib_time_now (mcm->vlib_main);
2300 s = format (s, "\n%UMost recent mastership peers:",
2301 format_white_space, indent + 2);
2302 vec_foreach (mp, mcm->mastership_peers)
2304 s = format (s, "\n%U%-30U%.4e",
2305 format_white_space, indent + 4,
2306 mcm->transport.format_peer_id, mp->peer_id,
2307 now - mp->time_last_master_assert_received);
2311 vec_foreach (t, mcm->stream_vector)
2313 s = format (s, "\n%Ustream `%s' index %d",
2314 format_white_space, indent + 2,
2315 t->config.name, t->index);
2317 s = format (s, "\n%Ustate %U",
2318 format_white_space, indent + 4,
2319 format_mc_stream_state, t->state);
2321 s = format (s, "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2322 format_white_space, indent + 4, t->config.retry_interval,
2323 t->config.retry_limit,
2324 pool_elts (t->retry_pool),
2325 t->stats.n_retries - t->stats_last_clear.n_retries);
2327 s = format (s, "\n%U%Ld/%Ld user requests sent/received",
2328 format_white_space, indent + 4,
2329 t->user_requests_sent, t->user_requests_received);
2331 s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
2332 format_white_space, indent + 4,
2333 pool_elts (t->peers),
2334 t->our_local_sequence,
2335 t->last_global_sequence_processed);
2338 pool_foreach (p, t->peers,
2340 if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2341 vec_add1 (ps, p[0]);
2343 vec_sort_with_function (ps, mc_peer_comp);
2344 s = format (s, "\n%U%=30s%10s%16s%16s",
2345 format_white_space, indent + 6,
2346 "Peer", "Last seq", "Retries", "Future");
2350 s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
2351 format_white_space, indent + 6,
2352 mcm->transport.format_peer_id, p->id.as_u64,
2353 p->last_sequence_received,
2354 p->stats.n_msgs_from_past - p->stats_last_clear.n_msgs_from_past,
2355 p->stats.n_msgs_from_future - p->stats_last_clear.n_msgs_from_future,
2356 (mcm->transport.our_ack_peer_id.as_u64 == p->id.as_u64