1 // Copyright (c) 2018 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
23 "git.fd.io/govpp.git/api"
24 "github.com/sirupsen/logrus"
27 const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
29 // requestCtxData is a context of a ongoing request (simple one - only one response is expected).
30 type requestCtxData struct {
35 // multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected).
36 type multiRequestCtxData struct {
41 func (req *requestCtxData) ReceiveReply(msg api.Message) error {
42 if req == nil || req.ch == nil {
43 return errors.New("invalid request context")
46 lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
48 if lastReplyReceived {
49 err = errors.New("multipart reply recieved while a simple reply expected")
54 func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
55 if req == nil || req.ch == nil {
56 return false, errors.New("invalid request context")
59 return req.ch.receiveReplyInternal(msg, req.seqNum)
62 // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
63 // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
64 // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
65 // concurrently, otherwise the responses could mix! Use multiple channels instead.
67 id uint16 // channel ID
69 reqChan chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
70 replyChan chan *api.VppReply // channel where VPP replies are delivered to
72 notifSubsChan chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests
73 notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to
75 msgDecoder api.MessageDecoder // used to decode binary data to generated API messages
76 msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message
78 lastSeqNum uint16 // sequence number of the last sent request
80 delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery
81 replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
84 func (ch *channel) SendRequest(msg api.Message) api.RequestCtx {
86 ch.reqChan <- &api.VppRequest{
88 SeqNum: ch.lastSeqNum,
90 return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum}
93 func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
95 ch.reqChan <- &api.VppRequest{
98 SeqNum: ch.lastSeqNum,
100 return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum}
103 func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) {
104 subscription := &api.NotifSubscription{
105 NotifChan: notifChan,
106 MsgFactory: msgFactory,
108 ch.notifSubsChan <- &api.NotifSubscribeRequest{
109 Subscription: subscription,
112 return subscription, <-ch.notifSubsReplyChan
115 func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error {
116 ch.notifSubsChan <- &api.NotifSubscribeRequest{
117 Subscription: subscription,
120 return <-ch.notifSubsReplyChan
123 func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error {
124 for _, msg := range messages {
125 _, err := ch.msgIdentifier.GetMessageID(msg)
127 return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
128 msg.GetMessageName(), msg.GetCrcString())
134 func (ch *channel) SetReplyTimeout(timeout time.Duration) {
135 ch.replyTimeout = timeout
138 func (ch *channel) GetRequestChannel() chan<- *api.VppRequest {
142 func (ch *channel) GetReplyChannel() <-chan *api.VppReply {
146 func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest {
147 return ch.notifSubsChan
150 func (ch *channel) GetNotificationReplyChannel() <-chan error {
151 return ch.notifSubsReplyChan
154 func (ch *channel) GetMessageDecoder() api.MessageDecoder {
158 func (ch *channel) GetID() uint16 {
162 func (ch *channel) Close() {
163 if ch.reqChan != nil {
168 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
169 func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
172 return false, errors.New("nil message passed in")
175 if ch.delayedReply != nil {
176 // try the delayed reply
177 vppReply := ch.delayedReply
178 ch.delayedReply = nil
179 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
181 return lastReplyReceived, err
185 timer := time.NewTimer(ch.replyTimeout)
188 // blocks until a reply comes to ReplyChan or until timeout expires
189 case vppReply := <-ch.replyChan:
190 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
194 return lastReplyReceived, err
197 err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
204 func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
205 // check the sequence number
206 cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
207 if cmpSeqNums == -1 {
208 // reply received too late, ignore the message
209 logrus.WithField("sequence-number", reply.SeqNum).Warn(
210 "Received reply to an already closed binary API request")
215 ch.delayedReply = reply
216 err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
220 if reply.Error != nil {
224 if reply.LastReplyReceived {
225 lastReplyReceived = true
231 expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
233 err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
234 msg.GetMessageName(), msg.GetCrcString())
238 if reply.MessageID != expMsgID {
239 var msgNameCrc string
240 if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil {
241 msgNameCrc = err.Error()
246 err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+
247 "(check if multiple goroutines are not sharing single GoVPP channel)",
248 reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
252 // decode the message
253 err = ch.msgDecoder.DecodeMsg(reply.Data, msg)
257 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
258 // or succeeds seq. number <seqNum2>.
259 // Since sequence numbers cycle in the finite set of size 2^16, the function
260 // must assume that the distance between compared sequence numbers is less than
261 // (2^16)/2 to determine the order.
262 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
263 // calculate distance from seqNum1 to seqNum2
265 if seqNum1 <= seqNum2 {
266 dist = seqNum2 - seqNum1
268 dist = 0xffff - (seqNum1 - seqNum2 - 1)
272 } else if dist <= 0x8000 {