make api.Channel as interface
[govpp.git] / core / notification_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "fmt"
19         "reflect"
20
21         "git.fd.io/govpp.git/api"
22         logger "github.com/sirupsen/logrus"
23 )
24
25 // processNotifSubscribeRequest processes a notification subscribe request.
26 func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error {
27         var err error
28
29         // subscribe / unsubscribe
30         if req.Subscribe {
31                 err = c.addNotifSubscription(req.Subscription)
32         } else {
33                 err = c.removeNotifSubscription(req.Subscription)
34         }
35
36         // send the reply into the go channel
37         select {
38         case ch.notifSubsReplyChan <- err:
39                 // reply sent successfully
40         default:
41                 // unable to write into the channel without blocking
42                 log.WithFields(logger.Fields{
43                         "channel": ch,
44                 }).Warn("Unable to deliver the subscribe reply, reciever end not ready.")
45         }
46
47         return err
48 }
49
50 // addNotifSubscription adds the notification subscription into the subscriptions map of the connection.
51 func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
52         // get message ID of the notification message
53         msgID, err := c.getSubscriptionMessageID(subs)
54         if err != nil {
55                 return err
56         }
57
58         log.WithFields(logger.Fields{
59                 "msg_id":       msgID,
60                 "subscription": subs,
61         }).Debug("Adding new notification subscription.")
62
63         // add the subscription into map
64         c.notifSubscriptionsLock.Lock()
65         defer c.notifSubscriptionsLock.Unlock()
66
67         c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs)
68
69         return nil
70 }
71
72 // removeNotifSubscription removes the notification subscription from the subscriptions map of the connection.
73 func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error {
74         // get message ID of the notification message
75         msgID, err := c.getSubscriptionMessageID(subs)
76         if err != nil {
77                 return err
78         }
79
80         log.WithFields(logger.Fields{
81                 "msg_id":       msgID,
82                 "subscription": subs,
83         }).Debug("Removing notification subscription.")
84
85         // remove the subscription from the map
86         c.notifSubscriptionsLock.Lock()
87         defer c.notifSubscriptionsLock.Unlock()
88
89         for i, item := range c.notifSubscriptions[msgID] {
90                 if item == subs {
91                         // remove i-th item in the slice
92                         c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...)
93                         break
94                 }
95         }
96
97         return nil
98 }
99
100 // isNotificationMessage returns true if someone has subscribed to provided message ID.
101 func (c *Connection) isNotificationMessage(msgID uint16) bool {
102         c.notifSubscriptionsLock.RLock()
103         defer c.notifSubscriptionsLock.RUnlock()
104
105         _, exists := c.notifSubscriptions[msgID]
106         return exists
107 }
108
109 // sendNotifications send a notification message to all subscribers subscribed for that message.
110 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
111         c.notifSubscriptionsLock.RLock()
112         defer c.notifSubscriptionsLock.RUnlock()
113
114         matched := false
115
116         // send to notification to each subscriber
117         for _, subs := range c.notifSubscriptions[msgID] {
118                 log.WithFields(logger.Fields{
119                         "msg_id":       msgID,
120                         "msg_size":     len(data),
121                         "subscription": subs,
122                 }).Debug("Sending a notification to the subscription channel.")
123
124                 msg := subs.MsgFactory()
125                 err := c.codec.DecodeMsg(data, msg)
126                 if err != nil {
127                         log.WithFields(logger.Fields{
128                                 "msg_id":       msgID,
129                                 "msg_size":     len(data),
130                                 "subscription": subs,
131                         }).Error("Unable to decode the notification message.")
132                         continue
133                 }
134
135                 // special case for the strange interface counters message
136                 if msg.GetMessageName() == "vnet_interface_counters" {
137                         v := reflect.ValueOf(msg).Elem().FieldByName("Data")
138                         if v.IsValid() {
139                                 v.SetBytes(data[8:]) // include the Count and Data fields in the data
140                         }
141                 }
142
143                 // send the message into the go channel of the subscription
144                 select {
145                 case subs.NotifChan <- msg:
146                         // message sent successfully
147                 default:
148                         // unable to write into the channel without blocking
149                         log.WithFields(logger.Fields{
150                                 "msg_id":       msgID,
151                                 "msg_size":     len(data),
152                                 "subscription": subs,
153                         }).Warn("Unable to deliver the notification, reciever end not ready.")
154                 }
155
156                 matched = true
157         }
158
159         if !matched {
160                 log.WithFields(logger.Fields{
161                         "msg_id":   msgID,
162                         "msg_size": len(data),
163                 }).Debug("No subscription found for the notification message.")
164         }
165 }
166
167 // getSubscriptionMessageID returns ID of the message the subscription is tied to.
168 func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) {
169         msg := subs.MsgFactory()
170         msgID, err := c.GetMessageID(msg)
171
172         if err != nil {
173                 log.WithFields(logger.Fields{
174                         "msg_name": msg.GetMessageName(),
175                         "msg_crc":  msg.GetCrcString(),
176                 }).Errorf("unable to retrieve message ID: %v", err)
177                 return 0, fmt.Errorf("unable to retrieve message ID: %v", err)
178         }
179
180         return msgID, nil
181 }