e272c6f8ec17531c6c7b9312a5236663b4074577
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync/atomic"
22         "time"
23
24         logger "github.com/sirupsen/logrus"
25
26         "git.fd.io/govpp.git/api"
27 )
28
29 var ReplyChannelTimeout = time.Millisecond * 100
30
31 var (
32         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
33         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
34 )
35
36 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
37 func (c *Connection) watchRequests(ch *Channel) {
38         for {
39                 select {
40                 case req, ok := <-ch.reqChan:
41                         // new request on the request channel
42                         if !ok {
43                                 // after closing the request channel, release API channel and return
44                                 c.releaseAPIChannel(ch)
45                                 return
46                         }
47                         if err := c.processRequest(ch, req); err != nil {
48                                 sendReplyError(ch, req, err)
49                         }
50                 }
51         }
52 }
53
54 // processRequest processes a single request received on the request channel.
55 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
56         // check whether we are connected to VPP
57         if atomic.LoadUint32(&c.vppConnected) == 0 {
58                 err := ErrNotConnected
59                 log.Errorf("processing request failed: %v", err)
60                 return err
61         }
62
63         // retrieve message ID
64         msgID, err := c.GetMessageID(req.msg)
65         if err != nil {
66                 log.WithFields(logger.Fields{
67                         "msg_name": req.msg.GetMessageName(),
68                         "msg_crc":  req.msg.GetCrcString(),
69                         "seq_num":  req.seqNum,
70                         "error":    err,
71                 }).Errorf("failed to retrieve message ID")
72                 return fmt.Errorf("unable to retrieve message ID: %v", err)
73         }
74
75         // encode the message into binary
76         data, err := c.codec.EncodeMsg(req.msg, msgID)
77         if err != nil {
78                 log.WithFields(logger.Fields{
79                         "channel":  ch.id,
80                         "msg_id":   msgID,
81                         "msg_name": req.msg.GetMessageName(),
82                         "seq_num":  req.seqNum,
83                         "error":    err,
84                 }).Errorf("failed to encode message: %#v", req.msg)
85                 return fmt.Errorf("unable to encode the message: %v", err)
86         }
87
88         context := packRequestContext(ch.id, req.multi, req.seqNum)
89
90         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
91                 log.WithFields(logger.Fields{
92                         "channel":  ch.id,
93                         "context":  context,
94                         "is_multi": req.multi,
95                         "msg_id":   msgID,
96                         "msg_size": len(data),
97                         "seq_num":  req.seqNum,
98                         "msg_crc":  req.msg.GetCrcString(),
99                 }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg)
100         }
101
102         // send the request to VPP
103         err = c.vppClient.SendMsg(context, data)
104         if err != nil {
105                 err = fmt.Errorf("unable to send the message: %v", err)
106                 log.WithFields(logger.Fields{
107                         "context": context,
108                         "msg_id":  msgID,
109                         "seq_num": req.seqNum,
110                 }).Error(err)
111                 return err
112         }
113
114         if req.multi {
115                 // send a control ping to determine end of the multipart response
116                 pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
117
118                 log.WithFields(logger.Fields{
119                         "channel":  ch.id,
120                         "context":  context,
121                         "msg_id":   c.pingReqID,
122                         "msg_size": len(pingData),
123                         "seq_num":  req.seqNum,
124                 }).Debug(" -> sending control ping")
125
126                 if err := c.vppClient.SendMsg(context, pingData); err != nil {
127                         log.WithFields(logger.Fields{
128                                 "context": context,
129                                 "msg_id":  msgID,
130                                 "seq_num": req.seqNum,
131                         }).Warnf("unable to send control ping: %v", err)
132                 }
133         }
134
135         return nil
136 }
137
138 // msgCallback is called whenever any binary API message comes from VPP.
139 func (c *Connection) msgCallback(msgID uint16, data []byte) {
140         if c == nil {
141                 log.Warn("Already disconnected, ignoring the message.")
142                 return
143         }
144
145         msg, ok := c.msgMap[msgID]
146         if !ok {
147                 log.Warnf("Unknown message received, ID: %d", msgID)
148                 return
149         }
150
151         // decode message context to fix for special cases of messages,
152         // for example:
153         // - replies that don't have context as first field (comes as zero)
154         // - events that don't have context at all (comes as non zero)
155         //
156         context, err := c.codec.DecodeMsgContext(data, msg)
157         if err != nil {
158                 log.Errorf("decoding context failed: %v", err)
159         }
160
161         chanID, isMulti, seqNum := unpackRequestContext(context)
162
163         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
164                 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
165
166                 // decode the message
167                 if err = c.codec.DecodeMsg(data, msg); err != nil {
168                         err = fmt.Errorf("decoding message failed: %w", err)
169                         return
170                 }
171
172                 log.WithFields(logger.Fields{
173                         "context":  context,
174                         "msg_id":   msgID,
175                         "msg_size": len(data),
176                         "channel":  chanID,
177                         "is_multi": isMulti,
178                         "seq_num":  seqNum,
179                         "msg_crc":  msg.GetCrcString(),
180                 }).Debugf("<-- govpp RECEIVE: %s %+v", msg.GetMessageName(), msg)
181         }
182
183         if context == 0 || c.isNotificationMessage(msgID) {
184                 // process the message as a notification
185                 c.sendNotifications(msgID, data)
186                 return
187         }
188
189         // match ch according to the context
190         c.channelsLock.RLock()
191         ch, ok := c.channels[chanID]
192         c.channelsLock.RUnlock()
193         if !ok {
194                 log.WithFields(logger.Fields{
195                         "channel": chanID,
196                         "msg_id":  msgID,
197                 }).Error("Channel ID not known, ignoring the message.")
198                 return
199         }
200
201         // if this is a control ping reply to a multipart request,
202         // treat this as a last part of the reply
203         lastReplyReceived := isMulti && msgID == c.pingReplyID
204
205         // send the data to the channel, it needs to be copied,
206         // because it will be freed after this function returns
207         sendReply(ch, &vppReply{
208                 msgID:        msgID,
209                 seqNum:       seqNum,
210                 data:         append([]byte(nil), data...),
211                 lastReceived: lastReplyReceived,
212         })
213
214         // store actual time of this reply
215         c.lastReplyLock.Lock()
216         c.lastReply = time.Now()
217         c.lastReplyLock.Unlock()
218 }
219
220 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
221 // it logs the error and do not send the message.
222 func sendReply(ch *Channel, reply *vppReply) {
223         select {
224         case ch.replyChan <- reply:
225         // reply sent successfully
226         case <-time.After(ReplyChannelTimeout):
227                 // receiver still not ready
228                 log.WithFields(logger.Fields{
229                         "channel": ch,
230                         "msg_id":  reply.msgID,
231                         "seq_num": reply.seqNum,
232                 }).Warn("Unable to send the reply, reciever end not ready.")
233         }
234 }
235
236 func sendReplyError(ch *Channel, req *vppRequest, err error) {
237         sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
238 }
239
240 // isNotificationMessage returns true if someone has subscribed to provided message ID.
241 func (c *Connection) isNotificationMessage(msgID uint16) bool {
242         c.subscriptionsLock.RLock()
243         defer c.subscriptionsLock.RUnlock()
244
245         _, exists := c.subscriptions[msgID]
246         return exists
247 }
248
249 // sendNotifications send a notification message to all subscribers subscribed for that message.
250 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
251         c.subscriptionsLock.RLock()
252         defer c.subscriptionsLock.RUnlock()
253
254         matched := false
255
256         // send to notification to each subscriber
257         for _, sub := range c.subscriptions[msgID] {
258                 log.WithFields(logger.Fields{
259                         "msg_name": sub.event.GetMessageName(),
260                         "msg_id":   msgID,
261                         "msg_size": len(data),
262                 }).Debug("Sending a notification to the subscription channel.")
263
264                 event := sub.msgFactory()
265                 if err := c.codec.DecodeMsg(data, event); err != nil {
266                         log.WithFields(logger.Fields{
267                                 "msg_name": sub.event.GetMessageName(),
268                                 "msg_id":   msgID,
269                                 "msg_size": len(data),
270                         }).Errorf("Unable to decode the notification message: %v", err)
271                         continue
272                 }
273
274                 // send the message into the go channel of the subscription
275                 select {
276                 case sub.notifChan <- event:
277                         // message sent successfully
278                 default:
279                         // unable to write into the channel without blocking
280                         log.WithFields(logger.Fields{
281                                 "msg_name": sub.event.GetMessageName(),
282                                 "msg_id":   msgID,
283                                 "msg_size": len(data),
284                         }).Warn("Unable to deliver the notification, reciever end not ready.")
285                 }
286
287                 matched = true
288         }
289
290         if !matched {
291                 log.WithFields(logger.Fields{
292                         "msg_id":   msgID,
293                         "msg_size": len(data),
294                 }).Info("No subscription found for the notification message.")
295         }
296 }
297
298 // +------------------+-------------------+-----------------------+
299 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
300 // +------------------+-------------------+-----------------------+
301 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
302         context := uint32(chanID) << 17
303         if isMultipart {
304                 context |= 1 << 16
305         }
306         context |= uint32(seqNum)
307         return context
308 }
309
310 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
311         chanID = uint16(context >> 17)
312         if ((context >> 16) & 0x1) != 0 {
313                 isMulipart = true
314         }
315         seqNum = uint16(context & 0xffff)
316         return
317 }
318
319 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
320 // or succeeds seq. number <seqNum2>.
321 // Since sequence numbers cycle in the finite set of size 2^16, the function
322 // must assume that the distance between compared sequence numbers is less than
323 // (2^16)/2 to determine the order.
324 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
325         // calculate distance from seqNum1 to seqNum2
326         var dist uint16
327         if seqNum1 <= seqNum2 {
328                 dist = seqNum2 - seqNum1
329         } else {
330                 dist = 0xffff - (seqNum1 - seqNum2 - 1)
331         }
332         if dist == 0 {
333                 return 0
334         } else if dist <= 0x8000 {
335                 return -1
336         }
337         return 1
338 }