2 * mc.c: vlib reliable sequenced multicast distributed applications
4 * Copyright (c) 2010 Cisco and/or its affiliates.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include <vlib/vlib.h>
21 * 1 to enable msg id training wheels, which are useful for tracking
22 * down catchup and/or partitioned network problems
24 #define MSG_ID_DEBUG 0
26 static format_function_t format_mc_stream_state;
29 elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
32 mhash_t *h = &m->elog_id_by_peer_id;
34 if (!m->elog_id_by_peer_id.hash)
35 mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
37 p = mhash_get (h, &peer_id);
40 r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
41 mhash_set (h, &peer_id, r, /* old_value */ 0);
46 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
49 uword *h = m->elog_id_by_msg_name;
53 h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
55 p = hash_get_mem (h, msg_name);
58 r = elog_string (m->elog_main, "%s", msg_name);
60 name_copy = format (0, "%s%c", msg_name, 0);
62 hash_set_mem (h, name_copy, r);
63 m->elog_id_by_msg_name = h;
69 elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
72 if (MC_EVENT_LOGGING > 0)
75 ELOG_TYPE_DECLARE (e) =
77 .format = "tx-msg: stream %d local seq %d attempt %d",
78 .format_args = "i4i4i4",
83 u32 stream_id, local_sequence, retry_count;
85 ed = ELOG_DATA (m->elog_main, e);
86 ed->stream_id = stream_id;
87 ed->local_sequence = local_sequence;
88 ed->retry_count = retry_count;
94 * correctly compare two unsigned sequence numbers.
95 * This function works so long as x and y are within 2**(n-1) of each
96 * other, where n = bits(x, y).
99 * seq_cmp == 0 => x and y are equal
100 * seq_cmp < 0 => x is "in the past" with respect to y
101 * seq_cmp > 0 => x is "in the future" with respect to y
104 mc_seq_cmp (u32 x, u32 y)
106 return (i32) x - (i32) y;
110 mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
115 n_alloc = vlib_buffer_alloc (vm, &bi, 1);
116 ASSERT (n_alloc == 1);
118 b = vlib_get_buffer (vm, bi);
119 b->current_length = n_bytes;
121 return (void *) b->data;
125 delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
126 uword index, int notify_application)
128 mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
130 if (s->config.peer_died && notify_application)
131 s->config.peer_died (mcm, s, p->id);
133 s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
135 if (MC_EVENT_LOGGING > 0)
138 ELOG_TYPE_DECLARE (e) =
140 .format = "delete peer %s from all_peer_bitmap",
149 ed = ELOG_DATA (mcm->elog_main, e);
150 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
152 /* Do not delete the pool / hash table entries, or we lose sequence number state */
155 static mc_stream_peer_t *
156 get_or_create_peer_with_id (mc_main_t * mcm,
157 mc_stream_t * s, mc_peer_id_t id, int *created)
159 uword *q = mhash_get (&s->peer_index_by_id, &id);
164 p = pool_elt_at_index (s->peers, q[0]);
168 pool_get (s->peers, p);
169 memset (p, 0, sizeof (p[0]));
171 p->last_sequence_received = ~0;
172 mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
177 if (MC_EVENT_LOGGING > 0)
180 ELOG_TYPE_DECLARE (e) =
182 .format = "get_or_create %s peer %s stream %d seq %d",
183 .format_args = "t4T4i4i4",
192 u32 is_new, peer, stream_index, rx_sequence;
195 ed = ELOG_DATA (mcm->elog_main, e);
196 ed->is_new = q ? 0 : 1;
197 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
198 ed->stream_index = s->index;
199 ed->rx_sequence = p->last_sequence_received;
201 /* $$$$ Enable or reenable this peer */
202 s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
207 maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
209 vlib_one_time_waiting_process_t *p;
211 if (pool_elts (stream->retry_pool) >= stream->config.window_size)
214 vec_foreach (p, stream->procs_waiting_for_open_window)
215 vlib_signal_one_time_waiting_process (vm, p);
217 if (stream->procs_waiting_for_open_window)
218 _vec_len (stream->procs_waiting_for_open_window) = 0;
222 mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
224 mc_retry_t record, *retp;
226 if (r->unacked_by_peer_bitmap)
227 _vec_len (r->unacked_by_peer_bitmap) = 0;
229 if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
231 clib_fifo_sub1 (s->retired_fifo, record);
232 vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
235 clib_fifo_add2 (s->retired_fifo, retp);
237 retp->buffer_index = r->buffer_index;
238 retp->local_sequence = r->local_sequence;
240 r->buffer_index = ~0; /* poison buffer index in this retry */
244 mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
248 if (MC_EVENT_LOGGING > 0)
251 ELOG_TYPE_DECLARE (e) =
253 .format = "resend-retired: search for local seq %d",
261 ed = ELOG_DATA (mcm->elog_main, e);
262 ed->local_sequence = local_sequence;
266 clib_fifo_foreach (retry, s->retired_fifo,
268 if (retry->local_sequence == local_sequence)
270 elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
271 mcm->transport.tx_buffer (mcm->transport.opaque,
272 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
273 retry->buffer_index);
279 if (MC_EVENT_LOGGING > 0)
282 ELOG_TYPE_DECLARE (e) =
284 .format = "resend-retired: FAILED search for local seq %d",
292 ed = ELOG_DATA (mcm->elog_main, e);
293 ed->local_sequence = local_sequence;
298 delete_retry_fifo_elt (mc_main_t * mcm,
299 mc_stream_t * stream,
300 mc_retry_t * r, uword * dead_peer_bitmap)
305 pool_foreach (p, stream->peers, ({
306 uword pi = p - stream->peers;
307 uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
310 dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
312 if (MC_EVENT_LOGGING > 0)
314 ELOG_TYPE_DECLARE (e) = {
315 .format = "delete_retry_fifo_elt: peer %s is %s",
316 .format_args = "T4t4",
318 .enum_strings = { "alive", "dead", },
320 struct { u32 peer, is_alive; } * ed;
321 ed = ELOG_DATA (mcm->elog_main, e);
322 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
323 ed->is_alive = is_alive;
328 hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
329 mc_retry_free (mcm, stream, r);
331 return dead_peer_bitmap;
334 always_inline mc_retry_t *
335 prev_retry (mc_stream_t * s, mc_retry_t * r)
337 return (r->prev_index != ~0
338 ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
341 always_inline mc_retry_t *
342 next_retry (mc_stream_t * s, mc_retry_t * r)
344 return (r->next_index != ~0
345 ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
349 remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
351 mc_retry_t *p = prev_retry (s, r);
352 mc_retry_t *n = next_retry (s, r);
355 p->next_index = r->next_index;
357 s->retry_head_index = r->next_index;
359 n->prev_index = r->prev_index;
361 s->retry_tail_index = r->prev_index;
363 pool_put_index (s->retry_pool, r - s->retry_pool);
367 check_retry (mc_main_t * mcm, mc_stream_t * s)
370 vlib_main_t *vm = mcm->vlib_main;
371 f64 now = vlib_time_now (vm);
372 uword *dead_peer_bitmap = 0;
375 for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
377 r = pool_elt_at_index (s->retry_pool, ri);
378 ri_next = r->next_index;
380 if (now < r->sent_at + s->config.retry_interval)
384 if (r->n_retries > s->config.retry_limit)
387 delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
388 remove_retry_from_pool (s, r);
392 if (MC_EVENT_LOGGING > 0)
397 ELOG_TYPE_DECLARE (t) =
399 .format = "resend local seq %d attempt %d",
400 .format_args = "i4i4",
405 pool_foreach (p, s->peers, ({
406 if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
408 ELOG_TYPE_DECLARE (ev) = {
409 .format = "resend: needed by peer %s local seq %d",
410 .format_args = "T4i4",
412 struct { u32 peer, rx_sequence; } * ed;
413 ed = ELOG_DATA (mcm->elog_main, ev);
414 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
415 ed->rx_sequence = r->local_sequence;
425 ed = ELOG_DATA (mcm->elog_main, t);
426 ed->sequence = r->local_sequence;
427 ed->trail = r->n_retries;
430 r->sent_at = vlib_time_now (vm);
431 s->stats.n_retries += 1;
433 elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
435 mcm->transport.tx_buffer
436 (mcm->transport.opaque,
437 MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
441 maybe_send_window_open_event (mcm->vlib_main, s);
443 /* Delete any dead peers we've found. */
444 if (!clib_bitmap_is_zero (dead_peer_bitmap))
449 clib_bitmap_foreach (i, dead_peer_bitmap, ({
450 delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
452 /* Delete any references to just deleted peer in retry pool. */
453 pool_foreach (r, s->retry_pool, ({
454 r->unacked_by_peer_bitmap =
455 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
459 clib_bitmap_free (dead_peer_bitmap);
463 always_inline mc_main_t *
464 mc_node_get_main (vlib_node_runtime_t * node)
466 mc_main_t **p = (void *) node->runtime_data;
471 mc_retry_process (vlib_main_t * vm,
472 vlib_node_runtime_t * node, vlib_frame_t * f)
474 mc_main_t *mcm = mc_node_get_main (node);
479 vlib_process_suspend (vm, 1.0);
480 vec_foreach (s, mcm->stream_vector)
482 if (s->state != MC_STREAM_STATE_invalid)
483 check_retry (mcm, s);
486 return 0; /* not likely */
490 send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
492 vlib_main_t *vm = mcm->vlib_main;
493 mc_msg_join_or_leave_request_t *mp;
496 mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
497 memset (mp, 0, sizeof (*mp));
498 mp->type = MC_MSG_TYPE_join_or_leave_request;
499 mp->peer_id = mcm->transport.our_ack_peer_id;
500 mp->stream_index = stream_index;
501 mp->is_join = is_join;
503 mc_byte_swap_msg_join_or_leave_request (mp);
506 * These msgs are unnumbered, unordered so send on the from-relay
509 mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
513 mc_join_ager_process (vlib_main_t * vm,
514 vlib_node_runtime_t * node, vlib_frame_t * f)
516 mc_main_t *mcm = mc_node_get_main (node);
520 if (mcm->joins_in_progress)
523 vlib_one_time_waiting_process_t *p;
524 f64 now = vlib_time_now (vm);
526 vec_foreach (s, mcm->stream_vector)
528 if (s->state != MC_STREAM_STATE_join_in_progress)
531 if (now > s->join_timeout)
533 s->state = MC_STREAM_STATE_ready;
535 if (MC_EVENT_LOGGING > 0)
538 ELOG_TYPE_DECLARE (e) =
540 .format = "stream %d join timeout",
543 ELOG (mcm->elog_main, e, s->index);
545 /* Make sure that this app instance exists as a stream peer,
546 or we may answer a catchup request with a NULL
547 all_peer_bitmap... */
548 (void) get_or_create_peer_with_id
549 (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
551 vec_foreach (p, s->procs_waiting_for_join_done)
552 vlib_signal_one_time_waiting_process (vm, p);
553 if (s->procs_waiting_for_join_done)
554 _vec_len (s->procs_waiting_for_join_done) = 0;
556 mcm->joins_in_progress--;
557 ASSERT (mcm->joins_in_progress >= 0);
561 /* Resent join request which may have been lost. */
562 send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
564 /* We're *not* alone, retry for as long as it takes */
565 if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
566 s->join_timeout = vlib_time_now (vm) + 2.0;
569 if (MC_EVENT_LOGGING > 0)
572 ELOG_TYPE_DECLARE (e) =
574 .format = "stream %d resend join request",
577 ELOG (mcm->elog_main, e, s->index);
583 vlib_process_suspend (vm, .5);
586 return 0; /* not likely */
590 serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
592 char *name = va_arg (*va, char *);
593 serialize_cstring (m, name);
597 elog_stream_name (char *buf, int n_buf_bytes, char *v)
599 clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
600 buf[n_buf_bytes - 1] = 0;
604 unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
606 mc_main_t *mcm = va_arg (*va, mc_main_t *);
611 unserialize_cstring (m, &name);
613 if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
615 if (MC_EVENT_LOGGING > 0)
618 ELOG_TYPE_DECLARE (e) =
620 .format = "stream index %d already named %s",
621 .format_args = "i4s16",
629 ed = ELOG_DATA (mcm->elog_main, e);
630 ed->stream_index = p[0];
631 elog_stream_name (ed->name, sizeof (ed->name), name);
638 vec_add2 (mcm->stream_vector, s, 1);
640 s->state = MC_STREAM_STATE_name_known;
641 s->index = s - mcm->stream_vector;
642 s->config.name = name;
644 if (MC_EVENT_LOGGING > 0)
647 ELOG_TYPE_DECLARE (e) =
649 .format = "stream index %d named %s",
650 .format_args = "i4s16",
658 ed = ELOG_DATA (mcm->elog_main, e);
659 ed->stream_index = s->index;
660 elog_stream_name (ed->name, sizeof (ed->name), name);
663 hash_set_mem (mcm->stream_index_by_name, name, s->index);
665 p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
668 vlib_one_time_waiting_process_t *wp, **w;
669 w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
670 vec_foreach (wp, w[0])
671 vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
672 pool_put (mcm->procs_waiting_for_stream_name_pool, w);
673 hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
678 MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
680 .name = "mc_register_stream_name",
681 .serialize = serialize_mc_register_stream_name,
682 .unserialize = unserialize_mc_register_stream_name,
687 mc_rx_buffer_unserialize (mc_main_t * mcm,
688 mc_stream_t * stream,
689 mc_peer_id_t peer_id, u32 buffer_index)
691 return mc_unserialize (mcm, stream, buffer_index);
695 mc_internal_catchup_snapshot (mc_main_t * mcm,
697 u32 last_global_sequence_processed)
701 /* Append serialized data to data vector. */
702 serialize_open_vector (&m, data_vector);
703 m.stream.current_buffer_index = vec_len (data_vector);
705 serialize (&m, serialize_mc_main, mcm);
706 return serialize_close_vector (&m);
710 mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
714 unserialize_open_data (&s, data, n_data_bytes);
716 unserialize (&s, unserialize_mc_main, mcm);
719 /* Overridden from the application layer, not actually used here */
720 void mc_stream_join_process_hold (void) __attribute__ ((weak));
722 mc_stream_join_process_hold (void)
727 mc_stream_join_helper (mc_main_t * mcm,
728 mc_stream_config_t * config, u32 is_internal)
731 vlib_main_t *vm = mcm->vlib_main;
738 /* Already have a stream with given name? */
739 if ((s = mc_stream_by_name (mcm, config->name)))
741 /* Already joined and ready? */
742 if (s->state == MC_STREAM_STATE_ready)
746 /* First join MC internal stream. */
747 if (!mcm->stream_vector
748 || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
749 == MC_STREAM_STATE_invalid))
751 static mc_stream_config_t c = {
752 .name = "mc-internal",
753 .rx_buffer = mc_rx_buffer_unserialize,
754 .catchup = mc_internal_catchup,
755 .catchup_snapshot = mc_internal_catchup_snapshot,
758 c.save_snapshot = config->save_snapshot;
760 mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
763 /* If stream is still unknown register this name and wait for
764 sequenced message to name stream. This way all peers agree
765 on stream name to index mappings. */
766 s = mc_stream_by_name (mcm, config->name);
769 vlib_one_time_waiting_process_t *wp, **w;
770 u8 *name_copy = format (0, "%s", config->name);
772 mc_serialize_stream (mcm,
773 MC_STREAM_INDEX_INTERNAL,
774 &mc_register_stream_name_msg, config->name);
776 /* Wait for this stream to be named. */
778 hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
782 pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
786 pool_get (mcm->procs_waiting_for_stream_name_pool, w);
787 if (!mcm->procs_waiting_for_stream_name_by_name)
788 mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
791 hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
793 w - mcm->procs_waiting_for_stream_name_pool);
797 vec_add2 (w[0], wp, 1);
798 vlib_current_process_wait_for_one_time_event (vm, wp);
799 vec_free (name_copy);
802 /* Name should be known now. */
803 s = mc_stream_by_name (mcm, config->name);
805 ASSERT (s->state == MC_STREAM_STATE_name_known);
810 vec_add2 (mcm->stream_vector, s, 1);
812 s->index = s - mcm->stream_vector;
816 /* Save name since we could have already used it as hash key. */
817 char *name_save = s->config.name;
819 s->config = config[0];
822 s->config.name = name_save;
825 if (s->config.window_size == 0)
826 s->config.window_size = 8;
828 if (s->config.retry_interval == 0.0)
829 s->config.retry_interval = 1.0;
832 ASSERT (s->config.retry_interval < 30);
834 if (s->config.retry_limit == 0)
835 s->config.retry_limit = 7;
837 s->state = MC_STREAM_STATE_join_in_progress;
838 if (!s->peer_index_by_id.hash)
839 mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
841 /* If we don't hear from someone in 5 seconds, we're alone */
842 s->join_timeout = vlib_time_now (vm) + 5.0;
843 mcm->joins_in_progress++;
845 if (MC_EVENT_LOGGING > 0)
848 ELOG_TYPE_DECLARE (e) =
850 .format = "stream index %d join request %s",
851 .format_args = "i4s16",
859 ed = ELOG_DATA (mcm->elog_main, e);
860 ed->stream_index = s->index;
861 elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
864 send_join_or_leave_request (mcm, s->index, 1 /* join */ );
866 vlib_current_process_wait_for_one_time_event_vector
867 (vm, &s->procs_waiting_for_join_done);
869 if (MC_EVENT_LOGGING)
871 ELOG_TYPE (e, "join complete stream %d");
872 ELOG (mcm->elog_main, e, s->index);
879 mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
881 return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
885 mc_stream_leave (mc_main_t * mcm, u32 stream_index)
887 mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
892 if (MC_EVENT_LOGGING)
895 ELOG_TYPE_DECLARE (t) =
897 .format = "leave-stream: %d",.format_args = "i4",
904 ed = ELOG_DATA (mcm->elog_main, t);
905 ed->index = stream_index;
908 send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
910 s->state = MC_STREAM_STATE_name_known;
914 mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
915 mc_msg_join_or_leave_request_t * req,
919 mc_msg_join_reply_t *rep;
922 mc_byte_swap_msg_join_or_leave_request (req);
924 s = mc_stream_by_index (mcm, req->stream_index);
925 if (!s || s->state != MC_STREAM_STATE_ready)
928 /* If the peer is joining, create it */
933 /* We're not in a position to catch up a peer until all
934 stream joins are complete. */
937 /* XXX This is hard to test so we've. */
938 vec_foreach (this_s, mcm->stream_vector)
940 if (this_s->state != MC_STREAM_STATE_ready
941 && this_s->state != MC_STREAM_STATE_name_known)
945 else if (mcm->joins_in_progress > 0)
948 (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
951 rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
952 memset (rep, 0, sizeof (rep[0]));
953 rep->type = MC_MSG_TYPE_join_reply;
954 rep->stream_index = req->stream_index;
956 mc_byte_swap_msg_join_reply (rep);
957 /* These two are already in network byte order... */
958 rep->peer_id = mcm->transport.our_ack_peer_id;
959 rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
961 mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
965 if (s->config.peer_died)
966 s->config.peer_died (mcm, s, req->peer_id);
971 mc_msg_join_reply_handler (mc_main_t * mcm,
972 mc_msg_join_reply_t * mp, u32 buffer_index)
976 mc_byte_swap_msg_join_reply (mp);
978 s = mc_stream_by_index (mcm, mp->stream_index);
980 if (!s || s->state != MC_STREAM_STATE_join_in_progress)
983 /* Switch to catchup state; next join reply
984 for this stream will be ignored. */
985 s->state = MC_STREAM_STATE_catchup;
987 mcm->joins_in_progress--;
988 mcm->transport.catchup_request_fun (mcm->transport.opaque,
989 mp->stream_index, mp->catchup_peer_id);
993 mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
999 s = mc_stream_by_name (m, stream_name);
1002 vlib_process_suspend (m->vlib_main, .1);
1005 /* It's OK to send a message in catchup and ready states. */
1006 if (s->state == MC_STREAM_STATE_catchup
1007 || s->state == MC_STREAM_STATE_ready)
1010 /* Otherwise we are waiting for a join to finish. */
1011 vlib_current_process_wait_for_one_time_event_vector
1012 (m->vlib_main, &s->procs_waiting_for_join_done);
1016 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
1018 mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
1019 vlib_main_t *vm = mcm->vlib_main;
1021 mc_msg_user_request_t *mp;
1022 vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
1028 if (s->state != MC_STREAM_STATE_ready)
1029 vlib_current_process_wait_for_one_time_event_vector
1030 (vm, &s->procs_waiting_for_join_done);
1032 while (pool_elts (s->retry_pool) >= s->config.window_size)
1034 vlib_current_process_wait_for_one_time_event_vector
1035 (vm, &s->procs_waiting_for_open_window);
1038 pool_get (s->retry_pool, r);
1039 ri = r - s->retry_pool;
1041 r->prev_index = s->retry_tail_index;
1043 s->retry_tail_index = ri;
1045 if (r->prev_index == ~0)
1046 s->retry_head_index = ri;
1049 mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
1053 vlib_buffer_advance (b, -sizeof (mp[0]));
1054 mp = vlib_buffer_get_current (b);
1056 mp->peer_id = mcm->transport.our_ack_peer_id;
1057 /* mp->transport.global_sequence set by relay agent. */
1058 mp->global_sequence = 0xdeadbeef;
1059 mp->stream_index = s->index;
1060 mp->local_sequence = s->our_local_sequence++;
1062 vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
1064 r->buffer_index = buffer_index;
1065 r->local_sequence = mp->local_sequence;
1066 r->sent_at = vlib_time_now (vm);
1069 /* Retry will be freed when all currently known peers have acked. */
1070 vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
1071 vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
1073 hash_set (s->retry_index_by_local_sequence, r->local_sequence,
1076 elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
1078 mc_byte_swap_msg_user_request (mp);
1080 mcm->transport.tx_buffer (mcm->transport.opaque,
1081 MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
1083 s->user_requests_sent++;
1085 /* return amount of window remaining */
1086 return s->config.window_size - pool_elts (s->retry_pool);
1090 mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
1093 vlib_main_t *vm = mcm->vlib_main;
1095 mc_stream_peer_t *peer;
1097 static int once = 0;
1099 mc_byte_swap_msg_user_request (mp);
1101 s = mc_stream_by_index (mcm, mp->stream_index);
1103 /* Not signed up for this stream? Turf-o-matic */
1104 if (!s || s->state != MC_STREAM_STATE_ready)
1106 vlib_buffer_free_one (vm, buffer_index);
1110 /* Find peer, including ourselves. */
1111 peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1114 seq_cmp_result = mc_seq_cmp (mp->local_sequence,
1115 peer->last_sequence_received + 1);
1117 if (MC_EVENT_LOGGING > 0)
1120 ELOG_TYPE_DECLARE (e) =
1122 .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1123 .format_args = "T4i4i4i4",
1128 u32 peer, stream_index, rx_sequence;
1131 ed = ELOG_DATA (mcm->elog_main, e);
1132 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1133 ed->stream_index = mp->stream_index;
1134 ed->rx_sequence = mp->local_sequence;
1135 ed->seq_cmp_result = seq_cmp_result;
1138 if (0 && mp->stream_index == 1 && once == 0)
1141 ELOG_TYPE (e, "FAKE lost msg on stream 1");
1142 ELOG (mcm->elog_main, e, 0);
1146 peer->last_sequence_received += seq_cmp_result == 0;
1147 s->user_requests_received++;
1149 if (seq_cmp_result > 0)
1150 peer->stats.n_msgs_from_future += 1;
1152 /* Send ack even if msg from future */
1155 mc_msg_user_ack_t *rp;
1158 rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
1159 rp->peer_id = mcm->transport.our_ack_peer_id;
1160 rp->stream_index = s->index;
1161 rp->local_sequence = mp->local_sequence;
1162 rp->seq_cmp_result = seq_cmp_result;
1164 if (MC_EVENT_LOGGING > 0)
1167 ELOG_TYPE_DECLARE (e) =
1169 .format = "tx-ack: stream %d local seq %d",
1170 .format_args = "i4i4",
1178 ed = ELOG_DATA (mcm->elog_main, e);
1179 ed->stream_index = rp->stream_index;
1180 ed->local_sequence = rp->local_sequence;
1183 mc_byte_swap_msg_user_ack (rp);
1185 mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
1186 /* Msg from past? If so, free the buffer... */
1187 if (seq_cmp_result < 0)
1189 vlib_buffer_free_one (vm, buffer_index);
1190 peer->stats.n_msgs_from_past += 1;
1194 if (seq_cmp_result == 0)
1196 vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
1199 case MC_STREAM_STATE_ready:
1200 vlib_buffer_advance (b, sizeof (mp[0]));
1201 s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
1203 /* Stream vector can change address via rx callback for mc-internal
1205 s = mc_stream_by_index (mcm, mp->stream_index);
1207 s->last_global_sequence_processed = mp->global_sequence;
1210 case MC_STREAM_STATE_catchup:
1211 clib_fifo_add1 (s->catchup_fifo, buffer_index);
1215 clib_warning ("stream in unknown state %U",
1216 format_mc_stream_state, s->state);
1223 mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
1226 vlib_main_t *vm = mcm->vlib_main;
1229 mc_stream_peer_t *peer;
1231 int peer_created = 0;
1233 mc_byte_swap_msg_user_ack (mp);
1235 s = mc_stream_by_index (mcm, mp->stream_index);
1237 if (MC_EVENT_LOGGING > 0)
1240 ELOG_TYPE_DECLARE (t) =
1242 .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
1243 .format_args = "i4T4i4",
1253 ed = ELOG_DATA (mcm->elog_main, t);
1254 ed->local_sequence = mp->local_sequence;
1255 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1256 ed->seq_cmp_result = mp->seq_cmp_result;
1259 /* Unknown stream? */
1263 /* Find the peer which just ack'ed. */
1264 peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1265 /* created */ &peer_created);
1268 * Peer reports message from the future. If it's not in the retry
1269 * fifo, look for a retired message.
1271 if (mp->seq_cmp_result > 0)
1273 p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
1274 mp->seq_cmp_result);
1276 mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
1278 /* Normal retry should fix it... */
1283 * Pointer to the indicated retry fifo entry.
1284 * Worth hashing because we could use a window size of 100 or 1000.
1286 p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
1289 * Is this a duplicate ACK, received after we've retired the
1290 * fifo entry. This can happen when learning about new
1295 if (MC_EVENT_LOGGING > 0)
1298 ELOG_TYPE_DECLARE (t) =
1300 .format = "ack: for seq %d from peer %s no fifo elt",
1301 .format_args = "i4T4",
1310 ed = ELOG_DATA (mcm->elog_main, t);
1311 ed->seq = mp->local_sequence;
1312 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1318 r = pool_elt_at_index (s->retry_pool, p[0]);
1320 /* Make sure that this new peer ACKs our msgs from now on */
1323 mc_retry_t *later_retry = next_retry (s, r);
1327 later_retry->unacked_by_peer_bitmap =
1328 clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
1330 later_retry = next_retry (s, later_retry);
1334 ASSERT (mp->local_sequence == r->local_sequence);
1336 /* If we weren't expecting to hear from this peer */
1337 if (!peer_created &&
1338 !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
1340 if (MC_EVENT_LOGGING > 0)
1343 ELOG_TYPE_DECLARE (t) =
1345 .format = "dup-ack: for seq %d from peer %s",
1346 .format_args = "i4T4",
1354 ed = ELOG_DATA (mcm->elog_main, t);
1355 ed->seq = r->local_sequence;
1356 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1358 if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1362 if (MC_EVENT_LOGGING > 0)
1365 ELOG_TYPE_DECLARE (t) =
1367 .format = "ack: for seq %d from peer %s",
1368 .format_args = "i4T4",
1376 ed = ELOG_DATA (mcm->elog_main, t);
1377 ed->seq = mp->local_sequence;
1378 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1381 r->unacked_by_peer_bitmap =
1382 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
1384 /* Not all clients have ack'ed */
1385 if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
1389 if (MC_EVENT_LOGGING > 0)
1392 ELOG_TYPE_DECLARE (t) =
1394 .format = "ack: retire fifo elt loc seq %d after %d acks",
1395 .format_args = "i4i4",
1403 ed = ELOG_DATA (mcm->elog_main, t);
1404 ed->seq = r->local_sequence;
1405 ed->npeers = pool_elts (s->peers);
1408 hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
1409 mc_retry_free (mcm, s, r);
1410 remove_retry_from_pool (s, r);
1411 maybe_send_window_open_event (vm, s);
1414 #define EVENT_MC_SEND_CATCHUP_DATA 0
1417 mc_catchup_process (vlib_main_t * vm,
1418 vlib_node_runtime_t * node, vlib_frame_t * f)
1420 mc_main_t *mcm = mc_node_get_main (node);
1421 uword *event_data = 0;
1422 mc_catchup_process_arg_t *args;
1428 _vec_len (event_data) = 0;
1429 vlib_process_wait_for_event_with_type (vm, &event_data,
1430 EVENT_MC_SEND_CATCHUP_DATA);
1432 for (i = 0; i < vec_len (event_data); i++)
1434 args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
1436 mcm->transport.catchup_send_fun (mcm->transport.opaque,
1437 args->catchup_opaque,
1438 args->catchup_snapshot);
1440 /* Send function will free snapshot data vector. */
1441 pool_put (mcm->catchup_process_args, args);
1445 return 0; /* not likely */
1449 serialize_mc_stream (serialize_main_t * m, va_list * va)
1451 mc_stream_t *s = va_arg (*va, mc_stream_t *);
1452 mc_stream_peer_t *p;
1454 serialize_integer (m, pool_elts (s->peers), sizeof (u32));
1456 pool_foreach (p, s->peers, ({
1457 u8 * x = serialize_get (m, sizeof (p->id));
1458 clib_memcpy (x, p->id.as_u8, sizeof (p->id));
1459 serialize_integer (m, p->last_sequence_received,
1460 sizeof (p->last_sequence_received));
1463 serialize_bitmap (m, s->all_peer_bitmap);
1467 unserialize_mc_stream (serialize_main_t * m, va_list * va)
1469 mc_stream_t *s = va_arg (*va, mc_stream_t *);
1471 mc_stream_peer_t *p;
1473 unserialize_integer (m, &n_peers, sizeof (u32));
1474 mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
1475 for (i = 0; i < n_peers; i++)
1478 pool_get (s->peers, p);
1479 x = unserialize_get (m, sizeof (p->id));
1480 clib_memcpy (p->id.as_u8, x, sizeof (p->id));
1481 unserialize_integer (m, &p->last_sequence_received,
1482 sizeof (p->last_sequence_received));
1483 mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */
1486 s->all_peer_bitmap = unserialize_bitmap (m);
1488 /* This is really bad. */
1489 if (!s->all_peer_bitmap)
1490 clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
1494 mc_msg_catchup_request_handler (mc_main_t * mcm,
1495 mc_msg_catchup_request_t * req,
1498 vlib_main_t *vm = mcm->vlib_main;
1500 mc_catchup_process_arg_t *args;
1502 mc_byte_swap_msg_catchup_request (req);
1504 s = mc_stream_by_index (mcm, req->stream_index);
1505 if (!s || s->state != MC_STREAM_STATE_ready)
1508 if (MC_EVENT_LOGGING > 0)
1511 ELOG_TYPE_DECLARE (t) =
1513 .format = "catchup-request: from %s stream %d",
1514 .format_args = "T4i4",
1521 ed = ELOG_DATA (mcm->elog_main, t);
1522 ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
1523 ed->stream = req->stream_index;
1527 * The application has to snapshoot its data structures right
1528 * here, right now. If we process any messages after
1529 * noting the last global sequence we've processed, the client
1530 * won't be able to accurately reconstruct our data structures.
1532 * Once the data structures are e.g. vec_dup()'ed, we
1533 * send the resulting messages from a separate process, to
1534 * make sure that we don't cause a bunch of message retransmissions
1536 pool_get (mcm->catchup_process_args, args);
1538 args->stream_index = s - mcm->stream_vector;
1539 args->catchup_opaque = catchup_opaque;
1540 args->catchup_snapshot = 0;
1542 /* Construct catchup reply and snapshot state for stream to send as
1543 catchup reply payload. */
1545 mc_msg_catchup_reply_t *rep;
1548 vec_resize (args->catchup_snapshot, sizeof (rep[0]));
1550 rep = (void *) args->catchup_snapshot;
1552 rep->peer_id = req->peer_id;
1553 rep->stream_index = req->stream_index;
1554 rep->last_global_sequence_included = s->last_global_sequence_processed;
1556 /* Setup for serialize to append to catchup snapshot. */
1557 serialize_open_vector (&m, args->catchup_snapshot);
1558 m.stream.current_buffer_index = vec_len (m.stream.buffer);
1560 serialize (&m, serialize_mc_stream, s);
1562 args->catchup_snapshot = serialize_close_vector (&m);
1564 /* Actually copy internal state */
1565 args->catchup_snapshot = s->config.catchup_snapshot
1566 (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
1568 rep = (void *) args->catchup_snapshot;
1569 rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
1571 mc_byte_swap_msg_catchup_reply (rep);
1574 /* now go send it... */
1575 vlib_process_signal_event (vm, mcm->catchup_process,
1576 EVENT_MC_SEND_CATCHUP_DATA,
1577 args - mcm->catchup_process_args);
1580 #define EVENT_MC_UNSERIALIZE_BUFFER 0
1581 #define EVENT_MC_UNSERIALIZE_CATCHUP 1
1584 mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
1587 vlib_process_signal_event (mcm->vlib_main,
1588 mcm->unserialize_process,
1589 EVENT_MC_UNSERIALIZE_CATCHUP,
1590 pointer_to_uword (mp));
1594 perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
1599 mc_byte_swap_msg_catchup_reply (mp);
1601 s = mc_stream_by_index (mcm, mp->stream_index);
1603 /* Never heard of this stream or already caught up. */
1604 if (!s || s->state == MC_STREAM_STATE_ready)
1609 mc_stream_peer_t *p;
1612 /* For offline sim replay: save the entire catchup snapshot... */
1613 if (s->config.save_snapshot)
1614 s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
1617 unserialize_open_data (&m, mp->data, mp->n_data_bytes);
1618 unserialize (&m, unserialize_mc_stream, s);
1620 /* Make sure we start numbering our messages as expected */
1622 pool_foreach (p, s->peers, ({
1623 if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1624 s->our_local_sequence = p->last_sequence_received + 1;
1628 n_stream_bytes = m.stream.current_buffer_index;
1630 /* No need to unserialize close; nothing to free. */
1632 /* After serialized stream is user's catchup data. */
1633 s->config.catchup (mcm, mp->data + n_stream_bytes,
1634 mp->n_data_bytes - n_stream_bytes);
1637 /* Vector could have been moved by catchup.
1638 This can only happen for mc-internal stream. */
1639 s = mc_stream_by_index (mcm, mp->stream_index);
1641 s->last_global_sequence_processed = mp->last_global_sequence_included;
1643 while (clib_fifo_elts (s->catchup_fifo))
1645 mc_msg_user_request_t *gp;
1649 clib_fifo_sub1 (s->catchup_fifo, bi);
1651 b = vlib_get_buffer (mcm->vlib_main, bi);
1652 gp = vlib_buffer_get_current (b);
1654 /* Make sure we're replaying "new" news */
1655 seq_cmp_result = mc_seq_cmp (gp->global_sequence,
1656 mp->last_global_sequence_included);
1658 if (seq_cmp_result > 0)
1660 vlib_buffer_advance (b, sizeof (gp[0]));
1661 s->config.rx_buffer (mcm, s, gp->peer_id, bi);
1662 s->last_global_sequence_processed = gp->global_sequence;
1664 if (MC_EVENT_LOGGING)
1667 ELOG_TYPE_DECLARE (t) =
1669 .format = "catchup replay local sequence 0x%x",
1670 .format_args = "i4",
1677 ed = ELOG_DATA (mcm->elog_main, t);
1678 ed->local_sequence = gp->local_sequence;
1683 if (MC_EVENT_LOGGING)
1686 ELOG_TYPE_DECLARE (t) =
1688 .format = "catchup discard local sequence 0x%x",
1689 .format_args = "i4",
1696 ed = ELOG_DATA (mcm->elog_main, t);
1697 ed->local_sequence = gp->local_sequence;
1700 vlib_buffer_free_one (mcm->vlib_main, bi);
1704 s->state = MC_STREAM_STATE_ready;
1706 /* Now that we are caught up wake up joining process. */
1708 vlib_one_time_waiting_process_t *wp;
1709 vec_foreach (wp, s->procs_waiting_for_join_done)
1710 vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
1711 if (s->procs_waiting_for_join_done)
1712 _vec_len (s->procs_waiting_for_join_done) = 0;
1717 this_node_maybe_master (mc_main_t * mcm)
1719 vlib_main_t *vm = mcm->vlib_main;
1720 mc_msg_master_assert_t *mp;
1723 int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
1724 clib_error_t *error;
1725 f64 now, time_last_master_assert = -1;
1730 if (!mcm->we_can_be_relay_master)
1732 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1733 if (MC_EVENT_LOGGING)
1735 ELOG_TYPE (e, "become slave (config)");
1736 ELOG (mcm->elog_main, e, 0);
1741 now = vlib_time_now (vm);
1742 if (now >= time_last_master_assert + 1)
1744 time_last_master_assert = now;
1745 mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
1747 mp->peer_id = mcm->transport.our_ack_peer_id;
1748 mp->global_sequence = mcm->relay_global_sequence;
1751 * these messages clog the event log, set MC_EVENT_LOGGING higher
1754 if (MC_EVENT_LOGGING > 1)
1757 ELOG_TYPE_DECLARE (e) =
1759 .format = "tx-massert: peer %s global seq %u",
1760 .format_args = "T4i4",
1765 u32 peer, global_sequence;
1767 ed = ELOG_DATA (mcm->elog_main, e);
1768 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1769 ed->global_sequence = mp->global_sequence;
1772 mc_byte_swap_msg_master_assert (mp);
1775 mcm->transport.tx_buffer (mcm->transport.opaque,
1776 MC_TRANSPORT_MASTERSHIP, bi);
1778 clib_error_report (error);
1781 vlib_process_wait_for_event_or_clock (vm, 1.0);
1782 event_type = vlib_process_get_events (vm, /* no event data */ 0);
1787 if (!is_master && timeouts++ > 2)
1789 mcm->relay_state = MC_RELAY_STATE_MASTER;
1790 mcm->relay_master_peer_id =
1791 mcm->transport.our_ack_peer_id.as_u64;
1792 if (MC_EVENT_LOGGING)
1794 ELOG_TYPE (e, "become master (was maybe_master)");
1795 ELOG (mcm->elog_main, e, 0);
1801 case MC_RELAY_STATE_SLAVE:
1802 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1803 if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
1805 ELOG_TYPE (e, "become slave (was maybe_master)");
1806 ELOG (mcm->elog_main, e, 0);
1814 this_node_slave (mc_main_t * mcm)
1816 vlib_main_t *vm = mcm->vlib_main;
1820 if (MC_EVENT_LOGGING)
1822 ELOG_TYPE (e, "become slave");
1823 ELOG (mcm->elog_main, e, 0);
1828 vlib_process_wait_for_event_or_clock (vm, 1.0);
1829 event_type = vlib_process_get_events (vm, /* no event data */ 0);
1836 mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
1837 mcm->relay_master_peer_id = ~0ULL;
1838 if (MC_EVENT_LOGGING)
1840 ELOG_TYPE (e, "timeouts; negoitate mastership");
1841 ELOG (mcm->elog_main, e, 0);
1847 case MC_RELAY_STATE_SLAVE:
1848 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1856 mc_mastership_process (vlib_main_t * vm,
1857 vlib_node_runtime_t * node, vlib_frame_t * f)
1859 mc_main_t *mcm = mc_node_get_main (node);
1863 switch (mcm->relay_state)
1865 case MC_RELAY_STATE_NEGOTIATE:
1866 case MC_RELAY_STATE_MASTER:
1867 this_node_maybe_master (mcm);
1870 case MC_RELAY_STATE_SLAVE:
1871 this_node_slave (mcm);
1875 return 0; /* not likely */
1879 mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
1881 if (we_can_be_master != mcm->we_can_be_relay_master)
1883 mcm->we_can_be_relay_master = we_can_be_master;
1884 vlib_process_signal_event (mcm->vlib_main,
1885 mcm->mastership_process,
1886 MC_RELAY_STATE_NEGOTIATE, 0);
1891 mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
1894 mc_peer_id_t his_peer_id, our_peer_id;
1896 u8 signal_slave = 0;
1897 u8 update_global_sequence = 0;
1899 mc_byte_swap_msg_master_assert (mp);
1901 his_peer_id = mp->peer_id;
1902 our_peer_id = mcm->transport.our_ack_peer_id;
1904 /* compare the incoming global sequence with ours */
1905 seq_cmp_result = mc_seq_cmp (mp->global_sequence,
1906 mcm->relay_global_sequence);
1908 /* If the sender has a lower peer id and the sender's sequence >=
1909 our global sequence, we become a slave. Otherwise we are master. */
1910 if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
1911 && seq_cmp_result >= 0)
1913 vlib_process_signal_event (mcm->vlib_main,
1914 mcm->mastership_process,
1915 MC_RELAY_STATE_SLAVE, 0);
1919 /* Update our global sequence. */
1920 if (seq_cmp_result > 0)
1922 mcm->relay_global_sequence = mp->global_sequence;
1923 update_global_sequence = 1;
1927 uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
1928 mc_mastership_peer_t *p;
1931 p = vec_elt_at_index (mcm->mastership_peers, q[0]);
1934 vec_add2 (mcm->mastership_peers, p, 1);
1935 p->peer_id = his_peer_id;
1936 mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
1937 p - mcm->mastership_peers,
1940 p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
1944 * these messages clog the event log, set MC_EVENT_LOGGING higher
1947 if (MC_EVENT_LOGGING > 1)
1950 ELOG_TYPE_DECLARE (e) =
1952 .format = "rx-massert: peer %s global seq %u upd %d slave %d",
1953 .format_args = "T4i4i1i1",
1960 u32 global_sequence;
1964 ed = ELOG_DATA (mcm->elog_main, e);
1965 ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
1966 ed->global_sequence = mp->global_sequence;
1967 ed->update_sequence = update_global_sequence;
1968 ed->slave = signal_slave;
1973 mc_serialize_init (mc_main_t * mcm)
1975 mc_serialize_msg_t *m;
1976 vlib_main_t *vm = vlib_get_main ();
1978 mcm->global_msg_index_by_name
1979 = hash_create_string ( /* elts */ 0, sizeof (uword));
1981 m = vm->mc_msg_registrations;
1985 m->global_index = vec_len (mcm->global_msgs);
1986 hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
1987 vec_add1 (mcm->global_msgs, m);
1988 m = m->next_registration;
1993 mc_serialize_va (mc_main_t * mc,
1995 u32 multiple_messages_per_vlib_buffer,
1996 mc_serialize_msg_t * msg, va_list * va)
1999 clib_error_t *error;
2000 serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
2001 vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
2002 u32 bi, n_before, n_after, n_total, n_this_msg;
2005 if (!sbm->vlib_main)
2007 sbm->tx.max_n_data_bytes_per_chain = 4096;
2008 sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
2011 if (sbm->first_buffer == 0)
2012 serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
2014 n_before = serialize_vlib_buffer_n_bytes (m);
2016 s = mc_stream_by_index (mc, stream_index);
2017 gi = msg->global_index;
2018 ASSERT (msg == vec_elt (mc->global_msgs, gi));
2021 if (gi < vec_len (s->stream_msg_index_by_global_index))
2022 si = s->stream_msg_index_by_global_index[gi];
2024 serialize_likely_small_unsigned_integer (m, si);
2026 /* For first time message is sent, use name to identify message. */
2027 if (si == ~0 || MSG_ID_DEBUG)
2028 serialize_cstring (m, msg->name);
2030 if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
2033 ELOG_TYPE_DECLARE (e) =
2035 .format = "serialize-msg: %s index %d",
2036 .format_args = "T4i4",
2043 ed = ELOG_DATA (mc->elog_main, e);
2044 ed->c[0] = elog_id_for_msg_name (mc, msg->name);
2048 error = va_serialize (m, va);
2050 n_after = serialize_vlib_buffer_n_bytes (m);
2051 n_this_msg = n_after - n_before;
2052 n_total = n_after + sizeof (mc_msg_user_request_t);
2054 /* For max message size ignore first message where string name is sent. */
2056 msg->max_n_bytes_serialized =
2057 clib_max (msg->max_n_bytes_serialized, n_this_msg);
2059 if (!multiple_messages_per_vlib_buffer
2061 || n_total + msg->max_n_bytes_serialized >
2062 mc->transport.max_packet_size)
2064 bi = serialize_close_vlib_buffer (m);
2065 sbm->first_buffer = 0;
2067 mc_stream_send (mc, stream_index, bi);
2069 vlib_buffer_free_one (mc->vlib_main, bi);
2076 mc_serialize_internal (mc_main_t * mc,
2078 u32 multiple_messages_per_vlib_buffer,
2079 mc_serialize_msg_t * msg, ...)
2081 vlib_main_t *vm = mc->vlib_main;
2083 clib_error_t *error;
2085 if (stream_index == ~0)
2087 if (vm->mc_main && vm->mc_stream_index == ~0)
2088 vlib_current_process_wait_for_one_time_event_vector
2089 (vm, &vm->procs_waiting_for_mc_stream_join);
2090 stream_index = vm->mc_stream_index;
2094 error = mc_serialize_va (mc, stream_index,
2095 multiple_messages_per_vlib_buffer, msg, &va);
2101 mc_unserialize_message (mc_main_t * mcm,
2102 mc_stream_t * s, serialize_main_t * m)
2104 mc_serialize_stream_msg_t *sm;
2107 si = unserialize_likely_small_unsigned_integer (m);
2109 if (!(si == ~0 || MSG_ID_DEBUG))
2111 sm = vec_elt_at_index (s->stream_msgs, si);
2112 gi = sm->global_index;
2118 unserialize_cstring (m, &name);
2120 if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
2123 ELOG_TYPE_DECLARE (e) =
2125 .format = "unserialize-msg: %s rx index %d",
2126 .format_args = "T4i4",
2133 ed = ELOG_DATA (mcm->elog_main, e);
2134 ed->c[0] = elog_id_for_msg_name (mcm, name);
2139 uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
2143 /* Unknown message? */
2150 vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
2151 si = s->stream_msg_index_by_global_index[gi];
2153 /* Stream local index unknown? Create it. */
2156 vec_add2 (s->stream_msgs, sm, 1);
2158 si = sm - s->stream_msgs;
2159 sm->global_index = gi;
2160 s->stream_msg_index_by_global_index[gi] = si;
2162 if (MC_EVENT_LOGGING > 0)
2165 ELOG_TYPE_DECLARE (e) =
2167 .format = "msg-bind: stream %d %s to index %d",
2168 .format_args = "i4T4i4",
2175 ed = ELOG_DATA (mcm->elog_main, e);
2176 ed->c[0] = s->index;
2177 ed->c[1] = elog_id_for_msg_name (mcm, name);
2183 sm = vec_elt_at_index (s->stream_msgs, si);
2184 if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
2187 ELOG_TYPE_DECLARE (e) =
2189 .format = "msg-id-ERROR: %s index %d expected %d",
2190 .format_args = "T4i4i4",
2197 ed = ELOG_DATA (mcm->elog_main, e);
2198 ed->c[0] = elog_id_for_msg_name (mcm, name);
2201 if (sm->global_index <
2202 vec_len (s->stream_msg_index_by_global_index))
2204 s->stream_msg_index_by_global_index[sm->global_index];
2213 mc_serialize_msg_t *msg;
2214 msg = vec_elt (mcm->global_msgs, gi);
2215 unserialize (m, msg->unserialize, mcm);
2223 mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
2225 vlib_main_t *vm = mcm->vlib_main;
2226 serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
2227 vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
2228 mc_stream_and_buffer_t *sb;
2229 mc_stream_t *stream;
2233 pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
2234 stream_and_buffer_index);
2235 buffer_index = sb->buffer_index;
2236 stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
2237 pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
2239 if (stream->config.save_snapshot)
2241 u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
2242 static u8 *contents;
2243 vec_reset_length (contents);
2244 vec_validate (contents, n_bytes - 1);
2245 vlib_buffer_contents (vm, buffer_index, contents);
2246 stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
2250 ASSERT (vlib_in_process_context (vm));
2252 unserialize_open_vlib_buffer (m, vm, sbm);
2254 clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
2256 while (unserialize_vlib_buffer_n_bytes (m) > 0)
2257 mc_unserialize_message (mcm, stream, m);
2260 unserialize_close_vlib_buffer (m);
2264 mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
2266 vlib_main_t *vm = mcm->vlib_main;
2267 mc_stream_and_buffer_t *sb;
2268 pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
2269 sb->stream_index = s->index;
2270 sb->buffer_index = buffer_index;
2271 vlib_process_signal_event (vm, mcm->unserialize_process,
2272 EVENT_MC_UNSERIALIZE_BUFFER,
2273 sb - mcm->mc_unserialize_stream_and_buffers);
2277 mc_unserialize_process (vlib_main_t * vm,
2278 vlib_node_runtime_t * node, vlib_frame_t * f)
2280 mc_main_t *mcm = mc_node_get_main (node);
2281 uword event_type, *event_data = 0;
2287 _vec_len (event_data) = 0;
2289 vlib_process_wait_for_event (vm);
2290 event_type = vlib_process_get_events (vm, &event_data);
2293 case EVENT_MC_UNSERIALIZE_BUFFER:
2294 for (i = 0; i < vec_len (event_data); i++)
2295 mc_unserialize_internal (mcm, event_data[i]);
2298 case EVENT_MC_UNSERIALIZE_CATCHUP:
2299 for (i = 0; i < vec_len (event_data); i++)
2301 u8 *mp = uword_to_pointer (event_data[i], u8 *);
2302 perform_catchup (mcm, (void *) mp);
2312 return 0; /* not likely */
2316 serialize_mc_main (serialize_main_t * m, va_list * va)
2318 mc_main_t *mcm = va_arg (*va, mc_main_t *);
2320 mc_serialize_stream_msg_t *sm;
2321 mc_serialize_msg_t *msg;
2323 serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
2324 vec_foreach (s, mcm->stream_vector)
2327 serialize_cstring (m, s->config.name);
2329 /* Serialize global names for all sent messages. */
2330 serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
2331 vec_foreach (sm, s->stream_msgs)
2333 msg = vec_elt (mcm->global_msgs, sm->global_index);
2334 serialize_cstring (m, msg->name);
2340 unserialize_mc_main (serialize_main_t * m, va_list * va)
2342 mc_main_t *mcm = va_arg (*va, mc_main_t *);
2343 u32 i, n_streams, n_stream_msgs;
2346 mc_serialize_stream_msg_t *sm;
2348 unserialize_integer (m, &n_streams, sizeof (u32));
2349 for (i = 0; i < n_streams; i++)
2351 unserialize_cstring (m, &name);
2352 if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
2354 vec_validate (mcm->stream_vector, i);
2355 s = vec_elt_at_index (mcm->stream_vector, i);
2357 s->index = s - mcm->stream_vector;
2358 s->config.name = name;
2359 s->state = MC_STREAM_STATE_name_known;
2360 hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
2365 s = vec_elt_at_index (mcm->stream_vector, i);
2367 vec_free (s->stream_msgs);
2368 vec_free (s->stream_msg_index_by_global_index);
2370 unserialize_integer (m, &n_stream_msgs, sizeof (u32));
2371 vec_resize (s->stream_msgs, n_stream_msgs);
2372 vec_foreach (sm, s->stream_msgs)
2377 unserialize_cstring (m, &name);
2378 p = hash_get (mcm->global_msg_index_by_name, name);
2380 si = sm - s->stream_msgs;
2382 if (MC_EVENT_LOGGING > 0)
2385 ELOG_TYPE_DECLARE (e) =
2387 .format = "catchup-bind: %s to %d global index %d stream %d",
2388 .format_args = "T4i4i4i4",
2396 ed = ELOG_DATA (mcm->elog_main, e);
2397 ed->c[0] = elog_id_for_msg_name (mcm, name);
2400 ed->c[3] = s->index;
2405 sm->global_index = gi;
2408 vec_validate_init_empty (s->stream_msg_index_by_global_index,
2410 s->stream_msg_index_by_global_index[gi] = si;
2417 mc_main_init (mc_main_t * mcm, char *tag)
2419 vlib_main_t *vm = vlib_get_main ();
2421 mcm->vlib_main = vm;
2422 mcm->elog_main = &vm->elog_main;
2424 mcm->relay_master_peer_id = ~0ULL;
2425 mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
2427 mcm->stream_index_by_name
2428 = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
2431 vlib_node_registration_t r;
2433 memset (&r, 0, sizeof (r));
2435 r.type = VLIB_NODE_TYPE_PROCESS;
2437 /* Point runtime data to main instance. */
2438 r.runtime_data = &mcm;
2439 r.runtime_data_bytes = sizeof (&mcm);
2441 r.name = (char *) format (0, "mc-mastership-%s", tag);
2442 r.function = mc_mastership_process;
2443 mcm->mastership_process = vlib_register_node (vm, &r);
2445 r.name = (char *) format (0, "mc-join-ager-%s", tag);
2446 r.function = mc_join_ager_process;
2447 mcm->join_ager_process = vlib_register_node (vm, &r);
2449 r.name = (char *) format (0, "mc-retry-%s", tag);
2450 r.function = mc_retry_process;
2451 mcm->retry_process = vlib_register_node (vm, &r);
2453 r.name = (char *) format (0, "mc-catchup-%s", tag);
2454 r.function = mc_catchup_process;
2455 mcm->catchup_process = vlib_register_node (vm, &r);
2457 r.name = (char *) format (0, "mc-unserialize-%s", tag);
2458 r.function = mc_unserialize_process;
2459 mcm->unserialize_process = vlib_register_node (vm, &r);
2462 if (MC_EVENT_LOGGING > 0)
2463 mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
2464 sizeof (mc_peer_id_t));
2466 mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
2467 sizeof (mc_peer_id_t));
2468 mc_serialize_init (mcm);
2472 format_mc_relay_state (u8 * s, va_list * args)
2474 mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
2478 case MC_RELAY_STATE_NEGOTIATE:
2481 case MC_RELAY_STATE_MASTER:
2484 case MC_RELAY_STATE_SLAVE:
2488 return format (s, "unknown 0x%x", state);
2491 return format (s, "%s", t);
2495 format_mc_stream_state (u8 * s, va_list * args)
2497 mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
2501 #define _(f) case MC_STREAM_STATE_##f: t = #f; break;
2502 foreach_mc_stream_state
2505 return format (s, "unknown 0x%x", state);
2508 return format (s, "%s", t);
2512 mc_peer_comp (void *a1, void *a2)
2514 mc_stream_peer_t *p1 = a1;
2515 mc_stream_peer_t *p2 = a2;
2517 return mc_peer_id_compare (p1->id, p2->id);
2521 format_mc_main (u8 * s, va_list * args)
2523 mc_main_t *mcm = va_arg (*args, mc_main_t *);
2525 mc_stream_peer_t *p, *ps;
2526 u32 indent = format_get_indent (s);
2528 s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
2529 format_mc_relay_state, mcm->relay_state,
2530 vec_len (mcm->stream_vector), mcm->relay_global_sequence);
2533 mc_mastership_peer_t *mp;
2534 f64 now = vlib_time_now (mcm->vlib_main);
2535 s = format (s, "\n%UMost recent mastership peers:",
2536 format_white_space, indent + 2);
2537 vec_foreach (mp, mcm->mastership_peers)
2539 s = format (s, "\n%U%-30U%.4e",
2540 format_white_space, indent + 4,
2541 mcm->transport.format_peer_id, mp->peer_id,
2542 now - mp->time_last_master_assert_received);
2546 vec_foreach (t, mcm->stream_vector)
2548 s = format (s, "\n%Ustream `%s' index %d",
2549 format_white_space, indent + 2, t->config.name, t->index);
2551 s = format (s, "\n%Ustate %U",
2552 format_white_space, indent + 4,
2553 format_mc_stream_state, t->state);
2557 "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2558 format_white_space, indent + 4, t->config.retry_interval,
2559 t->config.retry_limit, pool_elts (t->retry_pool),
2560 t->stats.n_retries - t->stats_last_clear.n_retries);
2562 s = format (s, "\n%U%Ld/%Ld user requests sent/received",
2563 format_white_space, indent + 4,
2564 t->user_requests_sent, t->user_requests_received);
2566 s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
2567 format_white_space, indent + 4,
2568 pool_elts (t->peers),
2569 t->our_local_sequence, t->last_global_sequence_processed);
2573 pool_foreach (p, t->peers,
2575 if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2576 vec_add1 (ps, p[0]);
2579 vec_sort_with_function (ps, mc_peer_comp);
2580 s = format (s, "\n%U%=30s%10s%16s%16s",
2581 format_white_space, indent + 6,
2582 "Peer", "Last seq", "Retries", "Future");
2586 s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
2587 format_white_space, indent + 6,
2588 mcm->transport.format_peer_id, p->id.as_u64,
2589 p->last_sequence_received,
2590 p->stats.n_msgs_from_past -
2591 p->stats_last_clear.n_msgs_from_past,
2592 p->stats.n_msgs_from_future -
2593 p->stats_last_clear.n_msgs_from_future,
2594 (mcm->transport.our_ack_peer_id.as_u64 ==
2595 p->id.as_u64 ? " (self)" : ""));
2604 * fd.io coding-style-patch-verification: ON
2607 * eval: (c-set-style "gnu")