Recover possible panic in EncodeMsg and improve debug logs
[govpp.git] / core / request_handler.go
index fd6d100..e52e262 100644 (file)
@@ -29,7 +29,7 @@ var (
 )
 
 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *channel) {
+func (c *Connection) watchRequests(ch *Channel) {
        for {
                select {
                case req, ok := <-ch.reqChan:
@@ -39,54 +39,49 @@ func (c *Connection) watchRequests(ch *channel) {
                                c.releaseAPIChannel(ch)
                                return
                        }
-                       c.processRequest(ch, req)
-
-               case req := <-ch.notifSubsChan:
-                       // new request on the notification subscribe channel
-                       c.processSubscriptionRequest(ch, req)
+                       if err := c.processRequest(ch, req); err != nil {
+                               sendReplyError(ch, req, err)
+                       }
                }
        }
 }
 
 // processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
+func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.connected) == 0 {
                err := ErrNotConnected
                log.Errorf("processing request failed: %v", err)
-               sendReplyError(ch, req, err)
                return err
        }
 
        // retrieve message ID
        msgID, err := c.GetMessageID(req.msg)
        if err != nil {
-               err = fmt.Errorf("unable to retrieve message ID: %v", err)
                log.WithFields(logger.Fields{
                        "msg_name": req.msg.GetMessageName(),
                        "msg_crc":  req.msg.GetCrcString(),
                        "seq_num":  req.seqNum,
-               }).Error(err)
-               sendReplyError(ch, req, err)
-               return err
+                       "error":    err,
+               }).Errorf("failed to retrieve message ID")
+               return fmt.Errorf("unable to retrieve message ID: %v", err)
        }
 
        // encode the message into binary
        data, err := c.codec.EncodeMsg(req.msg, msgID)
        if err != nil {
-               err = fmt.Errorf("unable to encode the messge: %v", err)
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
                        "msg_id":   msgID,
                        "msg_name": req.msg.GetMessageName(),
                        "seq_num":  req.seqNum,
-               }).Error(err)
-               sendReplyError(ch, req, err)
-               return err
+                       "error":    err,
+               }).Errorf("failed to encode message: %#v", req.msg)
+               return fmt.Errorf("unable to encode the message: %v", err)
        }
 
-       // get context
        context := packRequestContext(ch.id, req.multi, req.seqNum)
+
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
@@ -108,7 +103,6 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
                        "msg_id":  msgID,
                        "seq_num": req.seqNum,
                }).Error(err)
-               sendReplyError(ch, req, err)
                return err
        }
 
@@ -137,7 +131,7 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
 }
 
 // msgCallback is called whenever any binary API message comes from VPP.
-func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
+func (c *Connection) msgCallback(msgID uint16, data []byte) {
        connLock.RLock()
        defer connLock.RUnlock()
 
@@ -157,13 +151,8 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
        // - replies that don't have context as first field (comes as zero)
        // - events that don't have context at all (comes as non zero)
        //
-       msgContext, err := c.codec.DecodeMsgContext(data, msg)
-       if err == nil {
-               if context != msgContext {
-                       log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext)
-                       context = msgContext
-               }
-       } else {
+       context, err := c.codec.DecodeMsgContext(data, msg)
+       if err != nil {
                log.Errorf("decoding context failed: %v", err)
        }
 
@@ -202,11 +191,12 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
        // treat this as a last part of the reply
        lastReplyReceived := isMulti && msgID == c.pingReplyID
 
-       // send the data to the channel
+       // send the data to the channel, it needs to be copied,
+       // because it will be freed after this function returns
        sendReply(ch, &vppReply{
                msgID:        msgID,
                seqNum:       seqNum,
-               data:         data,
+               data:         append([]byte(nil), data...),
                lastReceived: lastReplyReceived,
        })
 
@@ -218,7 +208,7 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
 
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
 // it logs the error and do not send the message.
-func sendReply(ch *channel, reply *vppReply) {
+func sendReply(ch *Channel, reply *vppReply) {
        select {
        case ch.replyChan <- reply:
                // reply sent successfully
@@ -232,10 +222,68 @@ func sendReply(ch *channel, reply *vppReply) {
        }
 }
 
-func sendReplyError(ch *channel, req *vppRequest, err error) {
+func sendReplyError(ch *Channel, req *vppRequest, err error) {
        sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
 }
 
+// isNotificationMessage returns true if someone has subscribed to provided message ID.
+func (c *Connection) isNotificationMessage(msgID uint16) bool {
+       c.subscriptionsLock.RLock()
+       defer c.subscriptionsLock.RUnlock()
+
+       _, exists := c.subscriptions[msgID]
+       return exists
+}
+
+// sendNotifications send a notification message to all subscribers subscribed for that message.
+func (c *Connection) sendNotifications(msgID uint16, data []byte) {
+       c.subscriptionsLock.RLock()
+       defer c.subscriptionsLock.RUnlock()
+
+       matched := false
+
+       // send to notification to each subscriber
+       for _, sub := range c.subscriptions[msgID] {
+               log.WithFields(logger.Fields{
+                       "msg_name": sub.event.GetMessageName(),
+                       "msg_id":   msgID,
+                       "msg_size": len(data),
+               }).Debug("Sending a notification to the subscription channel.")
+
+               event := sub.msgFactory()
+               if err := c.codec.DecodeMsg(data, event); err != nil {
+                       log.WithFields(logger.Fields{
+                               "msg_name": sub.event.GetMessageName(),
+                               "msg_id":   msgID,
+                               "msg_size": len(data),
+                       }).Errorf("Unable to decode the notification message: %v", err)
+                       continue
+               }
+
+               // send the message into the go channel of the subscription
+               select {
+               case sub.notifChan <- event:
+                       // message sent successfully
+               default:
+                       // unable to write into the channel without blocking
+                       log.WithFields(logger.Fields{
+                               "msg_name": sub.event.GetMessageName(),
+                               "msg_id":   msgID,
+                               "msg_size": len(data),
+                       }).Warn("Unable to deliver the notification, reciever end not ready.")
+               }
+
+               matched = true
+       }
+
+       if !matched {
+               log.WithFields(logger.Fields{
+                       "msg_id":   msgID,
+                       "msg_size": len(data),
+               }).Info("No subscription found for the notification message.")
+       }
+}
+
 // +------------------+-------------------+-----------------------+
 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
 // +------------------+-------------------+-----------------------+