api: retry sending fd on EAGAIN
[vpp.git] / extras / gomemif / memif / control_channel.go
1 /*
2  *------------------------------------------------------------------
3  * Copyright (c) 2020 Cisco and/or its affiliates.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *------------------------------------------------------------------
16  */
17
18 package memif
19
20 import (
21         "bytes"
22         "container/list"
23         "encoding/binary"
24         "fmt"
25         "os"
26         "sync"
27         "syscall"
28 )
29
30 const maxEpollEvents = 1
31 const maxControlLen = 256
32
33 const errorFdNotFound = "fd not found"
34
35 // controlMsg represents a message used in communication between memif peers
36 type controlMsg struct {
37         Buffer *bytes.Buffer
38         Fd     int
39 }
40
41 // listener represents a listener functionality of UNIX domain socket
42 type listener struct {
43         socket *Socket
44         event  syscall.EpollEvent
45 }
46
47 // controlChannel represents a communication channel between memif peers
48 // backed by UNIX domain socket
49 type controlChannel struct {
50         listRef     *list.Element
51         socket      *Socket
52         i           *Interface
53         event       syscall.EpollEvent
54         data        [msgSize]byte
55         control     [maxControlLen]byte
56         controlLen  int
57         msgQueue    []controlMsg
58         isConnected bool
59 }
60
61 // Socket represents a UNIX domain socket used for communication
62 // between memif peers
63 type Socket struct {
64         appName       string
65         filename      string
66         listener      *listener
67         interfaceList *list.List
68         ccList        *list.List
69         epfd          int
70         wakeEvent     syscall.EpollEvent
71         stopPollChan  chan struct{}
72         wg            sync.WaitGroup
73 }
74
75 // StopPolling stops polling events on the socket
76 func (socket *Socket) StopPolling() error {
77         if socket.stopPollChan != nil {
78                 // stop polling msg
79                 close(socket.stopPollChan)
80                 // wake epoll
81                 buf := make([]byte, 8)
82                 binary.PutUvarint(buf, 1)
83                 n, err := syscall.Write(int(socket.wakeEvent.Fd), buf[:])
84                 if err != nil {
85                         return err
86                 }
87                 if n != 8 {
88                         return fmt.Errorf("Faild to write to eventfd")
89                 }
90                 // wait until polling is stopped
91                 socket.wg.Wait()
92         }
93
94         return nil
95 }
96
97 // StartPolling starts polling and handling events on the socket,
98 // enabling communication between memif peers
99 func (socket *Socket) StartPolling(errChan chan<- error) {
100         socket.stopPollChan = make(chan struct{})
101         socket.wg.Add(1)
102         go func() {
103                 var events [maxEpollEvents]syscall.EpollEvent
104                 defer socket.wg.Done()
105
106                 for {
107                         select {
108                         case <-socket.stopPollChan:
109                                 return
110                         default:
111                                 num, err := syscall.EpollWait(socket.epfd, events[:], -1)
112                                 if err != nil {
113                                         errChan <- fmt.Errorf("EpollWait: ", err)
114                                         return
115                                 }
116
117                                 for ev := 0; ev < num; ev++ {
118                                         if events[0].Fd == socket.wakeEvent.Fd {
119                                                 continue
120                                         }
121                                         err = socket.handleEvent(&events[0])
122                                         if err != nil {
123                                                 errChan <- fmt.Errorf("handleEvent: ", err)
124                                         }
125                                 }
126                         }
127                 }
128         }()
129 }
130
131 // addEvent adds event to epoll instance associated with the socket
132 func (socket *Socket) addEvent(event *syscall.EpollEvent) error {
133         err := syscall.EpollCtl(socket.epfd, syscall.EPOLL_CTL_ADD, int(event.Fd), event)
134         if err != nil {
135                 return fmt.Errorf("EpollCtl: %s", err)
136         }
137         return nil
138 }
139
140 // addEvent deletes event to epoll instance associated with the socket
141 func (socket *Socket) delEvent(event *syscall.EpollEvent) error {
142         err := syscall.EpollCtl(socket.epfd, syscall.EPOLL_CTL_DEL, int(event.Fd), event)
143         if err != nil {
144                 return fmt.Errorf("EpollCtl: %s", err)
145         }
146         return nil
147 }
148
149 // Delete deletes the socket
150 func (socket *Socket) Delete() (err error) {
151         for elt := socket.ccList.Front(); elt != nil; elt = elt.Next() {
152                 cc, ok := elt.Value.(*controlChannel)
153                 if ok {
154                         err = cc.close(true, "Socket deleted")
155                         if err != nil {
156                                 return nil
157                         }
158                 }
159         }
160         for elt := socket.interfaceList.Front(); elt != nil; elt = elt.Next() {
161                 i, ok := elt.Value.(*Interface)
162                 if ok {
163                         err = i.Delete()
164                         if err != nil {
165                                 return err
166                         }
167                 }
168         }
169
170         if socket.listener != nil {
171                 err = socket.listener.close()
172                 if err != nil {
173                         return err
174                 }
175                 err = os.Remove(socket.filename)
176                 if err != nil {
177                         return nil
178                 }
179         }
180
181         err = socket.delEvent(&socket.wakeEvent)
182         if err != nil {
183                 return fmt.Errorf("Failed to delete event: ", err)
184         }
185
186         syscall.Close(socket.epfd)
187
188         return nil
189 }
190
191 // NewSocket returns a new Socket
192 func NewSocket(appName string, filename string) (socket *Socket, err error) {
193         socket = &Socket{
194                 appName:       appName,
195                 filename:      filename,
196                 interfaceList: list.New(),
197                 ccList:        list.New(),
198         }
199         if socket.filename == "" {
200                 socket.filename = DefaultSocketFilename
201         }
202
203         socket.epfd, _ = syscall.EpollCreate1(0)
204
205         efd, err := eventFd()
206         socket.wakeEvent = syscall.EpollEvent{
207                 Events: syscall.EPOLLIN | syscall.EPOLLERR | syscall.EPOLLHUP,
208                 Fd:     int32(efd),
209         }
210         err = socket.addEvent(&socket.wakeEvent)
211         if err != nil {
212                 return nil, fmt.Errorf("Failed to add event: ", err)
213         }
214
215         return socket, nil
216 }
217
218 // handleEvent handles epoll event
219 func (socket *Socket) handleEvent(event *syscall.EpollEvent) error {
220         if socket.listener != nil && socket.listener.event.Fd == event.Fd {
221                 return socket.listener.handleEvent(event)
222         }
223
224         for elt := socket.ccList.Front(); elt != nil; elt = elt.Next() {
225                 cc, ok := elt.Value.(*controlChannel)
226                 if ok {
227                         if cc.event.Fd == event.Fd {
228                                 return cc.handleEvent(event)
229                         }
230                 }
231         }
232
233         return fmt.Errorf(errorFdNotFound)
234 }
235
236 // handleEvent handles epoll event for listener
237 func (l *listener) handleEvent(event *syscall.EpollEvent) error {
238         // hang up
239         if (event.Events & syscall.EPOLLHUP) == syscall.EPOLLHUP {
240                 err := l.close()
241                 if err != nil {
242                         return fmt.Errorf("Failed to close listener after hang up event: ", err)
243                 }
244                 return fmt.Errorf("Hang up: ", l.socket.filename)
245         }
246
247         // error
248         if (event.Events & syscall.EPOLLERR) == syscall.EPOLLERR {
249                 err := l.close()
250                 if err != nil {
251                         return fmt.Errorf("Failed to close listener after receiving an error event: ", err)
252                 }
253                 return fmt.Errorf("Received error event on listener ", l.socket.filename)
254         }
255
256         // read message
257         if (event.Events & syscall.EPOLLIN) == syscall.EPOLLIN {
258                 newFd, _, err := syscall.Accept(int(l.event.Fd))
259                 if err != nil {
260                         return fmt.Errorf("Accept: %s", err)
261                 }
262
263                 cc, err := l.socket.addControlChannel(newFd, nil)
264                 if err != nil {
265                         return fmt.Errorf("Failed to add control channel: %s", err)
266                 }
267
268                 err = cc.msgEnqHello()
269                 if err != nil {
270                         return fmt.Errorf("msgEnqHello: %s", err)
271                 }
272
273                 err = cc.sendMsg()
274                 if err != nil {
275                         return err
276                 }
277
278                 return nil
279         }
280
281         return fmt.Errorf("Unexpected event: ", event.Events)
282 }
283
284 // handleEvent handles epoll event for control channel
285 func (cc *controlChannel) handleEvent(event *syscall.EpollEvent) error {
286         var size int
287         var err error
288
289         // hang up
290         if (event.Events & syscall.EPOLLHUP) == syscall.EPOLLHUP {
291                 // close cc, don't send msg
292                 err := cc.close(false, "")
293                 if err != nil {
294                         return fmt.Errorf("Failed to close control channel after hang up event: ", err)
295                 }
296                 return fmt.Errorf("Hang up: ", cc.i.GetName())
297         }
298
299         if (event.Events & syscall.EPOLLERR) == syscall.EPOLLERR {
300                 // close cc, don't send msg
301                 err := cc.close(false, "")
302                 if err != nil {
303                         return fmt.Errorf("Failed to close control channel after receiving an error event: ", err)
304                 }
305                 return fmt.Errorf("Received error event on control channel ", cc.i.GetName())
306         }
307
308         if (event.Events & syscall.EPOLLIN) == syscall.EPOLLIN {
309                 size, cc.controlLen, _, _, err = syscall.Recvmsg(int(cc.event.Fd), cc.data[:], cc.control[:], 0)
310                 if err != nil {
311                         return fmt.Errorf("recvmsg: %s", err)
312                 }
313                 if size != msgSize {
314                         return fmt.Errorf("invalid message size %d", size)
315                 }
316
317                 err = cc.parseMsg()
318                 if err != nil {
319                         return err
320                 }
321
322                 err = cc.sendMsg()
323                 if err != nil {
324                         return err
325                 }
326
327                 return nil
328         }
329
330         return fmt.Errorf("Unexpected event: ", event.Events)
331 }
332
333 // close closes the listener
334 func (l *listener) close() error {
335         err := l.socket.delEvent(&l.event)
336         if err != nil {
337                 return fmt.Errorf("Failed to del event: ", err)
338         }
339         err = syscall.Close(int(l.event.Fd))
340         if err != nil {
341                 return fmt.Errorf("Failed to close socket: ", err)
342         }
343         return nil
344 }
345
346 // AddListener adds a lisntener to the socket. The fd must describe a
347 // UNIX domain socket already bound to a UNIX domain filename and
348 // marked as listener
349 func (socket *Socket) AddListener(fd int) (err error) {
350         l := &listener{
351                 // we will need this to look up master interface by id
352                 socket: socket,
353         }
354
355         l.event = syscall.EpollEvent{
356                 Events: syscall.EPOLLIN | syscall.EPOLLERR | syscall.EPOLLHUP,
357                 Fd:     int32(fd),
358         }
359         err = socket.addEvent(&l.event)
360         if err != nil {
361                 return fmt.Errorf("Failed to add event: ", err)
362         }
363
364         socket.listener = l
365
366         return nil
367 }
368
369 // addListener creates new UNIX domain socket, binds it to the address
370 // and marks it as listener
371 func (socket *Socket) addListener() (err error) {
372         // create socket
373         fd, err := syscall.Socket(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
374         if err != nil {
375                 return fmt.Errorf("Failed to create UNIX domain socket")
376         }
377         usa := &syscall.SockaddrUnix{Name: socket.filename}
378
379         // Bind to address and start listening
380         err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_PASSCRED, 1)
381         if err != nil {
382                 return fmt.Errorf("Failed to set socket option %s : %v", socket.filename, err)
383         }
384         err = syscall.Bind(fd, usa)
385         if err != nil {
386                 return fmt.Errorf("Failed to bind socket %s : %v", socket.filename, err)
387         }
388         err = syscall.Listen(fd, syscall.SOMAXCONN)
389         if err != nil {
390                 return fmt.Errorf("Failed to listen on socket %s : %v", socket.filename, err)
391         }
392
393         return socket.AddListener(fd)
394 }
395
396 // close closes a control channel, if the control channel is assigned an
397 // interface, the interface is disconnected
398 func (cc *controlChannel) close(sendMsg bool, str string) (err error) {
399         if sendMsg == true {
400                 // first clear message queue so that the disconnect
401                 // message is the only message in queue
402                 cc.msgQueue = []controlMsg{}
403                 cc.msgEnqDisconnect(str)
404
405                 err = cc.sendMsg()
406                 if err != nil {
407                         return err
408                 }
409         }
410
411         err = cc.socket.delEvent(&cc.event)
412         if err != nil {
413                 return fmt.Errorf("Failed to del event: ", err)
414         }
415
416         // remove referance form socket
417         cc.socket.ccList.Remove(cc.listRef)
418
419         if cc.i != nil {
420                 err = cc.i.disconnect()
421                 if err != nil {
422                         return fmt.Errorf("Interface Disconnect: ", err)
423                 }
424         }
425
426         return nil
427 }
428
429 //addControlChannel returns a new controlChannel and adds it to the socket
430 func (socket *Socket) addControlChannel(fd int, i *Interface) (*controlChannel, error) {
431         cc := &controlChannel{
432                 socket:      socket,
433                 i:           i,
434                 isConnected: false,
435         }
436
437         var err error
438
439         cc.event = syscall.EpollEvent{
440                 Events: syscall.EPOLLIN | syscall.EPOLLERR | syscall.EPOLLHUP,
441                 Fd:     int32(fd),
442         }
443         err = socket.addEvent(&cc.event)
444         if err != nil {
445                 return nil, fmt.Errorf("Failed to add event: ", err)
446         }
447
448         cc.listRef = socket.ccList.PushBack(cc)
449
450         return cc, nil
451 }
452
453 func (cc *controlChannel) msgEnqAck() (err error) {
454         buf := new(bytes.Buffer)
455         err = binary.Write(buf, binary.LittleEndian, msgTypeAck)
456
457         msg := controlMsg{
458                 Buffer: buf,
459                 Fd:     -1,
460         }
461
462         cc.msgQueue = append(cc.msgQueue, msg)
463
464         return nil
465 }
466
467 func (cc *controlChannel) msgEnqHello() (err error) {
468         hello := MsgHello{
469                 VersionMin:      Version,
470                 VersionMax:      Version,
471                 MaxRegion:       255,
472                 MaxRingM2S:      255,
473                 MaxRingS2M:      255,
474                 MaxLog2RingSize: 14,
475         }
476
477         copy(hello.Name[:], []byte(cc.socket.appName))
478
479         buf := new(bytes.Buffer)
480         err = binary.Write(buf, binary.LittleEndian, msgTypeHello)
481         err = binary.Write(buf, binary.LittleEndian, hello)
482
483         msg := controlMsg{
484                 Buffer: buf,
485                 Fd:     -1,
486         }
487
488         cc.msgQueue = append(cc.msgQueue, msg)
489
490         return nil
491 }
492
493 func (cc *controlChannel) parseHello() (err error) {
494         var hello MsgHello
495
496         buf := bytes.NewReader(cc.data[msgTypeSize:])
497         err = binary.Read(buf, binary.LittleEndian, &hello)
498         if err != nil {
499                 return
500         }
501
502         if hello.VersionMin > Version || hello.VersionMax < Version {
503                 return fmt.Errorf("Incompatible memif version")
504         }
505
506         cc.i.run = cc.i.args.MemoryConfig
507
508         cc.i.run.NumQueuePairs = min16(cc.i.args.MemoryConfig.NumQueuePairs, hello.MaxRingS2M)
509         cc.i.run.NumQueuePairs = min16(cc.i.args.MemoryConfig.NumQueuePairs, hello.MaxRingM2S)
510         cc.i.run.Log2RingSize = min8(cc.i.args.MemoryConfig.Log2RingSize, hello.MaxLog2RingSize)
511
512         cc.i.remoteName = string(hello.Name[:])
513
514         return nil
515 }
516
517 func (cc *controlChannel) msgEnqInit() (err error) {
518         init := MsgInit{
519                 Version: Version,
520                 Id:      cc.i.args.Id,
521                 Mode:    interfaceModeEthernet,
522         }
523
524         copy(init.Name[:], []byte(cc.socket.appName))
525
526         buf := new(bytes.Buffer)
527         err = binary.Write(buf, binary.LittleEndian, msgTypeInit)
528         err = binary.Write(buf, binary.LittleEndian, init)
529
530         msg := controlMsg{
531                 Buffer: buf,
532                 Fd:     -1,
533         }
534
535         cc.msgQueue = append(cc.msgQueue, msg)
536
537         return nil
538 }
539
540 func (cc *controlChannel) parseInit() (err error) {
541         var init MsgInit
542
543         buf := bytes.NewReader(cc.data[msgTypeSize:])
544         err = binary.Read(buf, binary.LittleEndian, &init)
545         if err != nil {
546                 return
547         }
548
549         if init.Version != Version {
550                 return fmt.Errorf("Incompatible memif driver version")
551         }
552
553         // find peer interface
554         for elt := cc.socket.interfaceList.Front(); elt != nil; elt = elt.Next() {
555                 i, ok := elt.Value.(*Interface)
556                 if ok {
557                         if i.args.Id == init.Id && i.args.IsMaster && i.cc == nil {
558                                 // verify secret
559                                 if i.args.Secret != init.Secret {
560                                         return fmt.Errorf("Invalid secret")
561                                 }
562                                 // interface is assigned to control channel
563                                 i.cc = cc
564                                 cc.i = i
565                                 cc.i.run = cc.i.args.MemoryConfig
566                                 cc.i.remoteName = string(init.Name[:])
567
568                                 return nil
569                         }
570                 }
571         }
572
573         return fmt.Errorf("Invalid interface id")
574 }
575
576 func (cc *controlChannel) msgEnqAddRegion(regionIndex uint16) (err error) {
577         if len(cc.i.regions) <= int(regionIndex) {
578                 return fmt.Errorf("Invalid region index")
579         }
580
581         addRegion := MsgAddRegion{
582                 Index: regionIndex,
583                 Size:  cc.i.regions[regionIndex].size,
584         }
585
586         buf := new(bytes.Buffer)
587         err = binary.Write(buf, binary.LittleEndian, msgTypeAddRegion)
588         err = binary.Write(buf, binary.LittleEndian, addRegion)
589
590         msg := controlMsg{
591                 Buffer: buf,
592                 Fd:     cc.i.regions[regionIndex].fd,
593         }
594
595         cc.msgQueue = append(cc.msgQueue, msg)
596
597         return nil
598 }
599
600 func (cc *controlChannel) parseAddRegion() (err error) {
601         var addRegion MsgAddRegion
602
603         buf := bytes.NewReader(cc.data[msgTypeSize:])
604         err = binary.Read(buf, binary.LittleEndian, &addRegion)
605         if err != nil {
606                 return
607         }
608
609         fd, err := cc.parseControlMsg()
610         if err != nil {
611                 return fmt.Errorf("parseControlMsg: %s", err)
612         }
613
614         if addRegion.Index > 255 {
615                 return fmt.Errorf("Invalid memory region index")
616         }
617
618         region := memoryRegion{
619                 size: addRegion.Size,
620                 fd:   fd,
621         }
622
623         cc.i.regions = append(cc.i.regions, region)
624
625         return nil
626 }
627
628 func (cc *controlChannel) msgEnqAddRing(ringType ringType, ringIndex uint16) (err error) {
629         var q Queue
630         var flags uint16 = 0
631
632         if ringType == ringTypeS2M {
633                 q = cc.i.txQueues[ringIndex]
634                 flags = msgAddRingFlagS2M
635         } else {
636                 q = cc.i.rxQueues[ringIndex]
637         }
638
639         addRing := MsgAddRing{
640                 Index:          ringIndex,
641                 Offset:         uint32(q.ring.offset),
642                 Region:         uint16(q.ring.region),
643                 RingSizeLog2:   uint8(q.ring.log2Size),
644                 Flags:          flags,
645                 PrivateHdrSize: 0,
646         }
647
648         buf := new(bytes.Buffer)
649         err = binary.Write(buf, binary.LittleEndian, msgTypeAddRing)
650         err = binary.Write(buf, binary.LittleEndian, addRing)
651
652         msg := controlMsg{
653                 Buffer: buf,
654                 Fd:     q.interruptFd,
655         }
656
657         cc.msgQueue = append(cc.msgQueue, msg)
658
659         return nil
660 }
661
662 func (cc *controlChannel) parseAddRing() (err error) {
663         var addRing MsgAddRing
664
665         buf := bytes.NewReader(cc.data[msgTypeSize:])
666         err = binary.Read(buf, binary.LittleEndian, &addRing)
667         if err != nil {
668                 return
669         }
670
671         fd, err := cc.parseControlMsg()
672         if err != nil {
673                 return err
674         }
675
676         if addRing.Index >= cc.i.run.NumQueuePairs {
677                 return fmt.Errorf("invalid ring index")
678         }
679
680         q := Queue{
681                 i:           cc.i,
682                 interruptFd: fd,
683         }
684
685         if (addRing.Flags & msgAddRingFlagS2M) == msgAddRingFlagS2M {
686                 q.ring = newRing(int(addRing.Region), ringTypeS2M, int(addRing.Offset), int(addRing.RingSizeLog2))
687                 cc.i.rxQueues = append(cc.i.rxQueues, q)
688         } else {
689                 q.ring = newRing(int(addRing.Region), ringTypeM2S, int(addRing.Offset), int(addRing.RingSizeLog2))
690                 cc.i.txQueues = append(cc.i.txQueues, q)
691         }
692
693         return nil
694 }
695
696 func (cc *controlChannel) msgEnqConnect() (err error) {
697         var connect MsgConnect
698         copy(connect.Name[:], []byte(cc.i.args.Name))
699
700         buf := new(bytes.Buffer)
701         err = binary.Write(buf, binary.LittleEndian, msgTypeConnect)
702         err = binary.Write(buf, binary.LittleEndian, connect)
703
704         msg := controlMsg{
705                 Buffer: buf,
706                 Fd:     -1,
707         }
708
709         cc.msgQueue = append(cc.msgQueue, msg)
710
711         return nil
712 }
713
714 func (cc *controlChannel) parseConnect() (err error) {
715         var connect MsgConnect
716
717         buf := bytes.NewReader(cc.data[msgTypeSize:])
718         err = binary.Read(buf, binary.LittleEndian, &connect)
719         if err != nil {
720                 return
721         }
722
723         cc.i.peerName = string(connect.Name[:])
724
725         err = cc.i.connect()
726         if err != nil {
727                 return err
728         }
729
730         cc.isConnected = true
731
732         return nil
733 }
734
735 func (cc *controlChannel) msgEnqConnected() (err error) {
736         var connected MsgConnected
737         copy(connected.Name[:], []byte(cc.i.args.Name))
738
739         buf := new(bytes.Buffer)
740         err = binary.Write(buf, binary.LittleEndian, msgTypeConnected)
741         err = binary.Write(buf, binary.LittleEndian, connected)
742
743         msg := controlMsg{
744                 Buffer: buf,
745                 Fd:     -1,
746         }
747
748         cc.msgQueue = append(cc.msgQueue, msg)
749
750         return nil
751 }
752
753 func (cc *controlChannel) parseConnected() (err error) {
754         var conn MsgConnected
755
756         buf := bytes.NewReader(cc.data[msgTypeSize:])
757         err = binary.Read(buf, binary.LittleEndian, &conn)
758         if err != nil {
759                 return
760         }
761
762         cc.i.peerName = string(conn.Name[:])
763
764         err = cc.i.connect()
765         if err != nil {
766                 return err
767         }
768
769         cc.isConnected = true
770
771         return nil
772 }
773
774 func (cc *controlChannel) msgEnqDisconnect(str string) (err error) {
775         dc := MsgDisconnect{
776                 // not implemented
777                 Code: 0,
778         }
779         copy(dc.String[:], str)
780
781         buf := new(bytes.Buffer)
782         err = binary.Write(buf, binary.LittleEndian, msgTypeDisconnect)
783         err = binary.Write(buf, binary.LittleEndian, dc)
784
785         msg := controlMsg{
786                 Buffer: buf,
787                 Fd:     -1,
788         }
789
790         cc.msgQueue = append(cc.msgQueue, msg)
791
792         return nil
793 }
794
795 func (cc *controlChannel) parseDisconnect() (err error) {
796         var dc MsgDisconnect
797
798         buf := bytes.NewReader(cc.data[msgTypeSize:])
799         err = binary.Read(buf, binary.LittleEndian, &dc)
800         if err != nil {
801                 return
802         }
803
804         err = cc.close(false, string(dc.String[:]))
805         if err != nil {
806                 return fmt.Errorf("Failed to disconnect control channel: ", err)
807         }
808
809         return nil
810 }
811
812 func (cc *controlChannel) parseMsg() error {
813         var msgType msgType
814         var err error
815
816         buf := bytes.NewReader(cc.data[:])
817         err = binary.Read(buf, binary.LittleEndian, &msgType)
818
819         if msgType == msgTypeAck {
820                 return nil
821         } else if msgType == msgTypeHello {
822                 // Configure
823                 err = cc.parseHello()
824                 if err != nil {
825                         goto error
826                 }
827                 // Initialize slave memif
828                 err = cc.i.initializeRegions()
829                 if err != nil {
830                         goto error
831                 }
832                 err = cc.i.initializeQueues()
833                 if err != nil {
834                         goto error
835                 }
836                 // Enqueue messages
837                 err = cc.msgEnqInit()
838                 if err != nil {
839                         goto error
840                 }
841                 for i := 0; i < len(cc.i.regions); i++ {
842                         err = cc.msgEnqAddRegion(uint16(i))
843                         if err != nil {
844                                 goto error
845                         }
846                 }
847                 for i := 0; uint16(i) < cc.i.run.NumQueuePairs; i++ {
848                         err = cc.msgEnqAddRing(ringTypeS2M, uint16(i))
849                         if err != nil {
850                                 goto error
851                         }
852                 }
853                 for i := 0; uint16(i) < cc.i.run.NumQueuePairs; i++ {
854                         err = cc.msgEnqAddRing(ringTypeM2S, uint16(i))
855                         if err != nil {
856                                 goto error
857                         }
858                 }
859                 err = cc.msgEnqConnect()
860                 if err != nil {
861                         goto error
862                 }
863         } else if msgType == msgTypeInit {
864                 err = cc.parseInit()
865                 if err != nil {
866                         goto error
867                 }
868
869                 err = cc.msgEnqAck()
870                 if err != nil {
871                         goto error
872                 }
873         } else if msgType == msgTypeAddRegion {
874                 err = cc.parseAddRegion()
875                 if err != nil {
876                         goto error
877                 }
878
879                 err = cc.msgEnqAck()
880                 if err != nil {
881                         goto error
882                 }
883         } else if msgType == msgTypeAddRing {
884                 err = cc.parseAddRing()
885                 if err != nil {
886                         goto error
887                 }
888
889                 err = cc.msgEnqAck()
890                 if err != nil {
891                         goto error
892                 }
893         } else if msgType == msgTypeConnect {
894                 err = cc.parseConnect()
895                 if err != nil {
896                         goto error
897                 }
898
899                 err = cc.msgEnqConnected()
900                 if err != nil {
901                         goto error
902                 }
903         } else if msgType == msgTypeConnected {
904                 err = cc.parseConnected()
905                 if err != nil {
906                         goto error
907                 }
908         } else if msgType == msgTypeDisconnect {
909                 err = cc.parseDisconnect()
910                 if err != nil {
911                         goto error
912                 }
913         } else {
914                 err = fmt.Errorf("unknown message %d", msgType)
915                 goto error
916         }
917
918         return nil
919
920 error:
921         err1 := cc.close(true, err.Error())
922         if err1 != nil {
923                 return fmt.Errorf(err.Error(), ": Failed to close control channel: ", err1)
924         }
925
926         return err
927 }
928
929 // parseControlMsg parses control message and returns file descriptor
930 // if any
931 func (cc *controlChannel) parseControlMsg() (fd int, err error) {
932         // Assert only called when we require FD
933         fd = -1
934
935         controlMsgs, err := syscall.ParseSocketControlMessage(cc.control[:cc.controlLen])
936         if err != nil {
937                 return -1, fmt.Errorf("syscall.ParseSocketControlMessage: %s", err)
938         }
939
940         if len(controlMsgs) == 0 {
941                 return -1, fmt.Errorf("Missing control message")
942         }
943
944         for _, cmsg := range controlMsgs {
945                 if cmsg.Header.Level == syscall.SOL_SOCKET {
946                         if cmsg.Header.Type == syscall.SCM_RIGHTS {
947                                 FDs, err := syscall.ParseUnixRights(&cmsg)
948                                 if err != nil {
949                                         return -1, fmt.Errorf("syscall.ParseUnixRights: %s", err)
950                                 }
951                                 if len(FDs) == 0 {
952                                         continue
953                                 }
954                                 // Only expect single FD
955                                 fd = FDs[0]
956                         }
957                 }
958         }
959
960         if fd == -1 {
961                 return -1, fmt.Errorf("Missing file descriptor")
962         }
963
964         return fd, nil
965 }