Fix socketclient for VPP 19.08
[govpp.git] / adapter / socketclient / socketclient.go
index 2e6d0af..96f23e6 100644 (file)
@@ -47,7 +47,7 @@ var (
        DefaultDisconnectTimeout = time.Millisecond * 100
        // MaxWaitReady defines maximum duration before waiting for socket file
        // times out
-       MaxWaitReady = time.Second * 15
+       MaxWaitReady = time.Second * 10
        // ClientName is used for identifying client in socket registration
        ClientName = "govppsock"
 )
@@ -73,9 +73,10 @@ func init() {
 
 type vppClient struct {
        sockAddr string
-       conn     *net.UnixConn
-       reader   *bufio.Reader
-       writer   *bufio.Writer
+
+       conn   *net.UnixConn
+       reader *bufio.Reader
+       writer *bufio.Writer
 
        connectTimeout    time.Duration
        disconnectTimeout time.Duration
@@ -116,42 +117,43 @@ func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
 
 // WaitReady checks socket file existence and waits for it if necessary
 func (c *vppClient) WaitReady() error {
-       // check if file at the path already exists
+       // check if socket already exists
        if _, err := os.Stat(c.sockAddr); err == nil {
-               return nil
-       } else if os.IsExist(err) {
-               return err
+               return nil // socket exists, we are ready
+       } else if !os.IsNotExist(err) {
+               return err // some other error occurred
        }
 
-       // if not, watch for it
+       // socket does not exist, watch for it
        watcher, err := fsnotify.NewWatcher()
        if err != nil {
                return err
        }
        defer func() {
                if err := watcher.Close(); err != nil {
-                       Log.Errorf("failed to close file watcher: %v", err)
+                       Log.Warnf("failed to close file watcher: %v", err)
                }
        }()
 
-       // start watching directory
+       // start directory watcher
        if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
                return err
        }
 
+       timeout := time.NewTimer(MaxWaitReady)
        for {
                select {
-               case <-time.After(MaxWaitReady):
-                       return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
+               case <-timeout.C:
+                       return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
+
                case e := <-watcher.Errors:
                        return e
+
                case ev := <-watcher.Events:
                        Log.Debugf("watcher event: %+v", ev)
-                       if ev.Name == c.sockAddr {
-                               if (ev.Op & fsnotify.Create) == fsnotify.Create {
-                                       // socket was created, we are ready
-                                       return nil
-                               }
+                       if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
+                               // socket created, we are ready
+                               return nil
                        }
                }
        }
@@ -170,6 +172,7 @@ func (c *vppClient) Connect() error {
        }
 
        if err := c.open(); err != nil {
+               c.disconnect()
                return err
        }
 
@@ -180,6 +183,32 @@ func (c *vppClient) Connect() error {
        return nil
 }
 
+func (c *vppClient) Disconnect() error {
+       if c.conn == nil {
+               return nil
+       }
+       Log.Debugf("Disconnecting..")
+
+       close(c.quit)
+
+       if err := c.conn.CloseRead(); err != nil {
+               Log.Debugf("closing read failed: %v", err)
+       }
+
+       // wait for readerLoop to return
+       c.wg.Wait()
+
+       if err := c.close(); err != nil {
+               Log.Debugf("closing failed: %v", err)
+       }
+
+       if err := c.disconnect(); err != nil {
+               return err
+       }
+
+       return nil
+}
+
 func (c *vppClient) connect(sockAddr string) error {
        addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
 
@@ -188,7 +217,7 @@ func (c *vppClient) connect(sockAddr string) error {
                // we try different type of socket for backwards compatbility with VPP<=19.04
                if strings.Contains(err.Error(), "wrong type for socket") {
                        addr.Net = "unixpacket"
-                       Log.Debugf("%s, retrying connect with type unixpacket", err)
+                       Log.Warnf("%s, retrying connect with type unixpacket", err)
                        conn, err = net.DialUnix("unixpacket", nil, addr)
                }
                if err != nil {
@@ -198,11 +227,20 @@ func (c *vppClient) connect(sockAddr string) error {
        }
 
        c.conn = conn
+       Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
+
        c.reader = bufio.NewReader(c.conn)
        c.writer = bufio.NewWriter(c.conn)
 
-       Log.Debugf("Connected to socket: %v", addr)
+       return nil
+}
 
+func (c *vppClient) disconnect() error {
+       Log.Debugf("Closing socket")
+       if err := c.conn.Close(); err != nil {
+               Log.Debugln("Closing socket failed:", err)
+               return err
+       }
        return nil
 }
 
@@ -270,34 +308,6 @@ func (c *vppClient) open() error {
        return nil
 }
 
-func (c *vppClient) Disconnect() error {
-       if c.conn == nil {
-               return nil
-       }
-       Log.Debugf("Disconnecting..")
-
-       close(c.quit)
-
-       // force readerLoop to timeout
-       if err := c.conn.SetReadDeadline(time.Now()); err != nil {
-               return err
-       }
-
-       // wait for readerLoop to return
-       c.wg.Wait()
-
-       if err := c.close(); err != nil {
-               return err
-       }
-
-       if err := c.conn.Close(); err != nil {
-               Log.Debugln("Closing socket failed:", err)
-               return err
-       }
-
-       return nil
-}
-
 func (c *vppClient) close() error {
        msgCodec := new(codec.MsgCodec)
 
@@ -373,7 +383,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error {
        }
        copy(data[2:], buf.Bytes())
 
-       Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
+       Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
 
        if err := c.write(data); err != nil {
                Log.Debugln("write error: ", err)
@@ -403,27 +413,25 @@ func (c *vppClient) write(msg []byte) error {
                Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
        }
 
-       if err := c.writer.Flush(); err != nil {
-               return err
-       }
-
-       for i := 0; i <= len(msg)/c.writer.Size(); i++ {
-               x := i*c.writer.Size() + c.writer.Size()
+       writerSize := c.writer.Size()
+       for i := 0; i <= len(msg)/writerSize; i++ {
+               x := i*writerSize + writerSize
                if x > len(msg) {
                        x = len(msg)
                }
-               Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
-               if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
+               Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
+               if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
                        return err
                } else {
                        Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
                }
-               if err := c.writer.Flush(); err != nil {
-                       return err
-               }
-
+       }
+       if err := c.writer.Flush(); err != nil {
+               return err
        }
 
+       Log.Debugf(" -- write done")
+
        return nil
 }
 
@@ -435,6 +443,7 @@ type msgHeader struct {
 func (c *vppClient) readerLoop() {
        defer c.wg.Done()
        defer Log.Debugf("reader quit")
+
        for {
                select {
                case <-c.quit:
@@ -450,6 +459,7 @@ func (c *vppClient) readerLoop() {
                        Log.Debugf("read failed: %v", err)
                        continue
                }
+
                h := new(msgHeader)
                if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
                        Log.Debugf("unpacking header failed: %v", err)
@@ -468,22 +478,22 @@ type msgheader struct {
 }
 
 func (c *vppClient) read() ([]byte, error) {
-       Log.Debug("reading next msg..")
+       Log.Debug(" reading next msg..")
 
        header := make([]byte, 16)
 
        n, err := io.ReadAtLeast(c.reader, header, 16)
        if err != nil {
                return nil, err
-       } else if n == 0 {
+       }
+       if n == 0 {
                Log.Debugln("zero bytes header")
                return nil, nil
-       }
-       if n != 16 {
+       } else if n != 16 {
                Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
                return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
        }
-       Log.Debugf(" read header %d bytes: % 0X", n, header)
+       Log.Debugf(" read header %d bytes: % 0X", n, header)
 
        h := &msgheader{}
        if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
@@ -498,7 +508,7 @@ func (c *vppClient) read() ([]byte, error) {
        if err != nil {
                return nil, err
        }
-       Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
+       Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
 
        if msgLen > n {
                remain := msgLen - n
@@ -520,6 +530,8 @@ func (c *vppClient) read() ([]byte, error) {
                }
        }
 
+       Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
+
        return msg, nil
 }