a9db7705fe965531d3ad1904958768caf041831a
[vpp.git] / extras / libmemif / src / socket.c
1 /*
2  *------------------------------------------------------------------
3  * Copyright (c) 2017 Cisco and/or its affiliates.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *------------------------------------------------------------------
16  */
17
18 #define _GNU_SOURCE
19 #include <sys/socket.h>
20 #include <sys/types.h>
21 #include <sys/un.h>
22 #include <string.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <net/if.h>
26 #include <sys/ioctl.h>
27 #include <sys/uio.h>
28 #include <sys/mman.h>
29 #include <sys/prctl.h>
30 #include <fcntl.h>
31 #include <errno.h>
32
33 #include <socket.h>
34 #include <memif.h>
35 #include <memif_private.h>
36
37 /* sends msg to socket */
38 static int
39 memif_msg_send_from_queue (memif_control_channel_t *cc)
40 {
41   struct msghdr mh = { 0 };
42   struct iovec iov[1];
43   char ctl[CMSG_SPACE (sizeof (int))];
44   int rv, err = MEMIF_ERR_SUCCESS;      /* 0 */
45   memif_msg_queue_elt_t *e;
46
47   /* Pick the first message */
48   e = TAILQ_FIRST (&cc->msg_queue);
49   if (e == NULL)
50     return MEMIF_ERR_SUCCESS;
51
52   /* Construct the message */
53   iov[0].iov_base = (void *) &e->msg;
54   iov[0].iov_len = sizeof (memif_msg_t);
55   mh.msg_iov = iov;
56   mh.msg_iovlen = 1;
57
58   if (e->fd > 0)
59     {
60       struct cmsghdr *cmsg;
61       memset (&ctl, 0, sizeof (ctl));
62       mh.msg_control = ctl;
63       mh.msg_controllen = sizeof (ctl);
64       cmsg = CMSG_FIRSTHDR (&mh);
65       cmsg->cmsg_len = CMSG_LEN (sizeof (int));
66       cmsg->cmsg_level = SOL_SOCKET;
67       cmsg->cmsg_type = SCM_RIGHTS;
68       memcpy (CMSG_DATA (cmsg), &e->fd, sizeof (int));
69     }
70   rv = sendmsg (cc->fd, &mh, 0);
71   if (rv < 0)
72     err = memif_syscall_error_handler (errno);
73   DBG ("Message type %u sent", e->msg.type);
74
75   /* If sent successfully, remove the msg from queue */
76   if (err == MEMIF_ERR_SUCCESS)
77     {
78       TAILQ_REMOVE (&cc->msg_queue, e, next);
79       cc->sock->args.free (e);
80     }
81
82   return err;
83 }
84
85 static memif_msg_queue_elt_t *
86 memif_msg_enq (memif_control_channel_t *cc)
87 {
88   memif_msg_queue_elt_t *e;
89
90   e = cc->sock->args.alloc (sizeof (*e));
91   if (e == NULL)
92     return NULL;
93
94   e->fd = -1;
95   TAILQ_INSERT_TAIL (&cc->msg_queue, e, next);
96
97   return e;
98 }
99
100 static int
101 memif_msg_enq_hello (memif_control_channel_t *cc)
102 {
103   memif_msg_hello_t *h;
104   memif_msg_queue_elt_t *e = memif_msg_enq (cc);
105
106   if (e == NULL)
107     return MEMIF_ERR_NOMEM;
108
109   e->msg.type = MEMIF_MSG_TYPE_HELLO;
110
111   h = &e->msg.hello;
112   h->min_version = MEMIF_VERSION;
113   h->max_version = MEMIF_VERSION;
114   h->max_s2m_ring = MEMIF_MAX_S2M_RING;
115   h->max_m2s_ring = MEMIF_MAX_M2S_RING;
116   h->max_region = MEMIF_MAX_REGION;
117   h->max_log2_ring_size = MEMIF_MAX_LOG2_RING_SIZE;
118
119   strlcpy ((char *) h->name, (char *) cc->sock->args.app_name,
120            sizeof (h->name));
121
122   return MEMIF_ERR_SUCCESS;
123 }
124
125 /* response from memif master - master is ready to handle next message */
126 static int
127 memif_msg_enq_ack (memif_control_channel_t *cc)
128 {
129   memif_msg_queue_elt_t *e = memif_msg_enq (cc);
130
131   if (e == NULL)
132     return MEMIF_ERR_NOMEM;
133
134   e->msg.type = MEMIF_MSG_TYPE_ACK;
135   e->fd = -1;
136
137   return MEMIF_ERR_SUCCESS; /* 0 */
138 }
139
140 /* send id and secret (optional) for interface identification */
141 static int
142 memif_msg_enq_init (memif_control_channel_t *cc)
143 {
144   memif_msg_queue_elt_t *e = memif_msg_enq (cc);
145
146   if (e == NULL)
147     return MEMIF_ERR_NOMEM;
148
149   memif_msg_init_t *i = &e->msg.init;
150
151   e->msg.type = MEMIF_MSG_TYPE_INIT;
152   e->fd = -1;
153   i->version = MEMIF_VERSION;
154   i->id = cc->conn->args.interface_id;
155   i->mode = cc->conn->args.mode;
156
157   strlcpy ((char *) i->name, (char *) cc->sock->args.app_name,
158            sizeof (i->name));
159   if (strlen ((char *) cc->conn->args.secret) > 0)
160     strlcpy ((char *) i->secret, (char *) cc->conn->args.secret,
161              sizeof (i->secret));
162
163   return MEMIF_ERR_SUCCESS;     /* 0 */
164 }
165
166 /* send information about region specified by region_index */
167 static int
168 memif_msg_enq_add_region (memif_control_channel_t *cc, uint8_t region_index)
169 {
170   memif_region_t *mr = &cc->conn->regions[region_index];
171   memif_msg_queue_elt_t *e = memif_msg_enq (cc);
172
173   if (e == NULL)
174     return MEMIF_ERR_NOMEM;
175
176   memif_msg_add_region_t *ar = &e->msg.add_region;
177
178   e->msg.type = MEMIF_MSG_TYPE_ADD_REGION;
179   e->fd = mr->fd;
180   ar->index = region_index;
181   ar->size = mr->region_size;
182
183   return MEMIF_ERR_SUCCESS;     /* 0 */
184 }
185
186 /* send information about ring specified by direction (S2M | M2S) and index */
187 static int
188 memif_msg_enq_add_ring (memif_control_channel_t *cc, uint8_t index,
189                         uint8_t dir)
190 {
191   memif_msg_queue_elt_t *e = memif_msg_enq (cc);
192
193   if (e == NULL)
194     return MEMIF_ERR_NOMEM;
195
196   memif_msg_add_ring_t *ar = &e->msg.add_ring;
197
198   e->msg.type = MEMIF_MSG_TYPE_ADD_RING;
199
200   /* TODO: support multiple rings */
201   memif_queue_t *mq;
202   if (dir == MEMIF_RING_M2S)
203     mq = &cc->conn->rx_queues[index];
204   else
205     mq = &cc->conn->tx_queues[index];
206
207   e->fd = mq->int_fd;
208   ar->index = index;
209   ar->offset = mq->offset;
210   ar->region = mq->region;
211   ar->log2_ring_size = mq->log2_ring_size;
212   ar->flags = (dir == MEMIF_RING_S2M) ? MEMIF_MSG_ADD_RING_FLAG_S2M : 0;
213   ar->private_hdr_size = 0;
214
215   return MEMIF_ERR_SUCCESS;     /* 0 */
216 }
217
218 /* used as connection request from slave */
219 static int
220 memif_msg_enq_connect (memif_control_channel_t *cc)
221 {
222   memif_msg_queue_elt_t *e = memif_msg_enq (cc);
223
224   if (e == NULL)
225     return MEMIF_ERR_NOMEM;
226
227   memif_msg_connect_t *cm = &e->msg.connect;
228
229   e->msg.type = MEMIF_MSG_TYPE_CONNECT;
230   e->fd = -1;
231   strlcpy ((char *) cm->if_name, (char *) cc->conn->args.interface_name,
232            sizeof (cm->if_name));
233
234   return MEMIF_ERR_SUCCESS;     /* 0 */
235 }
236
237 /* used as confirmation of connection by master */
238 static int
239 memif_msg_enq_connected (memif_control_channel_t *cc)
240 {
241   memif_msg_queue_elt_t *e = memif_msg_enq (cc);
242
243   if (e == NULL)
244     return MEMIF_ERR_NOMEM;
245
246   memif_msg_connected_t *cm = &e->msg.connected;
247
248   e->msg.type = MEMIF_MSG_TYPE_CONNECTED;
249   e->fd = -1;
250   strlcpy ((char *) cm->if_name, (char *) cc->conn->args.interface_name,
251            sizeof (cm->if_name));
252
253   return MEMIF_ERR_SUCCESS;     /* 0 */
254 }
255
256 int
257 memif_msg_enq_disconnect (memif_control_channel_t *cc, uint8_t *err_string,
258                           uint32_t err_code)
259 {
260   memif_msg_queue_elt_t *e;
261
262   e = cc->sock->args.alloc (sizeof (*e));
263   if (e == NULL)
264     return MEMIF_ERR_NOMEM;
265
266   e->fd = -1;
267   /* Insert disconenct message at the top of the msg queue */
268   TAILQ_INSERT_HEAD (&cc->msg_queue, e, next);
269
270   memif_msg_disconnect_t *d = &e->msg.disconnect;
271
272   e->msg.type = MEMIF_MSG_TYPE_DISCONNECT;
273   d->code = err_code;
274   uint16_t l = sizeof (d->string);
275   if (l > 96)
276     {
277       DBG ("Disconnect string too long. Sending the first %d characters.",
278            sizeof (d->string) - 1);
279     }
280   strlcpy ((char *) d->string, (char *) err_string, sizeof (d->string));
281
282   return MEMIF_ERR_SUCCESS;
283 }
284
285 static int
286 memif_msg_parse_hello (memif_control_channel_t *cc, memif_msg_t *msg)
287 {
288   memif_msg_hello_t *h = &msg->hello;
289   memif_connection_t *c = cc->conn;
290
291   if (msg->hello.min_version > MEMIF_VERSION ||
292       msg->hello.max_version < MEMIF_VERSION)
293     {
294       DBG ("incompatible protocol version");
295       return MEMIF_ERR_PROTO;
296     }
297
298   c->run_args.num_s2m_rings = memif_min (h->max_s2m_ring + 1,
299                                          c->args.num_s2m_rings);
300   c->run_args.num_m2s_rings = memif_min (h->max_m2s_ring + 1,
301                                          c->args.num_m2s_rings);
302   c->run_args.log2_ring_size = memif_min (h->max_log2_ring_size,
303                                           c->args.log2_ring_size);
304   c->run_args.buffer_size = c->args.buffer_size;
305   strlcpy ((char *) c->remote_name, (char *) h->name, sizeof (c->remote_name));
306
307   return MEMIF_ERR_SUCCESS;     /* 0 */
308 }
309
310 /* handle interface identification (id, secret (optional)) */
311 static int
312 memif_msg_parse_init (memif_control_channel_t *cc, memif_msg_t *msg)
313 {
314   memif_msg_init_t *i = &msg->init;
315   memif_connection_t *c = NULL;
316   uint8_t err_string[96];
317   memset (err_string, 0, sizeof (char) * 96);
318   int err = MEMIF_ERR_SUCCESS;  /* 0 */
319
320   /* Check compatible meimf version */
321   if (i->version != MEMIF_VERSION)
322     {
323       DBG ("MEMIF_VER_ERR");
324       memif_msg_enq_disconnect (cc, MEMIF_VER_ERR, 0);
325       return MEMIF_ERR_PROTO;
326     }
327
328   /* Find endpoint on the socket */
329   TAILQ_FOREACH (c, &cc->sock->master_interfaces, next)
330   {
331     /* Match interface id */
332     if (c->args.interface_id != i->id)
333       continue;
334     /* If control channel is present, interface is connected (or connecting) */
335     if (c->control_channel != NULL)
336       {
337         memif_msg_enq_disconnect (cc, "Already connected", 0);
338         return MEMIF_ERR_ALRCONN;
339       }
340     /* Verify secret */
341     if (c->args.secret[0] != '\0')
342       {
343         if (strncmp ((char *) c->args.secret, (char *) i->secret, 24) != 0)
344           {
345             memif_msg_enq_disconnect (cc, "Incorrect secret", 0);
346             return MEMIF_ERR_SECRET;
347           }
348       }
349
350     /* Assign the control channel to this interface */
351     c->control_channel = cc;
352     cc->conn = c;
353
354     strlcpy ((char *) c->remote_name, (char *) i->name,
355              sizeof (c->remote_name));
356   }
357
358   return err;
359 }
360
361 /* receive region information and add new region to connection (if possible) */
362 static int
363 memif_msg_parse_add_region (memif_control_channel_t *cc, memif_msg_t *msg,
364                             int fd)
365 {
366   memif_msg_add_region_t *ar = &msg->add_region;
367   memif_region_t *mr;
368   memif_connection_t *c = cc->conn;
369
370   if (fd < 0)
371     return MEMIF_ERR_NO_SHMFD;
372
373   if (ar->index > MEMIF_MAX_REGION)
374     return MEMIF_ERR_MAXREG;
375
376   mr = (memif_region_t *) cc->sock->args.realloc (
377     c->regions, sizeof (memif_region_t) * (++c->regions_num));
378   if (mr == NULL)
379     return memif_syscall_error_handler (errno);
380   memset (mr + ar->index, 0, sizeof (memif_region_t));
381   c->regions = mr;
382   c->regions[ar->index].fd = fd;
383   c->regions[ar->index].region_size = ar->size;
384   c->regions[ar->index].addr = NULL;
385   /* region 0 is never external */
386   if (cc->sock->get_external_region_addr && (ar->index != 0))
387     c->regions[ar->index].is_external = 1;
388
389   return MEMIF_ERR_SUCCESS;     /* 0 */
390 }
391
392 /* receive ring information and add new ring to connection queue
393    (based on direction S2M | M2S) */
394 static int
395 memif_msg_parse_add_ring (memif_control_channel_t *cc, memif_msg_t *msg,
396                           int fd)
397 {
398   memif_msg_add_ring_t *ar = &msg->add_ring;
399   memif_connection_t *c = cc->conn;
400
401   memif_queue_t *mq;
402
403   if (fd < 0)
404     return MEMIF_ERR_NO_INTFD;
405
406   if (ar->private_hdr_size != 0)
407     return MEMIF_ERR_PRIVHDR;
408
409   if (ar->flags & MEMIF_MSG_ADD_RING_FLAG_S2M)
410     {
411       if (ar->index > MEMIF_MAX_S2M_RING)
412         return MEMIF_ERR_MAXRING;
413       if (ar->index >= c->args.num_s2m_rings)
414         return MEMIF_ERR_MAXRING;
415
416       mq = (memif_queue_t *) cc->sock->args.realloc (
417         c->rx_queues, sizeof (memif_queue_t) * (++c->rx_queues_num));
418       memset (mq + ar->index, 0, sizeof (memif_queue_t));
419       if (mq == NULL)
420         return memif_syscall_error_handler (errno);
421       c->rx_queues = mq;
422       c->rx_queues[ar->index].int_fd = fd;
423       c->rx_queues[ar->index].log2_ring_size = ar->log2_ring_size;
424       c->rx_queues[ar->index].region = ar->region;
425       c->rx_queues[ar->index].offset = ar->offset;
426       c->run_args.num_s2m_rings++;
427     }
428   else
429     {
430       if (ar->index > MEMIF_MAX_M2S_RING)
431         return MEMIF_ERR_MAXRING;
432       if (ar->index >= c->args.num_m2s_rings)
433         return MEMIF_ERR_MAXRING;
434
435       mq = (memif_queue_t *) cc->sock->args.realloc (
436         c->tx_queues, sizeof (memif_queue_t) * (++c->tx_queues_num));
437       memset (mq + ar->index, 0, sizeof (memif_queue_t));
438       if (mq == NULL)
439         return memif_syscall_error_handler (errno);
440       c->tx_queues = mq;
441       c->tx_queues[ar->index].int_fd = fd;
442       c->tx_queues[ar->index].log2_ring_size = ar->log2_ring_size;
443       c->tx_queues[ar->index].region = ar->region;
444       c->tx_queues[ar->index].offset = ar->offset;
445       c->run_args.num_m2s_rings++;
446     }
447
448   return MEMIF_ERR_SUCCESS;     /* 0 */
449 }
450
451 static int
452 memif_configure_rx_interrupt (memif_connection_t *c)
453 {
454   memif_socket_t *ms = (memif_socket_t *) c->args.socket;
455   memif_interrupt_t *idata;
456   memif_fd_event_t fde;
457   memif_fd_event_data_t *fdata;
458   void *ctx;
459   int i;
460
461   if (c->on_interrupt != NULL)
462     {
463       for (i = 0; i < c->run_args.num_m2s_rings; i++)
464         {
465           /* Allocate fd event data */
466           fdata = ms->args.alloc (sizeof (*fdata));
467           if (fdata == NULL)
468             {
469               memif_msg_enq_disconnect (c->control_channel, "Internal error",
470                                         0);
471               return MEMIF_ERR_NOMEM;
472             }
473           /* Allocate interrupt data */
474           idata = ms->args.alloc (sizeof (*fdata));
475           if (idata == NULL)
476             {
477               ms->args.free (fdata);
478               memif_msg_enq_disconnect (c->control_channel, "Internal error",
479                                         0);
480               return MEMIF_ERR_NOMEM;
481             }
482
483           /* configure interrupt data */
484           idata->c = c;
485           idata->qid = i;
486           /* configure fd event data */
487           fdata->event_handler = memif_interrupt_handler;
488           fdata->private_ctx = idata;
489           fde.fd = c->rx_queues[i].int_fd;
490           fde.type = MEMIF_FD_EVENT_READ;
491           fde.private_ctx = fdata;
492
493           /* Start listening for events */
494           ctx = ms->epfd != -1 ? ms : ms->private_ctx;
495           ms->args.on_control_fd_update (fde, ctx);
496         }
497     }
498
499   return MEMIF_ERR_SUCCESS;
500 }
501
502 /* slave -> master */
503 static int
504 memif_msg_parse_connect (memif_control_channel_t *cc, memif_msg_t *msg)
505 {
506   memif_msg_connect_t *cm = &msg->connect;
507   memif_connection_t *c = cc->conn;
508   int err;
509
510   err = memif_connect1 (c);
511   if (err != MEMIF_ERR_SUCCESS)
512     return err;
513
514   strlcpy ((char *) c->remote_if_name, (char *) cm->if_name,
515            sizeof (c->remote_if_name));
516
517   err = memif_configure_rx_interrupt (c);
518   if (err != MEMIF_ERR_SUCCESS)
519     return err;
520
521   c->on_connect ((void *) c, c->private_ctx);
522
523   return err;
524 }
525
526 /* master -> slave */
527 static int
528 memif_msg_parse_connected (memif_control_channel_t *cc, memif_msg_t *msg)
529 {
530   memif_msg_connect_t *cm = &msg->connect;
531   memif_connection_t *c = cc->conn;
532
533   int err;
534   err = memif_connect1 (c);
535   if (err != MEMIF_ERR_SUCCESS)
536     return err;
537
538   strlcpy ((char *) c->remote_if_name, (char *) cm->if_name,
539            sizeof (c->remote_if_name));
540
541   err = memif_configure_rx_interrupt (c);
542   if (err != MEMIF_ERR_SUCCESS)
543     return err;
544
545   c->on_connect ((void *) c, c->private_ctx);
546
547   return err;
548 }
549
550 static int
551 memif_msg_parse_disconnect (memif_control_channel_t *cc, memif_msg_t *msg)
552 {
553   memif_msg_disconnect_t *d = &msg->disconnect;
554   memif_connection_t *c = cc->conn;
555
556   memset (c->remote_disconnect_string, 0,
557           sizeof (c->remote_disconnect_string));
558   strlcpy ((char *) c->remote_disconnect_string, (char *) d->string,
559            sizeof (c->remote_disconnect_string));
560
561   /* on returning error, handle function will call memif_disconnect () */
562   DBG ("disconnect received: %s, mode: %d",
563        c->remote_disconnect_string, c->args.mode);
564   return MEMIF_ERR_DISCONNECT;
565 }
566
567 static int
568 memif_msg_receive_and_parse (memif_control_channel_t *cc)
569 {
570   char ctl[CMSG_SPACE (sizeof (int)) +
571            CMSG_SPACE (sizeof (struct ucred))] = { 0 };
572   struct msghdr mh = { 0 };
573   struct iovec iov[1];
574   memif_msg_t msg = { 0 };
575   ssize_t size;
576   int err = MEMIF_ERR_SUCCESS;  /* 0 */
577   int fd = -1;
578   int i;
579   memif_socket_t *ms = NULL;
580
581   iov[0].iov_base = (void *) &msg;
582   iov[0].iov_len = sizeof (memif_msg_t);
583   mh.msg_iov = iov;
584   mh.msg_iovlen = 1;
585   mh.msg_control = ctl;
586   mh.msg_controllen = sizeof (ctl);
587
588   DBG ("recvmsg fd %d", cc->fd);
589   size = recvmsg (cc->fd, &mh, 0);
590   if (size != sizeof (memif_msg_t))
591     {
592       if (size == 0)
593         return MEMIF_ERR_DISCONNECTED;
594       else
595         return MEMIF_ERR_MFMSG;
596     }
597
598   struct cmsghdr *cmsg;
599
600   cmsg = CMSG_FIRSTHDR (&mh);
601   while (cmsg)
602     {
603       if (cmsg->cmsg_level == SOL_SOCKET)
604         {
605           if (cmsg->cmsg_type == SCM_CREDENTIALS)
606             {
607               /* Do nothing */ ;
608             }
609           else if (cmsg->cmsg_type == SCM_RIGHTS)
610             {
611               int *fdp = (int *) CMSG_DATA (cmsg);
612               fd = *fdp;
613             }
614         }
615       cmsg = CMSG_NXTHDR (&mh, cmsg);
616     }
617
618   DBG ("Message type %u received", msg.type);
619
620   switch (msg.type)
621     {
622     case MEMIF_MSG_TYPE_ACK:
623       break;
624
625     case MEMIF_MSG_TYPE_HELLO:
626       if ((err = memif_msg_parse_hello (cc, &msg)) != MEMIF_ERR_SUCCESS)
627         return err;
628       if ((err = memif_init_regions_and_queues (cc->conn)) !=
629           MEMIF_ERR_SUCCESS)
630         return err;
631       if ((err = memif_msg_enq_init (cc)) != MEMIF_ERR_SUCCESS)
632         return err;
633       for (i = 0; i < cc->conn->regions_num; i++)
634         {
635           if ((err = memif_msg_enq_add_region (cc, i)) != MEMIF_ERR_SUCCESS)
636             return err;
637         }
638       for (i = 0; i < cc->conn->run_args.num_s2m_rings; i++)
639         {
640           if ((err = memif_msg_enq_add_ring (cc, i, MEMIF_RING_S2M)) !=
641               MEMIF_ERR_SUCCESS)
642             return err;
643         }
644       for (i = 0; i < cc->conn->run_args.num_m2s_rings; i++)
645         {
646           if ((err = memif_msg_enq_add_ring (cc, i, MEMIF_RING_M2S)) !=
647               MEMIF_ERR_SUCCESS)
648             return err;
649         }
650       if ((err = memif_msg_enq_connect (cc)) != MEMIF_ERR_SUCCESS)
651         return err;
652       break;
653
654     case MEMIF_MSG_TYPE_INIT:
655       if ((err = memif_msg_parse_init (cc, &msg)) != MEMIF_ERR_SUCCESS)
656         return err;
657       /* c->remote_pid = cr->pid */
658       /* c->remote_uid = cr->uid */
659       /* c->remote_gid = cr->gid */
660       if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS)
661         return err;
662       break;
663
664     case MEMIF_MSG_TYPE_ADD_REGION:
665       if ((err = memif_msg_parse_add_region (cc, &msg, fd)) !=
666           MEMIF_ERR_SUCCESS)
667         return err;
668       if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS)
669         return err;
670       break;
671
672     case MEMIF_MSG_TYPE_ADD_RING:
673       if ((err = memif_msg_parse_add_ring (cc, &msg, fd)) != MEMIF_ERR_SUCCESS)
674         return err;
675       if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS)
676         return err;
677       break;
678
679     case MEMIF_MSG_TYPE_CONNECT:
680       if ((err = memif_msg_parse_connect (cc, &msg)) != MEMIF_ERR_SUCCESS)
681         return err;
682       if ((err = memif_msg_enq_connected (cc)) != MEMIF_ERR_SUCCESS)
683         return err;
684       break;
685
686     case MEMIF_MSG_TYPE_CONNECTED:
687       if ((err = memif_msg_parse_connected (cc, &msg)) != MEMIF_ERR_SUCCESS)
688         return err;
689       break;
690
691     case MEMIF_MSG_TYPE_DISCONNECT:
692       if ((err = memif_msg_parse_disconnect (cc, &msg)) != MEMIF_ERR_SUCCESS)
693         return err;
694       break;
695
696     default:
697       return MEMIF_ERR_UNKNOWN_MSG;;
698       break;
699     }
700
701   return MEMIF_ERR_SUCCESS;     /* 0 */
702 }
703
704 void
705 memif_delete_control_channel (memif_control_channel_t *cc)
706 {
707   memif_msg_queue_elt_t *e, *next;
708   memif_socket_t *ms = cc->sock;
709   memif_fd_event_t fde;
710   void *ctx;
711
712   fde.fd = cc->fd;
713   fde.type = MEMIF_FD_EVENT_DEL;
714   ctx = ms->epfd != -1 ? ms : ms->private_ctx;
715   cc->sock->args.on_control_fd_update (fde, ctx);
716
717   if (cc->fd > 0)
718     close (cc->fd);
719
720   /* Clear control message queue */
721   for (e = TAILQ_FIRST (&cc->msg_queue); e != NULL; e = next)
722     {
723       next = TAILQ_NEXT (e, next);
724       TAILQ_REMOVE (&cc->msg_queue, e, next);
725       cc->sock->args.free (e);
726     }
727
728   /* remove reference */
729   if (cc->conn != NULL)
730     cc->conn->control_channel = NULL;
731   cc->conn = NULL;
732   cc->sock->args.free (cc);
733
734   return;
735 }
736
737 int
738 memif_control_channel_handler (memif_fd_event_type_t type, void *private_ctx)
739 {
740   memif_control_channel_t *cc = (memif_control_channel_t *) private_ctx;
741   int err;
742
743   /* Receive the message, parse the message and
744    * enqueue next message(s).
745    */
746   err = memif_msg_receive_and_parse (cc);
747   /* Can't assign to endpoint */
748   if (cc->conn == NULL)
749     {
750       /* A disconnect message is already in the queue */
751       memif_msg_send_from_queue (cc);
752       memif_delete_control_channel (cc);
753
754       return MEMIF_ERR_SUCCESS;
755     }
756   /* error in memif_msg_receive */
757   if (err != MEMIF_ERR_SUCCESS)
758     goto disconnect;
759
760   /* Continue connecting, send next message from the queue */
761   err = memif_msg_send_from_queue (cc);
762   if (err != MEMIF_ERR_SUCCESS)
763     goto disconnect;
764
765   return MEMIF_ERR_SUCCESS;
766
767 disconnect:
768   memif_disconnect_internal (cc->conn);
769   return MEMIF_ERR_SUCCESS;
770 }
771
772 int
773 memif_listener_handler (memif_fd_event_type_t type, void *private_ctx)
774 {
775   memif_socket_t *ms = (memif_socket_t *) private_ctx;
776   memif_control_channel_t *cc;
777   memif_fd_event_t fde;
778   memif_fd_event_data_t *fdata;
779   struct sockaddr_un un;
780   int err, sockfd, addr_len = sizeof (un);
781   void *ctx;
782
783   if (ms == NULL)
784     return MEMIF_ERR_INVAL_ARG;
785
786   if (type & MEMIF_FD_EVENT_READ)
787     {
788       /* Accept connection to the listener socket */
789       sockfd = accept (ms->listener_fd, (struct sockaddr *) &un,
790                        (socklen_t *) &addr_len);
791       if (sockfd < 0)
792         {
793           return memif_syscall_error_handler (errno);
794         }
795
796       /* Create new control channel */
797       cc = ms->args.alloc (sizeof (*cc));
798       if (cc == NULL)
799         {
800           err = MEMIF_ERR_NOMEM;
801           goto error;
802         }
803
804       cc->fd = sockfd;
805       /* The connection will be assigned after parsing MEMIF_MSG_TYPE_INIT msg
806        */
807       cc->conn = NULL;
808       cc->sock = ms;
809       TAILQ_INIT (&cc->msg_queue);
810
811       /* Create memif fd event */
812       fdata = ms->args.alloc (sizeof (*fdata));
813       if (fdata == NULL)
814         {
815           err = MEMIF_ERR_NOMEM;
816           goto error;
817         }
818
819       fdata->event_handler = memif_control_channel_handler;
820       fdata->private_ctx = cc;
821
822       fde.fd = sockfd;
823       fde.type = MEMIF_FD_EVENT_READ;
824       fde.private_ctx = fdata;
825
826       /* Start listenning for events on the new control channel */
827       ctx = ms->epfd != -1 ? ms : ms->private_ctx;
828       ms->args.on_control_fd_update (fde, ctx);
829
830       /* enqueue HELLO msg */
831       err = memif_msg_enq_hello (cc);
832       if (err != MEMIF_ERR_SUCCESS)
833         goto error;
834
835       /* send HELLO msg */
836       err = memif_msg_send_from_queue (cc);
837       if (err != MEMIF_ERR_SUCCESS)
838         goto error;
839     }
840
841   return MEMIF_ERR_SUCCESS;
842
843 error:
844   if (sockfd > 0)
845     close (sockfd);
846   if (cc != NULL)
847     ms->args.free (cc);
848   if (fdata != NULL)
849     ms->args.free (fdata);
850
851   return err;
852 }