make api.Channel as interface 29/13329/8
authorVladimir Lavor <vlavor@cisco.com>
Tue, 3 Jul 2018 08:39:21 +0000 (10:39 +0200)
committerVladimir Lavor <vlavor@cisco.com>
Fri, 6 Jul 2018 11:18:01 +0000 (13:18 +0200)
Change-Id: I052d241ab09043b1195beebeee99df4d8536621f
Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
16 files changed:
Makefile
adapter/mock/mock_adapter.go
api/api.go
api/doc.go
codec/doc.go [new file with mode: 0644]
codec/msg_codec.go [moved from core/msg_codec.go with 82% similarity]
core/channel.go [new file with mode: 0644]
core/channel_test.go [moved from api/api_test.go with 97% similarity]
core/connection.go [moved from core/core.go with 90% similarity]
core/connection_test.go [moved from core/core_test.go with 90% similarity]
core/doc.go
core/notification_handler.go
core/request_handler.go
examples/cmd/perf-bench/perf-bench.go
examples/cmd/simple-client/simple_client.go
examples/cmd/stats-client/stats_client.go

index 4eed58d..cfc99f7 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -6,7 +6,6 @@ build:
 
 test:
        @cd cmd/binapi-generator && go test -cover .
-       @cd api && go test -cover ./...
        @cd core && go test -cover .
 
 install:
index 959fd86..a5cb62d 100644 (file)
@@ -25,8 +25,8 @@ import (
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/adapter/mock/binapi"
        "git.fd.io/govpp.git/api"
-       "git.fd.io/govpp.git/core"
 
+       "git.fd.io/govpp.git/codec"
        "github.com/lunixbochs/struc"
 )
 
@@ -48,10 +48,10 @@ type VppAdapter struct {
        binAPITypes  map[string]reflect.Type
        access       sync.RWMutex
 
-       replies       []reply            // FIFO queue of messages
-       replyHandlers []ReplyHandler     // callbacks that are able to calculate mock responses
-       repliesLock   sync.Mutex         // mutex for the queue
-       mode          replyMode          // mode in which the mock operates
+       replies       []reply        // FIFO queue of messages
+       replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses
+       repliesLock   sync.Mutex     // mutex for the queue
+       mode          replyMode      // mode in which the mock operates
 }
 
 // defaultReply is a default reply message that mock adapter returns for a request.
@@ -79,7 +79,7 @@ type MsgWithContext struct {
        Multipart bool
 
        /* set by mock adapter */
-       hasCtx    bool
+       hasCtx bool
 }
 
 // ReplyHandler is a type that allows to extend the behaviour of VPP mock.
@@ -178,7 +178,7 @@ func (a *VppAdapter) ReplyBytes(request MessageDTO, reply api.Message) ([]byte,
        log.Println("ReplyBytes ", replyMsgID, " ", reply.GetMessageName(), " clientId: ", request.ClientID)
 
        buf := new(bytes.Buffer)
-       struc.Pack(buf, &core.VppReplyHeader{VlMsgID: replyMsgID, Context: request.ClientID})
+       struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: replyMsgID, Context: request.ClientID})
        struc.Pack(buf, reply)
 
        return buf.Bytes(), nil
@@ -238,7 +238,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
                        replyHandler := a.replyHandlers[i]
 
                        buf := bytes.NewReader(data)
-                       reqHeader := core.VppRequestHeader{}
+                       reqHeader := codec.VppRequestHeader{}
                        struc.Unpack(buf, &reqHeader)
 
                        a.access.Lock()
@@ -273,13 +273,13 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
                                        context = setSeqNum(context, msg.SeqNum)
                                }
                                if msg.Msg.GetMessageType() == api.ReplyMessage {
-                                       struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: context})
+                                       struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: context})
                                } else if msg.Msg.GetMessageType() == api.EventMessage {
-                                       struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: context})
+                                       struc.Pack(buf, &codec.VppEventHeader{VlMsgID: msgID, Context: context})
                                } else if msg.Msg.GetMessageType() == api.RequestMessage {
-                                       struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: context})
+                                       struc.Pack(buf, &codec.VppRequestHeader{VlMsgID: msgID, Context: context})
                                } else {
-                                       struc.Pack(buf, &core.VppOtherHeader{VlMsgID: msgID})
+                                       struc.Pack(buf, &codec.VppOtherHeader{VlMsgID: msgID})
                                }
                                struc.Pack(buf, msg.Msg)
                                a.callback(context, msgID, buf.Bytes())
@@ -299,7 +299,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
                // return default reply
                buf := new(bytes.Buffer)
                msgID := uint16(defaultReplyMsgID)
-               struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: clientID})
+               struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: clientID})
                struc.Pack(buf, &defaultReply{})
                a.callback(clientID, msgID, buf.Bytes())
        }
@@ -392,4 +392,3 @@ func setMultipart(context uint32, isMultipart bool) (newContext uint32) {
        }
        return context
 }
-
index 34e17c1..9c68ab9 100644 (file)
 package api
 
 import (
-       "errors"
-       "fmt"
        "time"
-
-       "github.com/sirupsen/logrus"
 )
 
 // MessageType represents the type of a VPP message.
