Fix the path error inside vcl socket_test.sh
[vpp.git] / src / vlib / mc.h
1 /*
2  * mc.h: vlib reliable sequenced multicast distributed applications
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef included_vlib_mc_h
19 #define included_vlib_mc_h
20
21 #include <vppinfra/elog.h>
22 #include <vppinfra/fifo.h>
23 #include <vppinfra/mhash.h>
24 #include <vlib/node.h>
25
26 #ifndef MC_EVENT_LOGGING
27 #define MC_EVENT_LOGGING 1
28 #endif
29
30 always_inline uword
31 mc_need_byte_swap (void)
32 {
33   return CLIB_ARCH_IS_LITTLE_ENDIAN;
34 }
35
36 /*
37  * Used to uniquely identify hosts.
38  * For IP4 this would be ip4_address plus tcp/udp port.
39  */
40 typedef union
41 {
42   u8 as_u8[8];
43   u64 as_u64;
44 } mc_peer_id_t;
45
46 always_inline mc_peer_id_t
47 mc_byte_swap_peer_id (mc_peer_id_t i)
48 {
49   /* Peer id is already in network byte order. */
50   return i;
51 }
52
53 always_inline int
54 mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b)
55 {
56   return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
57 }
58
59 /* Assert mastership.  Lowest peer_id amount all peers wins mastership.
60    Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
61    So, we don't need a message opcode. */
62 typedef CLIB_PACKED (struct
63                      {
64                      /* Peer id asserting mastership. */
65                      mc_peer_id_t peer_id;
66                      /* Global sequence number asserted. */
67                      u32 global_sequence;}) mc_msg_master_assert_t;
68
69 always_inline void
70 mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
71 {
72   if (mc_need_byte_swap ())
73     {
74       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
75       r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
76     }
77 }
78
79 #define foreach_mc_msg_type                     \
80   _ (master_assert)                             \
81   _ (join_or_leave_request)                     \
82   _ (join_reply)                                \
83   _ (user_request)                              \
84   _ (user_ack)                                  \
85   _ (catchup_request)                           \
86   _ (catchup_reply)
87
88 typedef enum
89 {
90 #define _(f) MC_MSG_TYPE_##f,
91   foreach_mc_msg_type
92 #undef _
93 } mc_relay_msg_type_t;
94
95 /* Request to join a given stream.  Multicast over MC_TRANSPORT_JOIN. */
96 typedef CLIB_PACKED (struct
97                      {
98 mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
99                      /* MC_MSG_TYPE_join_or_leave_request */
100                      /* Stream to join or leave. */
101                      u32 stream_index;
102                      /* join = 1, leave = 0 */
103                      u8 is_join;}) mc_msg_join_or_leave_request_t;
104
105 always_inline void
106 mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
107 {
108   if (mc_need_byte_swap ())
109     {
110       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
111       r->type = clib_byte_swap_u32 (r->type);
112       r->stream_index = clib_byte_swap_u32 (r->stream_index);
113     }
114 }
115
116 /* Join reply.  Multicast over MC_TRANSPORT_JOIN. */
117 typedef CLIB_PACKED (struct
118                      {
119 mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
120                      /* MC_MSG_TYPE_join_reply */
121                      u32 stream_index;
122                      /* Peer ID to contact to catchup with this stream. */
123                      mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t;
124
125 always_inline void
126 mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
127 {
128   if (mc_need_byte_swap ())
129     {
130       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
131       r->type = clib_byte_swap_u32 (r->type);
132       r->stream_index = clib_byte_swap_u32 (r->stream_index);
133       r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
134     }
135 }
136
137 /* Generic (application) request.  Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
138    relayed by relay master after filling in global sequence number. */
139 typedef CLIB_PACKED (struct
140                      {
141                      mc_peer_id_t peer_id; u32 stream_index;
142                      /* Global sequence number as filled in by relay master. */
143                      u32 global_sequence;
144                      /* Local sequence number as filled in by peer sending message. */
145                      u32 local_sequence;
146                      /* Size of request data. */
147                      u32 n_data_bytes;
148                      /* Opaque request data. */
149                      u8 data[0];}) mc_msg_user_request_t;
150
151 always_inline void
152 mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
153 {
154   if (mc_need_byte_swap ())
155     {
156       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
157       r->stream_index = clib_byte_swap_u32 (r->stream_index);
158       r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
159       r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
160       r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
161     }
162 }
163
164 /* Sent unicast over ACK channel. */
165 typedef CLIB_PACKED (struct
166                      {
167                      mc_peer_id_t peer_id;
168                      u32 global_sequence; u32 stream_index;
169                      u32 local_sequence;
170                      i32 seq_cmp_result;}) mc_msg_user_ack_t;
171
172 always_inline void
173 mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
174 {
175   if (mc_need_byte_swap ())
176     {
177       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
178       r->stream_index = clib_byte_swap_u32 (r->stream_index);
179       r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
180       r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
181       r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
182     }
183 }
184
185 /* Sent/received unicast over catchup channel (e.g. using TCP). */
186 typedef CLIB_PACKED (struct
187                      {
188                      mc_peer_id_t peer_id;
189                      u32 stream_index;}) mc_msg_catchup_request_t;
190
191 always_inline void
192 mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
193 {
194   if (mc_need_byte_swap ())
195     {
196       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
197       r->stream_index = clib_byte_swap_u32 (r->stream_index);
198     }
199 }
200
201 /* Sent/received unicast over catchup channel. */
202 typedef CLIB_PACKED (struct
203                      {
204                      mc_peer_id_t peer_id; u32 stream_index;
205                      /* Last global sequence number included in catchup data. */
206                      u32 last_global_sequence_included;
207                      /* Size of catchup data. */
208                      u32 n_data_bytes;
209                      /* Catchup data. */
210                      u8 data[0];}) mc_msg_catchup_reply_t;
211
212 always_inline void
213 mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
214 {
215   if (mc_need_byte_swap ())
216     {
217       r->peer_id = mc_byte_swap_peer_id (r->peer_id);
218       r->stream_index = clib_byte_swap_u32 (r->stream_index);
219       r->last_global_sequence_included =
220         clib_byte_swap_u32 (r->last_global_sequence_included);
221       r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
222     }
223 }
224
225 typedef struct _mc_serialize_msg
226 {
227   /* Name for this type. */
228   char *name;
229
230   /* Functions to serialize/unserialize data. */
231   serialize_function_t *serialize;
232   serialize_function_t *unserialize;
233
234   /* Maximum message size in bytes when serialized.
235      If zero then this will be set to the largest sent message. */
236   u32 max_n_bytes_serialized;
237
238   /* Opaque to use for first argument to serialize/unserialize function. */
239   u32 opaque;
240
241   /* Index in global message vector. */
242   u32 global_index;
243
244   /* Registration list */
245   struct _mc_serialize_msg *next_registration;
246 } mc_serialize_msg_t;
247
248 typedef struct
249 {
250   /* Index into global message vector. */
251   u32 global_index;
252 } mc_serialize_stream_msg_t;
253
254 #define MC_SERIALIZE_MSG(x,...)                                 \
255     __VA_ARGS__ mc_serialize_msg_t x;                           \
256 static void __mc_serialize_msg_registration_##x (void)          \
257     __attribute__((__constructor__)) ;                          \
258 static void __mc_serialize_msg_registration_##x (void)          \
259 {                                                               \
260     vlib_main_t * vm = vlib_get_main();                         \
261     x.next_registration = vm->mc_msg_registrations;             \
262     vm->mc_msg_registrations = &x;                              \
263 }                                                               \
264 static void __mc_serialize_msg_unregistration_##x (void)        \
265     __attribute__((__destructor__)) ;                           \
266 static void __mc_serialize_msg_unregistration_##x (void)        \
267 {                                                               \
268     vlib_main_t * vm = vlib_get_main();                         \
269     VLIB_REMOVE_FROM_LINKED_LIST (vm->mc_msg_registrations, &x, \
270                                   next_registration);           \
271 }                                                               \
272 __VA_ARGS__ mc_serialize_msg_t x
273
274 typedef enum
275 {
276   MC_TRANSPORT_MASTERSHIP,
277   MC_TRANSPORT_JOIN,
278   MC_TRANSPORT_USER_REQUEST_TO_RELAY,
279   MC_TRANSPORT_USER_REQUEST_FROM_RELAY,
280   MC_N_TRANSPORT_TYPE,
281 } mc_transport_type_t;
282
283 typedef struct
284 {
285   clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type,
286                               u32 buffer_index);
287
288   clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id,
289                            u32 buffer_index);
290
291   /* Returns catchup opaque. */
292     uword (*catchup_request_fun) (void *opaque, u32 stream_index,
293                                   mc_peer_id_t catchup_peer_id);
294
295   void (*catchup_send_fun) (void *opaque, uword catchup_opaque,
296                             u8 * data_vector);
297
298   /* Opaque passed to callbacks. */
299   void *opaque;
300
301   mc_peer_id_t our_ack_peer_id;
302   mc_peer_id_t our_catchup_peer_id;
303
304   /* Max packet size (MTU) for this transport.
305      For IP this is interface MTU less IP + UDP header size. */
306   u32 max_packet_size;
307
308   format_function_t *format_peer_id;
309 } mc_transport_t;
310
311 typedef struct
312 {
313   /* Count of messages received from this peer from the past/future
314      (with seq_cmp != 0). */
315   u64 n_msgs_from_past;
316   u64 n_msgs_from_future;
317 } mc_stream_peer_stats_t;
318
319 typedef struct
320 {
321   /* ID of this peer. */
322   mc_peer_id_t id;
323
324   /* The last sequence we received from this peer. */
325   u32 last_sequence_received;
326
327   mc_stream_peer_stats_t stats, stats_last_clear;
328 } mc_stream_peer_t;
329
330 typedef struct
331 {
332   u32 buffer_index;
333
334   /* Cached copy of local sequence number from buffer. */
335   u32 local_sequence;
336
337   /* Number of times this buffer has been sent (retried). */
338   u32 n_retries;
339
340   /* Previous/next retries in doubly-linked list. */
341   u32 prev_index, next_index;
342
343   /* Bitmap of all peers which have acked this msg */
344   uword *unacked_by_peer_bitmap;
345
346   /* Message send or resend time */
347   f64 sent_at;
348 } mc_retry_t;
349
350 typedef struct
351 {
352   /* Number of retries sent for this stream. */
353   u64 n_retries;
354 } mc_stream_stats_t;
355
356 struct mc_main_t;
357 struct mc_stream_t;
358
359 typedef struct
360 {
361   /* Stream name. */
362   char *name;
363
364   /* Number of outstanding messages. */
365   u32 window_size;
366
367   /* Retry interval, in seconds */
368   f64 retry_interval;
369
370   /* Retry limit */
371   u32 retry_limit;
372
373   /* User rx buffer callback */
374   void (*rx_buffer) (struct mc_main_t * mc_main,
375                      struct mc_stream_t * stream,
376                      mc_peer_id_t peer_id, u32 buffer_index);
377
378   /* User callback to create a snapshot */
379   u8 *(*catchup_snapshot) (struct mc_main_t * mc_main,
380                            u8 * snapshot_vector,
381                            u32 last_global_sequence_included);
382
383   /* User callback to replay a snapshot */
384   void (*catchup) (struct mc_main_t * mc_main,
385                    u8 * snapshot_data, u32 n_snapshot_data_bytes);
386
387   /* Callback to save a snapshot for offline replay */
388   void (*save_snapshot) (struct mc_main_t * mc_main,
389                          u32 is_catchup,
390                          u8 * snapshot_data, u32 n_snapshot_data_bytes);
391
392   /* Called when a peer dies */
393   void (*peer_died) (struct mc_main_t * mc_main,
394                      struct mc_stream_t * stream, mc_peer_id_t peer_id);
395 } mc_stream_config_t;
396
397 #define foreach_mc_stream_state                 \
398   _ (invalid)                                   \
399   _ (name_known)                                \
400   _ (join_in_progress)                          \
401   _ (catchup)                                   \
402   _ (ready)
403
404 typedef enum
405 {
406 #define _(f) MC_STREAM_STATE_##f,
407   foreach_mc_stream_state
408 #undef _
409 } mc_stream_state_t;
410
411 typedef struct mc_stream_t
412 {
413   mc_stream_config_t config;
414
415   mc_stream_state_t state;
416
417   /* Index in stream pool. */
418   u32 index;
419
420   /* Stream index 0 is always for MC internal use. */
421 #define MC_STREAM_INDEX_INTERNAL 0
422
423   mc_retry_t *retry_pool;
424
425   /* Head and tail index of retry pool. */
426   u32 retry_head_index, retry_tail_index;
427
428   /*
429    * Country club for recently retired messages
430    * If the set of peers is expanding and a new peer
431    * misses a message, we can easily retire the FIFO
432    * element before we even know about the new peer
433    */
434   mc_retry_t *retired_fifo;
435
436   /* Hash mapping local sequence to retry pool index. */
437   uword *retry_index_by_local_sequence;
438
439   /* catch-up fifo of VLIB buffer indices.
440      start recording when catching up. */
441   u32 *catchup_fifo;
442
443   mc_stream_stats_t stats, stats_last_clear;
444
445   /* Peer pool. */
446   mc_stream_peer_t *peers;
447
448   /* Bitmap with ones for all peers in peer pool. */
449   uword *all_peer_bitmap;
450
451   /* Map of 64 bit id to index in stream pool. */
452   mhash_t peer_index_by_id;
453
454   /* Timeout, in case we're alone in the world */
455   f64 join_timeout;
456
457   vlib_one_time_waiting_process_t *procs_waiting_for_join_done;
458
459   vlib_one_time_waiting_process_t *procs_waiting_for_open_window;
460
461   /* Next sequence number to use */
462   u32 our_local_sequence;
463
464   /*
465    * Last global sequence we processed.
466    * When supplying catchup data, we need to tell
467    * the client precisely where to start replaying
468    */
469   u32 last_global_sequence_processed;
470
471   /* Vector of unique messages we've sent on this stream. */
472   mc_serialize_stream_msg_t *stream_msgs;
473
474   /* Vector global message index into per stream message index. */
475   u32 *stream_msg_index_by_global_index;
476
477   /* Hashed by message name. */
478   uword *stream_msg_index_by_name;
479
480   u64 user_requests_sent;
481   u64 user_requests_received;
482 } mc_stream_t;
483
484 always_inline void
485 mc_stream_free (mc_stream_t * s)
486 {
487   pool_free (s->retry_pool);
488   hash_free (s->retry_index_by_local_sequence);
489   clib_fifo_free (s->catchup_fifo);
490   pool_free (s->peers);
491   mhash_free (&s->peer_index_by_id);
492   vec_free (s->procs_waiting_for_join_done);
493   vec_free (s->procs_waiting_for_open_window);
494 }
495
496 always_inline void
497 mc_stream_init (mc_stream_t * s)
498 {
499   memset (s, 0, sizeof (s[0]));
500   s->retry_head_index = s->retry_tail_index = ~0;
501 }
502
503 typedef struct
504 {
505   u32 stream_index;
506   u32 catchup_opaque;
507   u8 *catchup_snapshot;
508 } mc_catchup_process_arg_t;
509
510 typedef enum
511 {
512   MC_RELAY_STATE_NEGOTIATE,
513   MC_RELAY_STATE_MASTER,
514   MC_RELAY_STATE_SLAVE,
515 } mc_relay_state_t;
516
517 typedef struct
518 {
519   mc_peer_id_t peer_id;
520
521   f64 time_last_master_assert_received;
522 } mc_mastership_peer_t;
523
524 typedef struct
525 {
526   u32 stream_index;
527   u32 buffer_index;
528 } mc_stream_and_buffer_t;
529
530 typedef struct mc_main_t
531 {
532   mc_relay_state_t relay_state;
533
534   /* Mastership */
535   u32 we_can_be_relay_master;
536
537   u64 relay_master_peer_id;
538
539   mc_mastership_peer_t *mastership_peers;
540
541   /* Map of 64 bit id to index in stream pool. */
542   mhash_t mastership_peer_index_by_id;
543
544   /* The transport we're using. */
545   mc_transport_t transport;
546
547   /* Last-used global sequence number. */
548   u32 relay_global_sequence;
549
550   /* Vector of streams. */
551   mc_stream_t *stream_vector;
552
553   /* Hash table mapping stream name to pool index. */
554   uword *stream_index_by_name;
555
556   uword *procs_waiting_for_stream_name_by_name;
557
558   vlib_one_time_waiting_process_t **procs_waiting_for_stream_name_pool;
559
560   int joins_in_progress;
561
562   mc_catchup_process_arg_t *catchup_process_args;
563
564   /* Node indices for mastership, join ager,
565      retry and catchup processes. */
566   u32 mastership_process;
567   u32 join_ager_process;
568   u32 retry_process;
569   u32 catchup_process;
570   u32 unserialize_process;
571
572   /* Global vector of messages. */
573   mc_serialize_msg_t **global_msgs;
574
575   /* Hash table mapping message name to index. */
576   uword *global_msg_index_by_name;
577
578   /* Shared serialize/unserialize main. */
579   serialize_main_t serialize_mains[VLIB_N_RX_TX];
580
581   vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX];
582
583   /* Convenience variables */
584   struct vlib_main_t *vlib_main;
585   elog_main_t *elog_main;
586
587   /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
588   mhash_t elog_id_by_peer_id;
589
590   uword *elog_id_by_msg_name;
591
592   /* For mc_unserialize. */
593   mc_stream_and_buffer_t *mc_unserialize_stream_and_buffers;
594 } mc_main_t;
595
596 always_inline mc_stream_t *
597 mc_stream_by_name (mc_main_t * m, char *name)
598 {
599   uword *p = hash_get (m->stream_index_by_name, name);
600   return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
601 }
602
603 always_inline mc_stream_t *
604 mc_stream_by_index (mc_main_t * m, u32 i)
605 {
606   return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
607 }
608
609 always_inline void
610 mc_clear_stream_stats (mc_main_t * m)
611 {
612   mc_stream_t *s;
613   mc_stream_peer_t *p;
614   vec_foreach (s, m->stream_vector)
615   {
616     s->stats_last_clear = s->stats;
617       /* *INDENT-OFF* */
618       pool_foreach (p, s->peers, ({
619         p->stats_last_clear = p->stats;
620       }));
621       /* *INDENT-ON* */
622   }
623 }
624
625 /* Declare all message handlers. */
626 #define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
627 foreach_mc_msg_type
628 #undef _
629   u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *);
630
631 void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
632
633 void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name);
634
635 u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
636
637 void mc_main_init (mc_main_t * mcm, char *tag);
638
639 void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
640
641 void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes,
642                           u32 * bi_return);
643
644 format_function_t format_mc_main;
645
646 clib_error_t *mc_serialize_internal (mc_main_t * mc,
647                                      u32 stream_index,
648                                      u32 multiple_messages_per_vlib_buffer,
649                                      mc_serialize_msg_t * msg, ...);
650
651 clib_error_t *mc_serialize_va (mc_main_t * mc,
652                                u32 stream_index,
653                                u32 multiple_messages_per_vlib_buffer,
654                                mc_serialize_msg_t * msg, va_list * va);
655
656 #define mc_serialize_stream(mc,si,msg,args...)                  \
657   mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
658
659 #define mc_serialize(mc,msg,args...)                            \
660   mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
661
662 #define mc_serialize2(mc,add,msg,args...)                               \
663   mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
664
665 void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
666 uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s,
667                               serialize_main_t * m);
668
669 serialize_function_t serialize_mc_main, unserialize_mc_main;
670
671 always_inline uword
672 mc_max_message_size_in_bytes (mc_main_t * mcm)
673 {
674   return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t);
675 }
676
677 always_inline word
678 mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m)
679 {
680   return mc_max_message_size_in_bytes (mcm) -
681     serialize_vlib_buffer_n_bytes (m);
682 }
683
684 void unserialize_mc_stream (serialize_main_t * m, va_list * va);
685 void mc_stream_join_process_hold (void);
686
687 #endif /* included_vlib_mc_h */
688
689 /*
690  * fd.io coding-style-patch-verification: ON
691  *
692  * Local Variables:
693  * eval: (c-set-style "gnu")
694  * End:
695  */