2 * mc_socket.c: socket based multicast for vlib mc
4 * Copyright (c) 2010 Cisco and/or its affiliates.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include <vlib/vlib.h>
19 #include <vlib/unix/mc_socket.h>
21 #include <sys/ioctl.h> /* for FIONBIO */
22 #include <netinet/tcp.h> /* for TCP_NODELAY */
23 #include <net/if.h> /* for struct ifreq */
25 static u8 * format_socket_peer_id (u8 * s, va_list * args)
27 u64 peer_id_as_u64 = va_arg (*args, u64);
29 peer_id.as_u64 = peer_id_as_u64;
30 u32 a = mc_socket_peer_id_get_address (peer_id);
31 u32 p = mc_socket_peer_id_get_port (peer_id);
33 s = format (s, "%U:%04x", format_network_address, AF_INET, &a,
39 typedef void (mc_msg_handler_t) (mc_main_t * mcm, void * msg, u32 buffer_index);
41 always_inline void msg_handler (mc_main_t * mcm,
43 u32 handler_frees_buffer,
46 vlib_main_t * vm = mcm->vlib_main;
47 mc_msg_handler_t * h = _h;
48 vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
49 void * the_msg = vlib_buffer_get_current (b);
51 h (mcm, the_msg, buffer_index);
52 if (! handler_frees_buffer)
53 vlib_buffer_free_one (vm, buffer_index);
57 append_buffer_index_to_iovec (vlib_main_t * vm,
59 struct iovec ** iovs_return)
63 u32 bi = buffer_index;
68 b = vlib_get_buffer (vm, bi);
69 vec_add2 (*iovs_return, i, 1);
70 i->iov_base = vlib_buffer_get_current (b);
71 i->iov_len = b->current_length;
73 if (! (b->flags & VLIB_BUFFER_NEXT_PRESENT))
82 sendmsg_helper (mc_socket_main_t * msm,
84 struct sockaddr_in * tx_addr,
87 vlib_main_t * vm = msm->mc_main.vlib_main;
89 word n_bytes, n_bytes_tx, n_retries;
91 memset (&h, 0, sizeof (h));
93 h.msg_namelen = sizeof (tx_addr[0]);
96 _vec_len (msm->iovecs) = 0;
98 n_bytes = append_buffer_index_to_iovec (vm, buffer_index, &msm->iovecs);
99 ASSERT (n_bytes <= msm->mc_main.transport.max_packet_size);
100 if (n_bytes > msm->mc_main.transport.max_packet_size)
101 clib_error ("sending packet larger than interace MTU %d bytes", n_bytes);
103 h.msg_iov = msm->iovecs;
104 h.msg_iovlen = vec_len (msm->iovecs);
107 while ((n_bytes_tx = sendmsg (socket, &h, /* flags */ 0)) != n_bytes
110 if (n_bytes_tx != n_bytes)
112 clib_unix_warning ("sendmsg");
117 ELOG_TYPE_DECLARE (e) = {
118 .format = "sendmsg-helper: %d retries",
121 struct { u32 retries; } * ed = 0;
123 ed = ELOG_DATA (&vm->elog_main, e);
124 ed->retries = n_retries;
129 static clib_error_t *
130 tx_buffer (void * transport, mc_transport_type_t type, u32 buffer_index)
132 mc_socket_main_t *msm = (mc_socket_main_t *)transport;
133 vlib_main_t * vm = msm->mc_main.vlib_main;
134 mc_multicast_socket_t * ms = &msm->multicast_sockets[type];
135 clib_error_t * error;
136 error = sendmsg_helper (msm, ms->socket, &ms->tx_addr, buffer_index);
137 if (type != MC_TRANSPORT_USER_REQUEST_TO_RELAY)
138 vlib_buffer_free_one (vm, buffer_index);
142 static clib_error_t *
143 tx_ack (void *transport, mc_peer_id_t dest_peer_id, u32 buffer_index)
145 struct sockaddr_in tx_addr;
146 mc_socket_main_t *msm = (mc_socket_main_t *)transport;
147 vlib_main_t * vm = msm->mc_main.vlib_main;
148 clib_error_t * error;
150 memset (&tx_addr, 0, sizeof (tx_addr));
151 tx_addr.sin_family = AF_INET;
152 tx_addr.sin_addr.s_addr = mc_socket_peer_id_get_address (dest_peer_id);
153 tx_addr.sin_port = mc_socket_peer_id_get_port (dest_peer_id);
155 error = sendmsg_helper (msm, msm->ack_socket, &tx_addr, buffer_index);
156 vlib_buffer_free_one (vm, buffer_index);
160 static clib_error_t *
161 recvmsg_helper (mc_socket_main_t * msm,
163 struct sockaddr_in * rx_addr,
167 vlib_main_t * vm = msm->mc_main.vlib_main;
169 uword n_left, n_alloc, n_mtu, i, i_rx;
170 const uword buffer_size = VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES;
173 /* Make sure we have at least a MTU worth of buffers. */
174 n_mtu = msm->rx_mtu_n_buffers;
175 n_left = vec_len (msm->rx_buffers);
178 uword max_alloc = 8 * n_mtu;
179 vec_validate (msm->rx_buffers, max_alloc - 1);
180 n_alloc = vlib_buffer_alloc (vm, msm->rx_buffers + n_left, max_alloc - n_left);
181 _vec_len (msm->rx_buffers) = n_left + n_alloc;
184 ASSERT (vec_len (msm->rx_buffers) >= n_mtu);
185 vec_validate (msm->iovecs, n_mtu - 1);
187 /* Allocate RX buffers from end of rx_buffers.
188 Turn them into iovecs to pass to readv. */
189 i_rx = vec_len (msm->rx_buffers) - 1;
190 for (i = 0; i < n_mtu; i++)
192 b = vlib_get_buffer (vm, msm->rx_buffers[i_rx - i]);
193 msm->iovecs[i].iov_base = b->data;
194 msm->iovecs[i].iov_len = buffer_size;
196 _vec_len (msm->iovecs) = n_mtu;
201 memset (&h, 0, sizeof (h));
204 h.msg_name = rx_addr;
205 h.msg_namelen = sizeof (rx_addr[0]);
207 h.msg_iov = msm->iovecs;
208 h.msg_iovlen = vec_len (msm->iovecs);
210 n_bytes_left = recvmsg (socket, &h, 0);
211 if (n_bytes_left < 0)
212 return clib_error_return_unix (0, "recvmsg");
221 *buffer_index = msm->rx_buffers[i_rx];
224 b = vlib_get_buffer (vm, msm->rx_buffers[i_rx]);
228 b->current_length = n_bytes_left < buffer_size ? n_bytes_left : buffer_size;
230 n_bytes_left -= buffer_size;
232 if (n_bytes_left <= 0)
236 b->flags |= VLIB_BUFFER_NEXT_PRESENT;
237 b->next_buffer = msm->rx_buffers[i_rx];
240 _vec_len (msm->rx_buffers) = i_rx;
242 return 0 /* no error */;
245 static clib_error_t * mastership_socket_read_ready (unix_file_t * uf)
247 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
248 mc_main_t * mcm = &msm->mc_main;
249 mc_multicast_socket_t * ms = &msm->multicast_sockets[MC_TRANSPORT_MASTERSHIP];
250 clib_error_t * error;
253 error = recvmsg_helper (msm, ms->socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
255 msg_handler (mcm, bi,
256 /* handler_frees_buffer */ 0,
257 mc_msg_master_assert_handler);
262 static clib_error_t * to_relay_socket_read_ready (unix_file_t * uf)
264 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
265 mc_main_t *mcm = &msm->mc_main;
266 vlib_main_t * vm = msm->mc_main.vlib_main;
267 mc_multicast_socket_t * ms_to_relay = &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_TO_RELAY];
268 mc_multicast_socket_t * ms_from_relay = &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY];
269 clib_error_t * error;
271 u32 is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
273 /* Not the ordering master? Turf the msg */
274 error = recvmsg_helper (msm, ms_to_relay->socket, /* rx_addr */ 0, &bi,
275 /* drop_message */ ! is_master);
277 /* If we are the master, number and rebroadcast the msg. */
278 if (! error && is_master)
280 vlib_buffer_t * b = vlib_get_buffer (vm, bi);
281 mc_msg_user_request_t * mp = vlib_buffer_get_current (b);
282 mp->global_sequence = clib_host_to_net_u32 (mcm->relay_global_sequence);
283 mcm->relay_global_sequence++;
284 error = sendmsg_helper (msm, ms_from_relay->socket, &ms_from_relay->tx_addr, bi);
285 vlib_buffer_free_one (vm, bi);
291 static clib_error_t * from_relay_socket_read_ready (unix_file_t * uf)
293 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
294 mc_main_t * mcm = &msm->mc_main;
295 mc_multicast_socket_t * ms = &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY];
296 clib_error_t * error;
299 error = recvmsg_helper (msm, ms->socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
302 msg_handler (mcm, bi, /* handler_frees_buffer */ 1,
303 mc_msg_user_request_handler);
308 static clib_error_t * join_socket_read_ready (unix_file_t * uf)
310 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
311 mc_main_t * mcm = &msm->mc_main;
312 vlib_main_t * vm = mcm->vlib_main;
313 mc_multicast_socket_t * ms = &msm->multicast_sockets[MC_TRANSPORT_JOIN];
314 clib_error_t * error;
317 error = recvmsg_helper (msm, ms->socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
320 vlib_buffer_t * b = vlib_get_buffer (vm, bi);
321 mc_msg_join_or_leave_request_t * mp = vlib_buffer_get_current (b);
323 switch (clib_host_to_net_u32 (mp->type))
325 case MC_MSG_TYPE_join_or_leave_request:
326 msg_handler (mcm, bi, /* handler_frees_buffer */ 0,
327 mc_msg_join_or_leave_request_handler);
330 case MC_MSG_TYPE_join_reply:
331 msg_handler (mcm, bi, /* handler_frees_buffer */ 0,
332 mc_msg_join_reply_handler);
343 static clib_error_t * ack_socket_read_ready (unix_file_t * uf)
345 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
346 mc_main_t * mcm = &msm->mc_main;
347 clib_error_t * error;
350 error = recvmsg_helper (msm, msm->ack_socket, /* rx_addr */ 0, &bi, /* drop_message */ 0);
352 msg_handler (mcm, bi, /* handler_frees_buffer */ 0,
353 mc_msg_user_ack_handler);
357 static void catchup_cleanup (mc_socket_main_t *msm,
358 mc_socket_catchup_t *c,
359 unix_main_t *um, unix_file_t *uf)
361 hash_unset (msm->catchup_index_by_file_descriptor, uf->file_descriptor);
362 unix_file_del (um, uf);
363 vec_free (c->input_vector);
364 vec_free (c->output_vector);
365 pool_put (msm->catchups, c);
368 static mc_socket_catchup_t *
369 find_catchup_from_file_descriptor (mc_socket_main_t * msm, int file_descriptor)
371 uword * p = hash_get (msm->catchup_index_by_file_descriptor, file_descriptor);
372 return p ? pool_elt_at_index (msm->catchups, p[0]) : 0;
375 static clib_error_t * catchup_socket_read_ready (unix_file_t * uf, int is_server)
377 unix_main_t * um = &unix_main;
378 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
379 mc_main_t *mcm = &msm->mc_main;
380 mc_socket_catchup_t * c = find_catchup_from_file_descriptor (msm, uf->file_descriptor);
383 l = vec_len (c->input_vector);
384 vec_resize (c->input_vector, 4096);
385 n = read (uf->file_descriptor, c->input_vector + l, vec_len (c->input_vector) - l);
394 catchup_cleanup (msm, c, um, uf);
395 return clib_error_return_unix (0, "read");
399 _vec_len (c->input_vector) = l + n;
401 if (is_eof && vec_len (c->input_vector) > 0)
405 mc_msg_catchup_request_handler (mcm, (void *) c->input_vector, c - msm->catchups);
406 _vec_len (c->input_vector) = 0;
410 mc_msg_catchup_reply_handler (mcm, (void *) c->input_vector, c - msm->catchups);
411 c->input_vector = 0; /* reply handler is responsible for freeing vector */
412 catchup_cleanup (msm, c, um, uf);
416 return 0 /* no error */;
419 static clib_error_t * catchup_server_read_ready (unix_file_t * uf)
420 { return catchup_socket_read_ready (uf, /* is_server */ 1); }
422 static clib_error_t * catchup_client_read_ready (unix_file_t * uf)
424 if (MC_EVENT_LOGGING)
426 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
427 vlib_main_t * vm = msm->mc_main.vlib_main;
429 ELOG_TYPE (e, "catchup_client_read_ready");
430 ELOG (&vm->elog_main, e, 0);
432 return catchup_socket_read_ready (uf, /* is_server */ 0);
435 static clib_error_t *
436 catchup_socket_write_ready (unix_file_t * uf, int is_server)
438 unix_main_t * um = &unix_main;
439 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
440 mc_socket_catchup_t *c = find_catchup_from_file_descriptor (msm, uf->file_descriptor);
441 clib_error_t * error = 0;
444 if (c->connect_in_progress)
448 c->connect_in_progress = 0;
449 len = sizeof (value);
450 if (getsockopt (c->socket, SOL_SOCKET,
451 SO_ERROR, &value, &len) < 0)
453 error = clib_error_return_unix (0, "getsockopt SO_ERROR");
458 error = clib_error_return_code (0, value, CLIB_ERROR_ERRNO_VALID, "connect fails");
468 clib_min (vec_len (c->output_vector) - c->output_vector_n_written,
469 msm->rx_mtu_n_bytes - 64 /* ip + tcp + option allowance */);
471 if (n_this_write <= 0)
475 n = write (uf->file_descriptor,
476 c->output_vector + c->output_vector_n_written,
478 } while (n < 0 && errno == EAGAIN);
482 error = clib_error_return_unix (0, "write");
485 c->output_vector_n_written += n;
488 if (c->output_vector_n_written >= vec_len (c->output_vector))
492 uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
493 unix_main.file_update (uf, UNIX_FILE_UPDATE_MODIFY);
494 /* Send EOF to other side. */
495 shutdown (uf->file_descriptor, SHUT_WR);
501 catchup_cleanup (msm, c, um, uf);
507 static clib_error_t *
508 catchup_server_write_ready (unix_file_t * uf)
509 { return catchup_socket_write_ready (uf, /* is_server */ 1); }
511 static clib_error_t *
512 catchup_client_write_ready (unix_file_t * uf)
513 { return catchup_socket_write_ready (uf, /* is_server */ 0); }
515 static clib_error_t *catchup_socket_error_ready (unix_file_t *uf)
517 unix_main_t *um = &unix_main;
518 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
519 mc_socket_catchup_t *c = find_catchup_from_file_descriptor (msm, uf->file_descriptor);
520 catchup_cleanup (msm, c, um, uf);
521 return clib_error_return (0, "error");
524 static clib_error_t *catchup_listen_read_ready (unix_file_t * uf)
526 mc_socket_main_t *msm = (mc_socket_main_t *)uf->private_data;
527 struct sockaddr_in client_addr;
529 mc_socket_catchup_t *c;
530 unix_file_t template = {0};
532 pool_get (msm->catchups, c);
533 memset(c, 0, sizeof (c[0]));
535 client_len = sizeof(client_addr);
537 /* Acquires the non-blocking attrib from the server socket. */
538 c->socket = accept (uf->file_descriptor,
539 (struct sockaddr *)&client_addr,
540 (socklen_t *)&client_len);
544 pool_put (msm->catchups, c);
545 return clib_error_return_unix (0, "accept");
548 if (MC_EVENT_LOGGING)
550 mc_main_t * mcm = &msm->mc_main;
551 vlib_main_t * vm = mcm->vlib_main;
553 ELOG_TYPE_DECLARE (e) = {
554 .format = "catchup accepted from 0x%lx",
557 struct { u32 addr; } * ed = 0;
559 ed = ELOG_DATA (&vm->elog_main, e);
560 ed->addr = ntohl(client_addr.sin_addr.s_addr);
563 /* Disable the Nagle algorithm, ship catchup pkts immediately */
566 if ((setsockopt(c->socket, IPPROTO_TCP,
567 TCP_NODELAY, (void *)&one, sizeof(one))) < 0) {
568 clib_unix_warning("catchup socket: set TCP_NODELAY");
572 template.read_function = catchup_server_read_ready;
573 template.write_function = catchup_server_write_ready;
574 template.error_function = catchup_socket_error_ready;
575 template.file_descriptor = c->socket;
576 template.private_data = pointer_to_uword (msm);
577 c->unix_file_index = unix_file_add (&unix_main, &template);
578 hash_set (msm->catchup_index_by_file_descriptor, c->socket, c - msm->catchups);
583 /* Return and bind to an unused port. */
584 static word find_and_bind_to_free_port (word sock, word port)
586 for (; port < 1 << 16; port++)
588 struct sockaddr_in a;
590 memset (&a, 0, sizeof(a)); /* Warnings be gone */
592 a.sin_family = PF_INET;
593 a.sin_addr.s_addr = INADDR_ANY;
594 a.sin_port = htons (port);
596 if (bind (sock, (struct sockaddr *) &a, sizeof (a)) >= 0)
600 return port < 1 << 16 ? port : -1;
603 static clib_error_t *
604 setup_mutlicast_socket (mc_socket_main_t * msm,
605 mc_multicast_socket_t * ms,
610 struct ip_mreq mcast_req;
612 if (! msm->multicast_ttl)
613 msm->multicast_ttl = 1;
615 /* mastership (multicast) TX socket */
616 if ((ms->socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
617 return clib_error_return_unix(0, "%s socket", type);
620 u8 ttl = msm->multicast_ttl;
622 if ((setsockopt(ms->socket, IPPROTO_IP,
623 IP_MULTICAST_TTL, (void *)&ttl, sizeof(ttl))) < 0)
624 return clib_error_return_unix(0, "%s set multicast ttl", type);
627 if (setsockopt(ms->socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
628 return clib_error_return_unix (0, "%s setsockopt SO_REUSEADDR", type);
630 memset (&ms->tx_addr, 0, sizeof (ms->tx_addr));
631 ms->tx_addr.sin_family = AF_INET;
632 ms->tx_addr.sin_addr.s_addr = htonl (msm->multicast_tx_ip4_address_host_byte_order);
633 ms->tx_addr.sin_port = htons (udp_port);
635 if (bind(ms->socket, (struct sockaddr *)&ms->tx_addr,
636 sizeof (ms->tx_addr)) < 0)
637 return clib_error_return_unix(0, "%s bind", type);
639 memset (&mcast_req, 0, sizeof (mcast_req));
640 mcast_req.imr_multiaddr.s_addr = htonl (msm->multicast_tx_ip4_address_host_byte_order);
641 mcast_req.imr_interface.s_addr = msm->if_ip4_address_net_byte_order;
643 if ((setsockopt(ms->socket, IPPROTO_IP,
644 IP_ADD_MEMBERSHIP, (void *)&mcast_req,
645 sizeof (mcast_req))) < 0)
646 return clib_error_return_unix(0, "%s IP_ADD_MEMBERSHIP setsockopt", type);
648 if (ioctl (ms->socket, FIONBIO, &one) < 0)
649 return clib_error_return_unix (0, "%s set FIONBIO", type);
651 /* FIXME remove this when we support tx_ready. */
654 socklen_t sl = sizeof (len);
655 if (setsockopt(ms->socket, SOL_SOCKET, SO_SNDBUF, &len, sl) < 0)
656 clib_unix_error ("setsockopt");
662 static clib_error_t *
663 socket_setup (mc_socket_main_t *msm)
666 clib_error_t * error;
669 if (! msm->base_multicast_udp_port_host_byte_order)
670 msm->base_multicast_udp_port_host_byte_order =
671 0xffff - ((MC_N_TRANSPORT_TYPE + 2 /* ack socket, catchup socket */)
674 port = msm->base_multicast_udp_port_host_byte_order;
676 error = setup_mutlicast_socket (msm,
677 &msm->multicast_sockets[MC_TRANSPORT_MASTERSHIP],
683 error = setup_mutlicast_socket (msm,
684 &msm->multicast_sockets[MC_TRANSPORT_JOIN],
690 error = setup_mutlicast_socket (msm,
691 &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_TO_RELAY],
697 error = setup_mutlicast_socket (msm,
698 &msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY],
705 msm->ack_socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP);
706 if (msm->ack_socket < 0)
707 return clib_error_return_unix(0, "ack socket");
709 msm->ack_udp_port = find_and_bind_to_free_port (msm->ack_socket, port++);
711 if (ioctl (msm->ack_socket, FIONBIO, &one) < 0)
712 return clib_error_return_unix (0, "ack socket FIONBIO");
714 msm->catchup_server_socket = socket(AF_INET, SOCK_STREAM, 0);
715 if (msm->catchup_server_socket < 0)
716 return clib_error_return_unix (0, "catchup server socket");
718 msm->catchup_tcp_port = find_and_bind_to_free_port (msm->catchup_server_socket, port++);
720 if (ioctl (msm->catchup_server_socket, FIONBIO, &one) < 0)
721 return clib_error_return_unix (0, "catchup server socket FIONBIO");
723 if (listen(msm->catchup_server_socket, 5) < 0)
724 return clib_error_return_unix (0, "catchup server socket listen");
726 /* epoll setup for multicast mastership socket */
728 unix_file_t template = {0};
730 template.read_function = mastership_socket_read_ready;
731 template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_MASTERSHIP].socket;
732 template.private_data = (uword) msm;
733 unix_file_add (&unix_main, &template);
735 /* epoll setup for multicast to_relay socket */
736 template.read_function = to_relay_socket_read_ready;
737 template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_TO_RELAY].socket;
738 template.private_data = (uword) msm;
739 unix_file_add (&unix_main, &template);
741 /* epoll setup for multicast from_relay socket */
742 template.read_function = from_relay_socket_read_ready;
743 template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_USER_REQUEST_FROM_RELAY].socket;
744 template.private_data = (uword) msm;
745 unix_file_add (&unix_main, &template);
747 template.read_function = join_socket_read_ready;
748 template.file_descriptor = msm->multicast_sockets[MC_TRANSPORT_JOIN].socket;
749 template.private_data = (uword) msm;
750 unix_file_add (&unix_main, &template);
752 /* epoll setup for ack rx socket */
753 template.read_function = ack_socket_read_ready;
754 template.file_descriptor = msm->ack_socket;
755 template.private_data = (uword) msm;
756 unix_file_add (&unix_main, &template);
758 /* epoll setup for TCP catchup server */
759 template.read_function = catchup_listen_read_ready;
760 template.file_descriptor = msm->catchup_server_socket;
761 template.private_data = (uword) msm;
762 unix_file_add (&unix_main, &template);
769 catchup_add_pending_output (mc_socket_catchup_t * c, uword n_bytes, u8 * set_output_vector)
771 unix_file_t * uf = pool_elt_at_index (unix_main.file_pool,
775 if (set_output_vector)
776 c->output_vector = set_output_vector;
778 vec_add2 (c->output_vector, result, n_bytes);
779 if (vec_len (c->output_vector) > 0)
781 int skip_update = 0 != (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE);
782 uf->flags |= UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
784 unix_main.file_update (uf, UNIX_FILE_UPDATE_MODIFY);
789 static uword catchup_request_fun (void *transport_main,
791 mc_peer_id_t catchup_peer_id)
793 mc_socket_main_t *msm = (mc_socket_main_t *)transport_main;
794 mc_main_t * mcm = &msm->mc_main;
795 vlib_main_t * vm = mcm->vlib_main;
796 mc_socket_catchup_t *c;
797 struct sockaddr_in addr;
798 unix_main_t *um = &unix_main;
801 pool_get (msm->catchups, c);
802 memset (c, 0, sizeof (*c));
804 c->socket = socket(AF_INET, SOCK_STREAM, 0);
807 clib_unix_warning ("socket");
811 if (ioctl (c->socket, FIONBIO, &one) < 0)
813 clib_unix_warning ("FIONBIO");
817 memset(&addr, 0, sizeof(addr));
818 addr.sin_family = AF_INET;
819 addr.sin_addr.s_addr = mc_socket_peer_id_get_address (catchup_peer_id);
820 addr.sin_port = mc_socket_peer_id_get_port (catchup_peer_id);
822 c->connect_in_progress = 1;
824 if (MC_EVENT_LOGGING)
826 ELOG_TYPE_DECLARE (e) = {
827 .format = "connecting to peer 0x%Lx",
830 struct { u64 peer; } * ed;
831 ed = ELOG_DATA (&vm->elog_main, e);
832 ed->peer = catchup_peer_id.as_u64;
835 if (connect(c->socket, (const void *)&addr,sizeof(addr))
836 < 0 && errno != EINPROGRESS)
838 clib_unix_warning ("connect to %U fails",
839 format_socket_peer_id, catchup_peer_id);
844 unix_file_t template = {0};
846 template.read_function = catchup_client_read_ready;
847 template.write_function = catchup_client_write_ready;
848 template.error_function = catchup_socket_error_ready;
849 template.file_descriptor = c->socket;
850 template.private_data = (uword) msm;
851 c->unix_file_index = unix_file_add (um, &template);
853 hash_set (msm->catchup_index_by_file_descriptor, c->socket, c - msm->catchups);
857 mc_msg_catchup_request_t * mp;
858 mp = catchup_add_pending_output (c, sizeof (mp[0]), /* set_output_vector */ 0);
859 mp->peer_id = msm->mc_main.transport.our_catchup_peer_id;
860 mp->stream_index = stream_index;
861 mc_byte_swap_msg_catchup_request (mp);
864 return c - msm->catchups;
867 static void catchup_send_fun (void *transport_main, uword opaque, u8 * data)
869 mc_socket_main_t *msm = (mc_socket_main_t *)transport_main;
870 mc_socket_catchup_t *c = pool_elt_at_index (msm->catchups, opaque);
871 catchup_add_pending_output (c, 0, data);
875 find_interface_ip4_address (char * if_name, u32 * ip4_address, u32 * mtu)
879 struct sockaddr_in * sa;
881 /* Dig up our IP address */
882 fd = socket (PF_INET, AF_INET, 0);
884 clib_unix_error ("socket");
888 ifr.ifr_addr.sa_family = AF_INET;
889 strncpy (ifr.ifr_name, if_name, sizeof(ifr.ifr_name)-1);
890 if (ioctl (fd, SIOCGIFADDR, &ifr) < 0) {
891 clib_unix_error ("ioctl(SIOCFIGADDR)");
895 sa = (void *) &ifr.ifr_addr;
896 clib_memcpy (ip4_address, &sa->sin_addr.s_addr, sizeof (ip4_address[0]));
898 if (ioctl (fd, SIOCGIFMTU, &ifr) < 0)
901 *mtu = ifr.ifr_mtu - (/* IP4 header */ 20 + /* UDP header */ 8);
909 mc_socket_main_init (mc_socket_main_t * msm, char **intfc_probe_list,
910 int n_intfcs_to_probe)
912 clib_error_t * error;
919 if (! msm->multicast_tx_ip4_address_host_byte_order)
920 msm->multicast_tx_ip4_address_host_byte_order = 0xefff0007;
926 if (msm->multicast_interface_name)
928 win = ! find_interface_ip4_address (msm->multicast_interface_name, &a, &mtu);
932 for (i = 0; i < n_intfcs_to_probe; i++)
933 if (! find_interface_ip4_address (intfc_probe_list[i], &a, &mtu))
936 msm->multicast_interface_name = intfc_probe_list[i];
942 return clib_error_return (0, "can't find interface ip4 address");
944 msm->if_ip4_address_net_byte_order = a;
947 msm->rx_mtu_n_bytes = mtu;
948 msm->rx_mtu_n_buffers = msm->rx_mtu_n_bytes / VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES;
949 msm->rx_mtu_n_buffers += (msm->rx_mtu_n_bytes % VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES) != 0;
951 error = socket_setup (msm);
955 mcm->transport.our_ack_peer_id =
956 mc_socket_set_peer_id (msm->if_ip4_address_net_byte_order, msm->ack_udp_port);
958 mcm->transport.our_catchup_peer_id =
959 mc_socket_set_peer_id (msm->if_ip4_address_net_byte_order, msm->catchup_tcp_port);
961 mcm->transport.tx_buffer = tx_buffer;
962 mcm->transport.tx_ack = tx_ack;
963 mcm->transport.catchup_request_fun = catchup_request_fun;
964 mcm->transport.catchup_send_fun = catchup_send_fun;
965 mcm->transport.format_peer_id = format_socket_peer_id;
966 mcm->transport.opaque = msm;
967 mcm->transport.max_packet_size = mtu;
969 mc_main_init (mcm, "socket");