@@ -61,11 +57,11 @@ type DataType interface {
 type ChannelProvider interface {
        // NewAPIChannel returns a new channel for communication with VPP via govpp core.
        // It uses default buffer sizes for the request and reply Go channels.
-       NewAPIChannel() (*Channel, error)
+       NewAPIChannel() (Channel, error)
 
        // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core.
        // It allows to specify custom buffer sizes for the request and reply Go channels.
-       NewAPIChannelBuffered() (*Channel, error)
+       NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (Channel, error)
 }
 
 // MessageDecoder provides functionality for decoding binary data to generated API messages.
@@ -82,26 +78,57 @@ type MessageIdentifier interface {
        LookupByID(ID uint16) (string, error)
 }
 
-// Channel is the main communication interface with govpp core. It contains two Go channels, one for sending the requests
-// to VPP and one for receiving the replies from it. The user can access the Go channels directly, or use the helper
-// methods  provided inside of this package. Do not use the same channel from multiple goroutines concurrently,
-// otherwise the responses could mix! Use multiple channels instead.
-type Channel struct {
-       ID uint16 // channel ID
-
-       ReqChan   chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
-       ReplyChan chan *VppReply   // channel where VPP replies are delivered to
-
-       NotifSubsChan      chan *NotifSubscribeRequest // channel for sending notification subscribe requests
-       NotifSubsReplyChan chan error                  // channel where replies to notification subscribe requests are delivered to
-
-       MsgDecoder    MessageDecoder    // used to decode binary data to generated API messages
-       MsgIdentifier MessageIdentifier // used to retrieve message ID of a message
-
-       lastSeqNum uint16 // sequence number of the last sent request
-
-       delayedReply *VppReply     // reply already taken from ReplyChan, buffered for later delivery
-       replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
+// Channel provides methods for direct communication with VPP channel.
+type Channel interface {
+       // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
+       // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
+       SendRequest(msg Message) RequestCtx
+       // SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP.
+       // Returns a multipart request context, that can be used to call ReceiveReply.
+       // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
+       SendMultiRequest(msg Message) MultiRequestCtx
+       // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
+       // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
+       // buffer is full, the notifications will not be delivered into it.
+       SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error)
+       // UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription.
+       UnsubscribeNotification(subscription *NotifSubscription) error
+       // CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP
+       // which the library is connected to.
+       CheckMessageCompatibility(messages ...Message) error
+       // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
+       // from VPP before returning an error.
+       SetReplyTimeout(timeout time.Duration)
+       // GetRequestChannel returns request go channel of the VPP channel
+       GetRequestChannel() chan<- *VppRequest
+       // GetReplyChannel returns reply go channel of the VPP channel
+       GetReplyChannel() <-chan *VppReply
+       // GetNotificationChannel returns notification go channel of the VPP channel
+       GetNotificationChannel() chan<- *NotifSubscribeRequest
+       // GetNotificationReplyChannel returns notification reply go channel of the VPP channel
+       GetNotificationReplyChannel() <-chan error
+       // GetMessageDecoder returns message decoder instance
+       GetMessageDecoder() MessageDecoder
+       // GetID returns channel's ID
+       GetID() uint16
+       // Close closes the API channel and releases all API channel-related resources in the ChannelProvider.
+       Close()
+}
+
+// RequestCtx is helper interface which allows to receive reply on request context data
+type RequestCtx interface {
+       // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
+       // The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded.
+       ReceiveReply(msg Message) error
+}
+
+// MultiRequestCtx is helper interface which allows to receive reply on multi-request context data
+type MultiRequestCtx interface {
+       // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
+       // The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
+       // set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
+       // Error will be returned if the response cannot be received or decoded.
+       ReceiveReply(msg Message) (lastReplyReceived bool, err error)
 }
 
 // VppRequest is a request that will be sent to VPP.
@@ -131,238 +158,3 @@ type NotifSubscription struct {
        NotifChan  chan Message   // channel where notification messages will be delivered to
        MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification
 }
-
-// RequestCtx is a context of a ongoing request (simple one - only one response is expected).
-type RequestCtx struct {
-       ch     *Channel
-       seqNum uint16
-}
-
-// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected).
-type MultiRequestCtx struct {
-       ch     *Channel
-       seqNum uint16
-}
-
-const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
-
-// NewChannelInternal returns a new channel structure.
-// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly.
-// Use ChannelProvider to get an API channel ready for communication with VPP.
-func NewChannelInternal(id uint16) *Channel {
-       return &Channel{
-               ID:           id,
-               replyTimeout: defaultReplyTimeout,
-       }
-}
-
-// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
-// from VPP before returning an error.
-func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
-       ch.replyTimeout = timeout
-}
-
-// Close closes the API channel and releases all API channel-related resources in the ChannelProvider.
-func (ch *Channel) Close() {
-       if ch.ReqChan != nil {
-               close(ch.ReqChan)
-       }
-}
-
-// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
-// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
-func (ch *Channel) SendRequest(msg Message) *RequestCtx {
-       ch.lastSeqNum++
-       ch.ReqChan <- &VppRequest{
-               Message: msg,
-               SeqNum:  ch.lastSeqNum,
-       }
-       return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum}
-}
-
-// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded.
-func (req *RequestCtx) ReceiveReply(msg Message) error {
-       if req == nil || req.ch == nil {
-               return errors.New("invalid request context")
-       }
-
-       lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
-
-       if lastReplyReceived {
-               err = errors.New("multipart reply recieved while a simple reply expected")
-       }
-       return err
-}
-
-// SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP.
-// Returns a multipart request context, that can be used to call ReceiveReply.
-// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
-func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx {
-       ch.lastSeqNum++
-       ch.ReqChan <- &VppRequest{
-               Message:   msg,
-               Multipart: true,
-               SeqNum:    ch.lastSeqNum,
-       }
-       return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum}
-}
-
-// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
-// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
-// Error will be returned if the response cannot be received or decoded.
-func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) {
-       if req == nil || req.ch == nil {
-               return false, errors.New("invalid request context")
-       }
-
-       return req.ch.receiveReplyInternal(msg, req.seqNum)
-}
-
-// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
-       var ignore bool
-       if msg == nil {
-               return false, errors.New("nil message passed in")
-       }
-
-       if ch.delayedReply != nil {
-               // try the delayed reply
-               vppReply := ch.delayedReply
-               ch.delayedReply = nil
-               ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
-               if !ignore {
-                       return lastReplyReceived, err
-               }
-       }
-
-       timer := time.NewTimer(ch.replyTimeout)
-       for {
-               select {
-               // blocks until a reply comes to ReplyChan or until timeout expires
-               case vppReply := <-ch.ReplyChan:
-                       ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
-                       if ignore {
-                               continue
-                       }
-                       return lastReplyReceived, err
-
-               case <-timer.C:
-                       err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
-                       return false, err
-               }
-       }
-       return
-}
-
-func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) {
-       // check the sequence number
-       cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
-       if cmpSeqNums == -1 {
-               // reply received too late, ignore the message
-               logrus.WithField("sequence-number", reply.SeqNum).Warn(
-                       "Received reply to an already closed binary API request")
-               ignore = true
-               return
-       }
-       if cmpSeqNums == 1 {
-               ch.delayedReply = reply
-               err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
-               return
-       }
-
-       if reply.Error != nil {
-               err = reply.Error
-               return
-       }
-       if reply.LastReplyReceived {
-               lastReplyReceived = true
-               return
-       }
-
-       // message checks
-       var expMsgID uint16
-       expMsgID, err = ch.MsgIdentifier.GetMessageID(msg)
-       if err != nil {
-               err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
-                       msg.GetMessageName(), msg.GetCrcString())
-               return
-       }
-
-       if reply.MessageID != expMsgID {
-               var msgNameCrc string
-               if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil {
-                       msgNameCrc = err.Error()
-               } else {
-                       msgNameCrc = nameCrc
-               }
-
-               err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+
-                       "(check if multiple goroutines are not sharing single GoVPP channel)",
-                       reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
-               return
-       }
-
-       // decode the message
-       err = ch.MsgDecoder.DecodeMsg(reply.Data, msg)
-       return
-}
-
-// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
-// or succeeds seq. number <seqNum2>.
-// Since sequence numbers cycle in the finite set of size 2^16, the function
-// must assume that the distance between compared sequence numbers is less than
-// (2^16)/2 to determine the order.
-func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
-       // calculate distance from seqNum1 to seqNum2
-       var dist uint16
-       if seqNum1 <= seqNum2 {
-               dist = seqNum2 - seqNum1
-       } else {
-               dist = 0xffff - (seqNum1 - seqNum2 - 1)
-       }
-       if dist == 0 {
-               return 0
-       } else if dist <= 0x8000 {
-               return -1
-       }
-       return 1
-}
-
-// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
-// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
-// buffer is full, the notifications will not be delivered into it.
-func (ch *Channel) SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) {
-       subscription := &NotifSubscription{
-               NotifChan:  notifChan,
-               MsgFactory: msgFactory,
-       }
-       ch.NotifSubsChan <- &NotifSubscribeRequest{
-               Subscription: subscription,
-               Subscribe:    true,
-       }
-       return subscription, <-ch.NotifSubsReplyChan
-}
-
-// UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription.
-func (ch *Channel) UnsubscribeNotification(subscription *NotifSubscription) error {
-       ch.NotifSubsChan <- &NotifSubscribeRequest{
-               Subscription: subscription,
-               Subscribe:    false,
-       }
-       return <-ch.NotifSubsReplyChan
-}
-
-// CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP
-// which the library is connected to.
-func (ch *Channel) CheckMessageCompatibility(messages ...Message) error {
-       for _, msg := range messages {
-               _, err := ch.MsgIdentifier.GetMessageID(msg)
-               if err != nil {
-                       return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
-                               msg.GetMessageName(), msg.GetCrcString())
-               }
-       }
-       return nil
-}
index e1abffe..044a820 100644 (file)
@@ -1,94 +1,2 @@
-// Package api provides API for communication with govpp core using Go channels,
-// without the need of importing the govpp core package itself.
-//
-// The API offers two ways of communication with govpp core: using Go channels, or using convenient function
-// wrappers over the Go channels. The latter should be sufficient for most of the use cases.
-//
-// The entry point to the API is the Channel structure, that can be obtained from the existing connection using
-// the NewAPIChannel or NewAPIChannelBuffered functions:
-//
-//     conn, err := govpp.Connect()
-//     if err != nil {
-//             // handle error!
-//     }
-//     defer conn.Disconnect()
-//
-//     ch, err := conn.NewAPIChannel()
-//     if err != nil {
-//             // handle error!
-//     }
-//     defer ch.Close()
-//
-//
-// Simple Request-Reply API
-//
-// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request
-// message is sent  to VPP and a single reply message is filled in when the reply comes from VPP:
-//
-//     req := &acl.ACLPluginGetVersion{}
-//     reply := &acl.ACLPluginGetVersionReply{}
-//
-//     err := ch.SendRequest(req).ReceiveReply(reply)
-//     // process the reply
-//
-// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error.
-//
-//
-// Multipart Reply API
-//
-// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used:
-//
-//     req := &interfaces.SwInterfaceDump{}
-//     reqCtx := ch.SendMultiRequest(req)
-//
-//     for {
-//             reply := &interfaces.SwInterfaceDetails{}
-//             stop, err := reqCtx.ReceiveReply(reply)
-//             if stop {
-//                     break // break out of the loop
-//             }
-//             // process the reply
-//     }
-//
-// Note that if the last reply has been already consumed, stop boolean return value is set to true.
-// Do not use the message itself if stop is true - it won't be filled with actual data.
-//
-//
-// Go Channels API
-//
-// The blocking API introduced above may be not sufficient for some management applications that strongly
-// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g.
-// the following replacement of the SendRequest / ReceiveReply API:
-//
-//     req := &acl.ACLPluginGetVersion{}
-//     // send the request to the request go channel
-//     ch.ReqChan <- &api.VppRequest{Message: req}
-//
-//     // receive a reply from the reply go channel
-//     vppReply := <-ch.ReplyChan
-//
-//     // decode the message
-//     reply := &acl.ACLPluginGetVersionReply{}
-//     err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
-//
-//     // process the reply
-//
-//
-// Notifications API
-//
-// to subscribe for receiving of the specified notification messages via provided Go channel, use the
-// SubscribeNotification API:
-//
-//     // subscribe for specific notification message
-//     notifChan := make(chan api.Message, 100)
-//     subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags)
-//
-//     // receive one notification
-//     notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags)
-//
-//     ch.UnsubscribeNotification(subs)
-//
-// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
-// buffer is full, the notifications will not be delivered into it.
-//
+// Package api defines interfaces required by every file generated with binapi-generator
 package api
