1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
23 logger "github.com/sirupsen/logrus"
27 ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
28 ErrProbeTimeout = errors.New("probe reply not received within timeout period")
31 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
32 func (c *Connection) watchRequests(ch *channel) {
35 case req, ok := <-ch.reqChan:
36 // new request on the request channel
38 // after closing the request channel, release API channel and return
39 c.releaseAPIChannel(ch)
42 c.processRequest(ch, req)
44 case req := <-ch.notifSubsChan:
45 // new request on the notification subscribe channel
46 c.processSubscriptionRequest(ch, req)
51 // processRequest processes a single request received on the request channel.
52 func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
53 // check whether we are connected to VPP
54 if atomic.LoadUint32(&c.connected) == 0 {
55 err := ErrNotConnected
56 log.Errorf("processing request failed: %v", err)
57 sendReplyError(ch, req, err)
61 // retrieve message ID
62 msgID, err := c.GetMessageID(req.msg)
64 err = fmt.Errorf("unable to retrieve message ID: %v", err)
65 log.WithFields(logger.Fields{
66 "msg_name": req.msg.GetMessageName(),
67 "msg_crc": req.msg.GetCrcString(),
68 "seq_num": req.seqNum,
70 sendReplyError(ch, req, err)
74 // encode the message into binary
75 data, err := c.codec.EncodeMsg(req.msg, msgID)
77 err = fmt.Errorf("unable to encode the messge: %v", err)
78 log.WithFields(logger.Fields{
81 "msg_name": req.msg.GetMessageName(),
82 "seq_num": req.seqNum,
84 sendReplyError(ch, req, err)
89 context := packRequestContext(ch.id, req.multi, req.seqNum)
90 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
91 log.WithFields(logger.Fields{
94 "is_multi": req.multi,
96 "msg_name": req.msg.GetMessageName(),
97 "msg_size": len(data),
98 "seq_num": req.seqNum,
99 }).Debug(" -> Sending a message to VPP.")
102 // send the request to VPP
103 err = c.vpp.SendMsg(context, data)
105 err = fmt.Errorf("unable to send the message: %v", err)
106 log.WithFields(logger.Fields{
109 "seq_num": req.seqNum,
111 sendReplyError(ch, req, err)
116 // send a control ping to determine end of the multipart response
117 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
119 log.WithFields(logger.Fields{
122 "msg_id": c.pingReqID,
123 "msg_size": len(pingData),
124 "seq_num": req.seqNum,
125 }).Debug(" -> Sending a control ping to VPP.")
127 if err := c.vpp.SendMsg(context, pingData); err != nil {
128 log.WithFields(logger.Fields{
131 "seq_num": req.seqNum,
132 }).Warnf("unable to send control ping: %v", err)
139 // msgCallback is called whenever any binary API message comes from VPP.
140 func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
142 defer connLock.RUnlock()
145 log.Warn("Already disconnected, ignoring the message.")
149 msg, ok := c.msgMap[msgID]
151 log.Warnf("Unknown message received, ID: %d", msgID)
155 // decode message context to fix for special cases of messages,
157 // - replies that don't have context as first field (comes as zero)
158 // - events that don't have context at all (comes as non zero)
160 msgContext, err := c.codec.DecodeMsgContext(data, msg)
162 if context != msgContext {
163 log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext)
167 log.Errorf("decoding context failed: %v", err)
170 chanID, isMulti, seqNum := unpackRequestContext(context)
171 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
172 log.WithFields(logger.Fields{
175 "msg_name": msg.GetMessageName(),
176 "msg_size": len(data),
180 }).Debug(" <- Received a message from VPP.")
183 if context == 0 || c.isNotificationMessage(msgID) {
184 // process the message as a notification
185 c.sendNotifications(msgID, data)
189 // match ch according to the context
190 c.channelsLock.RLock()
191 ch, ok := c.channels[chanID]
192 c.channelsLock.RUnlock()
194 log.WithFields(logger.Fields{
197 }).Error("Channel ID not known, ignoring the message.")
201 // if this is a control ping reply to a multipart request,
202 // treat this as a last part of the reply
203 lastReplyReceived := isMulti && msgID == c.pingReplyID
205 // send the data to the channel
206 sendReply(ch, &vppReply{
210 lastReceived: lastReplyReceived,
213 // store actual time of this reply
214 c.lastReplyLock.Lock()
215 c.lastReply = time.Now()
216 c.lastReplyLock.Unlock()
219 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
220 // it logs the error and do not send the message.
221 func sendReply(ch *channel, reply *vppReply) {
223 case ch.replyChan <- reply:
224 // reply sent successfully
225 case <-time.After(time.Millisecond * 100):
226 // receiver still not ready
227 log.WithFields(logger.Fields{
229 "msg_id": reply.msgID,
230 "seq_num": reply.seqNum,
231 }).Warn("Unable to send the reply, reciever end not ready.")
235 func sendReplyError(ch *channel, req *vppRequest, err error) {
236 sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
239 // +------------------+-------------------+-----------------------+
240 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
241 // +------------------+-------------------+-----------------------+
242 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
243 context := uint32(chanID) << 17
247 context |= uint32(seqNum)
251 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
252 chanID = uint16(context >> 17)
253 if ((context >> 16) & 0x1) != 0 {
256 seqNum = uint16(context & 0xffff)
260 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
261 // or succeeds seq. number <seqNum2>.
262 // Since sequence numbers cycle in the finite set of size 2^16, the function
263 // must assume that the distance between compared sequence numbers is less than
264 // (2^16)/2 to determine the order.
265 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
266 // calculate distance from seqNum1 to seqNum2
268 if seqNum1 <= seqNum2 {
269 dist = seqNum2 - seqNum1
271 dist = 0xffff - (seqNum1 - seqNum2 - 1)
275 } else if dist <= 0x8000 {