Refactor GoVPP
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21         "time"
22
23         logger "github.com/sirupsen/logrus"
24 )
25
26 var (
27         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
28         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
29 )
30
31 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
32 func (c *Connection) watchRequests(ch *channel) {
33         for {
34                 select {
35                 case req, ok := <-ch.reqChan:
36                         // new request on the request channel
37                         if !ok {
38                                 // after closing the request channel, release API channel and return
39                                 c.releaseAPIChannel(ch)
40                                 return
41                         }
42                         c.processRequest(ch, req)
43
44                 case req := <-ch.notifSubsChan:
45                         // new request on the notification subscribe channel
46                         c.processSubscriptionRequest(ch, req)
47                 }
48         }
49 }
50
51 // processRequest processes a single request received on the request channel.
52 func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
53         // check whether we are connected to VPP
54         if atomic.LoadUint32(&c.connected) == 0 {
55                 err := ErrNotConnected
56                 log.Errorf("processing request failed: %v", err)
57                 sendReplyError(ch, req, err)
58                 return err
59         }
60
61         // retrieve message ID
62         msgID, err := c.GetMessageID(req.msg)
63         if err != nil {
64                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
65                 log.WithFields(logger.Fields{
66                         "msg_name": req.msg.GetMessageName(),
67                         "msg_crc":  req.msg.GetCrcString(),
68                         "seq_num":  req.seqNum,
69                 }).Error(err)
70                 sendReplyError(ch, req, err)
71                 return err
72         }
73
74         // encode the message into binary
75         data, err := c.codec.EncodeMsg(req.msg, msgID)
76         if err != nil {
77                 err = fmt.Errorf("unable to encode the messge: %v", err)
78                 log.WithFields(logger.Fields{
79                         "channel":  ch.id,
80                         "msg_id":   msgID,
81                         "msg_name": req.msg.GetMessageName(),
82                         "seq_num":  req.seqNum,
83                 }).Error(err)
84                 sendReplyError(ch, req, err)
85                 return err
86         }
87
88         // get context
89         context := packRequestContext(ch.id, req.multi, req.seqNum)
90         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
91                 log.WithFields(logger.Fields{
92                         "channel":  ch.id,
93                         "context":  context,
94                         "is_multi": req.multi,
95                         "msg_id":   msgID,
96                         "msg_name": req.msg.GetMessageName(),
97                         "msg_size": len(data),
98                         "seq_num":  req.seqNum,
99                 }).Debug(" -> Sending a message to VPP.")
100         }
101
102         // send the request to VPP
103         err = c.vpp.SendMsg(context, data)
104         if err != nil {
105                 err = fmt.Errorf("unable to send the message: %v", err)
106                 log.WithFields(logger.Fields{
107                         "context": context,
108                         "msg_id":  msgID,
109                         "seq_num": req.seqNum,
110                 }).Error(err)
111                 sendReplyError(ch, req, err)
112                 return err
113         }
114
115         if req.multi {
116                 // send a control ping to determine end of the multipart response
117                 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
118
119                 log.WithFields(logger.Fields{
120                         "channel":  ch.id,
121                         "context":  context,
122                         "msg_id":   c.pingReqID,
123                         "msg_size": len(pingData),
124                         "seq_num":  req.seqNum,
125                 }).Debug(" -> Sending a control ping to VPP.")
126
127                 if err := c.vpp.SendMsg(context, pingData); err != nil {
128                         log.WithFields(logger.Fields{
129                                 "context": context,
130                                 "msg_id":  msgID,
131                                 "seq_num": req.seqNum,
132                         }).Warnf("unable to send control ping: %v", err)
133                 }
134         }
135
136         return nil
137 }
138
139 // msgCallback is called whenever any binary API message comes from VPP.
140 func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
141         connLock.RLock()
142         defer connLock.RUnlock()
143
144         if c == nil {
145                 log.Warn("Already disconnected, ignoring the message.")
146                 return
147         }
148
149         msg, ok := c.msgMap[msgID]
150         if !ok {
151                 log.Warnf("Unknown message received, ID: %d", msgID)
152                 return
153         }
154
155         // decode message context to fix for special cases of messages,
156         // for example:
157         // - replies that don't have context as first field (comes as zero)
158         // - events that don't have context at all (comes as non zero)
159         //
160         msgContext, err := c.codec.DecodeMsgContext(data, msg)
161         if err == nil {
162                 if context != msgContext {
163                         log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext)
164                         context = msgContext
165                 }
166         } else {
167                 log.Errorf("decoding context failed: %v", err)
168         }
169
170         chanID, isMulti, seqNum := unpackRequestContext(context)
171         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
172                 log.WithFields(logger.Fields{
173                         "context":  context,
174                         "msg_id":   msgID,
175                         "msg_name": msg.GetMessageName(),
176                         "msg_size": len(data),
177                         "channel":  chanID,
178                         "is_multi": isMulti,
179                         "seq_num":  seqNum,
180                 }).Debug(" <- Received a message from VPP.")
181         }
182
183         if context == 0 || c.isNotificationMessage(msgID) {
184                 // process the message as a notification
185                 c.sendNotifications(msgID, data)
186                 return
187         }
188
189         // match ch according to the context
190         c.channelsLock.RLock()
191         ch, ok := c.channels[chanID]
192         c.channelsLock.RUnlock()
193         if !ok {
194                 log.WithFields(logger.Fields{
195                         "channel": chanID,
196                         "msg_id":  msgID,
197                 }).Error("Channel ID not known, ignoring the message.")
198                 return
199         }
200
201         // if this is a control ping reply to a multipart request,
202         // treat this as a last part of the reply
203         lastReplyReceived := isMulti && msgID == c.pingReplyID
204
205         // send the data to the channel
206         sendReply(ch, &vppReply{
207                 msgID:        msgID,
208                 seqNum:       seqNum,
209                 data:         data,
210                 lastReceived: lastReplyReceived,
211         })
212
213         // store actual time of this reply
214         c.lastReplyLock.Lock()
215         c.lastReply = time.Now()
216         c.lastReplyLock.Unlock()
217 }
218
219 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
220 // it logs the error and do not send the message.
221 func sendReply(ch *channel, reply *vppReply) {
222         select {
223         case ch.replyChan <- reply:
224                 // reply sent successfully
225         case <-time.After(time.Millisecond * 100):
226                 // receiver still not ready
227                 log.WithFields(logger.Fields{
228                         "channel": ch,
229                         "msg_id":  reply.msgID,
230                         "seq_num": reply.seqNum,
231                 }).Warn("Unable to send the reply, reciever end not ready.")
232         }
233 }
234
235 func sendReplyError(ch *channel, req *vppRequest, err error) {
236         sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
237 }
238
239 // +------------------+-------------------+-----------------------+
240 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
241 // +------------------+-------------------+-----------------------+
242 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
243         context := uint32(chanID) << 17
244         if isMultipart {
245                 context |= 1 << 16
246         }
247         context |= uint32(seqNum)
248         return context
249 }
250
251 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
252         chanID = uint16(context >> 17)
253         if ((context >> 16) & 0x1) != 0 {
254                 isMulipart = true
255         }
256         seqNum = uint16(context & 0xffff)
257         return
258 }
259
260 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
261 // or succeeds seq. number <seqNum2>.
262 // Since sequence numbers cycle in the finite set of size 2^16, the function
263 // must assume that the distance between compared sequence numbers is less than
264 // (2^16)/2 to determine the order.
265 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
266         // calculate distance from seqNum1 to seqNum2
267         var dist uint16
268         if seqNum1 <= seqNum2 {
269                 dist = seqNum2 - seqNum1
270         } else {
271                 dist = 0xffff - (seqNum1 - seqNum2 - 1)
272         }
273         if dist == 0 {
274                 return 0
275         } else if dist <= 0x8000 {
276                 return -1
277         }
278         return 1
279 }