Simplify subscribing to events and fix events
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21         "time"
22
23         logger "github.com/sirupsen/logrus"
24 )
25
26 var (
27         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
28         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
29 )
30
31 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
32 func (c *Connection) watchRequests(ch *Channel) {
33         for {
34                 select {
35                 case req, ok := <-ch.reqChan:
36                         // new request on the request channel
37                         if !ok {
38                                 // after closing the request channel, release API channel and return
39                                 c.releaseAPIChannel(ch)
40                                 return
41                         }
42                         c.processRequest(ch, req)
43                 }
44         }
45 }
46
47 // processRequest processes a single request received on the request channel.
48 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
49         // check whether we are connected to VPP
50         if atomic.LoadUint32(&c.connected) == 0 {
51                 err := ErrNotConnected
52                 log.Errorf("processing request failed: %v", err)
53                 sendReplyError(ch, req, err)
54                 return err
55         }
56
57         // retrieve message ID
58         msgID, err := c.GetMessageID(req.msg)
59         if err != nil {
60                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
61                 log.WithFields(logger.Fields{
62                         "msg_name": req.msg.GetMessageName(),
63                         "msg_crc":  req.msg.GetCrcString(),
64                         "seq_num":  req.seqNum,
65                 }).Error(err)
66                 sendReplyError(ch, req, err)
67                 return err
68         }
69
70         // encode the message into binary
71         data, err := c.codec.EncodeMsg(req.msg, msgID)
72         if err != nil {
73                 err = fmt.Errorf("unable to encode the messge: %v", err)
74                 log.WithFields(logger.Fields{
75                         "channel":  ch.id,
76                         "msg_id":   msgID,
77                         "msg_name": req.msg.GetMessageName(),
78                         "seq_num":  req.seqNum,
79                 }).Error(err)
80                 sendReplyError(ch, req, err)
81                 return err
82         }
83
84         // get context
85         context := packRequestContext(ch.id, req.multi, req.seqNum)
86         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
87                 log.WithFields(logger.Fields{
88                         "channel":  ch.id,
89                         "context":  context,
90                         "is_multi": req.multi,
91                         "msg_id":   msgID,
92                         "msg_name": req.msg.GetMessageName(),
93                         "msg_size": len(data),
94                         "seq_num":  req.seqNum,
95                 }).Debug(" -> Sending a message to VPP.")
96         }
97
98         // send the request to VPP
99         err = c.vpp.SendMsg(context, data)
100         if err != nil {
101                 err = fmt.Errorf("unable to send the message: %v", err)
102                 log.WithFields(logger.Fields{
103                         "context": context,
104                         "msg_id":  msgID,
105                         "seq_num": req.seqNum,
106                 }).Error(err)
107                 sendReplyError(ch, req, err)
108                 return err
109         }
110
111         if req.multi {
112                 // send a control ping to determine end of the multipart response
113                 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
114
115                 log.WithFields(logger.Fields{
116                         "channel":  ch.id,
117                         "context":  context,
118                         "msg_id":   c.pingReqID,
119                         "msg_size": len(pingData),
120                         "seq_num":  req.seqNum,
121                 }).Debug(" -> Sending a control ping to VPP.")
122
123                 if err := c.vpp.SendMsg(context, pingData); err != nil {
124                         log.WithFields(logger.Fields{
125                                 "context": context,
126                                 "msg_id":  msgID,
127                                 "seq_num": req.seqNum,
128                         }).Warnf("unable to send control ping: %v", err)
129                 }
130         }
131
132         return nil
133 }
134
135 // msgCallback is called whenever any binary API message comes from VPP.
136 func (c *Connection) msgCallback(msgID uint16, data []byte) {
137         connLock.RLock()
138         defer connLock.RUnlock()
139
140         if c == nil {
141                 log.Warn("Already disconnected, ignoring the message.")
142                 return
143         }
144
145         msg, ok := c.msgMap[msgID]
146         if !ok {
147                 log.Warnf("Unknown message received, ID: %d", msgID)
148                 return
149         }
150
151         // decode message context to fix for special cases of messages,
152         // for example:
153         // - replies that don't have context as first field (comes as zero)
154         // - events that don't have context at all (comes as non zero)
155         //
156         context, err := c.codec.DecodeMsgContext(data, msg)
157         if err != nil {
158                 log.Errorf("decoding context failed: %v", err)
159         }
160
161         chanID, isMulti, seqNum := unpackRequestContext(context)
162         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
163                 log.WithFields(logger.Fields{
164                         "context":  context,
165                         "msg_id":   msgID,
166                         "msg_name": msg.GetMessageName(),
167                         "msg_size": len(data),
168                         "channel":  chanID,
169                         "is_multi": isMulti,
170                         "seq_num":  seqNum,
171                 }).Debug(" <- Received a message from VPP.")
172         }
173
174         if context == 0 || c.isNotificationMessage(msgID) {
175                 // process the message as a notification
176                 c.sendNotifications(msgID, data)
177                 return
178         }
179
180         // match ch according to the context
181         c.channelsLock.RLock()
182         ch, ok := c.channels[chanID]
183         c.channelsLock.RUnlock()
184         if !ok {
185                 log.WithFields(logger.Fields{
186                         "channel": chanID,
187                         "msg_id":  msgID,
188                 }).Error("Channel ID not known, ignoring the message.")
189                 return
190         }
191
192         // if this is a control ping reply to a multipart request,
193         // treat this as a last part of the reply
194         lastReplyReceived := isMulti && msgID == c.pingReplyID
195
196         // send the data to the channel
197         sendReply(ch, &vppReply{
198                 msgID:        msgID,
199                 seqNum:       seqNum,
200                 data:         data,
201                 lastReceived: lastReplyReceived,
202         })
203
204         // store actual time of this reply
205         c.lastReplyLock.Lock()
206         c.lastReply = time.Now()
207         c.lastReplyLock.Unlock()
208 }
209
210 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
211 // it logs the error and do not send the message.
212 func sendReply(ch *Channel, reply *vppReply) {
213         select {
214         case ch.replyChan <- reply:
215                 // reply sent successfully
216         case <-time.After(time.Millisecond * 100):
217                 // receiver still not ready
218                 log.WithFields(logger.Fields{
219                         "channel": ch,
220                         "msg_id":  reply.msgID,
221                         "seq_num": reply.seqNum,
222                 }).Warn("Unable to send the reply, reciever end not ready.")
223         }
224 }
225
226 func sendReplyError(ch *Channel, req *vppRequest, err error) {
227         sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
228 }
229
230 // isNotificationMessage returns true if someone has subscribed to provided message ID.
231 func (c *Connection) isNotificationMessage(msgID uint16) bool {
232         c.subscriptionsLock.RLock()
233         defer c.subscriptionsLock.RUnlock()
234
235         _, exists := c.subscriptions[msgID]
236         return exists
237 }
238
239 // sendNotifications send a notification message to all subscribers subscribed for that message.
240 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
241         c.subscriptionsLock.RLock()
242         defer c.subscriptionsLock.RUnlock()
243
244         matched := false
245
246         // send to notification to each subscriber
247         for _, sub := range c.subscriptions[msgID] {
248                 log.WithFields(logger.Fields{
249                         "msg_name": sub.event.GetMessageName(),
250                         "msg_id":   msgID,
251                         "msg_size": len(data),
252                 }).Debug("Sending a notification to the subscription channel.")
253
254                 event := sub.msgFactory()
255                 if err := c.codec.DecodeMsg(data, event); err != nil {
256                         log.WithFields(logger.Fields{
257                                 "msg_name": sub.event.GetMessageName(),
258                                 "msg_id":   msgID,
259                                 "msg_size": len(data),
260                         }).Errorf("Unable to decode the notification message: %v", err)
261                         continue
262                 }
263
264                 // send the message into the go channel of the subscription
265                 select {
266                 case sub.notifChan <- event:
267                         // message sent successfully
268                 default:
269                         // unable to write into the channel without blocking
270                         log.WithFields(logger.Fields{
271                                 "msg_name": sub.event.GetMessageName(),
272                                 "msg_id":   msgID,
273                                 "msg_size": len(data),
274                         }).Warn("Unable to deliver the notification, reciever end not ready.")
275                 }
276
277                 matched = true
278         }
279
280         if !matched {
281                 log.WithFields(logger.Fields{
282                         "msg_id":   msgID,
283                         "msg_size": len(data),
284                 }).Info("No subscription found for the notification message.")
285         }
286 }
287
288 // +------------------+-------------------+-----------------------+
289 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
290 // +------------------+-------------------+-----------------------+
291 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
292         context := uint32(chanID) << 17
293         if isMultipart {
294                 context |= 1 << 16
295         }
296         context |= uint32(seqNum)
297         return context
298 }
299
300 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
301         chanID = uint16(context >> 17)
302         if ((context >> 16) & 0x1) != 0 {
303                 isMulipart = true
304         }
305         seqNum = uint16(context & 0xffff)
306         return
307 }
308
309 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
310 // or succeeds seq. number <seqNum2>.
311 // Since sequence numbers cycle in the finite set of size 2^16, the function
312 // must assume that the distance between compared sequence numbers is less than
313 // (2^16)/2 to determine the order.
314 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
315         // calculate distance from seqNum1 to seqNum2
316         var dist uint16
317         if seqNum1 <= seqNum2 {
318                 dist = seqNum2 - seqNum1
319         } else {
320                 dist = 0xffff - (seqNum1 - seqNum2 - 1)
321         }
322         if dist == 0 {
323                 return 0
324         } else if dist <= 0x8000 {
325                 return -1
326         }
327         return 1
328 }