1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
21 logger "github.com/sirupsen/logrus"
23 "git.fd.io/govpp.git/api"
26 // processNotifSubscribeRequest processes a notification subscribe request.
27 func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error {
30 // subscribe / unsubscribe
32 err = c.addNotifSubscription(req.Subscription)
34 err = c.removeNotifSubscription(req.Subscription)
37 // send the reply into the go channel
39 case ch.NotifSubsReplyChan <- err:
40 // reply sent successfully
42 // unable to write into the channel without blocking
43 log.WithFields(logger.Fields{
45 }).Warn("Unable to deliver the subscribe reply, reciever end not ready.")
51 // addNotifSubscription adds the notification subscription into the subscriptions map of the connection.
52 func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
53 // get message ID of the notification message
54 msgID, err := c.getSubscriptionMessageID(subs)
59 log.WithFields(logger.Fields{
62 }).Debug("Adding new notification subscription.")
64 // add the subscription into map
65 c.notifSubscriptionsLock.Lock()
66 defer c.notifSubscriptionsLock.Unlock()
68 c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs)
73 // removeNotifSubscription removes the notification subscription from the subscriptions map of the connection.
74 func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error {
75 // get message ID of the notification message
76 msgID, err := c.getSubscriptionMessageID(subs)
81 log.WithFields(logger.Fields{
84 }).Debug("Removing notification subscription.")
86 // remove the subscription from the map
87 c.notifSubscriptionsLock.Lock()
88 defer c.notifSubscriptionsLock.Unlock()
90 for i, item := range c.notifSubscriptions[msgID] {
92 // remove i-th item in the slice
93 c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...)
101 // isNotificationMessage returns true if someone has subscribed to provided message ID.
102 func (c *Connection) isNotificationMessage(msgID uint16) bool {
103 c.notifSubscriptionsLock.RLock()
104 defer c.notifSubscriptionsLock.RUnlock()
106 _, exists := c.notifSubscriptions[msgID]
110 // sendNotifications send a notification message to all subscribers subscribed for that message.
111 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
112 c.notifSubscriptionsLock.RLock()
113 defer c.notifSubscriptionsLock.RUnlock()
117 // send to notification to each subscriber
118 for _, subs := range c.notifSubscriptions[msgID] {
119 log.WithFields(logger.Fields{
121 "msg_size": len(data),
122 "subscription": subs,
123 }).Debug("Sending a notification to the subscription channel.")
125 msg := subs.MsgFactory()
126 err := c.codec.DecodeMsg(data, msg)
128 log.WithFields(logger.Fields{
130 "msg_size": len(data),
131 "subscription": subs,
132 }).Error("Unable to decode the notification message.")
136 // special case for the strange interface counters message
137 if msg.GetMessageName() == "vnet_interface_counters" {
138 v := reflect.ValueOf(msg).Elem().FieldByName("Data")
140 v.SetBytes(data[8:]) // include the Count and Data fields in the data
144 // send the message into the go channel of the subscription
146 case subs.NotifChan <- msg:
147 // message sent successfully
149 // unable to write into the channel without blocking
150 log.WithFields(logger.Fields{
152 "msg_size": len(data),
153 "subscription": subs,
154 }).Warn("Unable to deliver the notification, reciever end not ready.")
161 log.WithFields(logger.Fields{
163 "msg_size": len(data),
164 }).Debug("No subscription found for the notification message.")
168 // getSubscriptionMessageID returns ID of the message the subscription is tied to.
169 func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) {
170 msg := subs.MsgFactory()
171 msgID, err := c.GetMessageID(msg)
174 log.WithFields(logger.Fields{
175 "msg_name": msg.GetMessageName(),
176 "msg_crc": msg.GetCrcString(),
177 }).Errorf("unable to retrieve message ID: %v", err)
178 return 0, fmt.Errorf("unable to retrieve message ID: %v", err)