Merge "Introduce StatsAPI and it's initial implementation"
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21         "time"
22
23         logger "github.com/sirupsen/logrus"
24 )
25
26 var (
27         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
28         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
29 )
30
31 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
32 func (c *Connection) watchRequests(ch *Channel) {
33         for {
34                 select {
35                 case req, ok := <-ch.reqChan:
36                         // new request on the request channel
37                         if !ok {
38                                 // after closing the request channel, release API channel and return
39                                 c.releaseAPIChannel(ch)
40                                 return
41                         }
42                         if err := c.processRequest(ch, req); err != nil {
43                                 sendReplyError(ch, req, err)
44                         }
45                 }
46         }
47 }
48
49 // processRequest processes a single request received on the request channel.
50 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
51         // check whether we are connected to VPP
52         if atomic.LoadUint32(&c.vppConnected) == 0 {
53                 err := ErrNotConnected
54                 log.Errorf("processing request failed: %v", err)
55                 return err
56         }
57
58         // retrieve message ID
59         msgID, err := c.GetMessageID(req.msg)
60         if err != nil {
61                 log.WithFields(logger.Fields{
62                         "msg_name": req.msg.GetMessageName(),
63                         "msg_crc":  req.msg.GetCrcString(),
64                         "seq_num":  req.seqNum,
65                         "error":    err,
66                 }).Errorf("failed to retrieve message ID")
67                 return fmt.Errorf("unable to retrieve message ID: %v", err)
68         }
69
70         // encode the message into binary
71         data, err := c.codec.EncodeMsg(req.msg, msgID)
72         if err != nil {
73                 log.WithFields(logger.Fields{
74                         "channel":  ch.id,
75                         "msg_id":   msgID,
76                         "msg_name": req.msg.GetMessageName(),
77                         "seq_num":  req.seqNum,
78                         "error":    err,
79                 }).Errorf("failed to encode message: %#v", req.msg)
80                 return fmt.Errorf("unable to encode the message: %v", err)
81         }
82
83         context := packRequestContext(ch.id, req.multi, req.seqNum)
84
85         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
86                 log.WithFields(logger.Fields{
87                         "channel":  ch.id,
88                         "context":  context,
89                         "is_multi": req.multi,
90                         "msg_id":   msgID,
91                         "msg_name": req.msg.GetMessageName(),
92                         "msg_size": len(data),
93                         "seq_num":  req.seqNum,
94                 }).Debug(" -> Sending a message to VPP.")
95         }
96
97         // send the request to VPP
98         err = c.vppClient.SendMsg(context, data)
99         if err != nil {
100                 err = fmt.Errorf("unable to send the message: %v", err)
101                 log.WithFields(logger.Fields{
102                         "context": context,
103                         "msg_id":  msgID,
104                         "seq_num": req.seqNum,
105                 }).Error(err)
106                 return err
107         }
108
109         if req.multi {
110                 // send a control ping to determine end of the multipart response
111                 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
112
113                 log.WithFields(logger.Fields{
114                         "channel":  ch.id,
115                         "context":  context,
116                         "msg_id":   c.pingReqID,
117                         "msg_size": len(pingData),
118                         "seq_num":  req.seqNum,
119                 }).Debug(" -> Sending a control ping to VPP.")
120
121                 if err := c.vppClient.SendMsg(context, pingData); err != nil {
122                         log.WithFields(logger.Fields{
123                                 "context": context,
124                                 "msg_id":  msgID,
125                                 "seq_num": req.seqNum,
126                         }).Warnf("unable to send control ping: %v", err)
127                 }
128         }
129
130         return nil
131 }
132
133 // msgCallback is called whenever any binary API message comes from VPP.
134 func (c *Connection) msgCallback(msgID uint16, data []byte) {
135         connLock.RLock()
136         defer connLock.RUnlock()
137
138         if c == nil {
139                 log.Warn("Already disconnected, ignoring the message.")
140                 return
141         }
142
143         msg, ok := c.msgMap[msgID]
144         if !ok {
145                 log.Warnf("Unknown message received, ID: %d", msgID)
146                 return
147         }
148
149         // decode message context to fix for special cases of messages,
150         // for example:
151         // - replies that don't have context as first field (comes as zero)
152         // - events that don't have context at all (comes as non zero)
153         //
154         context, err := c.codec.DecodeMsgContext(data, msg)
155         if err != nil {
156                 log.Errorf("decoding context failed: %v", err)
157         }
158
159         chanID, isMulti, seqNum := unpackRequestContext(context)
160         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
161                 log.WithFields(logger.Fields{
162                         "context":  context,
163                         "msg_id":   msgID,
164                         "msg_name": msg.GetMessageName(),
165                         "msg_size": len(data),
166                         "channel":  chanID,
167                         "is_multi": isMulti,
168                         "seq_num":  seqNum,
169                 }).Debug(" <- Received a message from VPP.")
170         }
171
172         if context == 0 || c.isNotificationMessage(msgID) {
173                 // process the message as a notification
174                 c.sendNotifications(msgID, data)
175                 return
176         }
177
178         // match ch according to the context
179         c.channelsLock.RLock()
180         ch, ok := c.channels[chanID]
181         c.channelsLock.RUnlock()
182         if !ok {
183                 log.WithFields(logger.Fields{
184                         "channel": chanID,
185                         "msg_id":  msgID,
186                 }).Error("Channel ID not known, ignoring the message.")
187                 return
188         }
189
190         // if this is a control ping reply to a multipart request,
191         // treat this as a last part of the reply
192         lastReplyReceived := isMulti && msgID == c.pingReplyID
193
194         // send the data to the channel, it needs to be copied,
195         // because it will be freed after this function returns
196         sendReply(ch, &vppReply{
197                 msgID:        msgID,
198                 seqNum:       seqNum,
199                 data:         append([]byte(nil), data...),
200                 lastReceived: lastReplyReceived,
201         })
202
203         // store actual time of this reply
204         c.lastReplyLock.Lock()
205         c.lastReply = time.Now()
206         c.lastReplyLock.Unlock()
207 }
208
209 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
210 // it logs the error and do not send the message.
211 func sendReply(ch *Channel, reply *vppReply) {
212         select {
213         case ch.replyChan <- reply:
214                 // reply sent successfully
215         case <-time.After(time.Millisecond * 100):
216                 // receiver still not ready
217                 log.WithFields(logger.Fields{
218                         "channel": ch,
219                         "msg_id":  reply.msgID,
220                         "seq_num": reply.seqNum,
221                 }).Warn("Unable to send the reply, reciever end not ready.")
222         }
223 }
224
225 func sendReplyError(ch *Channel, req *vppRequest, err error) {
226         sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
227 }
228
229 // isNotificationMessage returns true if someone has subscribed to provided message ID.
230 func (c *Connection) isNotificationMessage(msgID uint16) bool {
231         c.subscriptionsLock.RLock()
232         defer c.subscriptionsLock.RUnlock()
233
234         _, exists := c.subscriptions[msgID]
235         return exists
236 }
237
238 // sendNotifications send a notification message to all subscribers subscribed for that message.
239 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
240         c.subscriptionsLock.RLock()
241         defer c.subscriptionsLock.RUnlock()
242
243         matched := false
244
245         // send to notification to each subscriber
246         for _, sub := range c.subscriptions[msgID] {
247                 log.WithFields(logger.Fields{
248                         "msg_name": sub.event.GetMessageName(),
249                         "msg_id":   msgID,
250                         "msg_size": len(data),
251                 }).Debug("Sending a notification to the subscription channel.")
252
253                 event := sub.msgFactory()
254                 if err := c.codec.DecodeMsg(data, event); err != nil {
255                         log.WithFields(logger.Fields{
256                                 "msg_name": sub.event.GetMessageName(),
257                                 "msg_id":   msgID,
258                                 "msg_size": len(data),
259                         }).Errorf("Unable to decode the notification message: %v", err)
260                         continue
261                 }
262
263                 // send the message into the go channel of the subscription
264                 select {
265                 case sub.notifChan <- event:
266                         // message sent successfully
267                 default:
268                         // unable to write into the channel without blocking
269                         log.WithFields(logger.Fields{
270                                 "msg_name": sub.event.GetMessageName(),
271                                 "msg_id":   msgID,
272                                 "msg_size": len(data),
273                         }).Warn("Unable to deliver the notification, reciever end not ready.")
274                 }
275
276                 matched = true
277         }
278
279         if !matched {
280                 log.WithFields(logger.Fields{
281                         "msg_id":   msgID,
282                         "msg_size": len(data),
283                 }).Info("No subscription found for the notification message.")
284         }
285 }
286
287 // +------------------+-------------------+-----------------------+
288 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
289 // +------------------+-------------------+-----------------------+
290 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
291         context := uint32(chanID) << 17
292         if isMultipart {
293                 context |= 1 << 16
294         }
295         context |= uint32(seqNum)
296         return context
297 }
298
299 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
300         chanID = uint16(context >> 17)
301         if ((context >> 16) & 0x1) != 0 {
302                 isMulipart = true
303         }
304         seqNum = uint16(context & 0xffff)
305         return
306 }
307
308 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
309 // or succeeds seq. number <seqNum2>.
310 // Since sequence numbers cycle in the finite set of size 2^16, the function
311 // must assume that the distance between compared sequence numbers is less than
312 // (2^16)/2 to determine the order.
313 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
314         // calculate distance from seqNum1 to seqNum2
315         var dist uint16
316         if seqNum1 <= seqNum2 {
317                 dist = seqNum2 - seqNum1
318         } else {
319                 dist = 0xffff - (seqNum1 - seqNum2 - 1)
320         }
321         if dist == 0 {
322                 return 0
323         } else if dist <= 0x8000 {
324                 return -1
325         }
326         return 1
327 }