2 * mc.h: vlib reliable sequenced multicast distributed applications
4 * Copyright (c) 2010 Cisco and/or its affiliates.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef included_vlib_mc_h
19 #define included_vlib_mc_h
21 #include <vppinfra/elog.h>
22 #include <vppinfra/fifo.h>
23 #include <vppinfra/mhash.h>
24 #include <vlib/node.h>
26 #ifndef MC_EVENT_LOGGING
27 #define MC_EVENT_LOGGING 1
31 mc_need_byte_swap (void)
32 { return CLIB_ARCH_IS_LITTLE_ENDIAN; }
35 * Used to uniquely identify hosts.
36 * For IP4 this would be ip4_address plus tcp/udp port.
43 always_inline mc_peer_id_t
44 mc_byte_swap_peer_id (mc_peer_id_t i)
46 /* Peer id is already in network byte order. */
51 mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b)
53 return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
56 /* Assert mastership. Lowest peer_id amount all peers wins mastership.
57 Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
58 So, we don't need a message opcode. */
59 typedef CLIB_PACKED (struct {
60 /* Peer id asserting mastership. */
63 /* Global sequence number asserted. */
65 }) mc_msg_master_assert_t;
68 mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
70 if (mc_need_byte_swap ())
72 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
73 r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
77 #define foreach_mc_msg_type \
79 _ (join_or_leave_request) \
87 #define _(f) MC_MSG_TYPE_##f,
90 } mc_relay_msg_type_t;
92 /* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */
93 typedef CLIB_PACKED (struct {
96 mc_relay_msg_type_t type : 32; /* MC_MSG_TYPE_join_or_leave_request */
98 /* Stream to join or leave. */
101 /* join = 1, leave = 0 */
103 }) mc_msg_join_or_leave_request_t;
106 mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
108 if (mc_need_byte_swap ())
110 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
111 r->type = clib_byte_swap_u32 (r->type);
112 r->stream_index = clib_byte_swap_u32 (r->stream_index);
116 /* Join reply. Multicast over MC_TRANSPORT_JOIN. */
117 typedef CLIB_PACKED (struct {
118 mc_peer_id_t peer_id;
120 mc_relay_msg_type_t type : 32; /* MC_MSG_TYPE_join_reply */
124 /* Peer ID to contact to catchup with this stream. */
125 mc_peer_id_t catchup_peer_id;
126 }) mc_msg_join_reply_t;
129 mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
131 if (mc_need_byte_swap ())
133 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
134 r->type = clib_byte_swap_u32 (r->type);
135 r->stream_index = clib_byte_swap_u32 (r->stream_index);
136 r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
140 /* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
141 relayed by relay master after filling in global sequence number. */
142 typedef CLIB_PACKED (struct {
143 mc_peer_id_t peer_id;
147 /* Global sequence number as filled in by relay master. */
150 /* Local sequence number as filled in by peer sending message. */
153 /* Size of request data. */
156 /* Opaque request data. */
158 }) mc_msg_user_request_t;
161 mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
163 if (mc_need_byte_swap ())
165 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
166 r->stream_index = clib_byte_swap_u32 (r->stream_index);
167 r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
168 r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
169 r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
173 /* Sent unicast over ACK channel. */
174 typedef CLIB_PACKED (struct {
175 mc_peer_id_t peer_id;
180 }) mc_msg_user_ack_t;
183 mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
185 if (mc_need_byte_swap ())
187 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
188 r->stream_index = clib_byte_swap_u32 (r->stream_index);
189 r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
190 r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
191 r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
195 /* Sent/received unicast over catchup channel (e.g. using TCP). */
196 typedef CLIB_PACKED (struct {
197 mc_peer_id_t peer_id;
199 }) mc_msg_catchup_request_t;
202 mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
204 if (mc_need_byte_swap ())
206 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
207 r->stream_index = clib_byte_swap_u32 (r->stream_index);
211 /* Sent/received unicast over catchup channel. */
212 typedef CLIB_PACKED (struct {
213 mc_peer_id_t peer_id;
217 /* Last global sequence number included in catchup data. */
218 u32 last_global_sequence_included;
220 /* Size of catchup data. */
225 }) mc_msg_catchup_reply_t;
228 mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
230 if (mc_need_byte_swap ())
232 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
233 r->stream_index = clib_byte_swap_u32 (r->stream_index);
234 r->last_global_sequence_included = clib_byte_swap_u32 (r->last_global_sequence_included);
235 r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
239 typedef struct _mc_serialize_msg {
240 /* Name for this type. */
243 /* Functions to serialize/unserialize data. */
244 serialize_function_t * serialize;
245 serialize_function_t * unserialize;
247 /* Maximum message size in bytes when serialized.
248 If zero then this will be set to the largest sent message. */
249 u32 max_n_bytes_serialized;
251 /* Opaque to use for first argument to serialize/unserialize function. */
254 /* Index in global message vector. */
257 /* Registration list */
258 struct _mc_serialize_msg * next_registration;
259 } mc_serialize_msg_t;
262 /* Index into global message vector. */
264 } mc_serialize_stream_msg_t;
266 #define MC_SERIALIZE_MSG(x,...) \
267 __VA_ARGS__ mc_serialize_msg_t x; \
268 static void __mc_serialize_msg_registration_##x (void) \
269 __attribute__((__constructor__)) ; \
270 static void __mc_serialize_msg_registration_##x (void) \
272 vlib_main_t * vm = vlib_get_main(); \
273 x.next_registration = vm->mc_msg_registrations; \
274 vm->mc_msg_registrations = &x; \
276 __VA_ARGS__ mc_serialize_msg_t x
279 MC_TRANSPORT_MASTERSHIP,
281 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
282 MC_TRANSPORT_USER_REQUEST_FROM_RELAY,
284 } mc_transport_type_t;
287 clib_error_t * (* tx_buffer) (void * opaque, mc_transport_type_t type, u32 buffer_index);
289 clib_error_t * (* tx_ack) (void * opaque, mc_peer_id_t peer_id, u32 buffer_index);
291 /* Returns catchup opaque. */
292 uword (* catchup_request_fun) (void * opaque, u32 stream_index, mc_peer_id_t catchup_peer_id);
294 void (* catchup_send_fun) (void * opaque, uword catchup_opaque, u8 * data_vector);
296 /* Opaque passed to callbacks. */
299 mc_peer_id_t our_ack_peer_id;
300 mc_peer_id_t our_catchup_peer_id;
302 /* Max packet size (MTU) for this transport.
303 For IP this is interface MTU less IP + UDP header size. */
306 format_function_t * format_peer_id;
310 /* Count of messages received from this peer from the past/future
311 (with seq_cmp != 0). */
312 u64 n_msgs_from_past;
313 u64 n_msgs_from_future;
314 } mc_stream_peer_stats_t;
317 /* ID of this peer. */
320 /* The last sequence we received from this peer. */
321 u32 last_sequence_received;
323 mc_stream_peer_stats_t stats, stats_last_clear;
329 /* Cached copy of local sequence number from buffer. */
332 /* Number of times this buffer has been sent (retried). */
335 /* Previous/next retries in doubly-linked list. */
336 u32 prev_index, next_index;
338 /* Bitmap of all peers which have acked this msg */
339 uword * unacked_by_peer_bitmap;
341 /* Message send or resend time */
346 /* Number of retries sent for this stream. */
357 /* Number of outstanding messages. */
360 /* Retry interval, in seconds */
366 /* User rx buffer callback */
367 void (* rx_buffer) (struct mc_main_t * mc_main,
368 struct mc_stream_t * stream,
369 mc_peer_id_t peer_id,
372 /* User callback to create a snapshot */
373 u8 * (* catchup_snapshot) (struct mc_main_t *mc_main,
374 u8 * snapshot_vector,
375 u32 last_global_sequence_included);
377 /* User callback to replay a snapshot */
378 void (* catchup) (struct mc_main_t *mc_main,
380 u32 n_snapshot_data_bytes);
382 /* Callback to save a snapshot for offline replay */
383 void (* save_snapshot) (struct mc_main_t *mc_main,
386 u32 n_snapshot_data_bytes);
388 /* Called when a peer dies */
389 void (* peer_died) (struct mc_main_t * mc_main,
390 struct mc_stream_t * stream,
391 mc_peer_id_t peer_id);
392 } mc_stream_config_t;
394 #define foreach_mc_stream_state \
397 _ (join_in_progress) \
402 #define _(f) MC_STREAM_STATE_##f,
403 foreach_mc_stream_state
407 typedef struct mc_stream_t {
408 mc_stream_config_t config;
410 mc_stream_state_t state;
412 /* Index in stream pool. */
415 /* Stream index 0 is always for MC internal use. */
416 #define MC_STREAM_INDEX_INTERNAL 0
418 mc_retry_t * retry_pool;
420 /* Head and tail index of retry pool. */
421 u32 retry_head_index, retry_tail_index;
424 * Country club for recently retired messages
425 * If the set of peers is expanding and a new peer
426 * misses a message, we can easily retire the FIFO
427 * element before we even know about the new peer
429 mc_retry_t * retired_fifo;
431 /* Hash mapping local sequence to retry pool index. */
432 uword * retry_index_by_local_sequence;
434 /* catch-up fifo of VLIB buffer indices.
435 start recording when catching up. */
438 mc_stream_stats_t stats, stats_last_clear;
441 mc_stream_peer_t * peers;
443 /* Bitmap with ones for all peers in peer pool. */
444 uword * all_peer_bitmap;
446 /* Map of 64 bit id to index in stream pool. */
447 mhash_t peer_index_by_id;
449 /* Timeout, in case we're alone in the world */
452 vlib_one_time_waiting_process_t * procs_waiting_for_join_done;
454 vlib_one_time_waiting_process_t * procs_waiting_for_open_window;
456 /* Next sequence number to use */
457 u32 our_local_sequence;
460 * Last global sequence we processed.
461 * When supplying catchup data, we need to tell
462 * the client precisely where to start replaying
464 u32 last_global_sequence_processed;
466 /* Vector of unique messages we've sent on this stream. */
467 mc_serialize_stream_msg_t * stream_msgs;
469 /* Vector global message index into per stream message index. */
470 u32 * stream_msg_index_by_global_index;
472 /* Hashed by message name. */
473 uword * stream_msg_index_by_name;
475 u64 user_requests_sent;
476 u64 user_requests_received;
480 mc_stream_free (mc_stream_t * s)
482 pool_free (s->retry_pool);
483 hash_free (s->retry_index_by_local_sequence);
484 clib_fifo_free (s->catchup_fifo);
485 pool_free (s->peers);
486 mhash_free (&s->peer_index_by_id);
487 vec_free (s->procs_waiting_for_join_done);
488 vec_free (s->procs_waiting_for_open_window);
492 mc_stream_init (mc_stream_t * s)
494 memset (s, 0, sizeof (s[0]));
495 s->retry_head_index = s->retry_tail_index = ~0;
501 u8 *catchup_snapshot;
502 } mc_catchup_process_arg_t;
505 MC_RELAY_STATE_NEGOTIATE,
506 MC_RELAY_STATE_MASTER,
507 MC_RELAY_STATE_SLAVE,
511 mc_peer_id_t peer_id;
513 f64 time_last_master_assert_received;
514 } mc_mastership_peer_t;
519 } mc_stream_and_buffer_t;
521 typedef struct mc_main_t {
522 mc_relay_state_t relay_state;
525 u32 we_can_be_relay_master;
527 u64 relay_master_peer_id;
529 mc_mastership_peer_t * mastership_peers;
531 /* Map of 64 bit id to index in stream pool. */
532 mhash_t mastership_peer_index_by_id;
534 /* The transport we're using. */
535 mc_transport_t transport;
537 /* Last-used global sequence number. */
538 u32 relay_global_sequence;
540 /* Vector of streams. */
541 mc_stream_t * stream_vector;
543 /* Hash table mapping stream name to pool index. */
544 uword * stream_index_by_name;
546 uword * procs_waiting_for_stream_name_by_name;
548 vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool;
550 int joins_in_progress;
552 mc_catchup_process_arg_t * catchup_process_args;
554 /* Node indices for mastership, join ager,
555 retry and catchup processes. */
556 u32 mastership_process;
557 u32 join_ager_process;
560 u32 unserialize_process;
562 /* Global vector of messages. */
563 mc_serialize_msg_t ** global_msgs;
565 /* Hash table mapping message name to index. */
566 uword * global_msg_index_by_name;
568 /* Shared serialize/unserialize main. */
569 serialize_main_t serialize_mains[VLIB_N_RX_TX];
571 vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX];
573 /* Convenience variables */
574 struct vlib_main_t * vlib_main;
575 elog_main_t * elog_main;
577 /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
578 mhash_t elog_id_by_peer_id;
580 uword *elog_id_by_msg_name;
582 /* For mc_unserialize. */
583 mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers;
586 always_inline mc_stream_t *
587 mc_stream_by_name (mc_main_t * m, char * name)
589 uword * p = hash_get (m->stream_index_by_name, name);
590 return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
593 always_inline mc_stream_t *
594 mc_stream_by_index (mc_main_t * m, u32 i)
596 return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
600 mc_clear_stream_stats (mc_main_t * m)
603 mc_stream_peer_t * p;
604 vec_foreach (s, m->stream_vector)
606 s->stats_last_clear = s->stats;
607 pool_foreach (p, s->peers, ({
608 p->stats_last_clear = p->stats;
613 /* Declare all message handlers. */
614 #define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
618 u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *);
620 void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
622 void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name);
624 u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
626 void mc_main_init (mc_main_t * mcm, char * tag);
628 void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
630 void * mc_get_vlib_buffer (struct vlib_main_t * vm, u32 n_bytes, u32 * bi_return);
632 format_function_t format_mc_main;
635 mc_serialize_internal (mc_main_t * mc,
637 u32 multiple_messages_per_vlib_buffer,
638 mc_serialize_msg_t * msg,
642 mc_serialize_va (mc_main_t * mc,
644 u32 multiple_messages_per_vlib_buffer,
645 mc_serialize_msg_t * msg,
648 #define mc_serialize_stream(mc,si,msg,args...) \
649 mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
651 #define mc_serialize(mc,msg,args...) \
652 mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
654 #define mc_serialize2(mc,add,msg,args...) \
655 mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
657 void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
658 uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s,
659 serialize_main_t * m);
661 serialize_function_t serialize_mc_main, unserialize_mc_main;
664 mc_max_message_size_in_bytes (mc_main_t * mcm)
665 { return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t); }
668 mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m)
669 { return mc_max_message_size_in_bytes (mcm) - serialize_vlib_buffer_n_bytes (m); }
671 void unserialize_mc_stream (serialize_main_t * m, va_list * va);
672 void mc_stream_join_process_hold (void);
674 #endif /* included_vlib_mc_h */