Initial commit of vpp code.
[vpp.git] / vlib / example / mc_test.c
1 /*
2  * mc_test.c: test program for vlib mc
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <vlib/vlib.h>
19 #include <vlib/unix/mc_socket.h>
20 #include <vppinfra/random.h>
21
22 typedef struct {
23   u32 min_n_msg_bytes;
24   u32 max_n_msg_bytes;
25   u32 tx_serial;
26   u32 rx_serial;
27   u32 seed;
28   u32 verbose;
29   u32 validate;
30   u32 window_size;
31   f64 min_delay, max_delay;
32   f64 n_packets_to_send;
33 } mc_test_main_t;
34
35 always_inline u32
36 choose_msg_size (mc_test_main_t * tm)
37 {
38   u32 r = tm->min_n_msg_bytes;
39   if (tm->max_n_msg_bytes > tm->min_n_msg_bytes)
40     r += random_u32 (&tm->seed) % (1 + tm->max_n_msg_bytes - tm->min_n_msg_bytes);
41   return r;
42 }
43
44 static mc_test_main_t mc_test_main;
45
46 static void serialize_test_msg (serialize_main_t * m, va_list * va)
47 {
48   mc_test_main_t * tm = &mc_test_main;
49   u32 n_bytes = choose_msg_size (tm);
50   u8 * msg;
51   int i;
52   serialize_integer (m, n_bytes, sizeof (n_bytes));
53   msg = serialize_get (m, n_bytes);
54   for (i = 0; i < n_bytes; i++)
55     msg[i] = i + tm->tx_serial;
56   tm->tx_serial += n_bytes;
57 }
58
59 static void unserialize_test_msg (serialize_main_t * m, va_list * va)
60 {
61   mc_test_main_t * tm = &mc_test_main;
62   u32 i, n_bytes, dump_msg = tm->verbose;
63   u8 * p;
64   unserialize_integer (m, &n_bytes, sizeof (n_bytes));
65   p = unserialize_get (m, n_bytes);
66   if (tm->validate)
67     for (i = 0; i < n_bytes; i++)
68       if (p[i] != ((tm->rx_serial + i) & 0xff))
69         {
70           clib_warning ("corrupt msg at offset %d", i);
71           dump_msg = 1;
72           break;
73         }
74   if (dump_msg)
75     clib_warning ("got %d bytes, %U", n_bytes, format_hex_bytes, p, n_bytes);
76   tm->rx_serial += n_bytes;
77 }
78
79 MC_SERIALIZE_MSG (test_msg, static) = {
80   .name = "test_msg",
81   .serialize = serialize_test_msg,
82   .unserialize = unserialize_test_msg,
83 };
84
85 #define SERIALIZE 1
86
87 #define EVENT_JOIN_STREAM       10
88 #define EVENT_SEND_DATA         11
89
90 static void test_rx_callback (mc_main_t * mcm,
91                               mc_stream_t * stream,
92                               mc_peer_id_t peer_id,
93                               u32 buffer_index)
94 {
95   if (SERIALIZE)
96     {
97         return mc_unserialize (mcm, stream, buffer_index);
98     }
99   else
100     {
101 #if DEBUG > 1
102       vlib_main_t * vm = mcm->vlib_main;
103       vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
104       u8 * dp = vlib_buffer_get_current (b);
105
106       fformat(stdout, "RX from %U %U\n",
107               stream->transport->format_peer_id, peer_id,
108               format_hex_bytes, dp, tm->n_msg_bytes);
109             
110 #endif
111     }
112 }
113
114 static u8 *
115 test_snapshot_callback (mc_main_t * mcm,
116                         u8 * data_vector,
117                         u32 last_global_sequence_processed)
118 {
119   if (SERIALIZE)
120     {
121       serialize_main_t m;
122
123       /* Append serialized data to data vector. */
124       serialize_open_vector (&m, data_vector);
125       m.stream.current_buffer_index = vec_len (data_vector);
126
127       return serialize_close_vector (&m);
128     }
129   else
130     return format (data_vector,
131                    "snapshot, last global seq 0x%x",
132                    last_global_sequence_processed);
133 }
134
135 static void
136 test_handle_snapshot_callback (mc_main_t * mcm,
137                                u8 * data,
138                                u32 n_data_bytes)
139 {
140   if (SERIALIZE)
141     {
142       serialize_main_t s;
143       unserialize_open_data (&s, data, n_data_bytes);
144     }
145   else
146     clib_warning ("snapshot `%*s'", n_data_bytes, data);
147 }
148
149 static mc_socket_main_t mc_socket_main;
150
151 static uword
152 mc_test_process (vlib_main_t * vm,
153                  vlib_node_runtime_t * node,
154                  vlib_frame_t * f)
155 {
156   mc_test_main_t * tm = &mc_test_main;
157   mc_socket_main_t * msm = &mc_socket_main;
158   mc_main_t *mcm = &msm->mc_main;
159   uword event_type, *event_data = 0;
160   u32 data_serial=0, stream_index;
161   f64 delay;
162   mc_stream_config_t config;
163   clib_error_t * error;
164   int i;
165   char *intfcs[] =  { "eth1", "eth0", "ce" };
166
167   memset (&config, 0, sizeof (config));
168   config.name = "test";
169   config.window_size = tm->window_size;
170   config.rx_buffer = test_rx_callback;
171   config.catchup_snapshot = test_snapshot_callback;
172   config.catchup = test_handle_snapshot_callback;
173   stream_index = ~0;
174
175   msm->multicast_tx_ip4_address_host_byte_order = 0xefff0100;
176   msm->base_multicast_udp_port_host_byte_order = 0xffab;
177
178   error = mc_socket_main_init (&mc_socket_main, intfcs, ARRAY_LEN(intfcs));
179   if (error)
180     {
181       clib_error_report (error);
182       exit (1);
183     }
184
185   mcm->we_can_be_relay_master = 1;
186
187   while (1)
188     {
189       vlib_process_wait_for_event (vm);
190       event_type = vlib_process_get_events (vm, &event_data);
191
192       switch (event_type)
193         {
194         case EVENT_JOIN_STREAM:
195           stream_index = mc_stream_join (mcm, &config);
196           break;
197
198         case EVENT_SEND_DATA: {
199           f64 times[2];
200
201           if (stream_index == ~0)
202             stream_index = mc_stream_join (mcm, &config);
203
204           times[0] = vlib_time_now (vm);
205           for (i = 0; i < event_data[0]; i++)
206             {
207               u32 bi;
208               if (SERIALIZE)
209                 {
210                   mc_serialize_stream (mcm, stream_index, &test_msg, data_serial);
211                 }
212               else
213                 {
214                   u8 * mp;
215                   mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
216                   mp[0] = data_serial;
217                   mc_stream_send (mcm, stream_index, bi);
218                 }
219               if (tm->min_delay > 0)
220                 {
221                   delay = tm->min_delay + random_f64 (&tm->seed) * (tm->max_delay - tm->min_delay);
222                   vlib_process_suspend (vm, delay);
223                 }
224               data_serial++;
225             }
226           times[1] = vlib_time_now (vm);
227           clib_warning ("done sending %d; %.4e per sec",
228                         event_data[0],
229                         (f64) event_data[0] / (times[1] - times[0]));
230           break;
231         }
232
233         default:
234           clib_warning ("bug");
235           break;
236         }
237             
238       if (event_data)
239         _vec_len (event_data) = 0;
240     }
241 }
242
243 VLIB_REGISTER_NODE (mc_test_process_node,static) = {
244   .function = mc_test_process,
245   .type = VLIB_NODE_TYPE_PROCESS,
246   .name = "mc-test-process",
247 };
248
249 static clib_error_t *
250 mc_test_command (vlib_main_t * vm,
251                  unformat_input_t * input,
252                  vlib_cli_command_t * cmd)
253 {
254   f64 npkts = 10;
255
256   if (unformat (input, "join"))
257     {
258       vlib_cli_output (vm, "Join stream...\n");
259       vlib_process_signal_event (vm, mc_test_process_node.index,
260                                  EVENT_JOIN_STREAM, 0);
261       return 0;
262     }
263   else if (unformat (input, "send %f", &npkts) 
264            || unformat(input, "send"))
265     {
266       vlib_process_signal_event (vm, mc_test_process_node.index,
267                                  EVENT_SEND_DATA, (uword) npkts);
268       vlib_cli_output (vm, "Send %.0f pkts...\n", npkts);
269     
270       return 0;
271     }
272   else
273     return unformat_parse_error (input);
274 }
275
276 VLIB_CLI_COMMAND (test_mc_command, static) = {
277   .path = "test mc",
278   .short_help = "Test mc command",
279   .function = mc_test_command,
280 };
281
282 static clib_error_t *
283 mc_show_command (vlib_main_t * vm,
284                  unformat_input_t * input,
285                  vlib_cli_command_t * cmd)
286 {
287   mc_main_t *mcm = &mc_socket_main.mc_main;
288   vlib_cli_output (vm, "%U", format_mc_main, mcm);
289   return 0;
290 }
291
292 VLIB_CLI_COMMAND (show_mc_command, static) = {
293   .path = "show mc",
294   .short_help = "Show mc command",
295   .function = mc_show_command,
296 };
297
298 static clib_error_t *
299 mc_clear_command (vlib_main_t * vm,
300                   unformat_input_t * input,
301                   vlib_cli_command_t * cmd)
302 {
303   mc_main_t * mcm = &mc_socket_main.mc_main;
304   mc_clear_stream_stats (mcm);
305   return 0;
306 }
307
308 VLIB_CLI_COMMAND (clear_mc_command, static) = {
309   .path = "clear mc",
310   .short_help = "Clear mc command",
311   .function = mc_clear_command,
312 };
313
314 static clib_error_t *
315 mc_config (vlib_main_t * vm, unformat_input_t * input)
316 {
317   mc_test_main_t * tm = &mc_test_main;
318   mc_socket_main_t * msm = &mc_socket_main;
319   clib_error_t * error = 0;
320
321   tm->min_n_msg_bytes = 4;
322   tm->max_n_msg_bytes = 4;
323   tm->window_size = 8;
324   tm->seed = getpid ();
325   tm->verbose = 0;
326   tm->validate = 1;
327   tm->min_delay = 10e-6;
328   tm->max_delay = 10e-3;
329   tm->n_packets_to_send = 0;
330   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
331     {
332       if (unformat (input, "interface %s", &msm->multicast_interface_name))
333         ;
334
335       else if (unformat (input, "n-bytes %d", &tm->max_n_msg_bytes))
336         tm->min_n_msg_bytes = tm->max_n_msg_bytes;
337       else if (unformat (input, "max-n-bytes %d", &tm->max_n_msg_bytes))
338         ;
339       else if (unformat (input, "min-n-bytes %d", &tm->min_n_msg_bytes))
340         ;
341       else if (unformat (input, "seed %d", &tm->seed))
342         ;
343       else if (unformat (input, "window %d", &tm->window_size))
344         ;
345       else if (unformat (input, "verbose"))
346         tm->verbose = 1;
347       else if (unformat (input, "no-validate"))
348         tm->validate = 0;
349       else if (unformat (input, "min-delay %f", &tm->min_delay))
350         ;
351       else if (unformat (input, "max-delay %f", &tm->max_delay))
352         ;
353       else if (unformat (input, "no-delay"))
354         tm->min_delay = tm->max_delay = 0;
355       else if (unformat (input, "n-packets %f", &tm->n_packets_to_send))
356         ;
357
358       else
359         return clib_error_return (0, "unknown input `%U'",
360                                   format_unformat_error, input);
361     }
362
363   if (tm->n_packets_to_send > 0)
364     vlib_process_signal_event (vm, mc_test_process_node.index,
365                                EVENT_SEND_DATA, (uword) tm->n_packets_to_send);
366
367   return error;
368 }
369
370 VLIB_CONFIG_FUNCTION (mc_config, "mc");