1 // Copyright (c) 2018 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
24 "github.com/sirupsen/logrus"
26 "git.fd.io/govpp.git/api"
30 ErrInvalidRequestCtx = errors.New("invalid request context")
33 // MessageCodec provides functionality for decoding binary data to generated API messages.
34 type MessageCodec interface {
35 // EncodeMsg encodes message into binary data.
36 EncodeMsg(msg api.Message, msgID uint16) ([]byte, error)
37 // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
38 DecodeMsg(data []byte, msg api.Message) error
41 // MessageIdentifier provides identification of generated API messages.
42 type MessageIdentifier interface {
43 // GetMessageID returns message identifier of given API message.
44 GetMessageID(msg api.Message) (uint16, error)
45 // LookupByID looks up message name and crc by ID
46 LookupByID(msgID uint16) (api.Message, error)
49 // vppRequest is a request that will be sent to VPP.
50 type vppRequest struct {
51 seqNum uint16 // sequence number
52 msg api.Message // binary API message to be send to VPP
53 multi bool // true if multipart response is expected
56 // vppReply is a reply received from VPP.
57 type vppReply struct {
58 seqNum uint16 // sequence number
59 msgID uint16 // ID of the message
60 data []byte // encoded data with the message
61 lastReceived bool // for multi request, true if the last reply has been already received
62 err error // in case of error, data is nil and this member contains error
65 // requestCtx is a context for request with single reply
66 type requestCtx struct {
71 // multiRequestCtx is a context for request with multiple responses
72 type multiRequestCtx struct {
77 // subscriptionCtx is a context of subscription for delivery of specific notification messages.
78 type subscriptionCtx struct {
80 notifChan chan api.Message // channel where notification messages will be delivered to
81 msgID uint16 // message ID for the subscribed event message
82 event api.Message // event message that this subscription is for
83 msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification
86 // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
87 // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
88 // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
89 // concurrently, otherwise the responses could mix! Use multiple channels instead.
94 reqChan chan *vppRequest // channel for sending the requests to VPP
95 replyChan chan *vppReply // channel where VPP replies are delivered to
97 msgCodec MessageCodec // used to decode binary data to generated API messages
98 msgIdentifier MessageIdentifier // used to retrieve message ID of a message
100 lastSeqNum uint16 // sequence number of the last sent request
102 delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery
103 replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
106 func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
111 msgIdentifier: identifier,
112 reqChan: make(chan *vppRequest, reqSize),
113 replyChan: make(chan *vppReply, replySize),
114 replyTimeout: DefaultReplyTimeout,
118 func (ch *Channel) GetID() uint16 {
122 func (ch *Channel) nextSeqNum() uint16 {
127 func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
128 seqNum := ch.nextSeqNum()
129 ch.reqChan <- &vppRequest{
133 return &requestCtx{ch: ch, seqNum: seqNum}
136 func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
137 seqNum := ch.nextSeqNum()
138 ch.reqChan <- &vppRequest{
143 return &multiRequestCtx{ch: ch, seqNum: seqNum}
146 func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
147 for _, msg := range msgs {
148 // TODO: collect all incompatible messages and return summarized error
149 _, err := ch.msgIdentifier.GetMessageID(msg)
157 func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
158 msgID, err := ch.msgIdentifier.GetMessageID(event)
160 log.WithFields(logrus.Fields{
161 "msg_name": event.GetMessageName(),
162 "msg_crc": event.GetCrcString(),
163 }).Errorf("unable to retrieve message ID: %v", err)
164 return nil, fmt.Errorf("unable to retrieve event message ID: %v", err)
167 sub := &subscriptionCtx{
169 notifChan: notifChan,
172 msgFactory: getMsgFactory(event),
175 // add the subscription into map
176 ch.conn.subscriptionsLock.Lock()
177 defer ch.conn.subscriptionsLock.Unlock()
179 ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub)
184 func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
185 ch.replyTimeout = timeout
188 func (ch *Channel) Close() {
192 func (req *requestCtx) ReceiveReply(msg api.Message) error {
193 if req == nil || req.ch == nil {
194 return ErrInvalidRequestCtx
197 lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
200 } else if lastReplyReceived {
201 return errors.New("multipart reply recieved while a single reply expected")
207 func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
208 if req == nil || req.ch == nil {
209 return false, ErrInvalidRequestCtx
212 return req.ch.receiveReplyInternal(msg, req.seqNum)
215 func (sub *subscriptionCtx) Unsubscribe() error {
216 log.WithFields(logrus.Fields{
217 "msg_name": sub.event.GetMessageName(),
219 }).Debug("Removing notification subscription.")
221 // remove the subscription from the map
222 sub.ch.conn.subscriptionsLock.Lock()
223 defer sub.ch.conn.subscriptionsLock.Unlock()
225 for i, item := range sub.ch.conn.subscriptions[sub.msgID] {
227 // remove i-th item in the slice
228 sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...)
233 return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())
236 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
237 func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
239 return false, errors.New("nil message passed in")
244 if vppReply := ch.delayedReply; vppReply != nil {
245 // try the delayed reply
246 ch.delayedReply = nil
247 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
249 return lastReplyReceived, err
253 timer := time.NewTimer(ch.replyTimeout)
256 // blocks until a reply comes to ReplyChan or until timeout expires
257 case vppReply := <-ch.replyChan:
258 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
260 logrus.WithFields(logrus.Fields{
261 "expSeqNum": expSeqNum,
263 }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName())
266 return lastReplyReceived, err
269 logrus.WithFields(logrus.Fields{
270 "expSeqNum": expSeqNum,
272 }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
273 err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
279 func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
280 // check the sequence number
281 cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum)
282 if cmpSeqNums == -1 {
283 // reply received too late, ignore the message
284 logrus.WithField("seqNum", reply.seqNum).
285 Warn("Received reply to an already closed binary API request")
290 ch.delayedReply = reply
291 err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
295 if reply.err != nil {
299 if reply.lastReceived {
300 lastReplyReceived = true
306 expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
308 err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
309 msg.GetMessageName(), msg.GetCrcString())
313 if reply.msgID != expMsgID {
314 var msgNameCrc string
315 if replyMsg, err := ch.msgIdentifier.LookupByID(reply.msgID); err != nil {
316 msgNameCrc = err.Error()
318 msgNameCrc = getMsgNameWithCrc(replyMsg)
321 err = fmt.Errorf("received invalid message ID (seqNum=%d), expected %d (%s), but got %d (%s) "+
322 "(check if multiple goroutines are not sharing single GoVPP channel)",
323 reply.seqNum, expMsgID, msg.GetMessageName(), reply.msgID, msgNameCrc)
327 // decode the message
328 if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil {
332 // check Retval and convert it into VnetAPIError error
333 if strings.HasSuffix(msg.GetMessageName(), "_reply") {
334 // TODO: use categories for messages to avoid checking message name
335 if f := reflect.Indirect(reflect.ValueOf(msg)).FieldByName("Retval"); f.IsValid() {
336 retval := int32(f.Int())
337 err = api.RetvalToVPPApiError(retval)