Initial commit of vpp code.
[vpp.git] / vlib / vlib / mc.h
1 /*
2  * mc.h: vlib reliable sequenced multicast distributed applications
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef included_vlib_mc_h
19 #define included_vlib_mc_h
20
21 #include <vppinfra/elog.h>
22 #include <vppinfra/fifo.h>
23 #include <vppinfra/mhash.h>
24 #include <vlib/node.h>
25
26 #ifndef MC_EVENT_LOGGING
27 #define MC_EVENT_LOGGING 1
28 #endif
29
30 always_inline uword
31 mc_need_byte_swap (void)
32 { return CLIB_ARCH_IS_LITTLE_ENDIAN; }
33
34 /* 
35  * Used to uniquely identify hosts.  
36  * For IP4 this would be ip4_address plus tcp/udp port. 
37  */
38 typedef union {
39   u8 as_u8[8];
40   u64 as_u64;
41 } mc_peer_id_t;
42
43 always_inline mc_peer_id_t
44 mc_byte_swap_peer_id (mc_peer_id_t i)
45
46   /* Peer id is already in network byte order. */
47   return i;
48 }
49
50 always_inline int
51 mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b)
52 {
53   return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
54 }
55
56 /* Assert mastership.  Lowest peer_id amount all peers wins mastership.
57    Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
58    So, we don't need a message opcode. */
59 typedef CLIB_PACKED (struct {
60   /* Peer id asserting mastership. */
61   mc_peer_id_t peer_id;
62
63   /* Global sequence number asserted. */
64   u32 global_sequence;
65 }) mc_msg_master_assert_t;
66
67 always_inline void
68 mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
69 {
70   if (mc_need_byte_swap ())
71     {
72       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
73       r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
74     }
75 }
76
77 #define foreach_mc_msg_type                     \
78   _ (master_assert)                             \
79   _ (join_or_leave_request)                     \
80   _ (join_reply)                                \
81   _ (user_request)                              \
82   _ (user_ack)                                  \
83   _ (catchup_request)                           \
84   _ (catchup_reply)
85
86 typedef enum {
87 #define _(f) MC_MSG_TYPE_##f,
88   foreach_mc_msg_type
89 #undef _
90 } mc_relay_msg_type_t;
91
92 /* Request to join a given stream.  Multicast over MC_TRANSPORT_JOIN. */
93 typedef CLIB_PACKED (struct {
94   mc_peer_id_t peer_id;
95
96   mc_relay_msg_type_t type : 32; /* MC_MSG_TYPE_join_or_leave_request */
97
98   /* Stream to join or leave. */
99   u32 stream_index;
100
101   /* join = 1, leave = 0 */
102   u8 is_join;
103 }) mc_msg_join_or_leave_request_t;
104
105 always_inline void
106 mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
107 {
108   if (mc_need_byte_swap ())
109     {
110       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
111       r->type = clib_byte_swap_u32 (r->type);
112       r->stream_index = clib_byte_swap_u32 (r->stream_index);
113     }
114 }
115
116 /* Join reply.  Multicast over MC_TRANSPORT_JOIN. */
117 typedef CLIB_PACKED (struct {
118   mc_peer_id_t peer_id;
119
120   mc_relay_msg_type_t type : 32; /* MC_MSG_TYPE_join_reply */
121
122   u32 stream_index;
123
124   /* Peer ID to contact to catchup with this stream. */
125   mc_peer_id_t catchup_peer_id;
126 }) mc_msg_join_reply_t;
127
128 always_inline void
129 mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
130 {
131   if (mc_need_byte_swap ())
132     {
133       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
134       r->type = clib_byte_swap_u32 (r->type);
135       r->stream_index = clib_byte_swap_u32 (r->stream_index);
136       r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
137     }
138 }
139
140 /* Generic (application) request.  Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
141    relayed by relay master after filling in global sequence number. */
142 typedef CLIB_PACKED (struct {
143   mc_peer_id_t peer_id;
144
145   u32 stream_index;
146
147   /* Global sequence number as filled in by relay master. */
148   u32 global_sequence;
149
150   /* Local sequence number as filled in by peer sending message. */
151   u32 local_sequence;
152
153   /* Size of request data. */
154   u32 n_data_bytes; 
155
156   /* Opaque request data. */
157   u8 data[0];
158 }) mc_msg_user_request_t;
159
160 always_inline void
161 mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
162 {
163   if (mc_need_byte_swap ())
164     {
165       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
166       r->stream_index = clib_byte_swap_u32 (r->stream_index);
167       r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
168       r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
169       r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
170     }
171 }
172
173 /* Sent unicast over ACK channel. */
174 typedef CLIB_PACKED (struct {
175   mc_peer_id_t peer_id;
176   u32 global_sequence;
177   u32 stream_index;
178   u32 local_sequence;
179   i32 seq_cmp_result;
180 }) mc_msg_user_ack_t;
181
182 always_inline void
183 mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
184 {
185   if (mc_need_byte_swap ())
186     {
187       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
188       r->stream_index = clib_byte_swap_u32 (r->stream_index);
189       r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
190       r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
191       r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
192     }
193 }
194
195 /* Sent/received unicast over catchup channel (e.g. using TCP). */
196 typedef CLIB_PACKED (struct {
197   mc_peer_id_t peer_id;
198   u32 stream_index;
199 }) mc_msg_catchup_request_t;
200
201 always_inline void
202 mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
203 {
204   if (mc_need_byte_swap ())
205     {
206       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
207       r->stream_index = clib_byte_swap_u32 (r->stream_index);
208     }
209 }
210
211 /* Sent/received unicast over catchup channel. */
212 typedef CLIB_PACKED (struct {
213   mc_peer_id_t peer_id;
214
215   u32 stream_index;
216
217   /* Last global sequence number included in catchup data. */
218   u32 last_global_sequence_included;
219
220   /* Size of catchup data. */
221   u32 n_data_bytes;
222
223   /* Catchup data. */
224   u8 data[0];
225 }) mc_msg_catchup_reply_t;
226
227 always_inline void
228 mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
229 {
230   if (mc_need_byte_swap ())
231     {
232       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
233       r->stream_index = clib_byte_swap_u32 (r->stream_index);
234       r->last_global_sequence_included = clib_byte_swap_u32 (r->last_global_sequence_included);
235       r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
236     }
237 }
238
239 typedef struct _mc_serialize_msg {
240   /* Name for this type. */
241   char * name;
242
243   /* Functions to serialize/unserialize data. */
244   serialize_function_t * serialize;
245   serialize_function_t * unserialize;
246
247   /* Maximum message size in bytes when serialized.
248      If zero then this will be set to the largest sent message. */
249   u32 max_n_bytes_serialized;
250
251   /* Opaque to use for first argument to serialize/unserialize function. */
252   u32 opaque;
253
254   /* Index in global message vector. */
255   u32 global_index;
256
257   /* Registration list */
258   struct _mc_serialize_msg * next_registration;
259 } mc_serialize_msg_t;
260
261 typedef struct {
262   /* Index into global message vector. */
263   u32 global_index;
264 } mc_serialize_stream_msg_t;
265
266 #define MC_SERIALIZE_MSG(x,...)                                 \
267     __VA_ARGS__ mc_serialize_msg_t x;                           \
268 static void __mc_serialize_msg_registration_##x (void)          \
269     __attribute__((__constructor__)) ;                          \
270 static void __mc_serialize_msg_registration_##x (void)          \
271 {                                                               \
272     vlib_main_t * vm = vlib_get_main();                         \
273     x.next_registration = vm->mc_msg_registrations;             \
274     vm->mc_msg_registrations = &x;                              \
275 }                                                               \
276 __VA_ARGS__ mc_serialize_msg_t x 
277
278 typedef enum {
279   MC_TRANSPORT_MASTERSHIP,
280   MC_TRANSPORT_JOIN,
281   MC_TRANSPORT_USER_REQUEST_TO_RELAY,
282   MC_TRANSPORT_USER_REQUEST_FROM_RELAY,
283   MC_N_TRANSPORT_TYPE,
284 } mc_transport_type_t;
285
286 typedef struct {
287   clib_error_t * (* tx_buffer) (void * opaque, mc_transport_type_t type, u32 buffer_index);
288
289   clib_error_t * (* tx_ack) (void * opaque, mc_peer_id_t peer_id, u32 buffer_index);
290
291   /* Returns catchup opaque. */
292   uword (* catchup_request_fun) (void * opaque, u32 stream_index, mc_peer_id_t catchup_peer_id);
293
294   void (* catchup_send_fun) (void * opaque, uword catchup_opaque, u8 * data_vector);
295
296   /* Opaque passed to callbacks. */
297   void * opaque;
298
299   mc_peer_id_t our_ack_peer_id;
300   mc_peer_id_t our_catchup_peer_id;
301
302   /* Max packet size (MTU) for this transport.
303      For IP this is interface MTU less IP + UDP header size. */
304   u32 max_packet_size;
305
306   format_function_t * format_peer_id;
307 } mc_transport_t;
308
309 typedef struct {
310   /* Count of messages received from this peer from the past/future
311      (with seq_cmp != 0). */
312   u64 n_msgs_from_past;
313   u64 n_msgs_from_future;
314 } mc_stream_peer_stats_t;
315
316 typedef struct {
317   /* ID of this peer. */
318   mc_peer_id_t id;
319
320   /* The last sequence we received from this peer. */
321   u32 last_sequence_received;
322
323   mc_stream_peer_stats_t stats, stats_last_clear;
324 } mc_stream_peer_t;
325
326 typedef struct {
327   u32 buffer_index;
328
329   /* Cached copy of local sequence number from buffer. */
330   u32 local_sequence;
331
332   /* Number of times this buffer has been sent (retried). */
333   u32 n_retries;
334
335   /* Previous/next retries in doubly-linked list. */
336   u32 prev_index, next_index;
337
338   /* Bitmap of all peers which have acked this msg */
339   uword * unacked_by_peer_bitmap;
340
341   /* Message send or resend time */
342   f64 sent_at;
343 } mc_retry_t;
344
345 typedef struct {
346   /* Number of retries sent for this stream. */
347   u64 n_retries;
348 } mc_stream_stats_t;
349
350 struct mc_main_t;
351 struct mc_stream_t;
352
353 typedef struct {
354   /* Stream name. */
355   char * name;
356
357   /* Number of outstanding messages. */
358   u32 window_size;
359
360   /* Retry interval, in seconds */
361   f64 retry_interval;
362
363   /* Retry limit */
364   u32 retry_limit;
365
366   /* User rx buffer callback */
367   void (* rx_buffer) (struct mc_main_t * mc_main,
368                       struct mc_stream_t * stream,
369                       mc_peer_id_t peer_id,
370                       u32 buffer_index);
371
372   /* User callback to create a snapshot */
373   u8 * (* catchup_snapshot) (struct mc_main_t *mc_main,
374                              u8 * snapshot_vector,
375                              u32 last_global_sequence_included);
376
377   /* User callback to replay a snapshot */
378   void (* catchup) (struct mc_main_t *mc_main,
379                     u8 * snapshot_data,
380                     u32 n_snapshot_data_bytes);
381
382   /* Callback to save a snapshot for offline replay */
383   void (* save_snapshot) (struct mc_main_t *mc_main,
384                           u32 is_catchup,
385                           u8 * snapshot_data,
386                           u32 n_snapshot_data_bytes);
387
388   /* Called when a peer dies */
389   void (* peer_died) (struct mc_main_t * mc_main,
390                       struct mc_stream_t * stream,
391                       mc_peer_id_t peer_id);
392 } mc_stream_config_t;
393
394 #define foreach_mc_stream_state                 \
395   _ (invalid)                                   \
396   _ (name_known)                                \
397   _ (join_in_progress)                          \
398   _ (catchup)                                   \
399   _ (ready)
400
401 typedef enum {
402 #define _(f) MC_STREAM_STATE_##f,
403   foreach_mc_stream_state
404 #undef _
405 } mc_stream_state_t;
406
407 typedef struct mc_stream_t {
408   mc_stream_config_t config;
409
410   mc_stream_state_t state;
411
412   /* Index in stream pool. */
413   u32 index;
414
415   /* Stream index 0 is always for MC internal use. */
416 #define MC_STREAM_INDEX_INTERNAL 0
417
418   mc_retry_t * retry_pool;
419
420   /* Head and tail index of retry pool. */
421   u32 retry_head_index, retry_tail_index;
422
423   /* 
424    * Country club for recently retired messages
425    * If the set of peers is expanding and a new peer
426    * misses a message, we can easily retire the FIFO
427    * element before we even know about the new peer
428    */
429   mc_retry_t * retired_fifo;
430
431   /* Hash mapping local sequence to retry pool index. */
432   uword * retry_index_by_local_sequence;
433
434   /* catch-up fifo of VLIB buffer indices.
435      start recording when catching up. */
436   u32 * catchup_fifo;
437
438   mc_stream_stats_t stats, stats_last_clear;
439
440   /* Peer pool. */
441   mc_stream_peer_t * peers;
442
443   /* Bitmap with ones for all peers in peer pool. */
444   uword * all_peer_bitmap;
445
446   /* Map of 64 bit id to index in stream pool. */
447   mhash_t peer_index_by_id;
448
449   /* Timeout, in case we're alone in the world */
450   f64 join_timeout;
451
452   vlib_one_time_waiting_process_t * procs_waiting_for_join_done;
453
454   vlib_one_time_waiting_process_t * procs_waiting_for_open_window;
455
456   /* Next sequence number to use */
457   u32 our_local_sequence;
458
459   /* 
460    * Last global sequence we processed.
461    * When supplying catchup data, we need to tell
462    * the client precisely where to start replaying
463    */
464   u32 last_global_sequence_processed;
465
466   /* Vector of unique messages we've sent on this stream. */
467   mc_serialize_stream_msg_t * stream_msgs;
468
469   /* Vector global message index into per stream message index. */
470   u32 * stream_msg_index_by_global_index;
471
472   /* Hashed by message name. */
473   uword * stream_msg_index_by_name;
474
475   u64 user_requests_sent;
476   u64 user_requests_received;
477 } mc_stream_t;
478
479 always_inline void
480 mc_stream_free (mc_stream_t * s)
481 {
482   pool_free (s->retry_pool);
483   hash_free (s->retry_index_by_local_sequence);
484   clib_fifo_free (s->catchup_fifo);
485   pool_free (s->peers);
486   mhash_free (&s->peer_index_by_id);
487   vec_free (s->procs_waiting_for_join_done);
488   vec_free (s->procs_waiting_for_open_window);
489 }
490
491 always_inline void
492 mc_stream_init (mc_stream_t * s)
493 {
494   memset (s, 0, sizeof (s[0]));
495   s->retry_head_index = s->retry_tail_index = ~0;
496 }
497
498 typedef struct {
499   u32 stream_index;
500   u32 catchup_opaque;
501   u8 *catchup_snapshot;
502 } mc_catchup_process_arg_t;
503
504 typedef enum {
505   MC_RELAY_STATE_NEGOTIATE,
506   MC_RELAY_STATE_MASTER,
507   MC_RELAY_STATE_SLAVE,
508 } mc_relay_state_t;
509
510 typedef struct {
511   mc_peer_id_t peer_id;
512
513   f64 time_last_master_assert_received;
514 } mc_mastership_peer_t;
515
516 typedef struct {
517   u32 stream_index;
518   u32 buffer_index;
519 } mc_stream_and_buffer_t;
520
521 typedef struct mc_main_t {
522   mc_relay_state_t relay_state;
523
524   /* Mastership */
525   u32 we_can_be_relay_master;
526
527   u64 relay_master_peer_id;
528
529   mc_mastership_peer_t * mastership_peers;
530
531   /* Map of 64 bit id to index in stream pool. */
532   mhash_t mastership_peer_index_by_id;
533
534   /* The transport we're using. */
535   mc_transport_t transport;
536     
537   /* Last-used global sequence number. */
538   u32 relay_global_sequence;
539
540   /* Vector of streams. */
541   mc_stream_t * stream_vector;
542
543   /* Hash table mapping stream name to pool index. */
544   uword * stream_index_by_name;
545
546   uword * procs_waiting_for_stream_name_by_name;
547
548   vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool;
549
550   int joins_in_progress;
551
552   mc_catchup_process_arg_t * catchup_process_args;
553
554   /* Node indices for mastership, join ager,
555      retry and catchup processes. */
556   u32 mastership_process;
557   u32 join_ager_process;
558   u32 retry_process;
559   u32 catchup_process;
560   u32 unserialize_process;
561
562   /* Global vector of messages. */
563   mc_serialize_msg_t ** global_msgs;
564
565   /* Hash table mapping message name to index. */
566   uword * global_msg_index_by_name;
567
568   /* Shared serialize/unserialize main. */
569   serialize_main_t serialize_mains[VLIB_N_RX_TX];
570
571   vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX];
572
573   /* Convenience variables */
574   struct vlib_main_t * vlib_main;
575   elog_main_t * elog_main;
576
577   /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
578   mhash_t elog_id_by_peer_id;
579
580   uword *elog_id_by_msg_name;
581
582   /* For mc_unserialize. */
583   mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers;
584 } mc_main_t;
585
586 always_inline mc_stream_t *
587 mc_stream_by_name (mc_main_t * m, char * name)
588 {
589   uword * p = hash_get (m->stream_index_by_name, name);
590   return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
591 }
592
593 always_inline mc_stream_t *
594 mc_stream_by_index (mc_main_t * m, u32 i)
595 {
596   return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
597 }
598
599 always_inline void
600 mc_clear_stream_stats (mc_main_t * m)
601 {
602   mc_stream_t * s;
603   mc_stream_peer_t * p;
604   vec_foreach (s, m->stream_vector)
605     {
606       s->stats_last_clear = s->stats;
607       pool_foreach (p, s->peers, ({
608         p->stats_last_clear = p->stats;
609       }));
610     }
611 }
612
613 /* Declare all message handlers. */
614 #define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
615 foreach_mc_msg_type
616 #undef _
617
618 u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *);
619
620 void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
621
622 void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name);
623
624 u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
625
626 void mc_main_init (mc_main_t * mcm, char * tag);
627
628 void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
629
630 void * mc_get_vlib_buffer (struct vlib_main_t * vm, u32 n_bytes, u32 * bi_return);
631
632 format_function_t format_mc_main;
633
634 clib_error_t *
635 mc_serialize_internal (mc_main_t * mc,
636                        u32 stream_index,
637                        u32 multiple_messages_per_vlib_buffer,
638                        mc_serialize_msg_t * msg,
639                        ...);
640
641 clib_error_t *
642 mc_serialize_va (mc_main_t * mc,
643                  u32 stream_index,
644                  u32 multiple_messages_per_vlib_buffer,
645                  mc_serialize_msg_t * msg,
646                  va_list * va);
647
648 #define mc_serialize_stream(mc,si,msg,args...)                  \
649   mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
650
651 #define mc_serialize(mc,msg,args...)                            \
652   mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
653
654 #define mc_serialize2(mc,add,msg,args...)                               \
655   mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
656
657 void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
658 uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s,
659                               serialize_main_t * m);
660
661 serialize_function_t serialize_mc_main, unserialize_mc_main;
662
663 always_inline uword
664 mc_max_message_size_in_bytes (mc_main_t * mcm)
665 { return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t); }
666
667 always_inline word
668 mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m)
669 { return mc_max_message_size_in_bytes (mcm) - serialize_vlib_buffer_n_bytes (m); }
670
671 void unserialize_mc_stream (serialize_main_t * m, va_list * va);
672 void mc_stream_join_process_hold (void);
673
674 #endif /* included_vlib_mc_h */