Improve handling of probes on timeouts
[govpp.git] / core / request_handler.go
index e0235b9..8f793f5 100644 (file)
@@ -18,11 +18,16 @@ import (
        "errors"
        "fmt"
        "sync/atomic"
+       "time"
 
-       logger "github.com/Sirupsen/logrus"
+       logger "github.com/sirupsen/logrus"
 
        "git.fd.io/govpp.git/api"
-       "git.fd.io/govpp.git/core/bin_api/vpe"
+)
+
+var (
+       ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
+       ErrProbeTimeout = errors.New("probe reply not received within timeout period")
 )
 
 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
@@ -49,34 +54,34 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.connected) == 0 {
-               error := errors.New("not connected to VPP, ignoring the request")
-               log.Error(error)
-               sendReply(ch, &api.VppReply{Error: error})
-               return error
+               err := ErrNotConnected
+               log.Error(err)
+               sendReply(ch, &api.VppReply{Error: err})
+               return err
        }
 
        // retrieve message ID
        msgID, err := c.GetMessageID(req.Message)
        if err != nil {
-               error := fmt.Errorf("unable to retrieve message ID: %v", err)
+               err = fmt.Errorf("unable to retrieve message ID: %v", err)
                log.WithFields(logger.Fields{
                        "msg_name": req.Message.GetMessageName(),
                        "msg_crc":  req.Message.GetCrcString(),
                }).Error(err)
-               sendReply(ch, &api.VppReply{Error: error})
-               return error
+               sendReply(ch, &api.VppReply{Error: err})
+               return err
        }
 
        // encode the message into binary
        data, err := c.codec.EncodeMsg(req.Message, msgID)
        if err != nil {
-               error := fmt.Errorf("unable to encode the messge: %v", err)
+               err = fmt.Errorf("unable to encode the messge: %v", err)
                log.WithFields(logger.Fields{
                        "context": chMeta.id,
                        "msg_id":  msgID,
-               }).Error(error)
-               sendReply(ch, &api.VppReply{Error: error})
-               return error
+               }).Error(err)
+               sendReply(ch, &api.VppReply{Error: err})
+               return err
        }
 
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
@@ -84,6 +89,7 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
                        "context":  chMeta.id,
                        "msg_id":   msgID,
                        "msg_size": len(data),
+                       "msg_name": req.Message.GetMessageName(),
                }).Debug("Sending a message to VPP.")
        }
 
@@ -94,12 +100,20 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
        }
 
        // send the request to VPP
-       c.vpp.SendMsg(chMeta.id, data)
+       err = c.vpp.SendMsg(chMeta.id, data)
+       if err != nil {
+               err = fmt.Errorf("unable to send the messge: %v", err)
+               log.WithFields(logger.Fields{
+                       "context": chMeta.id,
+                       "msg_id":  msgID,
+               }).Error(err)
+               sendReply(ch, &api.VppReply{Error: err})
+               return err
+       }
 
        if req.Multipart {
                // send a control ping to determine end of the multipart response
-               ping := &vpe.ControlPing{}
-               pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
+               pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
 
                log.WithFields(logger.Fields{
                        "context":  chMeta.id,
@@ -163,6 +177,11 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
                Data:              data,
                LastReplyReceived: lastReplyReceived,
        })
+
+       // store actual time of this reply
+       conn.lastReplyLock.Lock()
+       conn.lastReply = time.Now()
+       conn.lastReplyLock.Unlock()
 }
 
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
@@ -171,8 +190,8 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
        select {
        case ch.ReplyChan <- reply:
                // reply sent successfully
-       default:
-               // unable to write into the channel without blocking
+       case <-time.After(time.Millisecond * 100):
+               // receiver still not ready
                log.WithFields(logger.Fields{
                        "channel": ch,
                        "msg_id":  reply.MessageID,
@@ -190,9 +209,11 @@ func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
 
 // messageNameToID returns message ID of a message identified by its name and CRC.
 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
+       msgKey := msgName + "_" + msgCrc
+
        // try to get the ID from the map
        c.msgIDsLock.RLock()
-       id, ok := c.msgIDs[msgName+msgCrc]
+       id, ok := c.msgIDs[msgKey]
        c.msgIDsLock.RUnlock()
        if ok {
                return id, nil
@@ -201,17 +222,35 @@ func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, err
        // get the ID using VPP API
        id, err := c.vpp.GetMsgID(msgName, msgCrc)
        if err != nil {
-               error := fmt.Errorf("unable to retrieve message ID: %v", err)
+               err = fmt.Errorf("unable to retrieve message ID: %v", err)
                log.WithFields(logger.Fields{
                        "msg_name": msgName,
                        "msg_crc":  msgCrc,
-               }).Errorf("unable to retrieve message ID: %v", err)
-               return id, error
+               }).Error(err)
+               return id, err
        }
 
        c.msgIDsLock.Lock()
-       c.msgIDs[msgName+msgCrc] = id
+       c.msgIDs[msgKey] = id
        c.msgIDsLock.Unlock()
 
        return id, nil
 }
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(ID uint16) (string, error) {
+       if c == nil {
+               return "", errors.New("nil connection passed in")
+       }
+
+       c.msgIDsLock.Lock()
+       defer c.msgIDsLock.Unlock()
+
+       for key, id := range c.msgIDs {
+               if id == ID {
+                       return key, nil
+               }
+       }
+
+       return "", fmt.Errorf("unknown message ID: %d", ID)
+}