diff --git a/codec/doc.go b/codec/doc.go
new file mode 100644 (file)
index 0000000..eb18e15
--- /dev/null
@@ -0,0 +1,2 @@
+// Package codec provides methods allowing to encode and decode message structs to/from binary format accepted by VPP.
+package codec
similarity index 82%
rename from core/msg_codec.go
rename to codec/msg_codec.go
index e32916b..7ba8771 100644 (file)
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package core
+package codec
 
 import (
        "bytes"
@@ -20,10 +20,8 @@ import (
        "fmt"
        "reflect"
 
-       "github.com/lunixbochs/struc"
-       logger "github.com/sirupsen/logrus"
-
        "git.fd.io/govpp.git/api"
+       "github.com/lunixbochs/struc"
 )
 
 // MsgCodec provides encoding and decoding functionality of `api.Message` structs into/from
@@ -82,22 +80,14 @@ func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) {
        }
        err := struc.Pack(buf, header)
        if err != nil {
-               log.WithFields(logger.Fields{
-                       "error":  err,
-                       "header": header,
-               }).Error("Unable to encode the message header: ", err)
-               return nil, fmt.Errorf("unable to encode the message header: %v", err)
+               return nil, fmt.Errorf("unable to encode message: header: %v, error %v", header, err)
        }
 
        // encode message content
        if reflect.Indirect(reflect.ValueOf(msg)).NumField() > 0 {
                err := struc.Pack(buf, msg)
                if err != nil {
-                       log.WithFields(logger.Fields{
-                               "error":   err,
-                               "message": msg,
-                       }).Error("Unable to encode the message: ", err)
-                       return nil, fmt.Errorf("unable to encode the message: %v", err)
+                       return nil, fmt.Errorf("unable to encode message: header %v, error %v", header, err)
                }
        }
 
