Initial commit of vpp code.
[vpp.git] / vlib / vlib / unix / mc_socket.c
1 /*
2  * mc_socket.c: socket based multicast for vlib mc
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <vlib/vlib.h>
19 #include <vlib/unix/mc_socket.h>
20
21 #include <sys/ioctl.h>          /* for FIONBIO */
22 #include <netinet/tcp.h>        /* for TCP_NODELAY */
23 #include <net/if.h>             /* for struct ifreq */
24
25 static u8 * format_socket_peer_id (u8 * s, va_list * args)
26 {
27   u64 peer_id_as_u64 = va_arg (*args, u64);
28   mc_peer_id_t peer_id;
29   peer_id.as_u64 = peer_id_as_u64;
30   u32 a = mc_socket_peer_id_get_address (peer_id);
31   u32 p = mc_socket_peer_id_get_port (peer_id);
32
33   s = format (s, "%U:%04x", format_network_address, AF_INET, &a,
34               ntohs (p));
35
36   return s;
37 }
38
39 typedef void (mc_msg_handler_t) (mc_main_t * mcm, void * msg, u32 buffer_index);
40
41 always_inline void msg_handler (mc_main_t * mcm,
42                                 u32 buffer_index,
43                                 u32 handler_frees_buffer,
44                                 void * _h)
45 {
46   vlib_main_t * vm = mcm->vlib_main;
47   mc_msg_handler_t * h = _h;
48   vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
49   void * the_msg = vlib_buffer_get_current (b);
50
51   h (mcm, the_msg, buffer_index);
52   if (! handler_frees_buffer)
53     vlib_buffer_free_one (vm, buffer_index);
54 }
55
56 static uword
57 append_buffer_index_to_iovec (vlib_main_t * vm,
58                               u32 buffer_index,
59                               struct iovec ** iovs_return)
60 {
61   struct iovec * i;
62   vlib_buffer_t * b;
63   u32 bi = buffer_index;
64   u32 l = 0;
65
66   while (1)
67     {
68       b = vlib_get_buffer (vm, bi);
69       vec_add2 (*iovs_return, i, 1);
70       i->iov_base = vlib_buffer_get_current (b);
71       i->iov_len = b->current_length;
72       l += i->iov_len;
73       if (! (b->flags & VLIB_BUFFER_NEXT_PRESENT))
74         break;
75       bi = b->next_buffer;
76     }
77
78   return l;
79 }
80
81 static clib_error_t *
82 sendmsg_helper (mc_socket_main_t * msm,
83                 int socket,
84                 struct sockaddr_in * tx_addr,
85                 u32 buffer_index)
86 {
87   vlib_main_t * vm = msm->mc_main.vlib_main;
88   struct msghdr h;
89   word n_bytes, n_bytes_tx, n_retries;
90
91   memset (&h, 0, sizeof (h));
92   h.msg_name = tx_addr;
93   h.msg_namelen = sizeof (tx_addr[0]);
94
95   if (msm->iovecs)
96     _vec_len (msm->iovecs) = 0;
97
98   n_bytes = append_buffer_index_to_iovec (vm, buffer_index, &msm->iovecs);
99   ASSERT (n_bytes <= msm->mc_main.transport.max_packet_size);
100   if (n_bytes > msm->mc_main.transport.max_packet_size)
101     clib_error ("sending packet larger than interace MTU %d bytes", n_bytes);
102
103   h.msg_iov = msm->iovecs;
104   h.msg_iovlen = vec_len (msm->iovecs);
105
106   n_retries = 0;
107   while ((n_bytes_tx = sendmsg (socket, &h, /* flags */ 0)) != n_bytes
108          && errno == EAGAIN)
109     n_retries++;
110   if (n_bytes_tx != n_bytes)
111     {
112       clib_unix_warning ("sendmsg");
113       return 0;
114     }
115   if (n_retries)
116     {
117       ELOG_TYPE_DECLARE (e) = {
118         .format = "sendmsg-helper: %d retries",
119         .format_args = "i4",
120       };
121       struct { u32 retries; } * ed = 0;
122
123       ed = ELOG_DATA (&vm->elog_main, e);
124       ed->retries = n_retries;
125     }
126   return 0;
127 }
128
129 static clib_error_t *
130 tx_buffer (void * transport, mc_transport_type_t type, u32 buffer_index)
131 {
132   mc_socket_main_t *msm = (mc_socket_main_t *)transport;
133   vlib_main_t * vm = msm->mc_main.vlib_main;
134   mc_multicast_socket_t * ms = &msm->multicast_sockets[type];
135   clib_error_t * error;
136   error = sendmsg_helper (msm, ms->socket, &ms->tx_addr, buffer_index);
137   if (type != MC_TRANSPORT_USER_REQUEST_TO_RELAY)
138     vlib_buffer_free_one (vm, buffer_index);
139   return error;
140 }
141
142 static clib_error_t *
143 tx_ack (void *transport, mc_peer_id_t dest_peer_id, u32 buffer_index)
144 {
145   struct sockaddr_in tx_addr;
146   mc_socket_main_t *msm = (mc_socket_main_t *)transport;
147   vlib_main_t * vm = msm->mc_main.vlib_main;
148   clib_error_t * error;
149
150   memset (&tx_addr, 0, sizeof (tx_addr));
151   tx_addr.sin_family = AF_INET;
152   tx_addr.sin_addr.s_addr = mc_socket_peer_id_get_address (dest_peer_id);
153   tx_addr.sin_port = mc_socket_peer_id_get_port (dest_peer_id);
154
155   error = sendmsg_helper (msm, msm->ack_socket, &tx_addr, buffer_index);
156   vlib_buffer_free_one (vm, buffer_index);
157   return error;
158 }
159
160 static clib_error_t *
161 recvmsg_helper (mc_socket_main_t * msm,
162                 int socket,
163                 struct sockaddr_in * rx_addr,
164                 u32 * buffer_index,
165                 u32 drop_message)
166 {
167   vlib_main_t * vm = msm->mc_main.vlib_main;
168   vlib_buffer_t * b;
169   uword n_left, n_alloc, n_mtu, i, i_rx;
170   const uword buffer_size = VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES;
171   word n_bytes_left;
172
173   /* Make sure we have at least a MTU worth of buffers. */
174   n_mtu = msm->rx_mtu_n_buffers;
175   n_left = vec_len (msm->rx_buffers);
176   if (n_left < n_mtu)
177     {
178       uword max_alloc = 8 * n_mtu;
179       vec_validate (msm->rx_buffers, max_alloc - 1);
180       n_alloc = vlib_buffer_alloc (vm, msm->rx_buffers + n_left, max_alloc - n_left);
181       _vec_len (msm->rx_buffers) = n_left + n_alloc;
182     }
183
184   ASSERT (vec_len (msm->rx_buffers) >= n_mtu);
185   vec_validate (msm->iovecs, n_mtu - 1);
186
187   /* Allocate RX buffers from end of rx_buffers.
188      Turn them into iovecs to pass to readv. */
189   i_rx = vec_len (msm->rx_buffers) - 1;
190   for (i = 0; i < n_mtu; i++)
191     {
192       b = vlib_get_buffer (vm, msm->rx_buffers[i_rx - i]);
193       msm->iovecs[i].iov_base = b->data;
194       msm->iovecs[i].iov_len = buffer_size;
195     }
196   _vec_len (msm->iovecs) = n_mtu;
197
198   {
199     struct msghdr h;
200
201     memset (&h, 0, sizeof (h));
202     if (rx_addr)
203       {
204         h.msg_name = rx_addr;
205         h.msg_namelen = sizeof (rx_addr[0]);
206       }
207     h.msg_iov = msm->iovecs;
208     h.msg_iovlen = vec_len (msm->iovecs);
209
210     n_bytes_left = recvmsg (socket, &h, 0);
211     if (n_bytes_left < 0)
212       return clib_error_return_unix (0, "recvmsg");
213   }
214
215   if (drop_message)
216     {
217       *buffer_index = ~0;
218       return 0;
219     }
220
221   *buffer_index = msm->rx_buffers[i_rx];
222   while (1)
223     {
224       b = vlib_get_buffer (vm, msm->rx_buffers[i_rx]);
225
226       b->flags = 0;
227       b->current_data = 0;
228       b->current_length = n_bytes_left < buffer_size ? n_bytes_left : buffer_size;
229
230       n_bytes_left -= buffer_size;
231
232       if (n_bytes_left <= 0)
233         break;
234
235       i_rx--;
236       b->flags |= VLIB_BUFFER_NEXT_PRESENT;
237       b->next_buffer = msm->rx_buffers[i_rx];
238     }
239
240   _vec_len (msm->rx_buffers) = i_rx;
241
242   return 0 /* no error */;
243 }
244
245 static clib_error_t * mastership_socket_read_ready (unix_file_t * uf)
246 {
247   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
248   mc_main_t * mcm = &msm->mc_main;
249   mc_multicast_socket_t * ms = &msm->multicast_sockets[MC_TRANSPORT_MASTERSHIP];
250   clib_error_t * error;
251   u32 bi;
252
253   error = recvmsg_helper (msm, ms->socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
254   if (! error)
255     msg_handler (mcm, bi,
256                  /* handler_frees_buffer */ 0,
257                  mc_msg_master_assert_handler);
258
259   return error;
260 }
261
262 static clib_error_t * to_relay_socket_read_ready (unix_file_t * uf)
263 {
264   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
265   mc_main_t *mcm = &msm->mc_main;
266   vlib_main_t * vm = msm->mc_main.vlib_main;
267   mc_multicast_socket_t * ms_to_relay = &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_TO_RELAY];
268   mc_multicast_socket_t * ms_from_relay = &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY];
269   clib_error_t * error;
270   u32 bi;
271   u32 is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
272
273   /* Not the ordering master? Turf the msg */
274   error = recvmsg_helper (msm, ms_to_relay->socket, /* rx_addr */ 0, &bi,
275                           /* drop_message */ ! is_master);
276
277   /* If we are the master, number and rebroadcast the msg. */
278   if (! error && is_master)
279     {
280       vlib_buffer_t * b = vlib_get_buffer (vm, bi);
281       mc_msg_user_request_t * mp = vlib_buffer_get_current (b);
282       mp->global_sequence = clib_host_to_net_u32 (mcm->relay_global_sequence);
283       mcm->relay_global_sequence++;
284       error = sendmsg_helper (msm, ms_from_relay->socket, &ms_from_relay->tx_addr, bi);
285       vlib_buffer_free_one (vm, bi);
286     }
287
288   return error;
289 }
290
291 static clib_error_t * from_relay_socket_read_ready (unix_file_t * uf)
292 {
293   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
294   mc_main_t * mcm = &msm->mc_main;
295   mc_multicast_socket_t * ms = &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY];
296   clib_error_t * error;
297   u32 bi;
298
299   error = recvmsg_helper (msm, ms->socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
300   if (! error)
301     {
302       msg_handler (mcm, bi, /* handler_frees_buffer */ 1,
303                    mc_msg_user_request_handler);
304     }
305   return error;
306 }
307
308 static clib_error_t * join_socket_read_ready (unix_file_t * uf)
309 {
310   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
311   mc_main_t * mcm = &msm->mc_main;
312   vlib_main_t * vm = mcm->vlib_main;
313   mc_multicast_socket_t * ms = &msm->multicast_sockets[MC_TRANSPORT_JOIN];
314   clib_error_t * error;
315   u32 bi;
316
317   error = recvmsg_helper (msm, ms->socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
318   if (! error)
319     {
320       vlib_buffer_t * b = vlib_get_buffer (vm, bi);
321       mc_msg_join_or_leave_request_t * mp = vlib_buffer_get_current (b);
322
323       switch (clib_host_to_net_u32 (mp->type))
324         {
325         case MC_MSG_TYPE_join_or_leave_request:
326           msg_handler (mcm, bi, /* handler_frees_buffer */ 0,
327                        mc_msg_join_or_leave_request_handler);
328           break;
329
330         case MC_MSG_TYPE_join_reply:
331           msg_handler (mcm, bi, /* handler_frees_buffer */ 0,
332                        mc_msg_join_reply_handler);
333           break;
334
335         default:
336           ASSERT (0);
337           break;
338         }
339     }
340   return error;
341 }
342
343 static clib_error_t * ack_socket_read_ready (unix_file_t * uf)
344 {
345   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
346   mc_main_t * mcm = &msm->mc_main;
347   clib_error_t * error;
348   u32 bi;
349
350   error = recvmsg_helper (msm, msm->ack_socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
351   if (! error)
352     msg_handler (mcm, bi, /* handler_frees_buffer */ 0,
353                  mc_msg_user_ack_handler);
354   return error;
355 }
356
357 static void catchup_cleanup (mc_socket_main_t *msm,
358                              mc_socket_catchup_t *c,
359                              unix_main_t *um, unix_file_t *uf)
360 {
361   hash_unset (msm->catchup_index_by_file_descriptor, uf->file_descriptor);
362   unix_file_del (um, uf);
363   vec_free (c->input_vector);
364   vec_free (c->output_vector);
365   pool_put (msm->catchups, c);
366 }
367
368 static mc_socket_catchup_t *
369 find_catchup_from_file_descriptor (mc_socket_main_t * msm, int file_descriptor)
370 {
371   uword * p = hash_get (msm->catchup_index_by_file_descriptor, file_descriptor);
372   return p ? pool_elt_at_index (msm->catchups, p[0]) : 0;
373 }
374
375 static clib_error_t * catchup_socket_read_ready (unix_file_t * uf, int is_server)
376 {
377   unix_main_t * um = &unix_main;
378   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
379   mc_main_t *mcm = &msm->mc_main;
380   mc_socket_catchup_t * c = find_catchup_from_file_descriptor (msm, uf->file_descriptor);
381   word l, n, is_eof;
382
383   l = vec_len (c->input_vector);
384   vec_resize (c->input_vector, 4096);
385   n = read (uf->file_descriptor, c->input_vector + l, vec_len (c->input_vector) - l);
386   is_eof = n == 0;
387
388   if (n < 0)
389     {
390       if (errno == EAGAIN)
391         n = 0;
392       else
393         {
394           catchup_cleanup (msm, c, um, uf);
395           return clib_error_return_unix (0, "read");
396         }
397     }
398
399   _vec_len (c->input_vector) = l + n;
400
401   if (is_eof && vec_len (c->input_vector) > 0)
402     {
403       if (is_server)
404         {
405           mc_msg_catchup_request_handler (mcm, (void *) c->input_vector, c - msm->catchups);
406           _vec_len (c->input_vector) = 0;
407         }
408       else
409         {
410           mc_msg_catchup_reply_handler (mcm, (void *) c->input_vector, c - msm->catchups);
411           c->input_vector = 0;  /* reply handler is responsible for freeing vector */
412           catchup_cleanup (msm, c, um, uf);
413         }
414     }
415
416   return 0 /* no error */;
417 }
418
419 static clib_error_t * catchup_server_read_ready (unix_file_t * uf)
420 { return catchup_socket_read_ready (uf, /* is_server */ 1); }
421
422 static clib_error_t * catchup_client_read_ready (unix_file_t * uf)
423
424     if (MC_EVENT_LOGGING)
425       {
426         mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;        
427         vlib_main_t * vm = msm->mc_main.vlib_main;
428         
429         ELOG_TYPE (e, "catchup_client_read_ready");
430         ELOG (&vm->elog_main, e, 0);
431       }
432     return catchup_socket_read_ready (uf, /* is_server */ 0); 
433 }
434
435 static clib_error_t * 
436 catchup_socket_write_ready (unix_file_t * uf, int is_server)
437 {
438   unix_main_t * um = &unix_main;
439   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
440   mc_socket_catchup_t *c = find_catchup_from_file_descriptor (msm, uf->file_descriptor);
441   clib_error_t * error = 0;
442   int n;
443
444   if (c->connect_in_progress)
445     {
446       u32 len, value;
447
448       c->connect_in_progress = 0;
449       len = sizeof (value);
450       if (getsockopt (c->socket, SOL_SOCKET,
451                       SO_ERROR, &value, &len) < 0)
452         {
453           error = clib_error_return_unix (0, "getsockopt SO_ERROR");
454           goto error_quit;
455         }
456       if (value != 0)
457         {
458           error = clib_error_return_code (0, value, CLIB_ERROR_ERRNO_VALID, "connect fails");
459           goto error_quit;
460         }
461     }
462
463   while (1) 
464     {
465       u32 n_this_write;
466       
467       n_this_write = 
468         clib_min (vec_len (c->output_vector) - c->output_vector_n_written,
469                   msm->rx_mtu_n_bytes - 64 /* ip + tcp + option allowance */);
470
471       if (n_this_write <= 0)
472         break;
473
474       do {
475         n = write (uf->file_descriptor,
476                    c->output_vector + c->output_vector_n_written,
477                    n_this_write);
478       } while (n < 0 && errno == EAGAIN);
479       
480       if (n < 0)
481         {
482           error = clib_error_return_unix (0, "write");
483           goto error_quit;
484         }
485       c->output_vector_n_written += n;
486   }
487
488   if (c->output_vector_n_written >= vec_len (c->output_vector))
489     {
490       if (! is_server)
491         {
492           uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
493           unix_main.file_update (uf, UNIX_FILE_UPDATE_MODIFY);
494           /* Send EOF to other side. */
495           shutdown (uf->file_descriptor, SHUT_WR);
496           return error;
497         }
498       else
499         {
500         error_quit:
501           catchup_cleanup (msm, c, um, uf);
502         }
503     }
504   return error;
505 }
506
507 static clib_error_t * 
508 catchup_server_write_ready (unix_file_t * uf)
509 { return catchup_socket_write_ready (uf, /* is_server */ 1); }
510
511 static clib_error_t * 
512 catchup_client_write_ready (unix_file_t * uf)
513 { return catchup_socket_write_ready (uf, /* is_server */ 0); }
514
515 static clib_error_t *catchup_socket_error_ready (unix_file_t *uf)
516 {
517   unix_main_t *um = &unix_main;
518   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
519   mc_socket_catchup_t *c = find_catchup_from_file_descriptor (msm, uf->file_descriptor);
520   catchup_cleanup (msm, c, um, uf);
521   return clib_error_return (0, "error");
522 }
523
524 static clib_error_t *catchup_listen_read_ready (unix_file_t * uf)
525 {
526   mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
527   struct sockaddr_in client_addr;
528   int client_len;
529   mc_socket_catchup_t *c;
530   unix_file_t template = {0};
531
532   pool_get (msm->catchups, c);
533   memset(c, 0, sizeof (c[0]));
534
535   client_len = sizeof(client_addr);
536     
537   /* Acquires the non-blocking attrib from the server socket. */
538   c->socket = accept (uf->file_descriptor, 
539                       (struct sockaddr *)&client_addr, 
540                       (socklen_t *)&client_len);
541     
542   if (c->socket < 0)
543     {
544       pool_put (msm->catchups, c);
545       return clib_error_return_unix (0, "accept");
546     }
547
548   if (MC_EVENT_LOGGING)
549     {
550       mc_main_t * mcm = &msm->mc_main;
551       vlib_main_t * vm = mcm->vlib_main;
552
553       ELOG_TYPE_DECLARE (e) = {
554         .format = "catchup accepted from 0x%lx",
555         .format_args = "i4",
556       };
557       struct { u32 addr; } * ed = 0;
558
559       ed = ELOG_DATA (&vm->elog_main, e);
560       ed->addr = ntohl(client_addr.sin_addr.s_addr);
561     }
562
563   /* Disable the Nagle algorithm, ship catchup pkts immediately */
564   {
565     int one = 1;
566     if ((setsockopt(c->socket, IPPROTO_TCP, 
567                     TCP_NODELAY, (void *)&one, sizeof(one))) < 0) {
568       clib_unix_warning("catchup socket: set TCP_NODELAY");
569     }
570   }
571
572   template.read_function = catchup_server_read_ready;
573   template.write_function = catchup_server_write_ready;
574   template.error_function = catchup_socket_error_ready;
575   template.file_descriptor = c->socket;
576   template.private_data = pointer_to_uword (msm);
577   c->unix_file_index = unix_file_add (&unix_main, &template);
578   hash_set (msm->catchup_index_by_file_descriptor, c->socket, c - msm->catchups);
579
580   return 0;
581 }
582
583 /* Return and bind to an unused port. */
584 static word find_and_bind_to_free_port (word sock, word port)
585 {
586   for (; port < 1 << 16; port++)
587     {
588       struct sockaddr_in a;
589
590       memset (&a, 0, sizeof(a)); /* Warnings be gone */
591
592       a.sin_family = PF_INET;
593       a.sin_addr.s_addr = INADDR_ANY;
594       a.sin_port = htons (port);
595
596       if (bind (sock, (struct sockaddr *) &a, sizeof (a)) >= 0)
597         break;
598     }
599         
600   return port < 1 << 16 ? port : -1;
601 }
602
603 static clib_error_t *
604 setup_mutlicast_socket (mc_socket_main_t * msm,
605                         mc_multicast_socket_t * ms,
606                         char * type,
607                         uword udp_port)
608 {
609   int one = 1;
610   struct ip_mreq mcast_req;
611
612   if (! msm->multicast_ttl)
613     msm->multicast_ttl = 1;
614
615   /* mastership (multicast) TX socket */
616   if ((ms->socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
617     return clib_error_return_unix(0, "%s socket", type);
618
619   {
620     u8 ttl = msm->multicast_ttl;
621
622     if ((setsockopt(ms->socket, IPPROTO_IP, 
623                     IP_MULTICAST_TTL, (void *)&ttl, sizeof(ttl))) < 0)
624       return clib_error_return_unix(0, "%s set multicast ttl", type);
625   }
626
627   if (setsockopt(ms->socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
628     return clib_error_return_unix (0, "%s setsockopt SO_REUSEADDR", type);
629
630   memset (&ms->tx_addr, 0, sizeof (ms->tx_addr));
631   ms->tx_addr.sin_family = AF_INET;
632   ms->tx_addr.sin_addr.s_addr = htonl (msm->multicast_tx_ip4_address_host_byte_order);
633   ms->tx_addr.sin_port = htons (udp_port);
634     
635   if (bind(ms->socket, (struct sockaddr *)&ms->tx_addr, 
636            sizeof (ms->tx_addr)) < 0)
637     return clib_error_return_unix(0, "%s bind", type);
638
639   memset (&mcast_req, 0, sizeof (mcast_req));
640   mcast_req.imr_multiaddr.s_addr = htonl (msm->multicast_tx_ip4_address_host_byte_order);
641   mcast_req.imr_interface.s_addr = msm->if_ip4_address_net_byte_order;
642
643   if ((setsockopt(ms->socket, IPPROTO_IP, 
644                   IP_ADD_MEMBERSHIP, (void *)&mcast_req, 
645                   sizeof (mcast_req))) < 0)
646     return clib_error_return_unix(0, "%s IP_ADD_MEMBERSHIP setsockopt", type);
647
648   if (ioctl (ms->socket, FIONBIO, &one) < 0)
649     return clib_error_return_unix (0, "%s set FIONBIO", type);
650
651   /* FIXME remove this when we support tx_ready. */
652   {
653     u32 len = 1 << 20;
654     socklen_t sl = sizeof (len);
655     if (setsockopt(ms->socket, SOL_SOCKET, SO_SNDBUF, &len, sl) < 0)
656       clib_unix_error ("setsockopt");
657   }
658
659   return 0;
660 }
661
662 static clib_error_t *
663 socket_setup (mc_socket_main_t *msm)
664 {
665   int one = 1;
666   clib_error_t * error;
667   u32 port;
668
669   if (! msm->base_multicast_udp_port_host_byte_order)
670     msm->base_multicast_udp_port_host_byte_order = 
671         0xffff - ((MC_N_TRANSPORT_TYPE + 2 /* ack socket, catchup socket */) 
672                   - 1);
673
674   port = msm->base_multicast_udp_port_host_byte_order;
675
676   error = setup_mutlicast_socket (msm,
677                                   &msm->multicast_sockets[MC_TRANSPORT_MASTERSHIP],
678                                   "mastership",
679                                   port++);
680   if (error)
681     return error;
682
683   error = setup_mutlicast_socket (msm,
684                                   &msm->multicast_sockets[MC_TRANSPORT_JOIN],
685                                   "join",
686                                   port++);
687   if (error)
688     return error;
689
690   error = setup_mutlicast_socket (msm,
691                                   &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_TO_RELAY],
692                                   "to relay",
693                                   port++);
694   if (error)
695     return error;
696
697   error = setup_mutlicast_socket (msm,
698                                   &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY],
699                                   "from relay",
700                                   port++);
701   if (error)
702     return error;
703
704   /* ACK rx socket */
705   msm->ack_socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP);
706   if (msm->ack_socket < 0)
707     return clib_error_return_unix(0, "ack socket");
708
709   msm->ack_udp_port = find_and_bind_to_free_port (msm->ack_socket, port++);
710
711   if (ioctl (msm->ack_socket, FIONBIO, &one) < 0)
712     return clib_error_return_unix (0, "ack socket FIONBIO");
713
714   msm->catchup_server_socket = socket(AF_INET, SOCK_STREAM, 0);
715   if (msm->catchup_server_socket < 0)
716     return clib_error_return_unix (0, "catchup server socket");
717     
718   msm->catchup_tcp_port = find_and_bind_to_free_port (msm->catchup_server_socket, port++);
719
720   if (ioctl (msm->catchup_server_socket, FIONBIO, &one) < 0)
721     return clib_error_return_unix (0, "catchup server socket FIONBIO");
722
723   if (listen(msm->catchup_server_socket, 5) < 0)
724     return clib_error_return_unix (0, "catchup server socket listen");
725     
726   /* epoll setup for multicast mastership socket */
727   {
728     unix_file_t template = {0};
729
730     template.read_function = mastership_socket_read_ready;
731     template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_MASTERSHIP].socket;
732     template.private_data = (uword) msm;
733     unix_file_add (&unix_main, &template);
734
735     /* epoll setup for multicast to_relay socket */
736     template.read_function = to_relay_socket_read_ready;
737     template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_TO_RELAY].socket;
738     template.private_data = (uword) msm;
739     unix_file_add (&unix_main, &template);
740
741     /* epoll setup for multicast from_relay socket */
742     template.read_function = from_relay_socket_read_ready;
743     template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY].socket;
744     template.private_data = (uword) msm;
745     unix_file_add (&unix_main, &template);
746
747     template.read_function = join_socket_read_ready;
748     template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_JOIN].socket;
749     template.private_data = (uword) msm;
750     unix_file_add (&unix_main, &template);
751
752     /* epoll setup for ack rx socket */
753     template.read_function = ack_socket_read_ready;
754     template.file_descriptor = msm->ack_socket;
755     template.private_data = (uword) msm;
756     unix_file_add (&unix_main, &template);
757
758     /* epoll setup for TCP catchup server */
759     template.read_function = catchup_listen_read_ready;
760     template.file_descriptor = msm->catchup_server_socket;
761     template.private_data = (uword) msm;
762     unix_file_add (&unix_main, &template);
763   }
764
765   return 0;
766 }
767
768 static void *
769 catchup_add_pending_output (mc_socket_catchup_t * c, uword n_bytes, u8 * set_output_vector)
770 {
771   unix_file_t * uf = pool_elt_at_index (unix_main.file_pool,
772                                         c->unix_file_index);
773   u8 * result=0;
774
775   if (set_output_vector)
776     c->output_vector = set_output_vector;
777   else
778     vec_add2 (c->output_vector, result, n_bytes);
779   if (vec_len (c->output_vector) > 0)
780     {
781       int skip_update = 0 != (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE);
782       uf->flags |= UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
783       if (! skip_update)
784         unix_main.file_update (uf, UNIX_FILE_UPDATE_MODIFY);
785     }
786   return result;
787 }
788
789 static uword catchup_request_fun (void *transport_main, 
790                                   u32 stream_index,
791                                   mc_peer_id_t catchup_peer_id)
792 {
793   mc_socket_main_t *msm = (mc_socket_main_t *)transport_main;
794   mc_main_t * mcm = &msm->mc_main;
795   vlib_main_t * vm = mcm->vlib_main;
796   mc_socket_catchup_t *c;
797   struct sockaddr_in addr;
798   unix_main_t *um = &unix_main;
799   int one = 1;
800
801   pool_get (msm->catchups, c);
802   memset (c, 0, sizeof (*c));
803
804   c->socket = socket(AF_INET, SOCK_STREAM, 0);
805   if (c->socket < 0)
806     {
807       clib_unix_warning ("socket");
808       return 0;
809     }
810
811   if (ioctl (c->socket, FIONBIO, &one) < 0)
812     {
813       clib_unix_warning ("FIONBIO");
814       return 0;
815     }
816
817   memset(&addr, 0, sizeof(addr));
818   addr.sin_family = AF_INET;
819   addr.sin_addr.s_addr = mc_socket_peer_id_get_address (catchup_peer_id);
820   addr.sin_port = mc_socket_peer_id_get_port (catchup_peer_id);
821     
822   c->connect_in_progress = 1;
823
824   if (MC_EVENT_LOGGING)
825     {
826       ELOG_TYPE_DECLARE (e) = {
827         .format = "connecting to peer 0x%Lx",
828         .format_args = "i8",
829       };
830       struct { u64 peer; } * ed;
831       ed = ELOG_DATA (&vm->elog_main, e);
832       ed->peer = catchup_peer_id.as_u64;
833     }
834       
835   if (connect(c->socket, (const void *)&addr,sizeof(addr)) 
836       < 0 && errno != EINPROGRESS)
837     {
838       clib_unix_warning ("connect to %U fails",
839                          format_socket_peer_id, catchup_peer_id);
840       return 0;
841     }
842   
843   {
844     unix_file_t template = {0};
845     
846     template.read_function = catchup_client_read_ready;
847     template.write_function = catchup_client_write_ready;
848     template.error_function = catchup_socket_error_ready;
849     template.file_descriptor = c->socket;
850     template.private_data = (uword) msm;
851     c->unix_file_index = unix_file_add (um, &template);
852
853     hash_set (msm->catchup_index_by_file_descriptor, c->socket, c - msm->catchups);
854   }
855
856   {
857     mc_msg_catchup_request_t * mp;
858     mp = catchup_add_pending_output (c, sizeof (mp[0]), /* set_output_vector */ 0);
859     mp->peer_id = msm->mc_main.transport.our_catchup_peer_id; 
860     mp->stream_index = stream_index;
861     mc_byte_swap_msg_catchup_request (mp);
862   }
863
864   return c - msm->catchups;
865 }
866
867 static void catchup_send_fun (void *transport_main, uword opaque, u8 * data)
868 {
869   mc_socket_main_t *msm = (mc_socket_main_t *)transport_main;
870   mc_socket_catchup_t *c = pool_elt_at_index (msm->catchups, opaque);
871   catchup_add_pending_output (c, 0, data);
872 }
873
874 static int
875 find_interface_ip4_address (char * if_name, u32 * ip4_address, u32 * mtu)
876 {
877   int fd;
878   struct ifreq ifr;
879   struct sockaddr_in * sa;
880
881   /* Dig up our IP address */
882   fd = socket (PF_INET, AF_INET, 0);
883   if (fd < 0) {
884     clib_unix_error ("socket");
885     return -1;
886   }
887
888   ifr.ifr_addr.sa_family = AF_INET;
889   strncpy (ifr.ifr_name, if_name, sizeof(ifr.ifr_name)-1);
890   if (ioctl (fd, SIOCGIFADDR, &ifr) < 0) {
891     clib_unix_error ("ioctl(SIOCFIGADDR)");
892     return -1;
893   }
894
895   sa = (void *) &ifr.ifr_addr;
896   memcpy (ip4_address, &sa->sin_addr.s_addr, sizeof (ip4_address[0]));
897
898   if (ioctl (fd, SIOCGIFMTU, &ifr) < 0)
899     return -1;
900   if (mtu)
901     *mtu = ifr.ifr_mtu - (/* IP4 header */ 20 + /* UDP header */ 8);
902
903   close (fd);
904
905   return 0;
906 }
907
908 clib_error_t *
909 mc_socket_main_init (mc_socket_main_t * msm, char **intfc_probe_list,
910                      int n_intfcs_to_probe)
911 {
912   clib_error_t * error;
913   mc_main_t * mcm;
914   u32 mtu;
915
916   mcm = &msm->mc_main;
917
918   /* 239.255.0.7 */
919   if (! msm->multicast_tx_ip4_address_host_byte_order)
920     msm->multicast_tx_ip4_address_host_byte_order = 0xefff0007;
921
922   {
923     u32 i, a, win;
924
925     win = 0;
926     if (msm->multicast_interface_name)
927       {
928         win = ! find_interface_ip4_address (msm->multicast_interface_name, &a, &mtu);
929       }
930     else
931       {
932         for (i = 0; i < n_intfcs_to_probe; i++)
933           if (! find_interface_ip4_address (intfc_probe_list[i], &a, &mtu))
934             {
935               win = 1;
936               msm->multicast_interface_name = intfc_probe_list[i];
937               break;
938             }
939       }
940
941     if (! win)
942       return clib_error_return (0, "can't find interface ip4 address");
943
944     msm->if_ip4_address_net_byte_order = a;
945   }
946
947   msm->rx_mtu_n_bytes = mtu;
948   msm->rx_mtu_n_buffers = msm->rx_mtu_n_bytes / VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES;
949   msm->rx_mtu_n_buffers += (msm->rx_mtu_n_bytes % VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES) != 0;
950
951   error = socket_setup (msm);
952   if (error)
953     return error;
954
955   mcm->transport.our_ack_peer_id =
956     mc_socket_set_peer_id (msm->if_ip4_address_net_byte_order, msm->ack_udp_port);
957
958   mcm->transport.our_catchup_peer_id =
959     mc_socket_set_peer_id (msm->if_ip4_address_net_byte_order, msm->catchup_tcp_port);
960
961   mcm->transport.tx_buffer = tx_buffer;
962   mcm->transport.tx_ack = tx_ack;
963   mcm->transport.catchup_request_fun = catchup_request_fun;
964   mcm->transport.catchup_send_fun = catchup_send_fun;
965   mcm->transport.format_peer_id = format_socket_peer_id;
966   mcm->transport.opaque = msm;
967   mcm->transport.max_packet_size = mtu;
968
969   mc_main_init (mcm, "socket");
970
971   return error;
972 }