1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
24 logger "github.com/sirupsen/logrus"
26 "git.fd.io/govpp.git/api"
29 var ReplyChannelTimeout = time.Millisecond * 100
32 ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
33 ErrProbeTimeout = errors.New("probe reply not received within timeout period")
36 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
37 func (c *Connection) watchRequests(ch *Channel) {
40 case req, ok := <-ch.reqChan:
41 // new request on the request channel
43 // after closing the request channel, release API channel and return
44 c.releaseAPIChannel(ch)
47 if err := c.processRequest(ch, req); err != nil {
48 sendReply(ch, &vppReply{
50 err: fmt.Errorf("unable to process request: %w", err),
57 // processRequest processes a single request received on the request channel.
58 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
59 // check whether we are connected to VPP
60 if atomic.LoadUint32(&c.vppConnected) == 0 {
61 err := ErrNotConnected
62 log.WithFields(logger.Fields{
64 "seq_num": req.seqNum,
65 "msg_name": req.msg.GetMessageName(),
66 "msg_crc": req.msg.GetCrcString(),
68 }).Warnf("Unable to process request")
72 // retrieve message ID
73 msgID, err := c.GetMessageID(req.msg)
75 log.WithFields(logger.Fields{
77 "msg_name": req.msg.GetMessageName(),
78 "msg_crc": req.msg.GetCrcString(),
79 "seq_num": req.seqNum,
81 }).Warnf("Unable to retrieve message ID")
85 // encode the message into binary
86 data, err := c.codec.EncodeMsg(req.msg, msgID)
88 log.WithFields(logger.Fields{
91 "msg_name": req.msg.GetMessageName(),
92 "msg_crc": req.msg.GetCrcString(),
93 "seq_num": req.seqNum,
95 }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg)
99 context := packRequestContext(ch.id, req.multi, req.seqNum)
101 if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
102 log.WithFields(logger.Fields{
105 "msg_name": req.msg.GetMessageName(),
106 "msg_crc": req.msg.GetCrcString(),
107 "seq_num": req.seqNum,
108 "is_multi": req.multi,
110 "data_len": len(data),
111 }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg)
114 // send the request to VPP
116 err = c.vppClient.SendMsg(context, data)
118 log.WithFields(logger.Fields{
121 "msg_name": req.msg.GetMessageName(),
122 "msg_crc": req.msg.GetCrcString(),
123 "seq_num": req.seqNum,
124 "is_multi": req.multi,
126 "data_len": len(data),
128 }).Warnf("Unable to send message")
131 c.trace(req.msg, ch.id, t, false)
134 // send a control ping to determine end of the multipart response
135 pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
137 if log.Level >= logger.DebugLevel {
138 log.WithFields(logger.Fields{
140 "msg_id": c.pingReqID,
141 "msg_name": c.msgControlPing.GetMessageName(),
142 "msg_crc": c.msgControlPing.GetCrcString(),
143 "seq_num": req.seqNum,
145 "data_len": len(pingData),
146 }).Debugf(" -> SEND MSG: %T", c.msgControlPing)
150 if err := c.vppClient.SendMsg(context, pingData); err != nil {
151 log.WithFields(logger.Fields{
153 "seq_num": req.seqNum,
155 }).Warnf("unable to send control ping")
157 c.trace(c.msgControlPing, ch.id, t, false)
163 // msgCallback is called whenever any binary API message comes from VPP.
164 func (c *Connection) msgCallback(msgID uint16, data []byte) {
168 ).Warn("Connection already disconnected, ignoring the message.")
172 msg, err := c.getMessageByID(msgID)
178 // decode message context to fix for special cases of messages,
180 // - replies that don't have context as first field (comes as zero)
181 // - events that don't have context at all (comes as non zero)
183 context, err := c.codec.DecodeMsgContext(data, msg.GetMessageType())
185 log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err)
189 chanID, isMulti, seqNum := unpackRequestContext(context)
191 // decode and trace the message
192 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
193 if err = c.codec.DecodeMsg(data, msg); err != nil {
194 log.WithField("msg", msg).Warnf("Unable to decode message: %v", err)
197 c.trace(msg, chanID, time.Now(), true)
199 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
200 log.WithFields(logger.Fields{
203 "msg_size": len(data),
207 "msg_crc": msg.GetCrcString(),
208 }).Debugf("<-- govpp RECEIVE: %s", msg.GetMessageName())
211 if context == 0 || c.isNotificationMessage(msgID) {
212 // process the message as a notification
213 c.sendNotifications(msgID, data)
217 // match ch according to the context
218 c.channelsLock.RLock()
219 ch, ok := c.channels[chanID]
220 c.channelsLock.RUnlock()
222 log.WithFields(logger.Fields{
225 }).Error("Channel ID not known, ignoring the message.")
229 // if this is a control ping reply to a multipart request,
230 // treat this as a last part of the reply
231 lastReplyReceived := isMulti && msgID == c.pingReplyID
233 // send the data to the channel, it needs to be copied,
234 // because it will be freed after this function returns
235 sendReply(ch, &vppReply{
238 data: append([]byte(nil), data...),
239 lastReceived: lastReplyReceived,
242 // store actual time of this reply
243 c.lastReplyLock.Lock()
244 c.lastReply = time.Now()
245 c.lastReplyLock.Unlock()
248 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
249 // it logs the error and do not send the message.
250 func sendReply(ch *Channel, reply *vppReply) {
251 // first try to avoid creating timer
253 case ch.replyChan <- reply:
254 return // reply sent ok
256 // reply channel full
258 if ch.receiveReplyTimeout == 0 {
259 log.WithFields(logger.Fields{
261 "msg_id": reply.msgID,
262 "seq_num": reply.seqNum,
264 }).Warn("Reply channel full, dropping reply.")
268 case ch.replyChan <- reply:
269 return // reply sent ok
270 case <-time.After(ch.receiveReplyTimeout):
271 // receiver still not ready
272 log.WithFields(logger.Fields{
274 "msg_id": reply.msgID,
275 "seq_num": reply.seqNum,
277 }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout)
281 // isNotificationMessage returns true if someone has subscribed to provided message ID.
282 func (c *Connection) isNotificationMessage(msgID uint16) bool {
283 c.subscriptionsLock.RLock()
284 defer c.subscriptionsLock.RUnlock()
286 _, exists := c.subscriptions[msgID]
290 // sendNotifications send a notification message to all subscribers subscribed for that message.
291 func (c *Connection) sendNotifications(msgID uint16, data []byte) {
292 c.subscriptionsLock.RLock()
293 defer c.subscriptionsLock.RUnlock()
297 // send to notification to each subscriber
298 for _, sub := range c.subscriptions[msgID] {
299 log.WithFields(logger.Fields{
300 "msg_name": sub.event.GetMessageName(),
302 "msg_size": len(data),
303 }).Debug("Sending a notification to the subscription channel.")
305 event := sub.msgFactory()
306 if err := c.codec.DecodeMsg(data, event); err != nil {
307 log.WithFields(logger.Fields{
308 "msg_name": sub.event.GetMessageName(),
310 "msg_size": len(data),
312 }).Warnf("Unable to decode the notification message")
316 // send the message into the go channel of the subscription
318 case sub.notifChan <- event:
319 // message sent successfully
321 // unable to write into the channel without blocking
322 log.WithFields(logger.Fields{
323 "msg_name": sub.event.GetMessageName(),
325 "msg_size": len(data),
326 }).Warn("Unable to deliver the notification, reciever end not ready.")
333 log.WithFields(logger.Fields{
335 "msg_size": len(data),
336 }).Info("No subscription found for the notification message.")
340 // +------------------+-------------------+-----------------------+
341 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
342 // +------------------+-------------------+-----------------------+
343 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
344 context := uint32(chanID) << 17
348 context |= uint32(seqNum)
352 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
353 chanID = uint16(context >> 17)
354 if ((context >> 16) & 0x1) != 0 {
357 seqNum = uint16(context & 0xffff)
361 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
362 // or succeeds seq. number <seqNum2>.
363 // Since sequence numbers cycle in the finite set of size 2^16, the function
364 // must assume that the distance between compared sequence numbers is less than
365 // (2^16)/2 to determine the order.
366 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
367 // calculate distance from seqNum1 to seqNum2
369 if seqNum1 <= seqNum2 {
370 dist = seqNum2 - seqNum1
372 dist = 0xffff - (seqNum1 - seqNum2 - 1)
376 } else if dist <= 0x8000 {
382 // Returns message based on the message ID not depending on message path
383 func (c *Connection) getMessageByID(msgID uint16) (msg api.Message, err error) {
385 for _, messages := range c.msgMapByPath {
386 if msg, ok = messages[msgID]; ok {
390 return nil, fmt.Errorf("unknown message received, ID: %d", msgID)