@@ -127,11 +117,7 @@ func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error {
        // decode message header
        err := struc.Unpack(buf, header)
        if err != nil {
-               log.WithFields(logger.Fields{
-                       "error": err,
-                       "data":  data,
-               }).Error("Unable to decode header of the message.")
-               return fmt.Errorf("unable to decode the message header: %v", err)
+               return fmt.Errorf("unable to decode message: data %v, error %v", data, err)
        }
 
        // get rid of the message header
@@ -148,11 +134,7 @@ func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error {
        // decode message content
        err = struc.Unpack(buf, msg)
        if err != nil {
-               log.WithFields(logger.Fields{
-                       "error": err,
-                       "data":  buf,
-               }).Error("Unable to decode the message.")
-               return fmt.Errorf("unable to decode the message: %v", err)
+               return fmt.Errorf("unable to decode message: data %v, error %v", data, err)
        }
 
        return nil
diff --git a/core/channel.go b/core/channel.go
new file mode 100644 (file)
index 0000000..87b3e29
--- /dev/null
@@ -0,0 +1,276 @@
+// Copyright (c) 2018 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+import (
+       "fmt"
+       "time"
+
+       "errors"
+
+       "git.fd.io/govpp.git/api"
+       "github.com/sirupsen/logrus"
+)
+
+const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
+
+// requestCtxData is a context of a ongoing request (simple one - only one response is expected).
+type requestCtxData struct {
+       ch     *channel
+       seqNum uint16
+}
+
+// multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected).
+type multiRequestCtxData struct {
+       ch     *channel
+       seqNum uint16
+}
+
+func (req *requestCtxData) ReceiveReply(msg api.Message) error {
+       if req == nil || req.ch == nil {
+               return errors.New("invalid request context")
+       }
+
+       lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
+
+       if lastReplyReceived {
+               err = errors.New("multipart reply recieved while a simple reply expected")
+       }
+       return err
+}
+
+func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
+       if req == nil || req.ch == nil {
+               return false, errors.New("invalid request context")
+       }
+
+       return req.ch.receiveReplyInternal(msg, req.seqNum)
+}
+
+// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
+// to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
+// via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
+// concurrently, otherwise the responses could mix! Use multiple channels instead.
+type channel struct {
+       id uint16 // channel ID
+
+       reqChan   chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
+       replyChan chan *api.VppReply   // channel where VPP replies are delivered to
+
+       notifSubsChan      chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests
+       notifSubsReplyChan chan error                      // channel where replies to notification subscribe requests are delivered to
+
+       msgDecoder    api.MessageDecoder    // used to decode binary data to generated API messages
+       msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message
+
+       lastSeqNum uint16 // sequence number of the last sent request
+
+       delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery
+       replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
+}
+
+func (ch *channel) SendRequest(msg api.Message) api.RequestCtx {
+       ch.lastSeqNum++
+       ch.reqChan <- &api.VppRequest{
+               Message: msg,
+               SeqNum:  ch.lastSeqNum,
+       }
+       return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum}
+}
+
+func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+       ch.lastSeqNum++
+       ch.reqChan <- &api.VppRequest{
+               Message:   msg,
+               Multipart: true,
+               SeqNum:    ch.lastSeqNum,
+       }
+       return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum}
+}
+
+func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) {
+       subscription := &api.NotifSubscription{
+               NotifChan:  notifChan,
+               MsgFactory: msgFactory,
+       }
+       ch.notifSubsChan <- &api.NotifSubscribeRequest{
+               Subscription: subscription,
+               Subscribe:    true,
+       }
+       return subscription, <-ch.notifSubsReplyChan
+}
+
+func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error {
+       ch.notifSubsChan <- &api.NotifSubscribeRequest{
+               Subscription: subscription,
+               Subscribe:    false,
+       }
+       return <-ch.notifSubsReplyChan
+}
+
+func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error {
+       for _, msg := range messages {
+               _, err := ch.msgIdentifier.GetMessageID(msg)
+               if err != nil {
+                       return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+                               msg.GetMessageName(), msg.GetCrcString())
+               }
+       }
+       return nil
+}
+
+func (ch *channel) SetReplyTimeout(timeout time.Duration) {
+       ch.replyTimeout = timeout
+}
+
+func (ch *channel) GetRequestChannel() chan<- *api.VppRequest {
+       return ch.reqChan
+}
+
+func (ch *channel) GetReplyChannel() <-chan *api.VppReply {
+       return ch.replyChan
+}
+
+func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest {
+       return ch.notifSubsChan
+}
+
+func (ch *channel) GetNotificationReplyChannel() <-chan error {
+       return ch.notifSubsReplyChan
+}
+
+func (ch *channel) GetMessageDecoder() api.MessageDecoder {
+       return ch.msgDecoder
+}
+
+func (ch *channel) GetID() uint16 {
+       return ch.id
+}
+
+func (ch *channel) Close() {
+       if ch.reqChan != nil {
+               close(ch.reqChan)
+       }
+}
+
+// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
+func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
+       var ignore bool
+       if msg == nil {
+               return false, errors.New("nil message passed in")
+       }
+
+       if ch.delayedReply != nil {
+               // try the delayed reply
+               vppReply := ch.delayedReply
+               ch.delayedReply = nil
+               ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+               if !ignore {
+                       return lastReplyReceived, err
+               }
+       }
+
+       timer := time.NewTimer(ch.replyTimeout)
+       for {
+               select {
+               // blocks until a reply comes to ReplyChan or until timeout expires
+               case vppReply := <-ch.replyChan:
+                       ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+                       if ignore {
+                               continue
+                       }
+                       return lastReplyReceived, err
+
+               case <-timer.C:
+                       err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
+                       return false, err
+               }
+       }
+       return
+}
+
+func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
+       // check the sequence number
+       cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
+       if cmpSeqNums == -1 {
+               // reply received too late, ignore the message
+               logrus.WithField("sequence-number", reply.SeqNum).Warn(
+                       "Received reply to an already closed binary API request")
+               ignore = true
+               return
+       }
+       if cmpSeqNums == 1 {
+               ch.delayedReply = reply
+               err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
+               return
+       }
+
+       if reply.Error != nil {
+               err = reply.Error
+               return
+       }
+       if reply.LastReplyReceived {
+               lastReplyReceived = true
+               return
+       }
+
+       // message checks
+       var expMsgID uint16
+       expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
+       if err != nil {
+               err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+                       msg.GetMessageName(), msg.GetCrcString())
+               return
+       }
+
+       if reply.MessageID != expMsgID {
+               var msgNameCrc string
+               if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil {
+                       msgNameCrc = err.Error()
+               } else {
+                       msgNameCrc = nameCrc
+               }
+
+               err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+
+                       "(check if multiple goroutines are not sharing single GoVPP channel)",
+                       reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
+               return
+       }
+
+       // decode the message
+       err = ch.msgDecoder.DecodeMsg(reply.Data, msg)
+       return
+}
+
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+       // calculate distance from seqNum1 to seqNum2
+       var dist uint16
+       if seqNum1 <= seqNum2 {
+               dist = seqNum2 - seqNum1
+       } else {
+               dist = 0xffff - (seqNum1 - seqNum2 - 1)
+       }
+       if dist == 0 {
+               return 0
+       } else if dist <= 0x8000 {
+               return -1
+       }
+       return 1
+}
similarity index 97%
rename from api/api_test.go
rename to core/channel_test.go
index 7cbd9f0..d573f29 100644 (file)
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package api_test
+package core
 
 import (
        "testing"
        "time"
 
        "git.fd.io/govpp.git/adapter/mock"
-       "git.fd.io/govpp.git/api"
-       "git.fd.io/govpp.git/core"
        "git.fd.io/govpp.git/core/bin_api/vpe"
        "git.fd.io/govpp.git/examples/bin_api/interfaces"
        "git.fd.io/govpp.git/examples/bin_api/memif"
        "git.fd.io/govpp.git/examples/bin_api/tap"
 
+       "git.fd.io/govpp.git/api"
        . "github.com/onsi/gomega"
 )
 
 type testCtx struct {
        mockVpp *mock.VppAdapter
-       conn    *core.Connection
-       ch      *api.Channel
+       conn    *Connection
+       ch      api.Channel
 }
 
 func setupTest(t *testing.T) *testCtx {
@@ -43,7 +42,7 @@ func setupTest(t *testing.T) *testCtx {
        }
 
        var err error
-       ctx.conn, err = core.Connect(ctx.mockVpp)
+       ctx.conn, err = Connect(ctx.mockVpp)
        Expect(err).ShouldNot(HaveOccurred())
 
        ctx.ch, err = ctx.conn.NewAPIChannel()
@@ -196,7 +195,7 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
        defer ctx.teardownTest()
 
        // mock reply
-       msgs := []api.Message{}
+       var msgs []api.Message
        for i := 1; i <= 10; i++ {
                msgs = append(msgs, &tap.SwInterfaceTapDetails{
                        SwIfIndex: uint32(i),
@@ -225,7 +224,7 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
        defer ctx.teardownTest()
 
        // mock reply
-       msgs := []api.Message{}
+       var msgs []api.Message
        for i := 1; i <= 10; i++ {
                msgs = append(msgs, &memif.MemifDetails{
                        SwIfIndex: uint32(i),
@@ -348,7 +347,7 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
 
        ctx.ch.SetReplyTimeout(time.Millisecond)
 
-       msgs := []api.Message{}
+       var msgs []api.Message
        for i := 1; i <= 3; i++ {
                msgs = append(msgs, &interfaces.SwInterfaceDetails{
                        SwIfIndex:     uint32(i),
@@ -392,19 +391,19 @@ func TestReceiveReplyNegative(t *testing.T) {
        defer ctx.teardownTest()
 
        // invalid context 1
-       reqCtx1 := &api.RequestCtx{}
+       reqCtx1 := &requestCtxData{}
        err := reqCtx1.ReceiveReply(&vpe.ControlPingReply{})
        Expect(err).Should(HaveOccurred())
        Expect(err.Error()).To(ContainSubstring("invalid request context"))
 
        // invalid context 2
-       reqCtx2 := &api.MultiRequestCtx{}
+       reqCtx2 := &multiRequestCtxData{}
        _, err = reqCtx2.ReceiveReply(&vpe.ControlPingReply{})
        Expect(err).Should(HaveOccurred())
        Expect(err.Error()).To(ContainSubstring("invalid request context"))
 
        // NU
-       reqCtx3 := &api.RequestCtx{}
+       reqCtx3 := &requestCtxData{}
        err = reqCtx3.ReceiveReply(nil)
        Expect(err).Should(HaveOccurred())
        Expect(err.Error()).To(ContainSubstring("invalid request context"))
@@ -415,7 +414,7 @@ func TestMultiRequestDouble(t *testing.T) {
        defer ctx.teardownTest()
 
        // mock reply
-       msgs := []mock.MsgWithContext{}
+       var msgs []mock.MsgWithContext
        for i := 1; i <= 3; i++ {
                msgs = append(msgs, mock.MsgWithContext{
                        Msg: &interfaces.SwInterfaceDetails{
@@ -543,7 +542,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
        Expect(cnt).To(BeEquivalentTo(0))
 
        // simulating late replies
-       msgs := []mock.MsgWithContext{}
+       var msgs []mock.MsgWithContext
        for i := 1; i <= 3; i++ {
                msgs = append(msgs, mock.MsgWithContext{
                        Msg: &interfaces.SwInterfaceDetails{
similarity index 90%
rename from core/core.go
rename to core/connection.go
index 052eb0b..a44d0c4 100644 (file)
@@ -27,6 +27,7 @@ import (
 
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/api"
+       "git.fd.io/govpp.git/codec"
        "git.fd.io/govpp.git/core/bin_api/vpe"
 )
 
@@ -71,13 +72,13 @@ type ConnectionEvent struct {
 type Connection struct {
        vpp       adapter.VppAdapter // VPP adapter
        connected uint32             // non-zero if the adapter is connected to VPP
-       codec     *MsgCodec          // message codec
+       codec     *codec.MsgCodec    // message codec
 
        msgIDsLock sync.RWMutex      // lock for the message IDs map
        msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
 
-       channelsLock sync.RWMutex            // lock for the channels map
-       channels     map[uint16]*api.Channel // map of all API channels indexed by the channel ID
+       channelsLock sync.RWMutex        // lock for the channels map
+       channels     map[uint16]*channel // map of all API channels indexed by the channel ID
 
        notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
        notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
@@ -197,8 +198,8 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
 
        conn = &Connection{
                vpp:                vppAdapter,
-               codec:              &MsgCodec{},
-               channels:           make(map[uint16]*api.Channel),
+               codec:              &codec.MsgCodec{},
+               channels:           make(map[uint16]*channel),
                msgIDs:             make(map[string]uint16),
                notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
        }
@@ -268,7 +269,7 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
 // it continues with connectLoop and tries to reconnect.
 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        // create a separate API channel for health check probes
-       ch, err := conn.NewAPIChannelBuffered(1, 1)
+       ch, err := conn.newAPIChannelBuffered(1, 1)
        if err != nil {
                log.Error("Failed to create health check API channel, health check will be disabled:", err)
                return
@@ -290,18 +291,18 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
 
                // try draining probe replies from previous request before sending next one
                select {
-               case <-ch.ReplyChan:
+               case <-ch.replyChan:
                        log.Debug("drained old probe reply from reply channel")
                default:
                }
 
                // send the control ping request
-               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+               ch.reqChan <- &api.VppRequest{Message: msgControlPing}
 
                for {
                        // expect response within timeout period
                        select {
-                       case vppReply := <-ch.ReplyChan:
+                       case vppReply := <-ch.replyChan:
                                err = vppReply.Error
 
                        case <-time.After(healthCheckReplyTimeout):
@@ -349,32 +350,34 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        c.connectLoop(connChan)
 }
 
-// NewAPIChannel returns a new API channel for communication with VPP via govpp core.
-// It uses default buffer sizes for the request and reply Go channels.
-func (c *Connection) NewAPIChannel() (*api.Channel, error) {
-       if c == nil {
-               return nil, errors.New("nil connection passed in")
-       }
-       return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
+func (c *Connection) NewAPIChannel() (api.Channel, error) {
+       return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
+}
+
+func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
+       return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize)
 }
 
 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
 // It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
+func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) {
        if c == nil {
                return nil, errors.New("nil connection passed in")
        }
 
        chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
-       ch := api.NewChannelInternal(chID)
-       ch.MsgDecoder = c.codec
-       ch.MsgIdentifier = c
+       ch := &channel{
+               id:           chID,
+               replyTimeout: defaultReplyTimeout,
+       }
+       ch.msgDecoder = c.codec
+       ch.msgIdentifier = c
 
        // create the communication channels
-       ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
-       ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
-       ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
-       ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
+       ch.reqChan = make(chan *api.VppRequest, reqChanBufSize)
+       ch.replyChan = make(chan *api.VppReply, replyChanBufSize)
+       ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
+       ch.notifSubsReplyChan = make(chan error, replyChanBufSize)
 
        // store API channel within the client
        c.channelsLock.Lock()
@@ -388,13 +391,13 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
 }
 
 // releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *api.Channel) {
+func (c *Connection) releaseAPIChannel(ch *channel) {
        log.WithFields(logger.Fields{
-               "ID": ch.ID,
+               "ID": ch.id,
        }).Debug("API channel closed.")
 
        // delete the channel from channels map
        c.channelsLock.Lock()
-       delete(c.channels, ch.ID)
+       delete(c.channels, ch.id)
        c.channelsLock.Unlock()
 }
similarity index 90%
rename from core/core_test.go
rename to core/connection_test.go
index e4fbf63..b7c3aa0 100644 (file)
@@ -24,13 +24,14 @@ import (
        "git.fd.io/govpp.git/examples/bin_api/interfaces"
        "git.fd.io/govpp.git/examples/bin_api/stats"
 
+       "git.fd.io/govpp.git/codec"
        . "github.com/onsi/gomega"
 )
 
 type testCtx struct {
        mockVpp *mock.VppAdapter
        conn    *core.Connection
-       ch      *api.Channel
+       ch      api.Channel
 }
 
 func setupTest(t *testing.T, bufferedChan bool) *testCtx {
@@ -68,14 +69,14 @@ func TestSimpleRequest(t *testing.T) {
        reply := &vpe.ControlPingReply{}
 
        // send the request and receive a reply
-       ctx.ch.ReqChan <- &api.VppRequest{Message: req}
-       vppReply := <-ctx.ch.ReplyChan
+       ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req}
+       vppReply := <-ctx.ch.GetReplyChannel()
 
        Expect(vppReply).ShouldNot(BeNil())
        Expect(vppReply.Error).ShouldNot(HaveOccurred())
 
        // decode the message
-       err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
+       err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
        Expect(err).ShouldNot(HaveOccurred())
 
        Expect(reply.Retval).To(BeEquivalentTo(-5))
@@ -93,12 +94,12 @@ func TestMultiRequest(t *testing.T) {
        ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
 
        // send multipart request
-       ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
+       ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
 
        cnt := 0
        for {
                // receive a reply
-               vppReply := <-ctx.ch.ReplyChan
+               vppReply := <-ctx.ch.GetReplyChannel()
                if vppReply.LastReplyReceived {
                        break // break out of the loop
                }
@@ -106,7 +107,7 @@ func TestMultiRequest(t *testing.T) {
 
                // decode the message
                reply := &interfaces.SwInterfaceDetails{}
-               err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
+               err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
                Expect(err).ShouldNot(HaveOccurred())
                cnt++
        }
@@ -124,11 +125,11 @@ func TestNotifications(t *testing.T) {
                NotifChan:  notifChan,
                MsgFactory: interfaces.NewSwInterfaceSetFlags,
        }
-       ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{
+       ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
                Subscription: subscription,
                Subscribe:    true,
        }
-       err := <-ctx.ch.NotifSubsReplyChan
+       err := <-ctx.ch.GetNotificationReplyChannel()
        Expect(err).ShouldNot(HaveOccurred())
 
        // mock the notification and force its delivery
@@ -144,11 +145,11 @@ func TestNotifications(t *testing.T) {
        Expect(notif.SwIfIndex).To(BeEquivalentTo(3))
 
        // unsubscribe notification
-       ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{
+       ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
                Subscription: subscription,
                Subscribe:    false,
        }
-       err = <-ctx.ch.NotifSubsReplyChan
+       err = <-ctx.ch.GetNotificationReplyChannel()
        Expect(err).ShouldNot(HaveOccurred())
 }
 
@@ -207,15 +208,15 @@ func TestFullBuffer(t *testing.T) {
        // send multiple requests, only one reply should be read
        for i := 0; i < 20; i++ {
                ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
-               ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
+               ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}}
        }
 
-       vppReply := <-ctx.ch.ReplyChan
+       vppReply := <-ctx.ch.GetReplyChannel()
        Expect(vppReply).ShouldNot(BeNil())
 
        var received bool
        select {
-       case <-ctx.ch.ReplyChan:
+       case <-ctx.ch.GetReplyChannel():
                received = true // this should not happen
        default:
                received = false // no reply to be received
@@ -226,35 +227,35 @@ func TestFullBuffer(t *testing.T) {
 func TestCodec(t *testing.T) {
        RegisterTestingT(t)
 
-       codec := &core.MsgCodec{}
+       msgCodec := &codec.MsgCodec{}
 
        // request
-       data, err := codec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11)
+       data, err := msgCodec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11)
        Expect(err).ShouldNot(HaveOccurred())
        Expect(data).ShouldNot(BeEmpty())
 
        msg1 := &interfaces.CreateLoopback{}
-       err = codec.DecodeMsg(data, msg1)
+       err = msgCodec.DecodeMsg(data, msg1)
        Expect(err).ShouldNot(HaveOccurred())
        Expect(msg1.MacAddress).To(BeEquivalentTo([]byte{1, 2, 3, 4, 5, 6}))
 
        // reply
-       data, err = codec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22)
+       data, err = msgCodec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22)
        Expect(err).ShouldNot(HaveOccurred())
        Expect(data).ShouldNot(BeEmpty())
 
        msg2 := &vpe.ControlPingReply{}
-       err = codec.DecodeMsg(data, msg2)
+       err = msgCodec.DecodeMsg(data, msg2)
        Expect(err).ShouldNot(HaveOccurred())
        Expect(msg2.Retval).To(BeEquivalentTo(55))
 
        // other
-       data, err = codec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33)
+       data, err = msgCodec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33)
        Expect(err).ShouldNot(HaveOccurred())
        Expect(data).ShouldNot(BeEmpty())
 
        msg3 := &stats.VnetIP4FibCounters{}
-       err = codec.DecodeMsg(data, msg3)
+       err = msgCodec.DecodeMsg(data, msg3)
        Expect(err).ShouldNot(HaveOccurred())
        Expect(msg3.VrfID).To(BeEquivalentTo(77))
 }
@@ -262,21 +263,21 @@ func TestCodec(t *testing.T) {
 func TestCodecNegative(t *testing.T) {
        RegisterTestingT(t)
 
-       codec := &core.MsgCodec{}
+       msgCodec := &codec.MsgCodec{}
 
        // nil message for encoding
-       data, err := codec.EncodeMsg(nil, 15)
+       data, err := msgCodec.EncodeMsg(nil, 15)
        Expect(err).Should(HaveOccurred())
        Expect(err.Error()).To(ContainSubstring("nil message"))
        Expect(data).Should(BeNil())
 
        // nil message for decoding
-       err = codec.DecodeMsg(data, nil)
+       err = msgCodec.DecodeMsg(data, nil)
        Expect(err).Should(HaveOccurred())
        Expect(err.Error()).To(ContainSubstring("nil message"))
 
        // nil data for decoding
-       err = codec.DecodeMsg(nil, &vpe.ControlPingReply{})
+       err = msgCodec.DecodeMsg(nil, &vpe.ControlPingReply{})
        Expect(err).Should(HaveOccurred())
        Expect(err.Error()).To(ContainSubstring("EOF"))
 }
@@ -285,7 +286,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
        ctx := setupTest(t, false)
        defer ctx.teardownTest()
 
-       var reqCtx []*api.RequestCtx
+       var reqCtx []api.RequestCtx
        for i := 0; i < 10; i++ {
                ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)})
                req := &vpe.ControlPing{}
@@ -524,7 +525,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
        defer ctx.teardownTest()
 
        numIters := 0xffff + 100
-       reqCtx := make(map[int]*api.RequestCtx)
+       reqCtx := make(map[int]api.RequestCtx)
 
        for i := 0; i < numIters+30; /* receiver is 30 reqs behind */ i++ {
                if i < numIters {
index a4ecd50..5b0b40e 100644 (file)
 //     defer ch.Close()
 //
 // Note that one application can open only one connection, that can serve multiple API channels.
+//
+// The API offers two ways of communication with govpp core: using Go channels, or using convenient function
+// wrappers over the Go channels. The latter should be sufficient for most of the use cases.
+//
+// The entry point to the API is the Channel structure, that can be obtained from the existing connection using
+// the NewAPIChannel or NewAPIChannelBuffered functions:
+//
+//     conn, err := govpp.Connect()
+//     if err != nil {
+//             // handle error!
+//     }
+//     defer conn.Disconnect()
+//
+//     ch, err := conn.NewAPIChannel()
+//     if err != nil {
+//             // handle error!
+//     }
+//     defer ch.Close()
+//
+//
+// Simple Request-Reply API
+//
+// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request
+// message is sent  to VPP and a single reply message is filled in when the reply comes from VPP:
+//
+//     req := &acl.ACLPluginGetVersion{}
+//     reply := &acl.ACLPluginGetVersionReply{}
+//
+//     err := ch.SendRequest(req).ReceiveReply(reply)
+//     // process the reply
+//
+// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error.
+//
+//
+// Multipart Reply API
+//
+// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used:
+//
+//     req := &interfaces.SwInterfaceDump{}
+//     reqCtx := ch.SendMultiRequest(req)
+//
+//     for {
+//             reply := &interfaces.SwInterfaceDetails{}
+//             stop, err := reqCtx.ReceiveReply(reply)
+//             if stop {
+//                     break // break out of the loop
+//             }
+//             // process the reply
+//     }
+//
+// Note that if the last reply has been already consumed, stop boolean return value is set to true.
+// Do not use the message itself if stop is true - it won't be filled with actual data.
+//
+//
+// Go Channels API
+//
+// The blocking API introduced above may be not sufficient for some management applications that strongly
+// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g.
+// the following replacement of the SendRequest / ReceiveReply API:
+//
+//     req := &acl.ACLPluginGetVersion{}
+//     // send the request to the request go channel
+//     ch.GetRequestChannel <- &api.VppRequest{Message: req}
+//
+//     // receive a reply from the reply go channel
+//     vppReply := <-ch.GetReplyChannel
+//
+//     // decode the message
+//     reply := &acl.ACLPluginGetVersionReply{}
+//     err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
+//
+//     // process the reply
+//
+//
+// Notifications API
+//
+// to subscribe for receiving of the specified notification messages via provided Go channel, use the
+// SubscribeNotification API:
+//
+//     // subscribe for specific notification message
+//     notifChan := make(chan api.Message, 100)
+//     subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags)
+//
+//     // receive one notification
+//     notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags)
+//
+//     ch.UnsubscribeNotification(subs)
+//
+// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
+// buffer is full, the notifications will not be delivered into it.
+//
 package core
index 89c16a4..c0e8687 100644 (file)
@@ -18,13 +18,12 @@ import (
        "fmt"
        "reflect"
 
-       logger "github.com/sirupsen/logrus"
-
        "git.fd.io/govpp.git/api"
+       logger "github.com/sirupsen/logrus"
 )
 
 // processNotifSubscribeRequest processes a notification subscribe request.
-func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error {
+func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error {
        var err error
 
        // subscribe / unsubscribe
@@ -36,7 +35,7 @@ func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.Noti
 
        // send the reply into the go channel
        select {
-       case ch.NotifSubsReplyChan <- err:
+       case ch.notifSubsReplyChan <- err:
                // reply sent successfully
        default:
                // unable to write into the channel without blocking
index 3bec38d..8681963 100644 (file)
@@ -31,10 +31,10 @@ var (
 )
 
 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel) {
+func (c *Connection) watchRequests(ch *channel) {
        for {
                select {
-               case req, ok := <-ch.ReqChan:
+               case req, ok := <-ch.reqChan:
                        // new request on the request channel
                        if !ok {
                                // after closing the request channel, release API channel and return
@@ -43,7 +43,7 @@ func (c *Connection) watchRequests(ch *api.Channel) {
                        }
                        c.processRequest(ch, req)
 
-               case req := <-ch.NotifSubsChan:
+               case req := <-ch.notifSubsChan:
                        // new request on the notification subscribe channel
                        c.processNotifSubscribeRequest(ch, req)
                }
@@ -51,7 +51,7 @@ func (c *Connection) watchRequests(ch *api.Channel) {
 }
 
 // processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.connected) == 0 {
                err := ErrNotConnected
@@ -78,7 +78,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error
        if err != nil {
                err = fmt.Errorf("unable to encode the messge: %v", err)
                log.WithFields(logger.Fields{
-                       "channel": ch.ID,
+                       "channel": ch.id,
                        "msg_id":  msgID,
                        "seq_num": req.SeqNum,
                }).Error(err)
@@ -88,7 +88,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error
 
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
-                       "channel":  ch.ID,
+                       "channel":  ch.id,
                        "msg_id":   msgID,
                        "msg_size": len(data),
                        "msg_name": req.Message.GetMessageName(),
@@ -97,7 +97,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error
        }
 
        // send the request to VPP
-       context := packRequestContext(ch.ID, req.Multipart, req.SeqNum)
+       context := packRequestContext(ch.id, req.Multipart, req.SeqNum)
        err = c.vpp.SendMsg(context, data)
        if err != nil {
                err = fmt.Errorf("unable to send the message: %v", err)
@@ -189,9 +189,9 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
 
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
 // it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
+func sendReply(ch *channel, reply *api.VppReply) {
        select {
-       case ch.ReplyChan <- reply:
+       case ch.replyChan <- reply:
                // reply sent successfully
        case <-time.After(time.Millisecond * 100):
                // receiver still not ready
index f3ff752..b1f4dcf 100644 (file)
@@ -95,7 +95,7 @@ func main() {
        fmt.Printf("Requests per second: %.0f\n", float64(cnt)/elapsed.Seconds())
 }
 
-func syncTest(ch *api.Channel, cnt int) {
+func syncTest(ch api.Channel, cnt int) {
        fmt.Printf("Running synchronous perf test with %d requests...\n", cnt)
 
        for i := 0; i < cnt; i++ {
@@ -110,7 +110,7 @@ func syncTest(ch *api.Channel, cnt int) {
        }
 }
 
-func asyncTest(ch *api.Channel, cnt int) {
+func asyncTest(ch api.Channel, cnt int) {
        fmt.Printf("Running asynchronous perf test with %d requests...\n", cnt)
 
        // start a new go routine that reads the replies
@@ -125,20 +125,20 @@ func asyncTest(ch *api.Channel, cnt int) {
        wg.Wait()
 }
 
-func sendAsyncRequests(ch *api.Channel, cnt int) {
+func sendAsyncRequests(ch api.Channel, cnt int) {
        for i := 0; i < cnt; i++ {
-               ch.ReqChan <- &api.VppRequest{
+               ch.GetRequestChannel() <- &api.VppRequest{
                        Message: &vpe.ControlPing{},
                }
        }
 }
 
-func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) {
+func readAsyncReplies(ch api.Channel, expectedCnt int, wg *sync.WaitGroup) {
        cnt := 0
 
        for {
                // receive a reply
-               reply := <-ch.ReplyChan
+               reply := <-ch.GetReplyChannel()
                if reply.Error != nil {
                        log.Println("Error in reply:", reply.Error)
                        os.Exit(1)
@@ -146,7 +146,7 @@ func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) {
 
                // decode the message
                msg := &vpe.ControlPingReply{}
-               err := ch.MsgDecoder.DecodeMsg(reply.Data, msg)
+               err := ch.GetMessageDecoder().DecodeMsg(reply.Data, msg)
                if reply.Error != nil {
                        log.Println("Error by decoding:", err)
                        os.Exit(1)
@@ -159,4 +159,4 @@ func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) {
                        return
                }
        }
-}
\ No newline at end of file
+}
index 67dc14b..7b7dbcd 100644 (file)
@@ -66,7 +66,7 @@ func main() {
 
 // compatibilityCheck shows how an management application can check whether generated API messages are
 // compatible with the version of VPP which the library is connected to.
-func compatibilityCheck(ch *api.Channel) {
+func compatibilityCheck(ch api.Channel) {
        err := ch.CheckMessageCompatibility(
                &interfaces.SwInterfaceDump{},
                &interfaces.SwInterfaceDetails{},
@@ -78,7 +78,7 @@ func compatibilityCheck(ch *api.Channel) {
 }
 
 // aclVersion is the simplest API example - one empty request message and one reply message.
-func aclVersion(ch *api.Channel) {
+func aclVersion(ch api.Channel) {
        req := &acl.ACLPluginGetVersion{}
        reply := &acl.ACLPluginGetVersionReply{}
 
@@ -92,7 +92,7 @@ func aclVersion(ch *api.Channel) {
 }
 
 // aclConfig is another simple API example - in this case, the request contains structured data.
-func aclConfig(ch *api.Channel) {
+func aclConfig(ch api.Channel) {
        req := &acl.ACLAddReplace{
                ACLIndex: ^uint32(0),
                Tag:      []byte("access list 1"),
@@ -127,7 +127,7 @@ func aclConfig(ch *api.Channel) {
 }
 
 // aclDump shows an example where SendRequest and ReceiveReply are not chained together.
-func aclDump(ch *api.Channel) {
+func aclDump(ch api.Channel) {
        req := &acl.ACLDump{}
        reply := &acl.ACLDetails{}
 
@@ -143,17 +143,17 @@ func aclDump(ch *api.Channel) {
 
 // tapConnect example shows how the Go channels in the API channel can be accessed directly instead
 // of using SendRequest and ReceiveReply wrappers.
-func tapConnect(ch *api.Channel) {
+func tapConnect(ch api.Channel) {
        req := &tap.TapConnect{
                TapName:      []byte("testtap"),
                UseRandomMac: 1,
        }
 
        // send the request to the request go channel
-       ch.ReqChan <- &api.VppRequest{Message: req}
+       ch.GetRequestChannel() <- &api.VppRequest{Message: req}
 
        // receive a reply from the reply go channel
-       vppReply := <-ch.ReplyChan
+       vppReply := <-ch.GetReplyChannel()
        if vppReply.Error != nil {
                fmt.Println("Error:", vppReply.Error)
                return
@@ -161,7 +161,7 @@ func tapConnect(ch *api.Channel) {
 
        // decode the message
        reply := &tap.TapConnectReply{}
-       err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
+       err := ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
 
        if err != nil {
                fmt.Println("Error:", err)
@@ -171,7 +171,7 @@ func tapConnect(ch *api.Channel) {
 }
 
 // interfaceDump shows an example of multipart request (multiple replies are expected).
-func interfaceDump(ch *api.Channel) {
+func interfaceDump(ch api.Channel) {
        req := &interfaces.SwInterfaceDump{}
        reqCtx := ch.SendMultiRequest(req)
 
@@ -191,7 +191,7 @@ func interfaceDump(ch *api.Channel) {
 // interfaceNotifications shows the usage of notification API. Note that for notifications,
 // you are supposed to create your own Go channel with your preferred buffer size. If the channel's
 // buffer is full, the notifications will not be delivered into it.
-func interfaceNotifications(ch *api.Channel) {
+func interfaceNotifications(ch api.Channel) {
        // subscribe for specific notification message
        notifChan := make(chan api.Message, 100)
        subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags)
@@ -218,4 +218,4 @@ func interfaceNotifications(ch *api.Channel) {
 
        // unsubscribe from delivery of the notifications
        ch.UnsubscribeNotification(subs)
-}
\ No newline at end of file
+}
index 17c7956..5f9966f 100644 (file)
@@ -101,7 +101,7 @@ loop:
 }
 
 // subscribeNotifications subscribes for interface counters notifications.
-func subscribeNotifications(ch *api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) {
+func subscribeNotifications(ch api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) {
 
        notifChan := make(chan api.Message, 100)
        simpleCountersSubs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceSimpleCounters)
@@ -111,7 +111,7 @@ func subscribeNotifications(ch *api.Channel) (*api.NotifSubscription, *api.Notif
 }
 
 // requestStatistics requests interface counters notifications from VPP.
-func requestStatistics(ch *api.Channel) {
+func requestStatistics(ch api.Channel) {
        ch.SendRequest(&stats.WantStats{
                Pid:           uint32(os.Getpid()),
                EnableDisable: 1,
@@ -141,4 +141,4 @@ func processCombinedCounters(counters *interfaces.VnetInterfaceCombinedCounters)
                        counters.FirstSwIfIndex+i, counterNames[counters.VnetCounterType], counters.Data[i].Packets,
                        counterNames[counters.VnetCounterType], counters.Data[i].Bytes)
        }
-}
\ No newline at end of file
+}