X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fnotification_handler.go;h=7b889e37ab26264dc9987fd64b6199fe298d80db;hb=a3bb834db727a3ac9a1ffcfeae9265e5dead851f;hp=c0e8687bd724c3b81a0cf02b0dbfb3b80fe63d02;hpb=da815585c3f75c4ac073b0766dd668abf83844d8;p=govpp.git diff --git a/core/notification_handler.go b/core/notification_handler.go index c0e8687..7b889e3 100644 --- a/core/notification_handler.go +++ b/core/notification_handler.go @@ -16,21 +16,20 @@ package core import ( "fmt" - "reflect" "git.fd.io/govpp.git/api" logger "github.com/sirupsen/logrus" ) -// processNotifSubscribeRequest processes a notification subscribe request. -func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error { +// processSubscriptionRequest processes a notification subscribe request. +func (c *Connection) processSubscriptionRequest(ch *channel, req *subscriptionRequest) error { var err error // subscribe / unsubscribe - if req.Subscribe { - err = c.addNotifSubscription(req.Subscription) + if req.subscribe { + err = c.addNotifSubscription(req.sub) } else { - err = c.removeNotifSubscription(req.Subscription) + err = c.removeNotifSubscription(req.sub) } // send the reply into the go channel @@ -40,7 +39,7 @@ func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSub default: // unable to write into the channel without blocking log.WithFields(logger.Fields{ - "channel": ch, + "channel": ch.id, }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") } @@ -50,14 +49,14 @@ func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSub // addNotifSubscription adds the notification subscription into the subscriptions map of the connection. func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) + msgID, msgName, err := c.getSubscriptionMessageID(subs) if err != nil { return err } log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, + "msg_name": msgName, + "msg_id": msgID, }).Debug("Adding new notification subscription.") // add the subscription into map @@ -72,14 +71,14 @@ func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { // removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) + msgID, msgName, err := c.getSubscriptionMessageID(subs) if err != nil { return err } log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, + "msg_name": msgName, + "msg_id": msgID, }).Debug("Removing notification subscription.") // remove the subscription from the map @@ -115,31 +114,22 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) { // send to notification to each subscriber for _, subs := range c.notifSubscriptions[msgID] { + msg := subs.MsgFactory() log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, + "msg_name": msg.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), }).Debug("Sending a notification to the subscription channel.") - msg := subs.MsgFactory() - err := c.codec.DecodeMsg(data, msg) - if err != nil { + if err := c.codec.DecodeMsg(data, msg); err != nil { log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Error("Unable to decode the notification message.") + "msg_name": msg.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Errorf("Unable to decode the notification message: %v", err) continue } - // special case for the strange interface counters message - if msg.GetMessageName() == "vnet_interface_counters" { - v := reflect.ValueOf(msg).Elem().FieldByName("Data") - if v.IsValid() { - v.SetBytes(data[8:]) // include the Count and Data fields in the data - } - } - // send the message into the go channel of the subscription select { case subs.NotifChan <- msg: @@ -147,9 +137,9 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) { default: // unable to write into the channel without blocking log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, + "msg_name": msg.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), }).Warn("Unable to deliver the notification, reciever end not ready.") } @@ -160,22 +150,21 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) { log.WithFields(logger.Fields{ "msg_id": msgID, "msg_size": len(data), - }).Debug("No subscription found for the notification message.") + }).Info("No subscription found for the notification message.") } } // getSubscriptionMessageID returns ID of the message the subscription is tied to. -func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) { +func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, string, error) { msg := subs.MsgFactory() msgID, err := c.GetMessageID(msg) - if err != nil { log.WithFields(logger.Fields{ "msg_name": msg.GetMessageName(), "msg_crc": msg.GetCrcString(), }).Errorf("unable to retrieve message ID: %v", err) - return 0, fmt.Errorf("unable to retrieve message ID: %v", err) + return 0, "", fmt.Errorf("unable to retrieve message ID: %v", err) } - return msgID, nil + return msgID, msg.GetMessageName(), nil }