ddd53075495b1869e2b71ace1fee393ef5480c2e
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21         "time"
22
23         logger "github.com/sirupsen/logrus"
24 )
25
26 var ReplyChannelTimeout = time.Millisecond * 100
27
28 var (
29         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
30         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
31 )
32
33 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
34 func (c *Connection) watchRequests(ch *Channel) {
35         for {
36                 select {
37                 case req, ok := <-ch.reqChan:
38                         // new request on the request channel
39                         if !ok {
40                                 // after closing the request channel, release API channel and return
41                                 c.releaseAPIChannel(ch)
42                                 return
43                         }
44                         if err := c.processRequest(ch, req); err != nil {
45                                 sendReplyError(ch, req, err)
46                         }
47                 }
48         }
49 }
50
51 // processRequest processes a single request received on the request channel.
52 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
53         // check whether we are connected to VPP
54         if atomic.LoadUint32(&c.vppConnected) == 0 {
55                 err := ErrNotConnected
56                 log.Errorf("processing request failed: %v", err)
57                 return err
58         }
59
60         // retrieve message ID
61         msgID, err := c.GetMessageID(req.msg)
62         if err != nil {
63                 log.WithFields(logger.Fields{
64                         "msg_name": req.msg.GetMessageName(),
65                         "msg_crc":  req.msg.GetCrcString(),
66                         "seq_num":  req.seqNum,
67                         "error":    err,
68                 }).Errorf("failed to retrieve message ID")
69                 return fmt.Errorf("unable to retrieve message ID: %v", err)
70         }
71
72         // encode the message into binary
73         data, err := c.codec.EncodeMsg(req.msg, msgID)
74         if err != nil {
75                 log.WithFields(logger.Fields{
76                         "channel":  ch.id,
77                         "msg_id":   msgID,
78                         "msg_name": req.msg.GetMessageName(),
79                         "seq_num":  req.seqNum,
80                         "error":    err,
81                 }).Errorf("failed to encode message: %#v", req.msg)
82                 return fmt.Errorf("unable to encode the message: %v", err)
83         }
84
85         context := packRequestContext(ch.id, req.multi, req.seqNum)
86
87         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
88                 log.WithFields(logger.Fields{
89                         "channel":  ch.id,
90                         "context":  context,
91                         "is_multi": req.multi,
92                         "msg_id":   msgID,
93                         "msg_size": len(data),
94                         "seq_num":  req.seqNum,
95                         "msg_crc":  req.msg.GetCrcString(),
96                 }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
97         }
98
99         // send the request to VPP
100         err = c.vppClient.SendMsg(context, data)
101         if err != nil {
102                 err = fmt.Errorf("unable to send the message: %v", err)
103                 log.WithFields(logger.Fields{
104                         "context": context,
105                         "msg_id":  msgID,
106                         "seq_num": req.seqNum,
107                 }).Error(err)
108                 return err
109         }
110
111         if req.multi {
112                 // send a control ping to determine end of the multipart response
113                 pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
114
115                 log.WithFields(logger.Fields{
116                         "channel":  ch.id,
117                         "context":  context,
118                         "msg_id":   c.pingReqID,
119                         "msg_size": len(pingData),
120                         "seq_num":  req.seqNum,
121                 }).Debug("--> sending control ping")
122
123                 if err := c.vppClient.SendMsg(context, pingData); err != nil {
124                         log.WithFields(logger.Fields{
125                                 "context": context,
126                                 "msg_id":  msgID,
127                                 "seq_num": req.seqNum,
128                         }).Warnf("unable to send control ping: %v", err)
129                 }
130         }
131
132         return nil
133 }
134
135 // msgCallback is called whenever any binary API message comes from VPP.
136 func (c *Connection) msgCallback(msgID uint16, data []byte) {
137         if c == nil {
138                 log.Warn("Already disconnected, ignoring the message.")
139                 return
140         }
141
142         msg, ok := c.msgMap[msgID]
143         if !ok {
144                 log.Warnf("Unknown message received, ID: %d", msgID)
145                 return
146         }
147
148         // decode message context to fix for special cases of messages,
149         // for example:
150         // - replies that don't have context as first field (comes as zero)
151         // - events that don't have context at all (comes as non zero)
152         //
153         context, err := c.codec.DecodeMsgContext(data, msg)
154         if err != nil {
155                 log.Errorf("decoding context failed: %v", err)
156         }
157
158         chanID, isMulti, seqNum := unpackRequestContext(context)
159         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
160                 log.WithFields(logger.Fields{
161                         "context":  context,
162                         "msg_id":   msgID,
163                         "msg_size": len(data),
164                         "channel":  chanID,
165                         "is_multi": isMulti,
166                         "seq_num":  seqNum,
167                         "msg_crc":  msg.GetCrcString(),
168                 }).Debugf("<== govpp recv: %s", msg.GetMessageName())
169         }
170
171         if context == 0 || c.isNotificationMessage(msgID) {
172                 // process the message as a notification
173                 c.sendNotifications(msgID, data)
174                 return
175         }
176
177         // match ch according to the context
178         c.channelsLock.RLock()
179         ch, ok := c.channels[chanID]
180         c.channelsLock.RUnlock()
181         if !ok {
182                 log.WithFields(logger.Fields{
183                         "channel": chanID,
184                         "msg_id":  msgID,
185                 }).Error("Channel ID not known, ignoring the message.")
186                 return
187         }
188
189         // if this is a control ping reply to a multipart request,
190         // treat this as a last part of the reply
191         lastReplyReceived := isMulti && msgID == c.pingReplyID
192
193         // send the data to the channel, it needs to be copied,
194         // because it will be freed after this function returns
195         sendReply(ch, &vppReply{
196                 msgID:        msgID,
197                 seqNum:       seqNum,
198                 data:         append([]byte(nil), data...),
199                 lastReceived: lastReplyReceived,
200         })
201
202         // store actual time of this reply
203         c.lastReplyLock.Lock()
204         c.lastReply = time.Now()
205         c.lastReplyLock.Unlock()
206 }
207
208 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
209 // it logs the error and do not send the message.
210 func sendReply(ch *Channel, reply *vppReply) {
211         select {
212         case ch.replyChan <- reply:
213                 // reply sent successfully
214         case <-time.After(ReplyChannelTimeout):
215                 // receiver still not ready
216                 log.WithFields(logger.Fields{
217                         "channel": ch,
218                         "msg_id":  reply.msgID,
219                         "seq_num": reply.seqNum,
220                 }).Warn("Unable to send the reply, reciever end not ready.")
221         }
222 }
223
224 func sendReplyError(ch *Channel, req *vppRequest, err error) {
225         sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
226 }
227
228 // isNotificationMessage returns true if someone has subscribed to provided message ID.
229 func (c *Connection) isNotificationMessage(msgID uint16) bool {
230         c.subscriptionsLock.RLock()
231         defer c.subscriptionsLock.RUnlock()
232
233         _, exists := c.subscriptions[msgID]
234         return exists
235 }
236
237 // sendNotifications send a notification message to all subscribers subscribed for that message.
238 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
239         c.subscriptionsLock.RLock()
240         defer c.subscriptionsLock.RUnlock()
241
242         matched := false
243
244         // send to notification to each subscriber
245         for _, sub := range c.subscriptions[msgID] {
246                 log.WithFields(logger.Fields{
247                         "msg_name": sub.event.GetMessageName(),
248                         "msg_id":   msgID,
249                         "msg_size": len(data),
250                 }).Debug("Sending a notification to the subscription channel.")
251
252                 event := sub.msgFactory()
253                 if err := c.codec.DecodeMsg(data, event); err != nil {
254                         log.WithFields(logger.Fields{
255                                 "msg_name": sub.event.GetMessageName(),
256                                 "msg_id":   msgID,
257                                 "msg_size": len(data),
258                         }).Errorf("Unable to decode the notification message: %v", err)
259                         continue
260                 }
261
262                 // send the message into the go channel of the subscription
263                 select {
264                 case sub.notifChan <- event:
265                         // message sent successfully
266                 default:
267                         // unable to write into the channel without blocking
268                         log.WithFields(logger.Fields{
269                                 "msg_name": sub.event.GetMessageName(),
270                                 "msg_id":   msgID,
271                                 "msg_size": len(data),
272                         }).Warn("Unable to deliver the notification, reciever end not ready.")
273                 }
274
275                 matched = true
276         }
277
278         if !matched {
279                 log.WithFields(logger.Fields{
280                         "msg_id":   msgID,
281                         "msg_size": len(data),
282                 }).Info("No subscription found for the notification message.")
283         }
284 }
285
286 // +------------------+-------------------+-----------------------+
287 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
288 // +------------------+-------------------+-----------------------+
289 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
290         context := uint32(chanID) << 17
291         if isMultipart {
292                 context |= 1 << 16
293         }
294         context |= uint32(seqNum)
295         return context
296 }
297
298 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
299         chanID = uint16(context >> 17)
300         if ((context >> 16) & 0x1) != 0 {
301                 isMulipart = true
302         }
303         seqNum = uint16(context & 0xffff)
304         return
305 }
306
307 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
308 // or succeeds seq. number <seqNum2>.
309 // Since sequence numbers cycle in the finite set of size 2^16, the function
310 // must assume that the distance between compared sequence numbers is less than
311 // (2^16)/2 to determine the order.
312 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
313         // calculate distance from seqNum1 to seqNum2
314         var dist uint16
315         if seqNum1 <= seqNum2 {
316                 dist = seqNum2 - seqNum1
317         } else {
318                 dist = 0xffff - (seqNum1 - seqNum2 - 1)
319         }
320         if dist == 0 {
321                 return 0
322         } else if dist <= 0x8000 {
323                 return -1
324         }
325         return 1
326 }