feat: api-trace
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync/atomic"
22         "time"
23
24         logger "github.com/sirupsen/logrus"
25
26         "git.fd.io/govpp.git/api"
27 )
28
29 var ReplyChannelTimeout = time.Millisecond * 100
30
31 var (
32         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
33         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
34 )
35
36 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
37 func (c *Connection) watchRequests(ch *Channel) {
38         for {
39                 select {
40                 case req, ok := <-ch.reqChan:
41                         // new request on the request channel
42                         if !ok {
43                                 // after closing the request channel, release API channel and return
44                                 c.releaseAPIChannel(ch)
45                                 return
46                         }
47                         if err := c.processRequest(ch, req); err != nil {
48                                 sendReply(ch, &vppReply{
49                                         seqNum: req.seqNum,
50                                         err:    fmt.Errorf("unable to process request: %w", err),
51                                 })
52                         }
53                 }
54         }
55 }
56
57 // processRequest processes a single request received on the request channel.
58 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
59         // check whether we are connected to VPP
60         if atomic.LoadUint32(&c.vppConnected) == 0 {
61                 err := ErrNotConnected
62                 log.WithFields(logger.Fields{
63                         "channel":  ch.id,
64                         "seq_num":  req.seqNum,
65                         "msg_name": req.msg.GetMessageName(),
66                         "msg_crc":  req.msg.GetCrcString(),
67                         "error":    err,
68                 }).Warnf("Unable to process request")
69                 return err
70         }
71
72         // retrieve message ID
73         msgID, err := c.GetMessageID(req.msg)
74         if err != nil {
75                 log.WithFields(logger.Fields{
76                         "channel":  ch.id,
77                         "msg_name": req.msg.GetMessageName(),
78                         "msg_crc":  req.msg.GetCrcString(),
79                         "seq_num":  req.seqNum,
80                         "error":    err,
81                 }).Warnf("Unable to retrieve message ID")
82                 return err
83         }
84
85         // encode the message into binary
86         data, err := c.codec.EncodeMsg(req.msg, msgID)
87         if err != nil {
88                 log.WithFields(logger.Fields{
89                         "channel":  ch.id,
90                         "msg_id":   msgID,
91                         "msg_name": req.msg.GetMessageName(),
92                         "msg_crc":  req.msg.GetCrcString(),
93                         "seq_num":  req.seqNum,
94                         "error":    err,
95                 }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg)
96                 return err
97         }
98
99         context := packRequestContext(ch.id, req.multi, req.seqNum)
100
101         if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
102                 log.WithFields(logger.Fields{
103                         "channel":  ch.id,
104                         "msg_id":   msgID,
105                         "msg_name": req.msg.GetMessageName(),
106                         "msg_crc":  req.msg.GetCrcString(),
107                         "seq_num":  req.seqNum,
108                         "is_multi": req.multi,
109                         "context":  context,
110                         "data_len": len(data),
111                 }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg)
112         }
113
114         // send the request to VPP
115         t := time.Now()
116         err = c.vppClient.SendMsg(context, data)
117         if err != nil {
118                 log.WithFields(logger.Fields{
119                         "channel":  ch.id,
120                         "msg_id":   msgID,
121                         "msg_name": req.msg.GetMessageName(),
122                         "msg_crc":  req.msg.GetCrcString(),
123                         "seq_num":  req.seqNum,
124                         "is_multi": req.multi,
125                         "context":  context,
126                         "data_len": len(data),
127                         "error":    err,
128                 }).Warnf("Unable to send message")
129                 return err
130         }
131         c.trace(req.msg, ch.id, t, false)
132
133         if req.multi {
134                 // send a control ping to determine end of the multipart response
135                 pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
136
137                 if log.Level >= logger.DebugLevel {
138                         log.WithFields(logger.Fields{
139                                 "channel":  ch.id,
140                                 "msg_id":   c.pingReqID,
141                                 "msg_name": c.msgControlPing.GetMessageName(),
142                                 "msg_crc":  c.msgControlPing.GetCrcString(),
143                                 "seq_num":  req.seqNum,
144                                 "context":  context,
145                                 "data_len": len(pingData),
146                         }).Debugf(" -> SEND MSG: %T", c.msgControlPing)
147                 }
148
149                 t = time.Now()
150                 if err := c.vppClient.SendMsg(context, pingData); err != nil {
151                         log.WithFields(logger.Fields{
152                                 "context": context,
153                                 "seq_num": req.seqNum,
154                                 "error":   err,
155                         }).Warnf("unable to send control ping")
156                 }
157                 c.trace(c.msgControlPing, ch.id, t, false)
158         }
159
160         return nil
161 }
162
163 // msgCallback is called whenever any binary API message comes from VPP.
164 func (c *Connection) msgCallback(msgID uint16, data []byte) {
165         if c == nil {
166                 log.WithField(
167                         "msg_id", msgID,
168                 ).Warn("Connection already disconnected, ignoring the message.")
169                 return
170         }
171
172         msg, err := c.getMessageByID(msgID)
173         if err != nil {
174                 log.Warnln(err)
175                 return
176         }
177
178         // decode message context to fix for special cases of messages,
179         // for example:
180         // - replies that don't have context as first field (comes as zero)
181         // - events that don't have context at all (comes as non zero)
182         //
183         context, err := c.codec.DecodeMsgContext(data, msg.GetMessageType())
184         if err != nil {
185                 log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err)
186                 return
187         }
188
189         chanID, isMulti, seqNum := unpackRequestContext(context)
190
191         // decode and trace the message
192         msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
193         if err = c.codec.DecodeMsg(data, msg); err != nil {
194                 log.WithField("msg", msg).Warnf("Unable to decode message: %v", err)
195                 return
196         }
197         c.trace(msg, chanID, time.Now(), true)
198
199         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
200                 log.WithFields(logger.Fields{
201                         "context":  context,
202                         "msg_id":   msgID,
203                         "msg_size": len(data),
204                         "channel":  chanID,
205                         "is_multi": isMulti,
206                         "seq_num":  seqNum,
207                         "msg_crc":  msg.GetCrcString(),
208                 }).Debugf("<-- govpp RECEIVE: %s", msg.GetMessageName())
209         }
210
211         if context == 0 || c.isNotificationMessage(msgID) {
212                 // process the message as a notification
213                 c.sendNotifications(msgID, data)
214                 return
215         }
216
217         // match ch according to the context
218         c.channelsLock.RLock()
219         ch, ok := c.channels[chanID]
220         c.channelsLock.RUnlock()
221         if !ok {
222                 log.WithFields(logger.Fields{
223                         "channel": chanID,
224                         "msg_id":  msgID,
225                 }).Error("Channel ID not known, ignoring the message.")
226                 return
227         }
228
229         // if this is a control ping reply to a multipart request,
230         // treat this as a last part of the reply
231         lastReplyReceived := isMulti && msgID == c.pingReplyID
232
233         // send the data to the channel, it needs to be copied,
234         // because it will be freed after this function returns
235         sendReply(ch, &vppReply{
236                 msgID:        msgID,
237                 seqNum:       seqNum,
238                 data:         append([]byte(nil), data...),
239                 lastReceived: lastReplyReceived,
240         })
241
242         // store actual time of this reply
243         c.lastReplyLock.Lock()
244         c.lastReply = time.Now()
245         c.lastReplyLock.Unlock()
246 }
247
248 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
249 // it logs the error and do not send the message.
250 func sendReply(ch *Channel, reply *vppReply) {
251         // first try to avoid creating timer
252         select {
253         case ch.replyChan <- reply:
254                 return // reply sent ok
255         default:
256                 // reply channel full
257         }
258         if ch.receiveReplyTimeout == 0 {
259                 log.WithFields(logger.Fields{
260                         "channel": ch.id,
261                         "msg_id":  reply.msgID,
262                         "seq_num": reply.seqNum,
263                         "err":     reply.err,
264                 }).Warn("Reply channel full, dropping reply.")
265                 return
266         }
267         select {
268         case ch.replyChan <- reply:
269                 return // reply sent ok
270         case <-time.After(ch.receiveReplyTimeout):
271                 // receiver still not ready
272                 log.WithFields(logger.Fields{
273                         "channel": ch.id,
274                         "msg_id":  reply.msgID,
275                         "seq_num": reply.seqNum,
276                         "err":     reply.err,
277                 }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout)
278         }
279 }
280
281 // isNotificationMessage returns true if someone has subscribed to provided message ID.
282 func (c *Connection) isNotificationMessage(msgID uint16) bool {
283         c.subscriptionsLock.RLock()
284         defer c.subscriptionsLock.RUnlock()
285
286         _, exists := c.subscriptions[msgID]
287         return exists
288 }
289
290 // sendNotifications send a notification message to all subscribers subscribed for that message.
291 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
292         c.subscriptionsLock.RLock()
293         defer c.subscriptionsLock.RUnlock()
294
295         matched := false
296
297         // send to notification to each subscriber
298         for _, sub := range c.subscriptions[msgID] {
299                 log.WithFields(logger.Fields{
300                         "msg_name": sub.event.GetMessageName(),
301                         "msg_id":   msgID,
302                         "msg_size": len(data),
303                 }).Debug("Sending a notification to the subscription channel.")
304
305                 event := sub.msgFactory()
306                 if err := c.codec.DecodeMsg(data, event); err != nil {
307                         log.WithFields(logger.Fields{
308                                 "msg_name": sub.event.GetMessageName(),
309                                 "msg_id":   msgID,
310                                 "msg_size": len(data),
311                                 "error":    err,
312                         }).Warnf("Unable to decode the notification message")
313                         continue
314                 }
315
316                 // send the message into the go channel of the subscription
317                 select {
318                 case sub.notifChan <- event:
319                         // message sent successfully
320                 default:
321                         // unable to write into the channel without blocking
322                         log.WithFields(logger.Fields{
323                                 "msg_name": sub.event.GetMessageName(),
324                                 "msg_id":   msgID,
325                                 "msg_size": len(data),
326                         }).Warn("Unable to deliver the notification, reciever end not ready.")
327                 }
328
329                 matched = true
330         }
331
332         if !matched {
333                 log.WithFields(logger.Fields{
334                         "msg_id":   msgID,
335                         "msg_size": len(data),
336                 }).Info("No subscription found for the notification message.")
337         }
338 }
339
340 // +------------------+-------------------+-----------------------+
341 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
342 // +------------------+-------------------+-----------------------+
343 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
344         context := uint32(chanID) << 17
345         if isMultipart {
346                 context |= 1 << 16
347         }
348         context |= uint32(seqNum)
349         return context
350 }
351
352 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
353         chanID = uint16(context >> 17)
354         if ((context >> 16) & 0x1) != 0 {
355                 isMulipart = true
356         }
357         seqNum = uint16(context & 0xffff)
358         return
359 }
360
361 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
362 // or succeeds seq. number <seqNum2>.
363 // Since sequence numbers cycle in the finite set of size 2^16, the function
364 // must assume that the distance between compared sequence numbers is less than
365 // (2^16)/2 to determine the order.
366 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
367         // calculate distance from seqNum1 to seqNum2
368         var dist uint16
369         if seqNum1 <= seqNum2 {
370                 dist = seqNum2 - seqNum1
371         } else {
372                 dist = 0xffff - (seqNum1 - seqNum2 - 1)
373         }
374         if dist == 0 {
375                 return 0
376         } else if dist <= 0x8000 {
377                 return -1
378         }
379         return 1
380 }
381
382 // Returns message based on the message ID not depending on message path
383 func (c *Connection) getMessageByID(msgID uint16) (msg api.Message, err error) {
384         var ok bool
385         for _, messages := range c.msgMapByPath {
386                 if msg, ok = messages[msgID]; ok {
387                         return msg, nil
388                 }
389         }
390         return nil, fmt.Errorf("unknown message received, ID: %d", msgID)
391 }