1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
23 logger "github.com/sirupsen/logrus"
25 "git.fd.io/govpp.git/api"
28 var ReplyChannelTimeout = time.Millisecond * 100
31 ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
32 ErrProbeTimeout = errors.New("probe reply not received within timeout period")
35 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
36 func (c *Connection) watchRequests(ch *Channel) {
39 case req, ok := <-ch.reqChan:
40 // new request on the request channel
42 // after closing the request channel, release API channel and return
43 c.releaseAPIChannel(ch)
46 if err := c.processRequest(ch, req); err != nil {
47 sendReply(ch, &vppReply{
49 err: fmt.Errorf("unable to process request: %w", err),
56 // processRequest processes a single request received on the request channel.
57 func (c *Connection) sendMessage(context uint32, msg api.Message) error {
58 // check whether we are connected to VPP
59 if atomic.LoadUint32(&c.vppConnected) == 0 {
60 return ErrNotConnected
63 /*log := log.WithFields(logger.Fields{
65 "msg_name": msg.GetMessageName(),
66 "msg_crc": msg.GetCrcString(),
69 // retrieve message ID
70 msgID, err := c.GetMessageID(msg)
72 //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg)
76 //log = log.WithField("msg_id", msgID)
79 data, err := c.codec.EncodeMsg(msg, msgID)
81 log.WithError(err).Debugf("unable to encode message: %#v", msg)
85 //log = log.WithField("msg_length", len(data))
87 if log.Level >= logger.DebugLevel {
88 log.Debugf("--> SEND: MSG %T %+v", msg, msg)
91 // send message to VPP
92 err = c.vppClient.SendMsg(context, data)
94 log.WithError(err).Debugf("unable to send message: %#v", msg)
101 // processRequest processes a single request received on the request channel.
102 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
103 // check whether we are connected to VPP
104 if atomic.LoadUint32(&c.vppConnected) == 0 {
105 err := ErrNotConnected
106 log.WithFields(logger.Fields{
108 "seq_num": req.seqNum,
109 "msg_name": req.msg.GetMessageName(),
110 "msg_crc": req.msg.GetCrcString(),
112 }).Warnf("Unable to process request")
116 // retrieve message ID
117 msgID, err := c.GetMessageID(req.msg)
119 log.WithFields(logger.Fields{
121 "msg_name": req.msg.GetMessageName(),
122 "msg_crc": req.msg.GetCrcString(),
123 "seq_num": req.seqNum,
125 }).Warnf("Unable to retrieve message ID")
129 // encode the message into binary
130 data, err := c.codec.EncodeMsg(req.msg, msgID)
132 log.WithFields(logger.Fields{
135 "msg_name": req.msg.GetMessageName(),
136 "msg_crc": req.msg.GetCrcString(),
137 "seq_num": req.seqNum,
139 }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg)
143 context := packRequestContext(ch.id, req.multi, req.seqNum)
145 if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
146 log.WithFields(logger.Fields{
149 "msg_name": req.msg.GetMessageName(),
150 "msg_crc": req.msg.GetCrcString(),
151 "seq_num": req.seqNum,
152 "is_multi": req.multi,
154 "data_len": len(data),
155 }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg)
158 // send the request to VPP
159 err = c.vppClient.SendMsg(context, data)
161 log.WithFields(logger.Fields{
164 "msg_name": req.msg.GetMessageName(),
165 "msg_crc": req.msg.GetCrcString(),
166 "seq_num": req.seqNum,
167 "is_multi": req.multi,
169 "data_len": len(data),
171 }).Warnf("Unable to send message")
176 // send a control ping to determine end of the multipart response
177 pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
179 if log.Level >= logger.DebugLevel {
180 log.WithFields(logger.Fields{
182 "msg_id": c.pingReqID,
183 "msg_name": c.msgControlPing.GetMessageName(),
184 "msg_crc": c.msgControlPing.GetCrcString(),
185 "seq_num": req.seqNum,
187 "data_len": len(pingData),
188 }).Debugf(" -> SEND MSG: %T", c.msgControlPing)
191 if err := c.vppClient.SendMsg(context, pingData); err != nil {
192 log.WithFields(logger.Fields{
194 "seq_num": req.seqNum,
196 }).Warnf("unable to send control ping")
203 // msgCallback is called whenever any binary API message comes from VPP.
204 func (c *Connection) msgCallback(msgID uint16, data []byte) {
208 ).Warn("Connection already disconnected, ignoring the message.")
212 msgType, name, crc, err := c.getMessageDataByID(msgID)
218 // decode message context to fix for special cases of messages,
220 // - replies that don't have context as first field (comes as zero)
221 // - events that don't have context at all (comes as non zero)
223 context, err := c.codec.DecodeMsgContext(data, msgType)
225 log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err)
229 chanID, isMulti, seqNum := unpackRequestContext(context)
231 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
232 log.WithFields(logger.Fields{
235 "msg_size": len(data),
240 }).Debugf("<-- govpp RECEIVE: %s %+v", name)
243 if context == 0 || c.isNotificationMessage(msgID) {
244 // process the message as a notification
245 c.sendNotifications(msgID, data)
249 // match ch according to the context
250 c.channelsLock.RLock()
251 ch, ok := c.channels[chanID]
252 c.channelsLock.RUnlock()
254 log.WithFields(logger.Fields{
257 }).Error("Channel ID not known, ignoring the message.")
261 // if this is a control ping reply to a multipart request,
262 // treat this as a last part of the reply
263 lastReplyReceived := isMulti && msgID == c.pingReplyID
265 // send the data to the channel, it needs to be copied,
266 // because it will be freed after this function returns
267 sendReply(ch, &vppReply{
270 data: append([]byte(nil), data...),
271 lastReceived: lastReplyReceived,
274 // store actual time of this reply
275 c.lastReplyLock.Lock()
276 c.lastReply = time.Now()
277 c.lastReplyLock.Unlock()
280 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
281 // it logs the error and do not send the message.
282 func sendReply(ch *Channel, reply *vppReply) {
283 // first try to avoid creating timer
285 case ch.replyChan <- reply:
286 return // reply sent ok
288 // reply channel full
290 if ch.receiveReplyTimeout == 0 {
291 log.WithFields(logger.Fields{
293 "msg_id": reply.msgID,
294 "seq_num": reply.seqNum,
296 }).Warn("Reply channel full, dropping reply.")
300 case ch.replyChan <- reply:
301 return // reply sent ok
302 case <-time.After(ch.receiveReplyTimeout):
303 // receiver still not ready
304 log.WithFields(logger.Fields{
306 "msg_id": reply.msgID,
307 "seq_num": reply.seqNum,
309 }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout)
313 // isNotificationMessage returns true if someone has subscribed to provided message ID.
314 func (c *Connection) isNotificationMessage(msgID uint16) bool {
315 c.subscriptionsLock.RLock()
316 defer c.subscriptionsLock.RUnlock()
318 _, exists := c.subscriptions[msgID]
322 // sendNotifications send a notification message to all subscribers subscribed for that message.
323 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
324 c.subscriptionsLock.RLock()
325 defer c.subscriptionsLock.RUnlock()
329 // send to notification to each subscriber
330 for _, sub := range c.subscriptions[msgID] {
331 log.WithFields(logger.Fields{
332 "msg_name": sub.event.GetMessageName(),
334 "msg_size": len(data),
335 }).Debug("Sending a notification to the subscription channel.")
337 event := sub.msgFactory()
338 if err := c.codec.DecodeMsg(data, event); err != nil {
339 log.WithFields(logger.Fields{
340 "msg_name": sub.event.GetMessageName(),
342 "msg_size": len(data),
344 }).Warnf("Unable to decode the notification message")
348 // send the message into the go channel of the subscription
350 case sub.notifChan <- event:
351 // message sent successfully
353 // unable to write into the channel without blocking
354 log.WithFields(logger.Fields{
355 "msg_name": sub.event.GetMessageName(),
357 "msg_size": len(data),
358 }).Warn("Unable to deliver the notification, reciever end not ready.")
365 log.WithFields(logger.Fields{
367 "msg_size": len(data),
368 }).Info("No subscription found for the notification message.")
372 // +------------------+-------------------+-----------------------+
373 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
374 // +------------------+-------------------+-----------------------+
375 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
376 context := uint32(chanID) << 17
380 context |= uint32(seqNum)
384 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
385 chanID = uint16(context >> 17)
386 if ((context >> 16) & 0x1) != 0 {
389 seqNum = uint16(context & 0xffff)
393 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
394 // or succeeds seq. number <seqNum2>.
395 // Since sequence numbers cycle in the finite set of size 2^16, the function
396 // must assume that the distance between compared sequence numbers is less than
397 // (2^16)/2 to determine the order.
398 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
399 // calculate distance from seqNum1 to seqNum2
401 if seqNum1 <= seqNum2 {
402 dist = seqNum2 - seqNum1
404 dist = 0xffff - (seqNum1 - seqNum2 - 1)
408 } else if dist <= 0x8000 {
414 // Returns message data based on the message ID not depending on the message path
415 func (c *Connection) getMessageDataByID(msgID uint16) (typ api.MessageType, name, crc string, err error) {
416 for _, msgs := range c.msgMapByPath {
417 if msg, ok := msgs[msgID]; ok {
418 return msg.GetMessageType(), msg.GetMessageName(), msg.GetCrcString(), nil
421 return typ, name, crc, fmt.Errorf("unknown message received, ID: %d", msgID)