Refactor GoVPP
[govpp.git] / core / request_handler.go
index 8681963..fd6d100 100644 (file)
@@ -21,8 +21,6 @@ import (
        "time"
 
        logger "github.com/sirupsen/logrus"
-
-       "git.fd.io/govpp.git/api"
 )
 
 var (
@@ -45,151 +43,182 @@ func (c *Connection) watchRequests(ch *channel) {
 
                case req := <-ch.notifSubsChan:
                        // new request on the notification subscribe channel
-                       c.processNotifSubscribeRequest(ch, req)
+                       c.processSubscriptionRequest(ch, req)
                }
        }
 }
 
 // processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.connected) == 0 {
                err := ErrNotConnected
-               log.Error(err)
-               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
+               log.Errorf("processing request failed: %v", err)
+               sendReplyError(ch, req, err)
                return err
        }
 
        // retrieve message ID
-       msgID, err := c.GetMessageID(req.Message)
+       msgID, err := c.GetMessageID(req.msg)
        if err != nil {
                err = fmt.Errorf("unable to retrieve message ID: %v", err)
                log.WithFields(logger.Fields{
-                       "msg_name": req.Message.GetMessageName(),
-                       "msg_crc":  req.Message.GetCrcString(),
-                       "seq_num":  req.SeqNum,
+                       "msg_name": req.msg.GetMessageName(),
+                       "msg_crc":  req.msg.GetCrcString(),
+                       "seq_num":  req.seqNum,
                }).Error(err)
-               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
+               sendReplyError(ch, req, err)
                return err
        }
 
        // encode the message into binary
-       data, err := c.codec.EncodeMsg(req.Message, msgID)
+       data, err := c.codec.EncodeMsg(req.msg, msgID)
        if err != nil {
                err = fmt.Errorf("unable to encode the messge: %v", err)
                log.WithFields(logger.Fields{
-                       "channel": ch.id,
-                       "msg_id":  msgID,
-                       "seq_num": req.SeqNum,
+                       "channel":  ch.id,
+                       "msg_id":   msgID,
+                       "msg_name": req.msg.GetMessageName(),
+                       "seq_num":  req.seqNum,
                }).Error(err)
-               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
+               sendReplyError(ch, req, err)
                return err
        }
 
+       // get context
+       context := packRequestContext(ch.id, req.multi, req.seqNum)
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
+                       "context":  context,
+                       "is_multi": req.multi,
                        "msg_id":   msgID,
+                       "msg_name": req.msg.GetMessageName(),
                        "msg_size": len(data),
-                       "msg_name": req.Message.GetMessageName(),
-                       "seq_num":  req.SeqNum,
-               }).Debug("Sending a message to VPP.")
+                       "seq_num":  req.seqNum,
+               }).Debug(" -> Sending a message to VPP.")
        }
 
        // send the request to VPP
-       context := packRequestContext(ch.id, req.Multipart, req.SeqNum)
        err = c.vpp.SendMsg(context, data)
        if err != nil {
                err = fmt.Errorf("unable to send the message: %v", err)
                log.WithFields(logger.Fields{
                        "context": context,
                        "msg_id":  msgID,
-                       "seq_num": req.SeqNum,
+                       "seq_num": req.seqNum,
                }).Error(err)
-               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
+               sendReplyError(ch, req, err)
                return err
        }
 
-       if req.Multipart {
+       if req.multi {
                // send a control ping to determine end of the multipart response
                pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
 
                log.WithFields(logger.Fields{
+                       "channel":  ch.id,
                        "context":  context,
                        "msg_id":   c.pingReqID,
                        "msg_size": len(pingData),
-                       "seq_num":  req.SeqNum,
-               }).Debug("Sending a control ping to VPP.")
-
-               c.vpp.SendMsg(context, pingData)
+                       "seq_num":  req.seqNum,
+               }).Debug(" -> Sending a control ping to VPP.")
+
+               if err := c.vpp.SendMsg(context, pingData); err != nil {
+                       log.WithFields(logger.Fields{
+                               "context": context,
+                               "msg_id":  msgID,
+                               "seq_num": req.seqNum,
+                       }).Warnf("unable to send control ping: %v", err)
+               }
        }
 
        return nil
 }
 
 // msgCallback is called whenever any binary API message comes from VPP.
-func msgCallback(context uint32, msgID uint16, data []byte) {
+func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
        connLock.RLock()
        defer connLock.RUnlock()
 
-       if conn == nil {
+       if c == nil {
                log.Warn("Already disconnected, ignoring the message.")
                return
        }
 
-       chanID, isMultipart, seqNum := unpackRequestContext(context)
+       msg, ok := c.msgMap[msgID]
+       if !ok {
+               log.Warnf("Unknown message received, ID: %d", msgID)
+               return
+       }
+
+       // decode message context to fix for special cases of messages,
+       // for example:
+       // - replies that don't have context as first field (comes as zero)
+       // - events that don't have context at all (comes as non zero)
+       //
+       msgContext, err := c.codec.DecodeMsgContext(data, msg)
+       if err == nil {
+               if context != msgContext {
+                       log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext)
+                       context = msgContext
+               }
+       } else {
+               log.Errorf("decoding context failed: %v", err)
+       }
+
+       chanID, isMulti, seqNum := unpackRequestContext(context)
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
-                       "msg_id":       msgID,
-                       "msg_size":     len(data),
-                       "channel_id":   chanID,
-                       "is_multipart": isMultipart,
-                       "seq_num":      seqNum,
-               }).Debug("Received a message from VPP.")
+                       "context":  context,
+                       "msg_id":   msgID,
+                       "msg_name": msg.GetMessageName(),
+                       "msg_size": len(data),
+                       "channel":  chanID,
+                       "is_multi": isMulti,
+                       "seq_num":  seqNum,
+               }).Debug(" <- Received a message from VPP.")
        }
 
