package api
import (
- "errors"
"fmt"
"time"
-
- "github.com/sirupsen/logrus"
)
// MessageType represents the type of a VPP message.
+// Note: this is currently derived from the message header (fields),
+// and in many cases it does not represent the actual type of VPP message.
+// This means that some replies can be identified as requests, etc.
+// TODO: use services to identify type of message
type MessageType int
const (
RequestMessage MessageType = iota
// ReplyMessage represents a VPP reply message
ReplyMessage
- // EventMessage represents a VPP notification event message
+ // EventMessage represents a VPP event message
EventMessage
- // OtherMessage represents other VPP message (e.g. counters)
+ // OtherMessage represents other VPP message
OtherMessage
)
// GetMessageName returns the original VPP name of the message, as defined in the VPP API.
GetMessageName() string
- // GetMessageType returns the type of the VPP message.
- GetMessageType() MessageType
-
// GetCrcString returns the string with CRC checksum of the message definition (the string represents a hexadecimal number).
GetCrcString() string
+
+ // GetMessageType returns the type of the VPP message.
+ GetMessageType() MessageType
}
// DataType is an interface that is implemented by all VPP Binary API data types by the binapi_generator.
type ChannelProvider interface {
// NewAPIChannel returns a new channel for communication with VPP via govpp core.
// It uses default buffer sizes for the request and reply Go channels.
- NewAPIChannel() (*Channel, error)
+ NewAPIChannel() (Channel, error)
// NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core.
// It allows to specify custom buffer sizes for the request and reply Go channels.
- NewAPIChannelBuffered() (*Channel, error)
-}
-
-// MessageDecoder provides functionality for decoding binary data to generated API messages.
-type MessageDecoder interface {
- // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
- DecodeMsg(data []byte, msg Message) error
-}
-
-// MessageIdentifier provides identification of generated API messages.
-type MessageIdentifier interface {
- // GetMessageID returns message identifier of given API message.
- GetMessageID(msg Message) (uint16, error)
- // LookupByID looks up message name and crc by ID
- LookupByID(ID uint16) (string, error)
-}
-
-// Channel is the main communication interface with govpp core. It contains two Go channels, one for sending the requests
-// to VPP and one for receiving the replies from it. The user can access the Go channels directly, or use the helper
-// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently,
-// otherwise the responses could mix! Use multiple channels instead.
-type Channel struct {
- ID uint16 // channel ID
-
- ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
- ReplyChan chan *VppReply // channel where VPP replies are delivered to
-
- NotifSubsChan chan *NotifSubscribeRequest // channel for sending notification subscribe requests
- NotifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to
-
- MsgDecoder MessageDecoder // used to decode binary data to generated API messages
- MsgIdentifier MessageIdentifier // used to retrieve message ID of a message
-
- lastSeqNum uint16 // sequence number of the last sent request
-
- delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery
- replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
-}
-
-// VppRequest is a request that will be sent to VPP.
-type VppRequest struct {
- SeqNum uint16 // sequence number
- Message Message // binary API message to be send to VPP
- Multipart bool // true if multipart response is expected, false otherwise
-}
-
-// VppReply is a reply received from VPP.
-type VppReply struct {
- MessageID uint16 // ID of the message
- SeqNum uint16 // sequence number
- Data []byte // encoded data with the message - MessageDecoder can be used for decoding
- LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored
- Error error // in case of error, data is nil and this member contains error description
-}
-
-// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages.
-type NotifSubscribeRequest struct {
- Subscription *NotifSubscription // subscription details
- Subscribe bool // true if this is a request to subscribe, false if unsubscribe
-}
-
-// NotifSubscription represents a subscription for delivery of specific notification messages.
-type NotifSubscription struct {
- NotifChan chan Message // channel where notification messages will be delivered to
- MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification
-}
-
-// RequestCtx is a context of a ongoing request (simple one - only one response is expected).
-type RequestCtx struct {
- ch *Channel
- seqNum uint16
-}
-
-// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected).
-type MultiRequestCtx struct {
- ch *Channel
- seqNum uint16
-}
-
-const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
-
-// NewChannelInternal returns a new channel structure.
-// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly.
-// Use ChannelProvider to get an API channel ready for communication with VPP.
-func NewChannelInternal(id uint16) *Channel {
- return &Channel{
- ID: id,
- replyTimeout: defaultReplyTimeout,
- }
-}
-
-// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
-// from VPP before returning an error.
-func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
- ch.replyTimeout = timeout
+ NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (Channel, error)
}
-// Close closes the API channel and releases all API channel-related resources in the ChannelProvider.
-func (ch *Channel) Close() {
- if ch.ReqChan != nil {
- close(ch.ReqChan)
- }
-}
-
-// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
-// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
-func (ch *Channel) SendRequest(msg Message) *RequestCtx {
- ch.lastSeqNum++
- ch.ReqChan <- &VppRequest{
- Message: msg,
- SeqNum: ch.lastSeqNum,
- }
- return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum}
-}
+// Channel provides methods for direct communication with VPP channel.
+type Channel interface {
+ // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
+ // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
+ SendRequest(msg Message) RequestCtx
-// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded.
-func (req *RequestCtx) ReceiveReply(msg Message) error {
- if req == nil || req.ch == nil {
- return errors.New("invalid request context")
- }
-
- lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
-
- if lastReplyReceived {
- err = errors.New("multipart reply recieved while a simple reply expected")
- }
- return err
-}
+ // SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP.
+ // Returns a multipart request context, that can be used to call ReceiveReply.
+ // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
+ SendMultiRequest(msg Message) MultiRequestCtx
-// SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP.
-// Returns a multipart request context, that can be used to call ReceiveReply.
-// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
-func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx {
- ch.lastSeqNum++
- ch.ReqChan <- &VppRequest{
- Message: msg,
- Multipart: true,
- SeqNum: ch.lastSeqNum,
- }
- return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum}
-}
+ // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
+ // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
+ // buffer is full, the notifications will not be delivered into it.
+ SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error)
-// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
-// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
-// Error will be returned if the response cannot be received or decoded.
-func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) {
- if req == nil || req.ch == nil {
- return false, errors.New("invalid request context")
- }
+ // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
+ // from VPP before returning an error.
+ SetReplyTimeout(timeout time.Duration)
- return req.ch.receiveReplyInternal(msg, req.seqNum)
+ // Close closes the API channel and releases all API channel-related resources in the ChannelProvider.
+ Close()
}
-// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
- var ignore bool
- if msg == nil {
- return false, errors.New("nil message passed in")
- }
-
- if ch.delayedReply != nil {
- // try the delayed reply
- vppReply := ch.delayedReply
- ch.delayedReply = nil
- ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
- if !ignore {
- return lastReplyReceived, err
- }
- }
-
- timer := time.NewTimer(ch.replyTimeout)
- for {
- select {
- // blocks until a reply comes to ReplyChan or until timeout expires
- case vppReply := <-ch.ReplyChan:
- ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
- if ignore {
- continue
- }
- return lastReplyReceived, err
-
- case <-timer.C:
- err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
- return false, err
- }
- }
- return
+// RequestCtx is helper interface which allows to receive reply on request.
+type RequestCtx interface {
+ // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
+ // The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded.
+ ReceiveReply(msg Message) error
}
-func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) {
- // check the sequence number
- cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
- if cmpSeqNums == -1 {
- // reply received too late, ignore the message
- logrus.WithField("sequence-number", reply.SeqNum).Warn(
- "Received reply to an already closed binary API request")
- ignore = true
- return
- }
- if cmpSeqNums == 1 {
- ch.delayedReply = reply
- err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
- return
- }
-
- if reply.Error != nil {
- err = reply.Error
- return
- }
- if reply.LastReplyReceived {
- lastReplyReceived = true
- return
- }
-
- // message checks
- var expMsgID uint16
- expMsgID, err = ch.MsgIdentifier.GetMessageID(msg)
- if err != nil {
- err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
- msg.GetMessageName(), msg.GetCrcString())
- return
- }
-
- if reply.MessageID != expMsgID {
- var msgNameCrc string
- if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil {
- msgNameCrc = err.Error()
- } else {
- msgNameCrc = nameCrc
- }
-
- err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+
- "(check if multiple goroutines are not sharing single GoVPP channel)",
- reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
- return
- }
-
- // decode the message
- err = ch.MsgDecoder.DecodeMsg(reply.Data, msg)
- return
+// MultiRequestCtx is helper interface which allows to receive reply on multi-request.
+type MultiRequestCtx interface {
+ // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
+ // The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
+ // set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
+ // Error will be returned if the response cannot be received or decoded.
+ ReceiveReply(msg Message) (lastReplyReceived bool, err error)
}
-// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
-// or succeeds seq. number <seqNum2>.
-// Since sequence numbers cycle in the finite set of size 2^16, the function
-// must assume that the distance between compared sequence numbers is less than
-// (2^16)/2 to determine the order.
-func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
- // calculate distance from seqNum1 to seqNum2
- var dist uint16
- if seqNum1 <= seqNum2 {
- dist = seqNum2 - seqNum1
- } else {
- dist = 0xffff - (seqNum1 - seqNum2 - 1)
- }
- if dist == 0 {
- return 0
- } else if dist <= 0x8000 {
- return -1
- }
- return 1
+// SubscriptionCtx is helper interface which allows to control subscription for notification events.
+type SubscriptionCtx interface {
+ // Unsubscribe unsubscribes from receiving the notifications tied to the subscription context.
+ Unsubscribe() error
}
-// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
-// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
-// buffer is full, the notifications will not be delivered into it.
-func (ch *Channel) SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) {
- subscription := &NotifSubscription{
- NotifChan: notifChan,
- MsgFactory: msgFactory,
- }
- ch.NotifSubsChan <- &NotifSubscribeRequest{
- Subscription: subscription,
- Subscribe: true,
- }
- return subscription, <-ch.NotifSubsReplyChan
-}
+// map of registered messages
+var registeredMessages = make(map[string]Message)
-// UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription.
-func (ch *Channel) UnsubscribeNotification(subscription *NotifSubscription) error {
- ch.NotifSubsChan <- &NotifSubscribeRequest{
- Subscription: subscription,
- Subscribe: false,
+// RegisterMessage is called from generated code to register message.
+func RegisterMessage(x Message, name string) {
+ if _, ok := registeredMessages[name]; ok {
+ panic(fmt.Errorf("govpp: duplicate message registered: %s (%s)", name, x.GetCrcString()))
}
- return <-ch.NotifSubsReplyChan
+ registeredMessages[name] = x
}
-// CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP
-// which the library is connected to.
-func (ch *Channel) CheckMessageCompatibility(messages ...Message) error {
- for _, msg := range messages {
- _, err := ch.MsgIdentifier.GetMessageID(msg)
- if err != nil {
- return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
- msg.GetMessageName(), msg.GetCrcString())
- }
- }
- return nil
+// GetRegisteredMessages returns list of all registered messages.
+func GetRegisteredMessages() map[string]Message {
+ return registeredMessages
}