1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
21 "git.fd.io/govpp.git/api"
22 logger "github.com/sirupsen/logrus"
25 // processNotifSubscribeRequest processes a notification subscribe request.
26 func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error {
29 // subscribe / unsubscribe
31 err = c.addNotifSubscription(req.Subscription)
33 err = c.removeNotifSubscription(req.Subscription)
36 // send the reply into the go channel
38 case ch.notifSubsReplyChan <- err:
39 // reply sent successfully
41 // unable to write into the channel without blocking
42 log.WithFields(logger.Fields{
44 }).Warn("Unable to deliver the subscribe reply, reciever end not ready.")
50 // addNotifSubscription adds the notification subscription into the subscriptions map of the connection.
51 func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
52 // get message ID of the notification message
53 msgID, err := c.getSubscriptionMessageID(subs)
58 log.WithFields(logger.Fields{
61 }).Debug("Adding new notification subscription.")
63 // add the subscription into map
64 c.notifSubscriptionsLock.Lock()
65 defer c.notifSubscriptionsLock.Unlock()
67 c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs)
72 // removeNotifSubscription removes the notification subscription from the subscriptions map of the connection.
73 func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error {
74 // get message ID of the notification message
75 msgID, err := c.getSubscriptionMessageID(subs)
80 log.WithFields(logger.Fields{
83 }).Debug("Removing notification subscription.")
85 // remove the subscription from the map
86 c.notifSubscriptionsLock.Lock()
87 defer c.notifSubscriptionsLock.Unlock()
89 for i, item := range c.notifSubscriptions[msgID] {
91 // remove i-th item in the slice
92 c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...)
100 // isNotificationMessage returns true if someone has subscribed to provided message ID.
101 func (c *Connection) isNotificationMessage(msgID uint16) bool {
102 c.notifSubscriptionsLock.RLock()
103 defer c.notifSubscriptionsLock.RUnlock()
105 _, exists := c.notifSubscriptions[msgID]
109 // sendNotifications send a notification message to all subscribers subscribed for that message.
110 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
111 c.notifSubscriptionsLock.RLock()
112 defer c.notifSubscriptionsLock.RUnlock()
116 // send to notification to each subscriber
117 for _, subs := range c.notifSubscriptions[msgID] {
118 log.WithFields(logger.Fields{
120 "msg_size": len(data),
121 "subscription": subs,
122 }).Debug("Sending a notification to the subscription channel.")
124 msg := subs.MsgFactory()
125 err := c.codec.DecodeMsg(data, msg)
127 log.WithFields(logger.Fields{
129 "msg_size": len(data),
130 "subscription": subs,
131 }).Error("Unable to decode the notification message.")
135 // special case for the strange interface counters message
136 if msg.GetMessageName() == "vnet_interface_counters" {
137 v := reflect.ValueOf(msg).Elem().FieldByName("Data")
139 v.SetBytes(data[8:]) // include the Count and Data fields in the data
143 // send the message into the go channel of the subscription
145 case subs.NotifChan <- msg:
146 // message sent successfully
148 // unable to write into the channel without blocking
149 log.WithFields(logger.Fields{
151 "msg_size": len(data),
152 "subscription": subs,
153 }).Warn("Unable to deliver the notification, reciever end not ready.")
160 log.WithFields(logger.Fields{
162 "msg_size": len(data),
163 }).Debug("No subscription found for the notification message.")
167 // getSubscriptionMessageID returns ID of the message the subscription is tied to.
168 func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) {
169 msg := subs.MsgFactory()
170 msgID, err := c.GetMessageID(msg)
173 log.WithFields(logger.Fields{
174 "msg_name": msg.GetMessageName(),
175 "msg_crc": msg.GetCrcString(),
176 }).Errorf("unable to retrieve message ID: %v", err)
177 return 0, fmt.Errorf("unable to retrieve message ID: %v", err)