Add statsclient - pure Go implementation for stats API
[govpp.git] / adapter / socketclient / socketclient.go
index 4c576c3..e56f89c 100644 (file)
@@ -1,3 +1,17 @@
+// Copyright (c) 2019 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package socketclient
 
 import (
@@ -13,32 +27,35 @@ import (
        "time"
 
        "github.com/fsnotify/fsnotify"
-
        "github.com/lunixbochs/struc"
        logger "github.com/sirupsen/logrus"
 
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/codec"
-       "git.fd.io/govpp.git/examples/bin_api/memclnt"
+       "git.fd.io/govpp.git/examples/binapi/memclnt"
 )
 
 const (
        // DefaultSocketName is default VPP API socket file name
        DefaultSocketName = "/run/vpp-api.sock"
-
-       sockCreateMsgId = 15          // hard-coded id for sockclnt_create message
-       govppClientName = "govppsock" // client name used for socket registration
 )
 
 var (
-       ConnectTimeout    = time.Second * 3
-       DisconnectTimeout = time.Second
-
+       // DefaultConnectTimeout is default timeout for connecting
+       DefaultConnectTimeout = time.Second * 3
+       // DefaultDisconnectTimeout is default timeout for discconnecting
+       DefaultDisconnectTimeout = time.Second
        // MaxWaitReady defines maximum duration before waiting for socket file
        // times out
        MaxWaitReady = time.Second * 15
+       // ClientName is used for identifying client in socket registration
+       ClientName = "govppsock"
+)
 
-       Debug       = os.Getenv("DEBUG_GOVPP_SOCK") != ""
+var (
+       // Debug is global variable that determines debug mode
+       Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
+       // DebugMsgIds is global variable that determines debug mode for msg ids
        DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
 
        Log = logger.New() // global logger
@@ -53,16 +70,22 @@ func init() {
 }
 
 type vppClient struct {
-       sockAddr     string
-       conn         *net.UnixConn
-       reader       *bufio.Reader
+       sockAddr string
+       conn     *net.UnixConn
+       reader   *bufio.Reader
+       writer   *bufio.Writer
+
+       connectTimeout    time.Duration
+       disconnectTimeout time.Duration
+
        cb           adapter.MsgCallback
        clientIndex  uint32
        msgTable     map[string]uint16
        sockDelMsgId uint16
        writeMu      sync.Mutex
-       quit         chan struct{}
-       wg           sync.WaitGroup
+
+       quit chan struct{}
+       wg   sync.WaitGroup
 }
 
 func NewVppClient(sockAddr string) *vppClient {
@@ -70,13 +93,23 @@ func NewVppClient(sockAddr string) *vppClient {
                sockAddr = DefaultSocketName
        }
        return &vppClient{
-               sockAddr: sockAddr,
-               cb:       nilCallback,
+               sockAddr:          sockAddr,
+               connectTimeout:    DefaultConnectTimeout,
+               disconnectTimeout: DefaultDisconnectTimeout,
+               cb: func(msgID uint16, data []byte) {
+                       Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
+               },
        }
 }
 
-func nilCallback(msgID uint16, data []byte) {
-       Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
+// SetConnectTimeout sets timeout used during connecting.
+func (c *vppClient) SetConnectTimeout(t time.Duration) {
+       c.connectTimeout = t
+}
+
+// SetDisconnectTimeout sets timeout used during disconnecting.
+func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
+       c.disconnectTimeout = t
 }
 
 // WaitReady checks socket file existence and waits for it if necessary
@@ -146,31 +179,42 @@ func (c *vppClient) Connect() error {
 }
 
 func (c *vppClient) connect(sockAddr string) error {
-       addr, err := net.ResolveUnixAddr("unixpacket", sockAddr)
-       if err != nil {
-               Log.Debugln("ResolveUnixAddr error:", err)
-               return err
-       }
+       addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
 
-       conn, err := net.DialUnix("unixpacket", nil, addr)
+       conn, err := net.DialUnix("unix", nil, addr)
        if err != nil {
-               Log.Debugln("Dial error:", err)
-               return err
+               // we try different type of socket for backwards compatbility with VPP<=19.04
+               if strings.Contains(err.Error(), "wrong type for socket") {
+                       addr.Net = "unixpacket"
+                       Log.Debugf("%s, retrying connect with type unixpacket", err)
+                       conn, err = net.DialUnix("unixpacket", nil, addr)
+               }
+               if err != nil {
+                       Log.Debugf("Connecting to socket %s failed: %s", addr, err)
+                       return err
+               }
        }
 
        c.conn = conn
        c.reader = bufio.NewReader(c.conn)
+       c.writer = bufio.NewWriter(c.conn)
 
        Log.Debugf("Connected to socket: %v", addr)
 
        return nil
 }
 
+const (
+       sockCreateMsgId  = 15 // hard-coded sockclnt_create message ID
+       createMsgContext = byte(123)
+       deleteMsgContext = byte(124)
+)
+
 func (c *vppClient) open() error {
        msgCodec := new(codec.MsgCodec)
 
        req := &memclnt.SockclntCreate{
-               Name: []byte(govppClientName),
+               Name: []byte(ClientName),
        }
        msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
        if err != nil {
@@ -178,14 +222,14 @@ func (c *vppClient) open() error {
                return err
        }
        // set non-0 context
-       msg[5] = 123
+       msg[5] = createMsgContext
 
        if err := c.write(msg); err != nil {
                Log.Debugln("Write error: ", err)
                return err
        }
 
-       readDeadline := time.Now().Add(ConnectTimeout)
+       readDeadline := time.Now().Add(c.connectTimeout)
        if err := c.conn.SetReadDeadline(readDeadline); err != nil {
                return err
        }
@@ -199,8 +243,6 @@ func (c *vppClient) open() error {
                return err
        }
 
-       //log.Printf("Client got (%d): % 0X", len(msgReply), msgReply)
-
        reply := new(memclnt.SockclntCreateReply)
        if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
                Log.Println("Decode error:", err)
@@ -247,7 +289,7 @@ func (c *vppClient) Disconnect() error {
        }
 
        if err := c.conn.Close(); err != nil {
-               Log.Debugln("Close socket conn failed:", err)
+               Log.Debugln("Closing socket failed:", err)
                return err
        }
 
@@ -266,15 +308,15 @@ func (c *vppClient) close() error {
                return err
        }
        // set non-0 context
-       msg[5] = 124
+       msg[5] = deleteMsgContext
 
-       Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg)
+       Log.Debugf("sending socklntDel (%d byes): % 0X", len(msg), msg)
        if err := c.write(msg); err != nil {
                Log.Debugln("Write error: ", err)
                return err
        }
 
-       readDeadline := time.Now().Add(DisconnectTimeout)
+       readDeadline := time.Now().Add(c.disconnectTimeout)
        if err := c.conn.SetReadDeadline(readDeadline); err != nil {
                return err
        }
@@ -313,7 +355,7 @@ func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
 }
 
 type reqHeader struct {
-       //MsgID uint16
+       // MsgID uint16
        ClientIndex uint32
        Context     uint32
 }
@@ -341,7 +383,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error {
 
 func (c *vppClient) write(msg []byte) error {
        h := &msgheader{
-               Data_len: uint32(len(msg)),
+               DataLen: uint32(len(msg)),
        }
        buf := new(bytes.Buffer)
        if err := struc.Pack(buf, h); err != nil {
@@ -353,17 +395,31 @@ func (c *vppClient) write(msg []byte) error {
        c.writeMu.Lock()
        defer c.writeMu.Unlock()
 
-       var w io.Writer = c.conn
-
-       if n, err := w.Write(header); err != nil {
+       if n, err := c.writer.Write(header); err != nil {
                return err
        } else {
                Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
        }
-       if n, err := w.Write(msg); err != nil {
+
+       if err := c.writer.Flush(); err != nil {
                return err
-       } else {
-               Log.Debugf(" - msg sent (%d/%d): % 0X", n, len(msg), msg)
+       }
+
+       for i := 0; i <= len(msg)/c.writer.Size(); i++ {
+               x := i*c.writer.Size() + c.writer.Size()
+               if x > len(msg) {
+                       x = len(msg)
+               }
+               Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
+               if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
+                       return err
+               } else {
+                       Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
+               }
+               if err := c.writer.Flush(); err != nil {
+                       return err
+               }
+
        }
 
        return nil
@@ -376,10 +432,10 @@ type msgHeader struct {
 
 func (c *vppClient) readerLoop() {
        defer c.wg.Done()
+       defer Log.Debugf("reader quit")
        for {
                select {
                case <-c.quit:
-                       Log.Debugf("reader quit")
                        return
                default:
                }
@@ -389,7 +445,7 @@ func (c *vppClient) readerLoop() {
                        if isClosedError(err) {
                                return
                        }
-                       Log.Debugf("READ FAILED: %v", err)
+                       Log.Debugf("read failed: %v", err)
                        continue
                }
                h := new(msgHeader)
@@ -404,10 +460,9 @@ func (c *vppClient) readerLoop() {
 }
 
 type msgheader struct {
-       Q                 int    `struc:"uint64"`
-       Data_len          uint32 `struc:"uint32"`
-       Gc_mark_timestamp uint32 `struc:"uint32"`
-       //data              [0]uint8
+       Q               int    `struc:"uint64"`
+       DataLen         uint32 `struc:"uint32"`
+       GcMarkTimestamp uint32 `struc:"uint32"`
 }
 
 func (c *vppClient) read() ([]byte, error) {
@@ -423,7 +478,7 @@ func (c *vppClient) read() ([]byte, error) {
                return nil, nil
        }
        if n != 16 {
-               Log.Debug("invalid header data (%d): % 0X", n, header[:n])
+               Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
                return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
        }
        Log.Debugf(" - read header %d bytes: % 0X", n, header)
@@ -434,7 +489,7 @@ func (c *vppClient) read() ([]byte, error) {
        }
        Log.Debugf(" - decoded header: %+v", h)
 
-       msgLen := int(h.Data_len)
+       msgLen := int(h.DataLen)
        msg := make([]byte, msgLen)
 
        n, err = c.reader.Read(msg)
@@ -449,7 +504,6 @@ func (c *vppClient) read() ([]byte, error) {
                view := msg[n:]
 
                for remain > 0 {
-
                        nbytes, err := c.reader.Read(view)
                        if err != nil {
                                return nil, err