89c16a4bad59d31a13ef7655399db02e1222280d
[govpp.git] / core / notification_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "fmt"
19         "reflect"
20
21         logger "github.com/sirupsen/logrus"
22
23         "git.fd.io/govpp.git/api"
24 )
25
26 // processNotifSubscribeRequest processes a notification subscribe request.
27 func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error {
28         var err error
29
30         // subscribe / unsubscribe
31         if req.Subscribe {
32                 err = c.addNotifSubscription(req.Subscription)
33         } else {
34                 err = c.removeNotifSubscription(req.Subscription)
35         }
36
37         // send the reply into the go channel
38         select {
39         case ch.NotifSubsReplyChan <- err:
40                 // reply sent successfully
41         default:
42                 // unable to write into the channel without blocking
43                 log.WithFields(logger.Fields{
44                         "channel": ch,
45                 }).Warn("Unable to deliver the subscribe reply, reciever end not ready.")
46         }
47
48         return err
49 }
50
51 // addNotifSubscription adds the notification subscription into the subscriptions map of the connection.
52 func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
53         // get message ID of the notification message
54         msgID, err := c.getSubscriptionMessageID(subs)
55         if err != nil {
56                 return err
57         }
58
59         log.WithFields(logger.Fields{
60                 "msg_id":       msgID,
61                 "subscription": subs,
62         }).Debug("Adding new notification subscription.")
63
64         // add the subscription into map
65         c.notifSubscriptionsLock.Lock()
66         defer c.notifSubscriptionsLock.Unlock()
67
68         c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs)
69
70         return nil
71 }
72
73 // removeNotifSubscription removes the notification subscription from the subscriptions map of the connection.
74 func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error {
75         // get message ID of the notification message
76         msgID, err := c.getSubscriptionMessageID(subs)
77         if err != nil {
78                 return err
79         }
80
81         log.WithFields(logger.Fields{
82                 "msg_id":       msgID,
83                 "subscription": subs,
84         }).Debug("Removing notification subscription.")
85
86         // remove the subscription from the map
87         c.notifSubscriptionsLock.Lock()
88         defer c.notifSubscriptionsLock.Unlock()
89
90         for i, item := range c.notifSubscriptions[msgID] {
91                 if item == subs {
92                         // remove i-th item in the slice
93                         c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...)
94                         break
95                 }
96         }
97
98         return nil
99 }
100
101 // isNotificationMessage returns true if someone has subscribed to provided message ID.
102 func (c *Connection) isNotificationMessage(msgID uint16) bool {
103         c.notifSubscriptionsLock.RLock()
104         defer c.notifSubscriptionsLock.RUnlock()
105
106         _, exists := c.notifSubscriptions[msgID]
107         return exists
108 }
109
110 // sendNotifications send a notification message to all subscribers subscribed for that message.
111 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
112         c.notifSubscriptionsLock.RLock()
113         defer c.notifSubscriptionsLock.RUnlock()
114
115         matched := false
116
117         // send to notification to each subscriber
118         for _, subs := range c.notifSubscriptions[msgID] {
119                 log.WithFields(logger.Fields{
120                         "msg_id":       msgID,
121                         "msg_size":     len(data),
122                         "subscription": subs,
123                 }).Debug("Sending a notification to the subscription channel.")
124
125                 msg := subs.MsgFactory()
126                 err := c.codec.DecodeMsg(data, msg)
127                 if err != nil {
128                         log.WithFields(logger.Fields{
129                                 "msg_id":       msgID,
130                                 "msg_size":     len(data),
131                                 "subscription": subs,
132                         }).Error("Unable to decode the notification message.")
133                         continue
134                 }
135
136                 // special case for the strange interface counters message
137                 if msg.GetMessageName() == "vnet_interface_counters" {
138                         v := reflect.ValueOf(msg).Elem().FieldByName("Data")
139                         if v.IsValid() {
140                                 v.SetBytes(data[8:]) // include the Count and Data fields in the data
141                         }
142                 }
143
144                 // send the message into the go channel of the subscription
145                 select {
146                 case subs.NotifChan <- msg:
147                         // message sent successfully
148                 default:
149                         // unable to write into the channel without blocking
150                         log.WithFields(logger.Fields{
151                                 "msg_id":       msgID,
152                                 "msg_size":     len(data),
153                                 "subscription": subs,
154                         }).Warn("Unable to deliver the notification, reciever end not ready.")
155                 }
156
157                 matched = true
158         }
159
160         if !matched {
161                 log.WithFields(logger.Fields{
162                         "msg_id":   msgID,
163                         "msg_size": len(data),
164                 }).Debug("No subscription found for the notification message.")
165         }
166 }
167
168 // getSubscriptionMessageID returns ID of the message the subscription is tied to.
169 func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) {
170         msg := subs.MsgFactory()
171         msgID, err := c.GetMessageID(msg)
172
173         if err != nil {
174                 log.WithFields(logger.Fields{
175                         "msg_name": msg.GetMessageName(),
176                         "msg_crc":  msg.GetCrcString(),
177                 }).Errorf("unable to retrieve message ID: %v", err)
178                 return 0, fmt.Errorf("unable to retrieve message ID: %v", err)
179         }
180
181         return msgID, nil
182 }