Pair requests with replies using sequence numbers
[govpp.git] / api / api.go
index 783f97a..3c2c7ec 100644 (file)
@@ -18,6 +18,8 @@ import (
        "errors"
        "fmt"
        "time"
+
+       "github.com/sirupsen/logrus"
 )
 
 // MessageType represents the type of a VPP message.
@@ -28,11 +30,13 @@ const (
        RequestMessage MessageType = iota
        // ReplyMessage represents a VPP reply message
        ReplyMessage
+       // EventMessage represents a VPP notification event message
+       EventMessage
        // OtherMessage represents other VPP message (e.g. counters)
        OtherMessage
 )
 
-// Message is an interface that is implemented by all VPP Binary API messages generated by the binapi_generator.
+// Message is an interface that is implemented by all VPP Binary API messages generated by the binapigenerator.
 type Message interface {
        // GetMessageName returns the original VPP name of the message, as defined in the VPP API.
        GetMessageName() string
@@ -59,7 +63,7 @@ type ChannelProvider interface {
        // It uses default buffer sizes for the request and reply Go channels.
        NewAPIChannel() (*Channel, error)
 
-       // NewAPIChannel returns a new channel for communication with VPP via govpp core.
+       // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core.
        // It allows to specify custom buffer sizes for the request and reply Go channels.
        NewAPIChannelBuffered() (*Channel, error)
 }
@@ -74,12 +78,17 @@ type MessageDecoder interface {
 type MessageIdentifier interface {
        // GetMessageID returns message identifier of given API message.
        GetMessageID(msg Message) (uint16, error)
+       // LookupByID looks up message name and crc by ID
+       LookupByID(ID uint16) (string, error)
 }
 
 // Channel is the main communication interface with govpp core. It contains two Go channels, one for sending the requests
 // to VPP and one for receiving the replies from it. The user can access the Go channels directly, or use the helper
-// methods  provided inside of this package.
+// methods  provided inside of this package. Do not use the same channel from multiple goroutines concurrently,
+// otherwise the responses could mix! Use multiple channels instead.
 type Channel struct {
+       ID        uint16 // channel ID
+
        ReqChan   chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
        ReplyChan chan *VppReply   // channel where VPP replies are delivered to
 
@@ -89,12 +98,15 @@ type Channel struct {
        MsgDecoder    MessageDecoder    // used to decode binary data to generated API messages
        MsgIdentifier MessageIdentifier // used to retrieve message ID of a message
 
+       lastSeqNum uint16 // sequence number of the last sent request
+
+       delayedReply *VppReply     // reply already taken from ReplyChan, buffered for later delivery
        replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
-       metadata     interface{}   // opaque metadata of the API channel
 }
 
 // VppRequest is a request that will be sent to VPP.
 type VppRequest struct {
+       SeqNum    uint16  // sequence number
        Message   Message // binary API message to be send to VPP
        Multipart bool    // true if multipart response is expected, false otherwise
 }
@@ -102,6 +114,7 @@ type VppRequest struct {
 // VppReply is a reply received from VPP.
 type VppReply struct {
        MessageID         uint16 // ID of the message
+       SeqNum            uint16 // sequence number
        Data              []byte // encoded data with the message - MessageDecoder can be used for decoding
        LastReplyReceived bool   // in case of multipart replies, true if the last reply has been already received and this one should be ignored
        Error             error  // in case of error, data is nil and this member contains error description
@@ -122,30 +135,27 @@ type NotifSubscription struct {
 // RequestCtx is a context of a ongoing request (simple one - only one response is expected).
 type RequestCtx struct {
        ch *Channel
+       seqNum uint16
 }
 
 // MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected).
 type MultiRequestCtx struct {
        ch *Channel
+       seqNum uint16
 }
 
 const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
 
-// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument.
+// NewChannelInternal returns a new channel structure.
 // Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly.
 // Use ChannelProvider to get an API channel ready for communication with VPP.
-func NewChannelInternal(metadata interface{}) *Channel {
+func NewChannelInternal(id uint16) *Channel {
        return &Channel{
+               ID: id,
                replyTimeout: defaultReplyTimeout,
-               metadata:     metadata,
        }
 }
 
-// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call.
-func (ch *Channel) Metadata() interface{} {
-       return ch.metadata
-}
-
 // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
 // from VPP before returning an error.
 func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
@@ -162,10 +172,12 @@ func (ch *Channel) Close() {
 // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
 // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
 func (ch *Channel) SendRequest(msg Message) *RequestCtx {
+       ch.lastSeqNum++
        ch.ReqChan <- &VppRequest{
                Message: msg,
+               SeqNum: ch.lastSeqNum,
        }
-       return &RequestCtx{ch: ch}
+       return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum}
 }
 
 // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
@@ -175,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
                return errors.New("invalid request context")
        }
 
-       lastReplyReceived, err := req.ch.receiveReplyInternal(msg)
+       lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
 
        if lastReplyReceived {
                err = errors.New("multipart reply recieved while a simple reply expected")
@@ -187,61 +199,137 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
 // Returns a multipart request context, that can be used to call ReceiveReply.
 // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
 func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx {
+       ch.lastSeqNum++
        ch.ReqChan <- &VppRequest{
                Message:   msg,
                Multipart: true,
+               SeqNum:    ch.lastSeqNum,
        }
-       return &MultiRequestCtx{ch: ch}
+       return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum}
 }
 
 // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is
-// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data.
+// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
+// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
 // Error will be returned if the response cannot be received or decoded.
-func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) {
+func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) {
        if req == nil || req.ch == nil {
                return false, errors.New("invalid request context")
        }
 
-       return req.ch.receiveReplyInternal(msg)
+       return req.ch.receiveReplyInternal(msg, req.seqNum)
 }
 
 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) {
+func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
+       var ignore bool
        if msg == nil {
                return false, errors.New("nil message passed in")
        }
-       select {
-       // blocks until a reply comes to ReplyChan or until timeout expires
-       case vppReply := <-ch.ReplyChan:
-               if vppReply.Error != nil {
-                       err = vppReply.Error
-                       return
-               }
-               if vppReply.LastReplyReceived {
-                       LastReplyReceived = true
-                       return
+
+       if ch.delayedReply != nil {
+               // try the delayed reply
+               vppReply := ch.delayedReply
+               ch.delayedReply = nil
+               ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+               if !ignore {
+                       return lastReplyReceived, err
                }
-               // message checks
-               expMsgID, err := ch.MsgIdentifier.GetMessageID(msg)
-               if err != nil {
-                       err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
-                               msg.GetMessageName(), msg.GetCrcString())
+       }
+
+       timer := time.NewTimer(ch.replyTimeout)
+       for {
+               select {
+               // blocks until a reply comes to ReplyChan or until timeout expires
+               case vppReply := <-ch.ReplyChan:
+                       ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+                       if ignore {
+                               continue
+                       }
+                       return lastReplyReceived, err
+
+               case <-timer.C:
+                       err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
                        return false, err
                }
-               if vppReply.MessageID != expMsgID {
-                       err = fmt.Errorf("invalid message ID %d, expected %d", vppReply.MessageID, expMsgID)
-                       return false, err
+       }
+       return
+}
+
+func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) {
+       // check the sequence number
+       cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
+       if cmpSeqNums == -1 {
+               // reply received too late, ignore the message
+               logrus.WithField("sequence-number", reply.SeqNum).Warn(
+                       "Received reply to an already closed binary API request")
+               ignore = true
+               return
+       }
+       if cmpSeqNums == 1  {
+               ch.delayedReply = reply
+               err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
+               return
+       }
+
+       if reply.Error != nil {
+               err = reply.Error
+               return
+       }
+       if reply.LastReplyReceived {
+               lastReplyReceived = true
+               return
+       }
+
+       // message checks
+       var expMsgID uint16
+       expMsgID, err = ch.MsgIdentifier.GetMessageID(msg)
+       if err != nil {
+               err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+                       msg.GetMessageName(), msg.GetCrcString())
+               return
+       }
+
+       if reply.MessageID != expMsgID {
+               var msgNameCrc string
+               if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil {
+                       msgNameCrc = err.Error()
+               } else {
+                       msgNameCrc = nameCrc
                }
-               // decode the message
-               err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg)
 
-       case <-time.After(ch.replyTimeout):
-               err = fmt.Errorf("no reply received within the timeout period %ds", ch.replyTimeout/time.Second)
+               err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " +
+               "(check if multiple goroutines are not sharing single GoVPP channel)",
+                       reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
+               return
        }
+
+       // decode the message
+       err = ch.MsgDecoder.DecodeMsg(reply.Data, msg)
        return
 }
 
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+       // calculate distance from seqNum1 to seqNum2
+       var dist uint16
+       if seqNum1 <= seqNum2 {
+               dist = seqNum2 - seqNum1
+       } else {
+               dist = 0xffff - (seqNum1 - seqNum2 - 1)
+       }
+       if dist == 0 {
+               return 0
+       } else if dist <= 0x8000 {
+               return -1
+       }
+       return 1
+}
+
 // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
 // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
 // buffer is full, the notifications will not be delivered into it.