import (
"errors"
"fmt"
+ "reflect"
"sync/atomic"
"time"
logger "github.com/sirupsen/logrus"
+
+ "go.fd.io/govpp/api"
)
+var ReplyChannelTimeout = time.Millisecond * 100
+
var (
ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
ErrProbeTimeout = errors.New("probe reply not received within timeout period")
c.releaseAPIChannel(ch)
return
}
- c.processRequest(ch, req)
+ if err := c.processRequest(ch, req); err != nil {
+ sendReply(ch, &vppReply{
+ seqNum: req.seqNum,
+ err: fmt.Errorf("unable to process request: %w", err),
+ })
+ }
}
}
}
// processRequest processes a single request received on the request channel.
func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
// check whether we are connected to VPP
- if atomic.LoadUint32(&c.connected) == 0 {
+ if atomic.LoadUint32(&c.vppConnected) == 0 {
err := ErrNotConnected
- log.Errorf("processing request failed: %v", err)
- sendReplyError(ch, req, err)
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ "seq_num": req.seqNum,
+ "msg_name": req.msg.GetMessageName(),
+ "msg_crc": req.msg.GetCrcString(),
+ "error": err,
+ }).Warnf("Unable to process request")
return err
}
// retrieve message ID
msgID, err := c.GetMessageID(req.msg)
if err != nil {
- err = fmt.Errorf("unable to retrieve message ID: %v", err)
log.WithFields(logger.Fields{
+ "channel": ch.id,
"msg_name": req.msg.GetMessageName(),
"msg_crc": req.msg.GetCrcString(),
"seq_num": req.seqNum,
- }).Error(err)
- sendReplyError(ch, req, err)
+ "error": err,
+ }).Warnf("Unable to retrieve message ID")
return err
}
// encode the message into binary
data, err := c.codec.EncodeMsg(req.msg, msgID)
if err != nil {
- err = fmt.Errorf("unable to encode the messge: %v", err)
log.WithFields(logger.Fields{
"channel": ch.id,
"msg_id": msgID,
"msg_name": req.msg.GetMessageName(),
+ "msg_crc": req.msg.GetCrcString(),
"seq_num": req.seqNum,
- }).Error(err)
- sendReplyError(ch, req, err)
+ "error": err,
+ }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg)
return err
}
- // get context
context := packRequestContext(ch.id, req.multi, req.seqNum)
- if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
+
+ if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
"channel": ch.id,
- "context": context,
- "is_multi": req.multi,
"msg_id": msgID,
"msg_name": req.msg.GetMessageName(),
- "msg_size": len(data),
+ "msg_crc": req.msg.GetCrcString(),
"seq_num": req.seqNum,
- }).Debug(" -> Sending a message to VPP.")
+ "is_multi": req.multi,
+ "context": context,
+ "data_len": len(data),
+ }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg)
}
// send the request to VPP
- err = c.vpp.SendMsg(context, data)
+ t := time.Now()
+ err = c.vppClient.SendMsg(context, data)
if err != nil {
- err = fmt.Errorf("unable to send the message: %v", err)
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "seq_num": req.seqNum,
- }).Error(err)
- sendReplyError(ch, req, err)
+ "channel": ch.id,
+ "msg_id": msgID,
+ "msg_name": req.msg.GetMessageName(),
+ "msg_crc": req.msg.GetCrcString(),
+ "seq_num": req.seqNum,
+ "is_multi": req.multi,
+ "context": context,
+ "data_len": len(data),
+ "error": err,
+ }).Warnf("Unable to send message")
return err
}
+ c.trace(req.msg, ch.id, t, false)
if req.multi {
// send a control ping to determine end of the multipart response
- pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
+ pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
- log.WithFields(logger.Fields{
- "channel": ch.id,
- "context": context,
- "msg_id": c.pingReqID,
- "msg_size": len(pingData),
- "seq_num": req.seqNum,
- }).Debug(" -> Sending a control ping to VPP.")
+ if log.Level >= logger.DebugLevel {
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ "msg_id": c.pingReqID,
+ "msg_name": c.msgControlPing.GetMessageName(),
+ "msg_crc": c.msgControlPing.GetCrcString(),
+ "seq_num": req.seqNum,
+ "context": context,
+ "data_len": len(pingData),
+ }).Debugf(" -> SEND MSG: %T", c.msgControlPing)
+ }
- if err := c.vpp.SendMsg(context, pingData); err != nil {
+ t = time.Now()
+ if err := c.vppClient.SendMsg(context, pingData); err != nil {
log.WithFields(logger.Fields{
"context": context,
- "msg_id": msgID,
"seq_num": req.seqNum,
- }).Warnf("unable to send control ping: %v", err)
+ "error": err,
+ }).Warnf("unable to send control ping")
}
+ c.trace(c.msgControlPing, ch.id, t, false)
}
return nil
// msgCallback is called whenever any binary API message comes from VPP.
func (c *Connection) msgCallback(msgID uint16, data []byte) {
- connLock.RLock()
- defer connLock.RUnlock()
-
if c == nil {
- log.Warn("Already disconnected, ignoring the message.")
+ log.WithField(
+ "msg_id", msgID,
+ ).Warn("Connection already disconnected, ignoring the message.")
return
}
- msg, ok := c.msgMap[msgID]
- if !ok {
- log.Warnf("Unknown message received, ID: %d", msgID)
+ msg, err := c.getMessageByID(msgID)
+ if err != nil {
+ log.Warnln(err)
return
}
// - replies that don't have context as first field (comes as zero)
// - events that don't have context at all (comes as non zero)
//
- context, err := c.codec.DecodeMsgContext(data, msg)
+ context, err := c.codec.DecodeMsgContext(data, msg.GetMessageType())
if err != nil {
- log.Errorf("decoding context failed: %v", err)
+ log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err)
+ return
}
chanID, isMulti, seqNum := unpackRequestContext(context)
+
+ // decode and trace the message
+ msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ if err = c.codec.DecodeMsg(data, msg); err != nil {
+ log.WithField("msg", msg).Warnf("Unable to decode message: %v", err)
+ return
+ }
+ c.trace(msg, chanID, time.Now(), true)
+
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
"context": context,
"msg_id": msgID,
- "msg_name": msg.GetMessageName(),
"msg_size": len(data),
"channel": chanID,
"is_multi": isMulti,
"seq_num": seqNum,
- }).Debug(" <- Received a message from VPP.")
+ "msg_crc": msg.GetCrcString(),
+ }).Debugf("<-- govpp RECEIVE: %s", msg.GetMessageName())
}
if context == 0 || c.isNotificationMessage(msgID) {
// treat this as a last part of the reply
lastReplyReceived := isMulti && msgID == c.pingReplyID
- // send the data to the channel
+ // send the data to the channel, it needs to be copied,
+ // because it will be freed after this function returns
sendReply(ch, &vppReply{
msgID: msgID,
seqNum: seqNum,
- data: data,
+ data: append([]byte(nil), data...),
lastReceived: lastReplyReceived,
})
// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
// it logs the error and do not send the message.
func sendReply(ch *Channel, reply *vppReply) {
+ // first try to avoid creating timer
select {
case ch.replyChan <- reply:
- // reply sent successfully
- case <-time.After(time.Millisecond * 100):
+ return // reply sent ok
+ default:
+ // reply channel full
+ }
+ if ch.receiveReplyTimeout == 0 {
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ "msg_id": reply.msgID,
+ "seq_num": reply.seqNum,
+ "err": reply.err,
+ }).Warn("Reply channel full, dropping reply.")
+ return
+ }
+ select {
+ case ch.replyChan <- reply:
+ return // reply sent ok
+ case <-time.After(ch.receiveReplyTimeout):
// receiver still not ready
log.WithFields(logger.Fields{
- "channel": ch,
+ "channel": ch.id,
"msg_id": reply.msgID,
"seq_num": reply.seqNum,
- }).Warn("Unable to send the reply, reciever end not ready.")
+ "err": reply.err,
+ }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout)
}
}
-func sendReplyError(ch *Channel, req *vppRequest, err error) {
- sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
-}
-
// isNotificationMessage returns true if someone has subscribed to provided message ID.
func (c *Connection) isNotificationMessage(msgID uint16) bool {
c.subscriptionsLock.RLock()
"msg_name": sub.event.GetMessageName(),
"msg_id": msgID,
"msg_size": len(data),
- }).Errorf("Unable to decode the notification message: %v", err)
+ "error": err,
+ }).Warnf("Unable to decode the notification message")
continue
}
}
return 1
}
+
+// Returns message based on the message ID not depending on message path
+func (c *Connection) getMessageByID(msgID uint16) (msg api.Message, err error) {
+ var ok bool
+ for _, messages := range c.msgMapByPath {
+ if msg, ok = messages[msgID]; ok {
+ return msg, nil
+ }
+ }
+ return nil, fmt.Errorf("unknown message received, ID: %d", msgID)
+}