1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
23 logger "github.com/sirupsen/logrus"
25 "git.fd.io/govpp.git/api"
29 ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
30 ErrProbeTimeout = errors.New("probe reply not received within timeout period")
33 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
34 func (c *Connection) watchRequests(ch *channel) {
37 case req, ok := <-ch.reqChan:
38 // new request on the request channel
40 // after closing the request channel, release API channel and return
41 c.releaseAPIChannel(ch)
44 c.processRequest(ch, req)
46 case req := <-ch.notifSubsChan:
47 // new request on the notification subscribe channel
48 c.processNotifSubscribeRequest(ch, req)
53 // processRequest processes a single request received on the request channel.
54 func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error {
55 // check whether we are connected to VPP
56 if atomic.LoadUint32(&c.connected) == 0 {
57 err := ErrNotConnected
59 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
63 // retrieve message ID
64 msgID, err := c.GetMessageID(req.Message)
66 err = fmt.Errorf("unable to retrieve message ID: %v", err)
67 log.WithFields(logger.Fields{
68 "msg_name": req.Message.GetMessageName(),
69 "msg_crc": req.Message.GetCrcString(),
70 "seq_num": req.SeqNum,
72 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
76 // encode the message into binary
77 data, err := c.codec.EncodeMsg(req.Message, msgID)
79 err = fmt.Errorf("unable to encode the messge: %v", err)
80 log.WithFields(logger.Fields{
83 "seq_num": req.SeqNum,
85 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
89 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
90 log.WithFields(logger.Fields{
93 "msg_size": len(data),
94 "msg_name": req.Message.GetMessageName(),
95 "seq_num": req.SeqNum,
96 }).Debug("Sending a message to VPP.")
99 // send the request to VPP
100 context := packRequestContext(ch.id, req.Multipart, req.SeqNum)
101 err = c.vpp.SendMsg(context, data)
103 err = fmt.Errorf("unable to send the message: %v", err)
104 log.WithFields(logger.Fields{
107 "seq_num": req.SeqNum,
109 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
114 // send a control ping to determine end of the multipart response
115 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
117 log.WithFields(logger.Fields{
119 "msg_id": c.pingReqID,
120 "msg_size": len(pingData),
121 "seq_num": req.SeqNum,
122 }).Debug("Sending a control ping to VPP.")
124 c.vpp.SendMsg(context, pingData)
130 // msgCallback is called whenever any binary API message comes from VPP.
131 func msgCallback(context uint32, msgID uint16, data []byte) {
133 defer connLock.RUnlock()
136 log.Warn("Already disconnected, ignoring the message.")
140 chanID, isMultipart, seqNum := unpackRequestContext(context)
141 if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
142 log.WithFields(logger.Fields{
144 "msg_size": len(data),
145 "channel_id": chanID,
146 "is_multipart": isMultipart,
148 }).Debug("Received a message from VPP.")
151 if context == 0 || conn.isNotificationMessage(msgID) {
152 // process the message as a notification
153 conn.sendNotifications(msgID, data)
157 // match ch according to the context
158 conn.channelsLock.RLock()
159 ch, ok := conn.channels[chanID]
160 conn.channelsLock.RUnlock()
163 log.WithFields(logger.Fields{
164 "channel_id": chanID,
166 }).Error("Channel ID not known, ignoring the message.")
170 lastReplyReceived := false
171 // if this is a control ping reply to a multipart request, treat this as a last part of the reply
172 if msgID == conn.pingReplyID && isMultipart {
173 lastReplyReceived = true
176 // send the data to the channel
177 sendReply(ch, &api.VppReply{
181 LastReplyReceived: lastReplyReceived,
184 // store actual time of this reply
185 conn.lastReplyLock.Lock()
186 conn.lastReply = time.Now()
187 conn.lastReplyLock.Unlock()
190 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
191 // it logs the error and do not send the message.
192 func sendReply(ch *channel, reply *api.VppReply) {
194 case ch.replyChan <- reply:
195 // reply sent successfully
196 case <-time.After(time.Millisecond * 100):
197 // receiver still not ready
198 log.WithFields(logger.Fields{
200 "msg_id": reply.MessageID,
201 "seq_num": reply.SeqNum,
202 }).Warn("Unable to send the reply, reciever end not ready.")
206 // GetMessageID returns message identifier of given API message.
207 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
209 return 0, errors.New("nil connection passed in")
211 return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
214 // messageNameToID returns message ID of a message identified by its name and CRC.
215 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
216 msgKey := msgName + "_" + msgCrc
218 // try to get the ID from the map
220 id, ok := c.msgIDs[msgKey]
221 c.msgIDsLock.RUnlock()
226 // get the ID using VPP API
227 id, err := c.vpp.GetMsgID(msgName, msgCrc)
229 err = fmt.Errorf("unable to retrieve message ID: %v", err)
230 log.WithFields(logger.Fields{
238 c.msgIDs[msgKey] = id
239 c.msgIDsLock.Unlock()
244 // LookupByID looks up message name and crc by ID.
245 func (c *Connection) LookupByID(ID uint16) (string, error) {
247 return "", errors.New("nil connection passed in")
251 defer c.msgIDsLock.Unlock()
253 for key, id := range c.msgIDs {
259 return "", fmt.Errorf("unknown message ID: %d", ID)
262 // +------------------+-------------------+-----------------------+
263 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
264 // +------------------+-------------------+-----------------------+
265 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
266 context := uint32(chanID) << 17
270 context |= uint32(seqNum)
274 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
275 chanID = uint16(context >> 17)
276 if ((context >> 16) & 0x1) != 0 {
279 seqNum = uint16(context & 0xffff)