- in VPP 19.08 the socket type has changed to STREAM and data has to
be writtento VPP with single flush, otherwise msg might get mixed
with next header and cause VPP to stop responding
- this also fixes WaitReady for socketclient and vppapiclient
Change-Id: I022724c0c09c9b92d4c695d1cf2be15994fff717
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
DefaultDisconnectTimeout = time.Millisecond * 100
// MaxWaitReady defines maximum duration before waiting for socket file
// times out
DefaultDisconnectTimeout = time.Millisecond * 100
// MaxWaitReady defines maximum duration before waiting for socket file
// times out
- MaxWaitReady = time.Second * 15
+ MaxWaitReady = time.Second * 10
// ClientName is used for identifying client in socket registration
ClientName = "govppsock"
)
// ClientName is used for identifying client in socket registration
ClientName = "govppsock"
)
type vppClient struct {
sockAddr string
type vppClient struct {
sockAddr string
- conn *net.UnixConn
- reader *bufio.Reader
- writer *bufio.Writer
+
+ conn *net.UnixConn
+ reader *bufio.Reader
+ writer *bufio.Writer
connectTimeout time.Duration
disconnectTimeout time.Duration
connectTimeout time.Duration
disconnectTimeout time.Duration
// WaitReady checks socket file existence and waits for it if necessary
func (c *vppClient) WaitReady() error {
// WaitReady checks socket file existence and waits for it if necessary
func (c *vppClient) WaitReady() error {
- // check if file at the path already exists
+ // check if socket already exists
if _, err := os.Stat(c.sockAddr); err == nil {
if _, err := os.Stat(c.sockAddr); err == nil {
- return nil
- } else if os.IsExist(err) {
- return err
+ return nil // socket exists, we are ready
+ } else if !os.IsNotExist(err) {
+ return err // some other error occurred
- // if not, watch for it
+ // socket does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
- Log.Errorf("failed to close file watcher: %v", err)
+ Log.Warnf("failed to close file watcher: %v", err)
- // start watching directory
+ // start directory watcher
if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
return err
}
if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
return err
}
+ timeout := time.NewTimer(MaxWaitReady)
- case <-time.After(MaxWaitReady):
- return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
+ case <-timeout.C:
+ return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
+
case e := <-watcher.Errors:
return e
case e := <-watcher.Errors:
return e
case ev := <-watcher.Events:
Log.Debugf("watcher event: %+v", ev)
case ev := <-watcher.Events:
Log.Debugf("watcher event: %+v", ev)
- if ev.Name == c.sockAddr {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // socket was created, we are ready
- return nil
- }
+ if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ // socket created, we are ready
+ return nil
}
if err := c.open(); err != nil {
}
if err := c.open(); err != nil {
+func (c *vppClient) Disconnect() error {
+ if c.conn == nil {
+ return nil
+ }
+ Log.Debugf("Disconnecting..")
+
+ close(c.quit)
+
+ if err := c.conn.CloseRead(); err != nil {
+ Log.Debugf("closing read failed: %v", err)
+ }
+
+ // wait for readerLoop to return
+ c.wg.Wait()
+
+ if err := c.close(); err != nil {
+ Log.Debugf("closing failed: %v", err)
+ }
+
+ if err := c.disconnect(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func (c *vppClient) connect(sockAddr string) error {
addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
func (c *vppClient) connect(sockAddr string) error {
addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
// we try different type of socket for backwards compatbility with VPP<=19.04
if strings.Contains(err.Error(), "wrong type for socket") {
addr.Net = "unixpacket"
// we try different type of socket for backwards compatbility with VPP<=19.04
if strings.Contains(err.Error(), "wrong type for socket") {
addr.Net = "unixpacket"
- Log.Debugf("%s, retrying connect with type unixpacket", err)
+ Log.Warnf("%s, retrying connect with type unixpacket", err)
conn, err = net.DialUnix("unixpacket", nil, addr)
}
if err != nil {
conn, err = net.DialUnix("unixpacket", nil, addr)
}
if err != nil {
+ Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
+
c.reader = bufio.NewReader(c.conn)
c.writer = bufio.NewWriter(c.conn)
c.reader = bufio.NewReader(c.conn)
c.writer = bufio.NewWriter(c.conn)
- Log.Debugf("Connected to socket: %v", addr)
+func (c *vppClient) disconnect() error {
+ Log.Debugf("Closing socket")
+ if err := c.conn.Close(); err != nil {
+ Log.Debugln("Closing socket failed:", err)
+ return err
+ }
-func (c *vppClient) Disconnect() error {
- if c.conn == nil {
- return nil
- }
- Log.Debugf("Disconnecting..")
-
- close(c.quit)
-
- // force readerLoop to timeout
- if err := c.conn.SetReadDeadline(time.Now()); err != nil {
- return err
- }
-
- // wait for readerLoop to return
- c.wg.Wait()
-
- if err := c.close(); err != nil {
- return err
- }
-
- if err := c.conn.Close(); err != nil {
- Log.Debugln("Closing socket failed:", err)
- return err
- }
-
- return nil
-}
-
func (c *vppClient) close() error {
msgCodec := new(codec.MsgCodec)
func (c *vppClient) close() error {
msgCodec := new(codec.MsgCodec)
}
copy(data[2:], buf.Bytes())
}
copy(data[2:], buf.Bytes())
- Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
+ Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
if err := c.write(data); err != nil {
Log.Debugln("write error: ", err)
if err := c.write(data); err != nil {
Log.Debugln("write error: ", err)
Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
}
Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
- for i := 0; i <= len(msg)/c.writer.Size(); i++ {
- x := i*c.writer.Size() + c.writer.Size()
+ writerSize := c.writer.Size()
+ for i := 0; i <= len(msg)/writerSize; i++ {
+ x := i*writerSize + writerSize
if x > len(msg) {
x = len(msg)
}
if x > len(msg) {
x = len(msg)
}
- Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
- if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
+ Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
+ if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
return err
} else {
Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
}
return err
} else {
Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
+ }
+ if err := c.writer.Flush(); err != nil {
+ return err
+ Log.Debugf(" -- write done")
+
func (c *vppClient) readerLoop() {
defer c.wg.Done()
defer Log.Debugf("reader quit")
func (c *vppClient) readerLoop() {
defer c.wg.Done()
defer Log.Debugf("reader quit")
for {
select {
case <-c.quit:
for {
select {
case <-c.quit:
Log.Debugf("read failed: %v", err)
continue
}
Log.Debugf("read failed: %v", err)
continue
}
h := new(msgHeader)
if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
Log.Debugf("unpacking header failed: %v", err)
h := new(msgHeader)
if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
Log.Debugf("unpacking header failed: %v", err)
}
func (c *vppClient) read() ([]byte, error) {
}
func (c *vppClient) read() ([]byte, error) {
- Log.Debug("reading next msg..")
+ Log.Debug(" reading next msg..")
header := make([]byte, 16)
n, err := io.ReadAtLeast(c.reader, header, 16)
if err != nil {
return nil, err
header := make([]byte, 16)
n, err := io.ReadAtLeast(c.reader, header, 16)
if err != nil {
return nil, err
Log.Debugln("zero bytes header")
return nil, nil
Log.Debugln("zero bytes header")
return nil, nil
Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
}
Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
}
- Log.Debugf(" - read header %d bytes: % 0X", n, header)
+ Log.Debugf(" read header %d bytes: % 0X", n, header)
h := &msgheader{}
if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
h := &msgheader{}
if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
- Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
+ Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
if msgLen > n {
remain := msgLen - n
if msgLen > n {
remain := msgLen - n
+ Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
+
var (
// MaxWaitReady defines maximum duration before waiting for shared memory
// segment times out
var (
// MaxWaitReady defines maximum duration before waiting for shared memory
// segment times out
- MaxWaitReady = time.Second * 15
+ MaxWaitReady = time.Second * 10
path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile)
}
path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile)
}
- // check if file at the path already exists
+ // check if file already exists
if _, err := os.Stat(path); err == nil {
if _, err := os.Stat(path); err == nil {
- // file exists, we are ready
- return nil
+ return nil // file exists, we are ready
} else if !os.IsNotExist(err) {
} else if !os.IsNotExist(err) {
+ return err // some other error occurred
}
// file does not exist, start watching folder
}
// file does not exist, start watching folder
+ timeout := time.NewTimer(MaxWaitReady)
- case <-time.After(MaxWaitReady):
- return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady)
+ case <-timeout.C:
+ return fmt.Errorf("timeout waiting (%s) for shm file: %s", MaxWaitReady, path)
+
case e := <-watcher.Errors:
return e
case e := <-watcher.Errors:
return e
case ev := <-watcher.Events:
case ev := <-watcher.Events:
- if ev.Name == path {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // file was created, we are ready
- return nil
- }
+ if ev.Name == path && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ // file created, we are ready
+ return nil
- "git.fd.io/govpp.git/api"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
+
+ "git.fd.io/govpp.git/api"
func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
for _, msg := range msgs {
func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
for _, msg := range msgs {
+ // TODO: collect all incompatible messages and return summarized error
_, err := ch.msgIdentifier.GetMessageID(msg)
if err != nil {
return err
_, err := ch.msgIdentifier.GetMessageID(msg)
if err != nil {
return err
case vppReply := <-ch.replyChan:
ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
if ignore {
case vppReply := <-ch.replyChan:
ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
if ignore {
- logrus.Warnf("ignoring reply: %+v", vppReply)
+ logrus.WithFields(logrus.Fields{
+ "expSeqNum": expSeqNum,
+ "channel": ch.id,
+ }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName())
continue
}
return lastReplyReceived, err
case <-timer.C:
continue
}
return lastReplyReceived, err
case <-timer.C:
+ logrus.WithFields(logrus.Fields{
+ "expSeqNum": expSeqNum,
+ "channel": ch.id,
+ }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
}
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
}
}
func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
}
func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
- DefaultReconnectInterval = time.Second // default interval between reconnect attempts
- DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
+ DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts
+ DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+ var reconnectAttempts int
// loop until connected
for {
// loop until connected
for {
break
} else if reconnectAttempts < c.maxAttempts {
reconnectAttempts++
break
} else if reconnectAttempts < c.maxAttempts {
reconnectAttempts++
- log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
time.Sleep(c.recInterval)
} else {
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
time.Sleep(c.recInterval)
} else {
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
"msg_size": len(data),
"seq_num": req.seqNum,
"msg_crc": req.msg.GetCrcString(),
"msg_size": len(data),
"seq_num": req.seqNum,
"msg_crc": req.msg.GetCrcString(),
- }).Debugf("--> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
+ }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
}
// send the request to VPP
}
// send the request to VPP
"msg_id": c.pingReqID,
"msg_size": len(pingData),
"seq_num": req.seqNum,
"msg_id": c.pingReqID,
"msg_size": len(pingData),
"seq_num": req.seqNum,
- }).Debug(" -> sending control ping")
+ }).Debug("--> sending control ping")
if err := c.vppClient.SendMsg(context, pingData); err != nil {
log.WithFields(logger.Fields{
if err := c.vppClient.SendMsg(context, pingData); err != nil {
log.WithFields(logger.Fields{
"is_multi": isMulti,
"seq_num": seqNum,
"msg_crc": msg.GetCrcString(),
"is_multi": isMulti,
"seq_num": seqNum,
"msg_crc": msg.GetCrcString(),
- }).Debugf("<-- govpp recv: %s", msg.GetMessageName())
+ }).Debugf("<== govpp recv: %s", msg.GetMessageName())
}
if context == 0 || c.isNotificationMessage(msgID) {
}
if context == 0 || c.isNotificationMessage(msgID) {