1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
23 logger "github.com/sirupsen/logrus"
26 var ReplyChannelTimeout = time.Millisecond * 100
29 ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
30 ErrProbeTimeout = errors.New("probe reply not received within timeout period")
33 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
34 func (c *Connection) watchRequests(ch *Channel) {
37 case req, ok := <-ch.reqChan:
38 // new request on the request channel
40 // after closing the request channel, release API channel and return
41 c.releaseAPIChannel(ch)
44 if err := c.processRequest(ch, req); err != nil {
45 sendReplyError(ch, req, err)
51 // processRequest processes a single request received on the request channel.
52 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
53 // check whether we are connected to VPP
54 if atomic.LoadUint32(&c.vppConnected) == 0 {
55 err := ErrNotConnected
56 log.Errorf("processing request failed: %v", err)
60 // retrieve message ID
61 msgID, err := c.GetMessageID(req.msg)
63 log.WithFields(logger.Fields{
64 "msg_name": req.msg.GetMessageName(),
65 "msg_crc": req.msg.GetCrcString(),
66 "seq_num": req.seqNum,
68 }).Errorf("failed to retrieve message ID")
69 return fmt.Errorf("unable to retrieve message ID: %v", err)
72 // encode the message into binary
73 data, err := c.codec.EncodeMsg(req.msg, msgID)
75 log.WithFields(logger.Fields{
78 "msg_name": req.msg.GetMessageName(),
79 "seq_num": req.seqNum,
81 }).Errorf("failed to encode message: %#v", req.msg)
82 return fmt.Errorf("unable to encode the message: %v", err)
85 context := packRequestContext(ch.id, req.multi, req.seqNum)
87 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
88 log.WithFields(logger.Fields{
91 "is_multi": req.multi,
93 "msg_size": len(data),
94 "seq_num": req.seqNum,
95 "msg_crc": req.msg.GetCrcString(),
96 }).Debugf("--> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
99 // send the request to VPP
100 err = c.vppClient.SendMsg(context, data)
102 err = fmt.Errorf("unable to send the message: %v", err)
103 log.WithFields(logger.Fields{
106 "seq_num": req.seqNum,
112 // send a control ping to determine end of the multipart response
113 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
115 log.WithFields(logger.Fields{
118 "msg_id": c.pingReqID,
119 "msg_size": len(pingData),
120 "seq_num": req.seqNum,
121 }).Debug(" -> sending control ping")
123 if err := c.vppClient.SendMsg(context, pingData); err != nil {
124 log.WithFields(logger.Fields{
127 "seq_num": req.seqNum,
128 }).Warnf("unable to send control ping: %v", err)
135 // msgCallback is called whenever any binary API message comes from VPP.
136 func (c *Connection) msgCallback(msgID uint16, data []byte) {
138 log.Warn("Already disconnected, ignoring the message.")
142 msg, ok := c.msgMap[msgID]
144 log.Warnf("Unknown message received, ID: %d", msgID)
148 // decode message context to fix for special cases of messages,
150 // - replies that don't have context as first field (comes as zero)
151 // - events that don't have context at all (comes as non zero)
153 context, err := c.codec.DecodeMsgContext(data, msg)
155 log.Errorf("decoding context failed: %v", err)
158 chanID, isMulti, seqNum := unpackRequestContext(context)
159 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
160 log.WithFields(logger.Fields{
163 "msg_size": len(data),
167 "msg_crc": msg.GetCrcString(),
168 }).Debugf("<-- govpp recv: %s", msg.GetMessageName())
171 if context == 0 || c.isNotificationMessage(msgID) {
172 // process the message as a notification
173 c.sendNotifications(msgID, data)
177 // match ch according to the context
178 c.channelsLock.RLock()
179 ch, ok := c.channels[chanID]
180 c.channelsLock.RUnlock()
182 log.WithFields(logger.Fields{
185 }).Error("Channel ID not known, ignoring the message.")
189 // if this is a control ping reply to a multipart request,
190 // treat this as a last part of the reply
191 lastReplyReceived := isMulti && msgID == c.pingReplyID
193 // send the data to the channel, it needs to be copied,
194 // because it will be freed after this function returns
195 sendReply(ch, &vppReply{
198 data: append([]byte(nil), data...),
199 lastReceived: lastReplyReceived,
202 // store actual time of this reply
203 c.lastReplyLock.Lock()
204 c.lastReply = time.Now()
205 c.lastReplyLock.Unlock()
208 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
209 // it logs the error and do not send the message.
210 func sendReply(ch *Channel, reply *vppReply) {
212 case ch.replyChan <- reply:
213 // reply sent successfully
214 case <-time.After(ReplyChannelTimeout):
215 // receiver still not ready
216 log.WithFields(logger.Fields{
218 "msg_id": reply.msgID,
219 "seq_num": reply.seqNum,
220 }).Warn("Unable to send the reply, reciever end not ready.")
224 func sendReplyError(ch *Channel, req *vppRequest, err error) {
225 sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
228 // isNotificationMessage returns true if someone has subscribed to provided message ID.
229 func (c *Connection) isNotificationMessage(msgID uint16) bool {
230 c.subscriptionsLock.RLock()
231 defer c.subscriptionsLock.RUnlock()
233 _, exists := c.subscriptions[msgID]
237 // sendNotifications send a notification message to all subscribers subscribed for that message.
238 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
239 c.subscriptionsLock.RLock()
240 defer c.subscriptionsLock.RUnlock()
244 // send to notification to each subscriber
245 for _, sub := range c.subscriptions[msgID] {
246 log.WithFields(logger.Fields{
247 "msg_name": sub.event.GetMessageName(),
249 "msg_size": len(data),
250 }).Debug("Sending a notification to the subscription channel.")
252 event := sub.msgFactory()
253 if err := c.codec.DecodeMsg(data, event); err != nil {
254 log.WithFields(logger.Fields{
255 "msg_name": sub.event.GetMessageName(),
257 "msg_size": len(data),
258 }).Errorf("Unable to decode the notification message: %v", err)
262 // send the message into the go channel of the subscription
264 case sub.notifChan <- event:
265 // message sent successfully
267 // unable to write into the channel without blocking
268 log.WithFields(logger.Fields{
269 "msg_name": sub.event.GetMessageName(),
271 "msg_size": len(data),
272 }).Warn("Unable to deliver the notification, reciever end not ready.")
279 log.WithFields(logger.Fields{
281 "msg_size": len(data),
282 }).Info("No subscription found for the notification message.")
286 // +------------------+-------------------+-----------------------+
287 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
288 // +------------------+-------------------+-----------------------+
289 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
290 context := uint32(chanID) << 17
294 context |= uint32(seqNum)
298 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
299 chanID = uint16(context >> 17)
300 if ((context >> 16) & 0x1) != 0 {
303 seqNum = uint16(context & 0xffff)
307 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
308 // or succeeds seq. number <seqNum2>.
309 // Since sequence numbers cycle in the finite set of size 2^16, the function
310 // must assume that the distance between compared sequence numbers is less than
311 // (2^16)/2 to determine the order.
312 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
313 // calculate distance from seqNum1 to seqNum2
315 if seqNum1 <= seqNum2 {
316 dist = seqNum2 - seqNum1
318 dist = 0xffff - (seqNum1 - seqNum2 - 1)
322 } else if dist <= 0x8000 {