-       if context == 0 || conn.isNotificationMessage(msgID) {
+       if context == 0 || c.isNotificationMessage(msgID) {
                // process the message as a notification
-               conn.sendNotifications(msgID, data)
+               c.sendNotifications(msgID, data)
                return
        }
 
        // match ch according to the context
-       conn.channelsLock.RLock()
-       ch, ok := conn.channels[chanID]
-       conn.channelsLock.RUnlock()
-
+       c.channelsLock.RLock()
+       ch, ok := c.channels[chanID]
+       c.channelsLock.RUnlock()
        if !ok {
                log.WithFields(logger.Fields{
-                       "channel_id": chanID,
-                       "msg_id":     msgID,
+                       "channel": chanID,
+                       "msg_id":  msgID,
                }).Error("Channel ID not known, ignoring the message.")
                return
        }
 
-       lastReplyReceived := false
-       // if this is a control ping reply to a multipart request, treat this as a last part of the reply
-       if msgID == conn.pingReplyID && isMultipart {
-               lastReplyReceived = true
-       }
+       // if this is a control ping reply to a multipart request,
+       // treat this as a last part of the reply
+       lastReplyReceived := isMulti && msgID == c.pingReplyID
 
        // send the data to the channel
-       sendReply(ch, &api.VppReply{
-               MessageID:         msgID,
-               SeqNum:            seqNum,
-               Data:              data,
-               LastReplyReceived: lastReplyReceived,
+       sendReply(ch, &vppReply{
+               msgID:        msgID,
+               seqNum:       seqNum,
+               data:         data,
+               lastReceived: lastReplyReceived,
        })
 
        // store actual time of this reply
-       conn.lastReplyLock.Lock()
-       conn.lastReply = time.Now()
-       conn.lastReplyLock.Unlock()
+       c.lastReplyLock.Lock()
+       c.lastReply = time.Now()
+       c.lastReplyLock.Unlock()
 }
 
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
 // it logs the error and do not send the message.
-func sendReply(ch *channel, reply *api.VppReply) {
+func sendReply(ch *channel, reply *vppReply) {
        select {
        case ch.replyChan <- reply:
                // reply sent successfully
@@ -197,66 +226,14 @@ func sendReply(ch *channel, reply *api.VppReply) {
                // receiver still not ready
                log.WithFields(logger.Fields{
                        "channel": ch,
-                       "msg_id":  reply.MessageID,
-                       "seq_num": reply.SeqNum,
+                       "msg_id":  reply.msgID,
+                       "seq_num": reply.seqNum,
                }).Warn("Unable to send the reply, reciever end not ready.")
        }
 }
 
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
-       if c == nil {
-               return 0, errors.New("nil connection passed in")
-       }
-       return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
-}
-
-// messageNameToID returns message ID of a message identified by its name and CRC.
-func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
-       msgKey := msgName + "_" + msgCrc
-
-       // try to get the ID from the map
-       c.msgIDsLock.RLock()
-       id, ok := c.msgIDs[msgKey]
-       c.msgIDsLock.RUnlock()
-       if ok {
-               return id, nil
-       }
-
-       // get the ID using VPP API
-       id, err := c.vpp.GetMsgID(msgName, msgCrc)
-       if err != nil {
-               err = fmt.Errorf("unable to retrieve message ID: %v", err)
-               log.WithFields(logger.Fields{
-                       "msg_name": msgName,
-                       "msg_crc":  msgCrc,
-               }).Error(err)
-               return id, err
-       }
-
-       c.msgIDsLock.Lock()
-       c.msgIDs[msgKey] = id
-       c.msgIDsLock.Unlock()
-
-       return id, nil
-}
-
-// LookupByID looks up message name and crc by ID.
-func (c *Connection) LookupByID(ID uint16) (string, error) {
-       if c == nil {
-               return "", errors.New("nil connection passed in")
-       }
-
-       c.msgIDsLock.Lock()
-       defer c.msgIDsLock.Unlock()
-
-       for key, id := range c.msgIDs {
-               if id == ID {
-                       return key, nil
-               }
-       }
-
-       return "", fmt.Errorf("unknown message ID: %d", ID)
+func sendReplyError(ch *channel, req *vppRequest, err error) {
+       sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
 }
 
 // +------------------+-------------------+-----------------------+
@@ -279,3 +256,24 @@ func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNu
        seqNum = uint16(context & 0xffff)
        return
 }
+
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+       // calculate distance from seqNum1 to seqNum2
+       var dist uint16
+       if seqNum1 <= seqNum2 {
+               dist = seqNum2 - seqNum1
+       } else {
+               dist = 0xffff - (seqNum1 - seqNum2 - 1)
+       }
+       if dist == 0 {
+               return 0
+       } else if dist <= 0x8000 {
+               return -1
+       }
+       return 1
+}