Simplify subscribing to events and fix events 50/14450/6
authorOndrej Fabry <ofabry@cisco.com>
Thu, 23 Aug 2018 20:51:56 +0000 (22:51 +0200)
committerOndrej Fabry <ofabry@cisco.com>
Fri, 24 Aug 2018 10:43:05 +0000 (12:43 +0200)
- there is no need for sending subscription requests through channels,
  since all the messages are registered and no communication with VPP
  is needed

Change-Id: Ibc29957be02a32e26309f66c369a071559b822a9
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
24 files changed:
Makefile
adapter/adapter.go
adapter/mock/mock_adapter.go
adapter/vppapiclient/vppapiclient_adapter.go
api/api.go
cmd/binapi-generator/generate.go
codec/msg_codec.go
core/channel.go
core/channel_test.go
core/connection.go
core/connection_test.go
core/notification_handler.go [deleted file]
core/request_handler.go
examples/bin_api/acl/acl.ba.go
examples/bin_api/af_packet/af_packet.ba.go
examples/bin_api/interfaces/interfaces.ba.go
examples/bin_api/ip/ip.ba.go
examples/bin_api/memif/memif.ba.go
examples/bin_api/stats/stats.ba.go
examples/bin_api/tap/tap.ba.go
examples/bin_api/vpe/vpe.ba.go
examples/cmd/perf-bench/perf-bench.go
examples/cmd/simple-client/simple_client.go
examples/cmd/stats-client/stats_client.go

index ee06818..ea1daf6 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -26,7 +26,6 @@ clean:
        @rm -f extras/libmemif/examples/icmp-responder/icmp-responder
 
 generate:
-       @cd core && go generate ./...
        @cd examples && go generate ./...
 
 lint:
index 7d3d1e4..aa34329 100644 (file)
@@ -22,7 +22,7 @@ import (
 var ErrNotImplemented = errors.New("not implemented for this OS")
 
 // MsgCallback defines func signature for message callback.
-type MsgCallback func(msgID uint16, context uint32, data []byte)
+type MsgCallback func(msgID uint16, data []byte)
 
 // VppAdapter provides connection to VPP. It is responsible for sending and receiving of binary-encoded messages to/from VPP.
 type VppAdapter interface {
index 5ca190f..cdf2081 100644 (file)
@@ -92,9 +92,9 @@ const (
 // NewVppAdapter returns a new mock adapter.
 func NewVppAdapter() *VppAdapter {
        a := &VppAdapter{
+               msgIDSeq:     1000,
                msgIDsToName: make(map[uint16]string),
                msgNameToIds: make(map[string]uint16),
-               msgIDSeq:     1000,
                binAPITypes:  make(map[string]reflect.Type),
        }
        a.registerBinAPITypes()
@@ -186,8 +186,7 @@ func (a *VppAdapter) ReplyBytes(request MessageDTO, reply api.Message) ([]byte,
        if err != nil {
                return nil, err
        }
-       err = struc.Pack(buf, reply)
-       if err != nil {
+       if err = struc.Pack(buf, reply); err != nil {
                return nil, err
        }
 
@@ -245,7 +244,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
                                Data:     data,
                        })
                        if finished {
-                               a.callback(msgID, clientID, reply)
+                               a.callback(msgID, reply)
                                return nil
                        }
                }
@@ -276,7 +275,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
                                        struc.Pack(buf, &codec.VppOtherHeader{VlMsgID: msgID})
                                }
                                struc.Pack(buf, msg.Msg)
-                               a.callback(msgID, context, buf.Bytes())
+                               a.callback(msgID, buf.Bytes())
                        }
 
                        a.replies = a.replies[1:]
@@ -295,7 +294,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
                msgID := uint16(defaultReplyMsgID)
                struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: clientID})
                struc.Pack(buf, &defaultReply{})
-               a.callback(msgID, clientID, buf.Bytes())
+               a.callback(msgID, buf.Bytes())
        }
        return nil
 }
index 7aafa55..e62bccd 100644 (file)
@@ -28,7 +28,7 @@ package vppapiclient
 #include <arpa/inet.h>
 #include <vpp-api/client/vppapiclient.h>
 
-extern void go_msg_callback(uint16_t msg_id, uint32_t context, void* data, size_t size);
+extern void go_msg_callback(uint16_t msg_id, void* data, size_t size);
 
 typedef struct __attribute__((__packed__)) _req_header {
     uint16_t msg_id;
@@ -38,14 +38,13 @@ typedef struct __attribute__((__packed__)) _req_header {
 
 typedef struct __attribute__((__packed__)) _reply_header {
     uint16_t msg_id;
-    uint32_t context; // currently not all reply messages contain context field
 } reply_header_t;
 
 static void
 govpp_msg_callback (unsigned char *data, int size)
 {
     reply_header_t *header = ((reply_header_t *)data);
-    go_msg_callback(ntohs(header->msg_id), ntohl(header->context), data, size);
+    go_msg_callback(ntohs(header->msg_id), data, size);
 }
 
 static int
@@ -204,10 +203,10 @@ func fileExists(name string) bool {
 }
 
 //export go_msg_callback
-func go_msg_callback(msgID C.uint16_t, context C.uint32_t, data unsafe.Pointer, size C.size_t) {
+func go_msg_callback(msgID C.uint16_t, data unsafe.Pointer, size C.size_t) {
        // convert unsafe.Pointer to byte slice
        slice := &reflect.SliceHeader{Data: uintptr(data), Len: int(size), Cap: int(size)}
        byteArr := *(*[]byte)(unsafe.Pointer(slice))
 
-       vppClient.callback(uint16(msgID), uint32(context), byteArr)
+       vppClient.callback(uint16(msgID), byteArr)
 }
index 39fe60f..9b7f0ff 100644 (file)
@@ -58,21 +58,6 @@ type DataType interface {
        GetCrcString() string
 }
 
-// MessageDecoder provides functionality for decoding binary data to generated API messages.
-type MessageDecoder interface {
-       // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
-       DecodeMsg(data []byte, msg Message) error
-}
-
-// MessageIdentifier provides identification of generated API messages.
-type MessageIdentifier interface {
-       // GetMessageID returns message identifier of given API message.
-       GetMessageID(msg Message) (uint16, error)
-
-       // LookupByID looks up message name and crc by ID
-       LookupByID(msgID uint16) (Message, error)
-}
-
 // ChannelProvider provides the communication channel with govpp core.
 type ChannelProvider interface {
        // NewAPIChannel returns a new channel for communication with VPP via govpp core.
@@ -86,9 +71,6 @@ type ChannelProvider interface {
 
 // Channel provides methods for direct communication with VPP channel.
 type Channel interface {
-       // GetID returns channel's ID
-       GetID() uint16
-
        // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
        // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
        SendRequest(msg Message) RequestCtx
@@ -101,10 +83,7 @@ type Channel interface {
        // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
        // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
        // buffer is full, the notifications will not be delivered into it.
-       SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error)
-
-       // UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription.
-       UnsubscribeNotification(subscription *NotifSubscription) error
+       SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error)
 
        // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
        // from VPP before returning an error.
@@ -114,14 +93,14 @@ type Channel interface {
        Close()
 }
 
-// RequestCtx is helper interface which allows to receive reply on request context data
+// RequestCtx is helper interface which allows to receive reply on request.
 type RequestCtx interface {
        // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
        // The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded.
        ReceiveReply(msg Message) error
 }
 
-// MultiRequestCtx is helper interface which allows to receive reply on multi-request context data
+// MultiRequestCtx is helper interface which allows to receive reply on multi-request.
 type MultiRequestCtx interface {
        // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
        // The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
@@ -130,13 +109,13 @@ type MultiRequestCtx interface {
        ReceiveReply(msg Message) (lastReplyReceived bool, err error)
 }
 
-// NotifSubscription represents a subscription for delivery of specific notification messages.
-type NotifSubscription struct {
-       NotifChan  chan Message   // channel where notification messages will be delivered to
-       MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification
-       // TODO: use Message directly here, not a factory, eliminating need to allocation
+// SubscriptionCtx is helper interface which allows to control subscription for notification events.
+type SubscriptionCtx interface {
+       // Unsubscribe unsubscribes from receiving the notifications tied to the subscription context.
+       Unsubscribe() error
 }
 
+// map of registered messages
 var registeredMessages = make(map[string]Message)
 
 // RegisterMessage is called from generated code to register message.
index 33ab614..73bcd2a 100644 (file)
@@ -140,9 +140,9 @@ func generatePackage(ctx *context, w *bufio.Writer) error {
        if len(ctx.packageData.Services) > 0 {
                fmt.Fprintf(w, "/* Services */\n\n")
 
-               fmt.Fprintf(w, "type %s interface {\n", "Services")
                ctx.inputBuff = bytes.NewBuffer(ctx.inputData)
                ctx.inputLine = 0
+               fmt.Fprintf(w, "type %s interface {\n", "Services")
                for _, svc := range ctx.packageData.Services {
                        generateService(ctx, w, &svc)
                }
@@ -209,6 +209,7 @@ func generateImports(ctx *context, w io.Writer) {
        fmt.Fprintln(w)
 
        fmt.Fprintf(w, "// Reference imports to suppress errors if they are not otherwise used.\n")
+       fmt.Fprintf(w, "var _ = api.RegisterMessage\n")
        fmt.Fprintf(w, "var _ = struc.Pack\n")
        fmt.Fprintf(w, "var _ = bytes.NewBuffer\n")
        fmt.Fprintln(w)
index 572e672..9d3f614 100644 (file)
@@ -58,15 +58,19 @@ func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) {
                return nil, errors.New("nil message passed in")
        }
 
-       // encode message header
        var header interface{}
+
+       // encode message header
        switch msg.GetMessageType() {
        case api.RequestMessage:
                header = &VppRequestHeader{VlMsgID: msgID}
+
        case api.ReplyMessage:
                header = &VppReplyHeader{VlMsgID: msgID}
+
        case api.EventMessage:
                header = &VppEventHeader{VlMsgID: msgID}
+
        default:
                header = &VppOtherHeader{VlMsgID: msgID}
        }
@@ -94,15 +98,19 @@ func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error {
                return errors.New("nil message passed in")
        }
 
-       // check which header is expected
        var header interface{}
+
+       // check which header is expected
        switch msg.GetMessageType() {
        case api.RequestMessage:
                header = new(VppRequestHeader)
+
        case api.ReplyMessage:
                header = new(VppReplyHeader)
+
        case api.EventMessage:
                header = new(VppEventHeader)
+
        default:
                header = new(VppOtherHeader)
        }
@@ -127,17 +135,19 @@ func (*MsgCodec) DecodeMsgContext(data []byte, msg api.Message) (uint32, error)
                return 0, errors.New("nil message passed in")
        }
 
+       var header interface{}
        var getContext func() uint32
 
        // check which header is expected
-       var header interface{}
        switch msg.GetMessageType() {
        case api.RequestMessage:
                header = new(VppRequestHeader)
                getContext = func() uint32 { return header.(*VppRequestHeader).Context }
+
        case api.ReplyMessage:
                header = new(VppReplyHeader)
                getContext = func() uint32 { return header.(*VppReplyHeader).Context }
+
        default:
                return 0, nil
        }
index 718f89c..a7d95fe 100644 (file)
@@ -29,40 +29,20 @@ var (
        ErrInvalidRequestCtx = errors.New("invalid request context")
 )
 
-// requestCtx is a context for request with single reply
-type requestCtx struct {
-       ch     *channel
-       seqNum uint16
-}
-
-// multiRequestCtx is a context for request with multiple responses
-type multiRequestCtx struct {
-       ch     *channel
-       seqNum uint16
-}
-
-func (req *requestCtx) ReceiveReply(msg api.Message) error {
-       if req == nil || req.ch == nil {
-               return ErrInvalidRequestCtx
-       }
-
-       lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
-       if err != nil {
-               return err
-       }
-       if lastReplyReceived {
-               return errors.New("multipart reply recieved while a single reply expected")
-       }
-
-       return nil
+// MessageCodec provides functionality for decoding binary data to generated API messages.
+type MessageCodec interface {
+       //EncodeMsg encodes message into binary data.
+       EncodeMsg(msg api.Message, msgID uint16) ([]byte, error)
+       // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
+       DecodeMsg(data []byte, msg api.Message) error
 }
 
-func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
-       if req == nil || req.ch == nil {
-               return false, ErrInvalidRequestCtx
-       }
-
-       return req.ch.receiveReplyInternal(msg, req.seqNum)
+// MessageIdentifier provides identification of generated API messages.
+type MessageIdentifier interface {
+       // GetMessageID returns message identifier of given API message.
+       GetMessageID(msg api.Message) (uint16, error)
+       // LookupByID looks up message name and crc by ID
+       LookupByID(msgID uint16) (api.Message, error)
 }
 
 // vppRequest is a request that will be sent to VPP.
@@ -81,27 +61,40 @@ type vppReply struct {
        err          error  // in case of error, data is nil and this member contains error
 }
 
-// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages.
-type subscriptionRequest struct {
-       sub       *api.NotifSubscription // subscription details
-       subscribe bool                   // true if this is a request to subscribe
+// requestCtx is a context for request with single reply
+type requestCtx struct {
+       ch     *Channel
+       seqNum uint16
+}
+
+// multiRequestCtx is a context for request with multiple responses
+type multiRequestCtx struct {
+       ch     *Channel
+       seqNum uint16
+}
+
+// subscriptionCtx is a context of subscription for delivery of specific notification messages.
+type subscriptionCtx struct {
+       ch         *Channel
+       notifChan  chan api.Message   // channel where notification messages will be delivered to
+       msgID      uint16             // message ID for the subscribed event message
+       event      api.Message        // event message that this subscription is for
+       msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification
 }
 
 // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
 // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
 // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
 // concurrently, otherwise the responses could mix! Use multiple channels instead.
-type channel struct {
-       id uint16 // channel ID
+type Channel struct {
+       id   uint16
+       conn *Connection
 
        reqChan   chan *vppRequest // channel for sending the requests to VPP
        replyChan chan *vppReply   // channel where VPP replies are delivered to
 
-       notifSubsChan      chan *subscriptionRequest // channel for sending notification subscribe requests
-       notifSubsReplyChan chan error                // channel where replies to notification subscribe requests are delivered to
-
-       msgDecoder    api.MessageDecoder    // used to decode binary data to generated API messages
-       msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message
+       msgCodec      MessageCodec      // used to decode binary data to generated API messages
+       msgIdentifier MessageIdentifier // used to retrieve message ID of a message
 
        lastSeqNum uint16 // sequence number of the last sent request
 
@@ -109,73 +102,142 @@ type channel struct {
        replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
 }
 
-func (ch *channel) GetID() uint16 {
+func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
+       return &Channel{
+               id:            id,
+               conn:          conn,
+               msgCodec:      codec,
+               msgIdentifier: identifier,
+               reqChan:       make(chan *vppRequest, reqSize),
+               replyChan:     make(chan *vppReply, replySize),
+               replyTimeout:  DefaultReplyTimeout,
+       }
+}
+
+func (ch *Channel) GetID() uint16 {
        return ch.id
 }
 
-func (ch *channel) nextSeqNum() uint16 {
+func (ch *Channel) nextSeqNum() uint16 {
        ch.lastSeqNum++
        return ch.lastSeqNum
 }
 
-func (ch *channel) SendRequest(msg api.Message) api.RequestCtx {
-       req := &vppRequest{
+func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
+       seqNum := ch.nextSeqNum()
+       ch.reqChan <- &vppRequest{
                msg:    msg,
-               seqNum: ch.nextSeqNum(),
+               seqNum: seqNum,
        }
-       ch.reqChan <- req
-       return &requestCtx{ch: ch, seqNum: req.seqNum}
+       return &requestCtx{ch: ch, seqNum: seqNum}
 }
 
-func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
-       req := &vppRequest{
+func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+       seqNum := ch.nextSeqNum()
+       ch.reqChan <- &vppRequest{
                msg:    msg,
-               seqNum: ch.nextSeqNum(),
+               seqNum: seqNum,
                multi:  true,
        }
-       ch.reqChan <- req
-       return &multiRequestCtx{ch: ch, seqNum: req.seqNum}
+       return &multiRequestCtx{ch: ch, seqNum: seqNum}
 }
 
-func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) {
-       sub := &api.NotifSubscription{
-               NotifChan:  notifChan,
-               MsgFactory: msgFactory,
+func getMsgFactory(msg api.Message) func() api.Message {
+       return func() api.Message {
+               return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
        }
-       // TODO: get rid of notifSubsChan and notfSubsReplyChan,
-       // it's no longer need because we know all message IDs and can store subscription right here
-       ch.notifSubsChan <- &subscriptionRequest{
-               sub:       sub,
-               subscribe: true,
-       }
-       return sub, <-ch.notifSubsReplyChan
 }
 
-func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error {
-       ch.notifSubsChan <- &subscriptionRequest{
-               sub:       subscription,
-               subscribe: false,
+func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
+       msgID, err := ch.msgIdentifier.GetMessageID(event)
+       if err != nil {
+               log.WithFields(logrus.Fields{
+                       "msg_name": event.GetMessageName(),
+                       "msg_crc":  event.GetCrcString(),
+               }).Errorf("unable to retrieve message ID: %v", err)
+               return nil, fmt.Errorf("unable to retrieve event message ID: %v", err)
+       }
+
+       sub := &subscriptionCtx{
+               ch:         ch,
+               notifChan:  notifChan,
+               msgID:      msgID,
+               event:      event,
+               msgFactory: getMsgFactory(event),
        }
-       return <-ch.notifSubsReplyChan
+
+       // add the subscription into map
+       ch.conn.subscriptionsLock.Lock()
+       defer ch.conn.subscriptionsLock.Unlock()
+
+       ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub)
+
+       return sub, nil
 }
 
-func (ch *channel) SetReplyTimeout(timeout time.Duration) {
+func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
        ch.replyTimeout = timeout
 }
 
-func (ch *channel) Close() {
+func (ch *Channel) Close() {
        if ch.reqChan != nil {
                close(ch.reqChan)
+               ch.reqChan = nil
+       }
+}
+
+func (req *requestCtx) ReceiveReply(msg api.Message) error {
+       if req == nil || req.ch == nil {
+               return ErrInvalidRequestCtx
+       }
+
+       lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
+       if err != nil {
+               return err
+       } else if lastReplyReceived {
+               return errors.New("multipart reply recieved while a single reply expected")
+       }
+
+       return nil
+}
+
+func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
+       if req == nil || req.ch == nil {
+               return false, ErrInvalidRequestCtx
        }
+
+       return req.ch.receiveReplyInternal(msg, req.seqNum)
+}
+
+func (sub *subscriptionCtx) Unsubscribe() error {
+       log.WithFields(logrus.Fields{
+               "msg_name": sub.event.GetMessageName(),
+               "msg_id":   sub.msgID,
+       }).Debug("Removing notification subscription.")
+
+       // remove the subscription from the map
+       sub.ch.conn.subscriptionsLock.Lock()
+       defer sub.ch.conn.subscriptionsLock.Unlock()
+
+       for i, item := range sub.ch.conn.subscriptions[sub.msgID] {
+               if item == sub {
+                       // remove i-th item in the slice
+                       sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...)
+                       return nil
+               }
+       }
+
+       return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())
 }
 
 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
-       var ignore bool
+func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
        if msg == nil {
                return false, errors.New("nil message passed in")
        }
 
+       var ignore bool
+
        if vppReply := ch.delayedReply; vppReply != nil {
                // try the delayed reply
                ch.delayedReply = nil
@@ -204,12 +266,12 @@ func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
        return
 }
 
-func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
+func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
        // check the sequence number
        cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum)
        if cmpSeqNums == -1 {
                // reply received too late, ignore the message
-               logrus.WithField("sequence-number", reply.seqNum).Warn(
+               logrus.WithField("seqNum", reply.seqNum).Warn(
                        "Received reply to an already closed binary API request")
                ignore = true
                return
@@ -253,7 +315,7 @@ func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa
        }
 
        // decode the message
-       if err = ch.msgDecoder.DecodeMsg(reply.data, msg); err != nil {
+       if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil {
                return
        }
 
index 4a9ab2b..0eafa32 100644 (file)
@@ -22,6 +22,7 @@ import (
        "git.fd.io/govpp.git/examples/bin_api/interfaces"
        "git.fd.io/govpp.git/examples/bin_api/memif"
        "git.fd.io/govpp.git/examples/bin_api/tap"
+       "git.fd.io/govpp.git/examples/binapi/vpe"
 
        "git.fd.io/govpp.git/api"
        . "github.com/onsi/gomega"
@@ -59,10 +60,11 @@ func TestRequestReplyTapConnect(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
+       // mock reply
        ctx.mockVpp.MockReply(&tap.TapConnectReply{
-               Retval:    0,
                SwIfIndex: 1,
        })
+
        request := &tap.TapConnect{
                TapName:      []byte("test-tap-name"),
                UseRandomMac: 1,
@@ -71,17 +73,21 @@ func TestRequestReplyTapConnect(t *testing.T) {
 
        err := ctx.ch.SendRequest(request).ReceiveReply(reply)
        Expect(err).ShouldNot(HaveOccurred())
-       Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapConnectReply")
-       Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply")
+       Expect(reply.Retval).To(BeEquivalentTo(0),
+               "Incorrect Retval value for TapConnectReply")
+       Expect(reply.SwIfIndex).To(BeEquivalentTo(1),
+               "Incorrect SwIfIndex value for TapConnectReply")
 }
 
 func TestRequestReplyTapModify(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
+       // mock reply
        ctx.mockVpp.MockReply(&tap.TapModifyReply{
                SwIfIndex: 2,
        })
+
        request := &tap.TapModify{
                TapName:           []byte("test-tap-modify"),
                UseRandomMac:      1,
@@ -91,15 +97,19 @@ func TestRequestReplyTapModify(t *testing.T) {
 
        err := ctx.ch.SendRequest(request).ReceiveReply(reply)
        Expect(err).ShouldNot(HaveOccurred())
-       Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapModifyReply")
-       Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply")
+       Expect(reply.Retval).To(BeEquivalentTo(0),
+               "Incorrect Retval value for TapModifyReply")
+       Expect(reply.SwIfIndex).To(BeEquivalentTo(2),
+               "Incorrect SwIfIndex value for TapModifyReply")
 }
 
 func TestRequestReplyTapDelete(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
+       // mock reply
        ctx.mockVpp.MockReply(&tap.TapDeleteReply{})
+
        request := &tap.TapDelete{
                SwIfIndex: 3,
        }
@@ -107,34 +117,41 @@ func TestRequestReplyTapDelete(t *testing.T) {
 
        err := ctx.ch.SendRequest(request).ReceiveReply(reply)
        Expect(err).ShouldNot(HaveOccurred())
-       Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapDeleteReply")
+       Expect(reply.Retval).To(BeEquivalentTo(0),
+               "Incorrect Retval value for TapDeleteReply")
 }
 
 func TestRequestReplySwInterfaceTapDump(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
+       // mock reply
        byteName := []byte("dev-name-test")
        ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
                SwIfIndex: 25,
                DevName:   byteName,
        })
+
        request := &tap.SwInterfaceTapDump{}
        reply := &tap.SwInterfaceTapDetails{}
 
        err := ctx.ch.SendRequest(request).ReceiveReply(reply)
        Expect(err).ShouldNot(HaveOccurred())
-       Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails")
-       Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails")
+       Expect(reply.SwIfIndex).To(BeEquivalentTo(25),
+               "Incorrect SwIfIndex value for SwInterfaceTapDetails")
+       Expect(reply.DevName).ToNot(BeEquivalentTo(byteName),
+               "Incorrect DevName value for SwInterfaceTapDetails")
 }
 
 func TestRequestReplyMemifCreate(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
+       // mock reply
        ctx.mockVpp.MockReply(&memif.MemifCreateReply{
                SwIfIndex: 4,
        })
+
        request := &memif.MemifCreate{
                Role:       10,
                ID:         12,
@@ -145,15 +162,19 @@ func TestRequestReplyMemifCreate(t *testing.T) {
 
        err := ctx.ch.SendRequest(request).ReceiveReply(reply)
        Expect(err).ShouldNot(HaveOccurred())
-       Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifCreate")
-       Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate")
+       Expect(reply.Retval).To(BeEquivalentTo(0),
+               "Incorrect Retval value for MemifCreate")
+       Expect(reply.SwIfIndex).To(BeEquivalentTo(4),
+               "Incorrect SwIfIndex value for MemifCreate")
 }
 
 func TestRequestReplyMemifDelete(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
+       // mock reply
        ctx.mockVpp.MockReply(&memif.MemifDeleteReply{})
+
        request := &memif.MemifDelete{
                SwIfIndex: 15,
        }
@@ -161,26 +182,30 @@ func TestRequestReplyMemifDelete(t *testing.T) {
 
        err := ctx.ch.SendRequest(request).ReceiveReply(reply)
        Expect(err).ShouldNot(HaveOccurred())
-       Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifDelete")
 }
 
 func TestRequestReplyMemifDetails(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
+       // mock reply
        ctx.mockVpp.MockReply(&memif.MemifDetails{
                SwIfIndex: 25,
                IfName:    []byte("memif-name"),
                Role:      0,
        })
+
        request := &memif.MemifDump{}
        reply := &memif.MemifDetails{}
 
        err := ctx.ch.SendRequest(request).ReceiveReply(reply)
        Expect(err).ShouldNot(HaveOccurred())
-       Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails")
-       Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array")
-       Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails")
+       Expect(reply.SwIfIndex).To(BeEquivalentTo(25),
+               "Incorrect SwIfIndex value for MemifDetails")
+       Expect(reply.IfName).ToNot(BeEmpty(),
+               "MemifDetails IfName is empty byte array")
+       Expect(reply.Role).To(BeEquivalentTo(0),
+               "Incorrect Role value for MemifDetails")
 }
 
 func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
@@ -204,7 +229,7 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
                msg := &tap.SwInterfaceTapDetails{}
                stop, err := reqCtx.ReceiveReply(msg)
                if stop {
-                       break // break out of the loop
+                       break
                }
                Expect(err).ShouldNot(HaveOccurred())
                cnt++
@@ -232,7 +257,7 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
                msg := &memif.MemifDetails{}
                stop, err := reqCtx.ReceiveReply(msg)
                if stop {
-                       break // break out of the loop
+                       break
                }
                Expect(err).ShouldNot(HaveOccurred())
                cnt++
@@ -240,51 +265,16 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
        Expect(cnt).To(BeEquivalentTo(10))
 }
 
-func TestNotifications(t *testing.T) {
-       ctx := setupTest(t)
-       defer ctx.teardownTest()
-
-       // subscribe for notification
-       notifChan := make(chan api.Message, 1)
-       subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags)
-       Expect(err).ShouldNot(HaveOccurred())
-
-       // mock the notification and force its delivery
-       ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
-               SwIfIndex:   3,
-               AdminUpDown: 1,
-       })
-       ctx.mockVpp.SendMsg(0, []byte(""))
-
-       // receive the notification
-       var notif *interfaces.SwInterfaceSetFlags
-       Eventually(func() *interfaces.SwInterfaceSetFlags {
-               select {
-               case n := <-notifChan:
-                       notif = n.(*interfaces.SwInterfaceSetFlags)
-                       return notif
-               default:
-                       return nil
-               }
-       }).ShouldNot(BeNil())
-
-       // verify the received notifications
-       Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags")
-       Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags")
-
-       ctx.ch.UnsubscribeNotification(subs)
-}
-
 func TestNotificationEvent(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
        // subscribe for notification
        notifChan := make(chan api.Message, 1)
-       subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent)
+       sub, err := ctx.ch.SubscribeNotification(notifChan, &interfaces.SwInterfaceEvent{})
        Expect(err).ShouldNot(HaveOccurred())
 
-       // mock the notification and force its delivery
+       // mock event and force its delivery
        ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{
                SwIfIndex:  2,
                LinkUpDown: 1,
@@ -307,16 +297,9 @@ func TestNotificationEvent(t *testing.T) {
        Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags")
        Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags")
 
-       ctx.ch.UnsubscribeNotification(subs)
-}
-
-/*func TestCheckMessageCompatibility(t *testing.T) {
-       ctx := setupTest(t)
-       defer ctx.teardownTest()
-
-       err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{})
+       err = sub.Unsubscribe()
        Expect(err).ShouldNot(HaveOccurred())
-}*/
+}
 
 func TestSetReplyTimeout(t *testing.T) {
        ctx := setupTest(t)
@@ -324,8 +307,10 @@ func TestSetReplyTimeout(t *testing.T) {
 
        ctx.ch.SetReplyTimeout(time.Millisecond)
 
-       // first one request should work
+       // mock reply
        ctx.mockVpp.MockReply(&ControlPingReply{})
+
+       // first one request should work
        err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
        Expect(err).ShouldNot(HaveOccurred())
 
@@ -339,16 +324,23 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
-       ctx.ch.SetReplyTimeout(time.Millisecond)
+       ctx.ch.SetReplyTimeout(time.Millisecond * 100)
 
-       var msgs []api.Message
-       for i := 1; i <= 3; i++ {
-               msgs = append(msgs, &interfaces.SwInterfaceDetails{
-                       SwIfIndex:     uint32(i),
+       // mock reply
+       ctx.mockVpp.MockReply(
+               &interfaces.SwInterfaceDetails{
+                       SwIfIndex:     1,
                        InterfaceName: []byte("if-name-test"),
-               })
-       }
-       ctx.mockVpp.MockReply(msgs...)
+               },
+               &interfaces.SwInterfaceDetails{
+                       SwIfIndex:     2,
+                       InterfaceName: []byte("if-name-test"),
+               },
+               &interfaces.SwInterfaceDetails{
+                       SwIfIndex:     3,
+                       InterfaceName: []byte("if-name-test"),
+               },
+       )
        ctx.mockVpp.MockReply(&ControlPingReply{})
 
        cnt := 0
@@ -357,12 +349,12 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
                for {
                        msg := &interfaces.SwInterfaceDetails{}
                        stop, err := reqCtx.ReceiveReply(msg)
-                       if stop {
-                               break // break out of the loop
-                       }
                        if err != nil {
                                return err
                        }
+                       if stop {
+                               break
+                       }
                        cnt++
                }
                return nil
@@ -443,7 +435,7 @@ func TestMultiRequestDouble(t *testing.T) {
                        msg := &interfaces.SwInterfaceDetails{}
                        stop, err := reqCtx.ReceiveReply(msg)
                        if stop {
-                               break // break out of the loop
+                               break
                        }
                        if err != nil {
                                return err
@@ -468,8 +460,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
 
        ctx.ch.SetReplyTimeout(time.Millisecond)
 
-       // first one request should work
+       // mock reply
        ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1})
+       // first one request should work
+
        err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
        Expect(err).ShouldNot(HaveOccurred())
 
@@ -479,9 +473,16 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
 
        ctx.mockVpp.MockReplyWithContext(
                // simulating late reply
-               mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 2},
+               mock.MsgWithContext{
+                       Msg:    &ControlPingReply{},
+                       SeqNum: 2,
+               },
                // normal reply for next request
-               mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3})
+               mock.MsgWithContext{
+                       Msg:    &tap.TapConnectReply{},
+                       SeqNum: 3,
+               },
+       )
 
        req := &tap.TapConnect{
                TapName:      []byte("test-tap-name"),
@@ -508,8 +509,10 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
 
        ctx.ch.SetReplyTimeout(time.Millisecond * 100)
 
-       // first one request should work
+       // mock reply
        ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1})
+
+       // first one request should work
        err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
        Expect(err).ShouldNot(HaveOccurred())
 
@@ -520,7 +523,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
                        msg := &interfaces.SwInterfaceDetails{}
                        stop, err := reqCtx.ReceiveReply(msg)
                        if stop {
-                               break // break out of the loop
+                               break
                        }
                        if err != nil {
                                return err
@@ -564,18 +567,20 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
        Expect(err).ShouldNot(HaveOccurred())
 }
 
-/*func TestInvalidMessageID(t *testing.T) {
+func TestInvalidMessageID(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
 
-       // first one request should work
+       // mock reply
+       ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
        ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+
+       // first one request should work
        err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{})
        Expect(err).ShouldNot(HaveOccurred())
 
        // second should fail with error invalid message ID
-       ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
        err = ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
        Expect(err).Should(HaveOccurred())
        Expect(err.Error()).To(ContainSubstring("invalid message ID"))
-}*/
+}
index c77358f..7d014ce 100644 (file)
@@ -29,42 +29,19 @@ import (
        "git.fd.io/govpp.git/codec"
 )
 
-const (
-       requestChannelBufSize      = 100 // default size of the request channel buffer
-       replyChannelBufSize        = 100 // default size of the reply channel buffer
-       notificationChannelBufSize = 100 // default size of the notification channel buffer
-
-       defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
+var (
+       RequestChanBufSize      = 100 // default size of the request channel buffer
+       ReplyChanBufSize        = 100 // default size of the reply channel buffer
+       NotificationChanBufSize = 100 // default size of the notification channel buffer
 )
 
 var (
-       healthCheckInterval     = time.Second * 1        // default health check interval
-       healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
-       healthCheckThreshold    = 1                      // number of failed health checks until the error is reported
+       HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
+       HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
+       HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
+       DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
 )
 
-// SetHealthCheckProbeInterval sets health check probe interval.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckProbeInterval(interval time.Duration) {
-       healthCheckInterval = interval
-}
-
-// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
-// If reply arrives after the timeout, check is considered as failed.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckReplyTimeout(timeout time.Duration) {
-       healthCheckReplyTimeout = timeout
-}
-
-// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckThreshold(threshold int) {
-       healthCheckThreshold = threshold
-}
-
 // ConnectionState represents the current state of the connection to VPP.
 type ConnectionState int
 
@@ -104,10 +81,10 @@ type Connection struct {
 
        maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
        channelsLock sync.RWMutex        // lock for the channels map
-       channels     map[uint16]*channel // map of all API channels indexed by the channel ID
+       channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
 
-       notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
-       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
+       subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
+       subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
 
        pingReqID   uint16 // ID if the ControlPing message
        pingReplyID uint16 // ID of the ControlPingReply message
@@ -116,18 +93,30 @@ type Connection struct {
        lastReply     time.Time  // time of the last received reply from VPP
 }
 
+func newConnection(vpp adapter.VppAdapter) *Connection {
+       c := &Connection{
+               vpp:           vpp,
+               codec:         &codec.MsgCodec{},
+               msgIDs:        make(map[string]uint16),
+               msgMap:        make(map[uint16]api.Message),
+               channels:      make(map[uint16]*Channel),
+               subscriptions: make(map[uint16][]*subscriptionCtx),
+       }
+       vpp.SetMsgCallback(c.msgCallback)
+       return c
+}
+
 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
        // create new connection handle
-       c, err := newConnection(vppAdapter)
+       c, err := createConnection(vppAdapter)
        if err != nil {
                return nil, err
        }
 
        // blocking attempt to connect to VPP
-       err = c.connectVPP()
-       if err != nil {
+       if err := c.connectVPP(); err != nil {
                return nil, err
        }
 
@@ -140,13 +129,13 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
        // create new connection handle
-       c, err := newConnection(vppAdapter)
+       c, err := createConnection(vppAdapter)
        if err != nil {
                return nil, nil, err
        }
 
        // asynchronously attempt to connect to VPP
-       connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+       connChan := make(chan ConnectionEvent, NotificationChanBufSize)
        go c.connectLoop(connChan)
 
        return c, connChan, nil
@@ -168,7 +157,7 @@ func (c *Connection) Disconnect() {
 }
 
 // newConnection returns new connection handle.
-func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
+func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
        connLock.Lock()
        defer connLock.Unlock()
 
@@ -176,15 +165,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
                return nil, errors.New("only one connection per process is supported")
        }
 
-       conn = &Connection{
-               vpp:                vppAdapter,
-               codec:              &codec.MsgCodec{},
-               channels:           make(map[uint16]*channel),
-               msgIDs:             make(map[string]uint16),
-               msgMap:             make(map[uint16]api.Message),
-               notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
-       }
-       conn.vpp.SetMsgCallback(conn.msgCallback)
+       conn = newConnection(vppAdapter)
 
        return conn, nil
 }
@@ -211,8 +192,72 @@ func (c *Connection) connectVPP() error {
        return nil
 }
 
-func getMsgNameWithCrc(x api.Message) string {
-       return x.GetMessageName() + "_" + x.GetCrcString()
+func (c *Connection) NewAPIChannel() (api.Channel, error) {
+       return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
+}
+
+func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
+       return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
+}
+
+// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
+// It allows to specify custom buffer sizes for the request and reply Go channels.
+func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
+
+       // create new channel
+       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+       channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
+
+       // store API channel within the client
+       c.channelsLock.Lock()
+       c.channels[chID] = channel
+       c.channelsLock.Unlock()
+
+       // start watching on the request channel
+       go c.watchRequests(channel)
+
+       return channel, nil
+}
+
+// releaseAPIChannel releases API channel that needs to be closed.
+func (c *Connection) releaseAPIChannel(ch *Channel) {
+       log.WithFields(logger.Fields{
+               "channel": ch.id,
+       }).Debug("API channel released")
+
+       // delete the channel from channels map
+       c.channelsLock.Lock()
+       delete(c.channels, ch.id)
+       c.channelsLock.Unlock()
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+       if c == nil {
+               return 0, errors.New("nil connection passed in")
+       }
+
+       if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+               return msgID, nil
+       }
+
+       return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
+}
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
+
+       if msg, ok := c.msgMap[msgID]; ok {
+               return msg, nil
+       }
+
+       return nil, fmt.Errorf("unknown message ID: %d", msgID)
 }
 
 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
@@ -268,32 +313,6 @@ func (c *Connection) retrieveMessageIDs() (err error) {
        return nil
 }
 
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
-       if c == nil {
-               return 0, errors.New("nil connection passed in")
-       }
-
-       if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
-               return msgID, nil
-       }
-
-       return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
-}
-
-// LookupByID looks up message name and crc by ID.
-func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
-       if c == nil {
-               return nil, errors.New("nil connection passed in")
-       }
-
-       if msg, ok := c.msgMap[msgID]; ok {
-               return msg, nil
-       }
-
-       return nil, fmt.Errorf("unknown message ID: %d", msgID)
-}
-
 // disconnectVPP disconnects from VPP in case it is connected.
 func (c *Connection) disconnectVPP() {
        if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
@@ -341,7 +360,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        // send health check probes until an error or timeout occurs
        for {
                // sleep until next health check probe period
-               time.Sleep(healthCheckInterval)
+               time.Sleep(HealthCheckProbeInterval)
 
                if atomic.LoadUint32(&c.connected) == 0 {
                        // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -365,7 +384,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        case vppReply := <-ch.replyChan:
                                err = vppReply.err
 
-                       case <-time.After(healthCheckReplyTimeout):
+                       case <-time.After(HealthCheckReplyTimeout):
                                err = ErrProbeTimeout
 
                                // check if time since last reply from any other
@@ -374,7 +393,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                                sinceLastReply = time.Since(c.lastReply)
                                c.lastReplyLock.Unlock()
 
-                               if sinceLastReply < healthCheckReplyTimeout {
+                               if sinceLastReply < HealthCheckReplyTimeout {
                                        log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
                                        continue
                                }
@@ -384,10 +403,10 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
 
                if err == ErrProbeTimeout {
                        failedChecks++
-                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
-                       if failedChecks > healthCheckThreshold {
+                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
+                       if failedChecks > HealthCheckThreshold {
                                // in case of exceeded failed check treshold, assume VPP disconnected
-                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
                                connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
                                break
                        }
@@ -411,52 +430,6 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        c.connectLoop(connChan)
 }
 
-func (c *Connection) NewAPIChannel() (api.Channel, error) {
-       return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
-}
-
-func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
-       return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
-}
-
-// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
-// It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
-       if c == nil {
-               return nil, errors.New("nil connection passed in")
-       }
-
-       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
-       ch := &channel{
-               id:                 chID,
-               replyTimeout:       defaultReplyTimeout,
-               msgDecoder:         c.codec,
-               msgIdentifier:      c,
-               reqChan:            make(chan *vppRequest, reqChanBufSize),
-               replyChan:          make(chan *vppReply, replyChanBufSize),
-               notifSubsChan:      make(chan *subscriptionRequest, reqChanBufSize),
-               notifSubsReplyChan: make(chan error, replyChanBufSize),
-       }
-
-       // store API channel within the client
-       c.channelsLock.Lock()
-       c.channels[chID] = ch
-       c.channelsLock.Unlock()
-
-       // start watching on the request channel
-       go c.watchRequests(ch)
-
-       return ch, nil
-}
-
-// releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *channel) {
-       log.WithFields(logger.Fields{
-               "channel": ch.id,
-       }).Debug("API channel released")
-
-       // delete the channel from channels map
-       c.channelsLock.Lock()
-       delete(c.channels, ch.id)
-       c.channelsLock.Unlock()
+func getMsgNameWithCrc(x api.Message) string {
+       return x.GetMessageName() + "_" + x.GetCrcString()
 }
index 5c8c309..cea7d2d 100644 (file)
 
 package core_test
 
-/*
 import (
        "testing"
 
        "git.fd.io/govpp.git/adapter/mock"
        "git.fd.io/govpp.git/api"
+       "git.fd.io/govpp.git/codec"
        "git.fd.io/govpp.git/core"
-       "git.fd.io/govpp.git/core/bin_api/vpe"
        "git.fd.io/govpp.git/examples/bin_api/interfaces"
-       "git.fd.io/govpp.git/examples/bin_api/stats"
-
-       "git.fd.io/govpp.git/codec"
+       "git.fd.io/govpp.git/examples/binapi/stats"
+       "git.fd.io/govpp.git/examples/binapi/vpe"
        . "github.com/onsi/gomega"
 )
 
@@ -38,8 +36,9 @@ type testCtx struct {
 func setupTest(t *testing.T, bufferedChan bool) *testCtx {
        RegisterTestingT(t)
 
-       ctx := &testCtx{}
-       ctx.mockVpp = &mock.VppAdapter{}
+       ctx := &testCtx{
+               mockVpp: mock.NewVppAdapter(),
+       }
 
        var err error
        ctx.conn, err = core.Connect(ctx.mockVpp)
@@ -60,100 +59,6 @@ func (ctx *testCtx) teardownTest() {
        ctx.conn.Disconnect()
 }
 
-func TestSimpleRequest(t *testing.T) {
-       ctx := setupTest(t, false)
-       defer ctx.teardownTest()
-
-       ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5})
-
-       req := &vpe.ControlPing{}
-       reply := &vpe.ControlPingReply{}
-
-       // send the request and receive a reply
-       ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req}
-       vppReply := <-ctx.ch.GetReplyChannel()
-
-       Expect(vppReply).ShouldNot(BeNil())
-       Expect(vppReply.Error).ShouldNot(HaveOccurred())
-
-       // decode the message
-       err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
-       Expect(err).ShouldNot(HaveOccurred())
-
-       Expect(reply.Retval).To(BeEquivalentTo(-5))
-}
-
-func TestMultiRequest(t *testing.T) {
-       ctx := setupTest(t, false)
-       defer ctx.teardownTest()
-
-       msgs := []api.Message{}
-       for m := 0; m < 10; m++ {
-               msgs = append(msgs, &interfaces.SwInterfaceDetails{})
-       }
-       ctx.mockVpp.MockReply(msgs...)
-       ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
-
-       // send multipart request
-       ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
-
-       cnt := 0
-       for {
-               // receive a reply
-               vppReply := <-ctx.ch.GetReplyChannel()
-               if vppReply.LastReplyReceived {
-                       break // break out of the loop
-               }
-               Expect(vppReply.Error).ShouldNot(HaveOccurred())
-
-               // decode the message
-               reply := &interfaces.SwInterfaceDetails{}
-               err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
-               Expect(err).ShouldNot(HaveOccurred())
-               cnt++
-       }
-
-       Expect(cnt).To(BeEquivalentTo(10))
-}
-
-func TestNotifications(t *testing.T) {
-       ctx := setupTest(t, false)
-       defer ctx.teardownTest()
-
-       // subscribe for notification
-       notifChan := make(chan api.Message, 1)
-       subscription := &api.NotifSubscription{
-               NotifChan:  notifChan,
-               MsgFactory: interfaces.NewSwInterfaceSetFlags,
-       }
-       ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
-               Subscription: subscription,
-               Subscribe:    true,
-       }
-       err := <-ctx.ch.GetNotificationReplyChannel()
-       Expect(err).ShouldNot(HaveOccurred())
-
-       // mock the notification and force its delivery
-       ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
-               SwIfIndex:   3,
-               AdminUpDown: 1,
-       })
-       ctx.mockVpp.SendMsg(0, []byte{0})
-
-       // receive the notification
-       notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags)
-
-       Expect(notif.SwIfIndex).To(BeEquivalentTo(3))
-
-       // unsubscribe notification
-       ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
-               Subscription: subscription,
-               Subscribe:    false,
-       }
-       err = <-ctx.ch.GetNotificationReplyChannel()
-       Expect(err).ShouldNot(HaveOccurred())
-}
-
 func TestNilConnection(t *testing.T) {
        RegisterTestingT(t)
        var conn *core.Connection
@@ -184,47 +89,16 @@ func TestAsyncConnection(t *testing.T) {
        defer ctx.teardownTest()
 
        ctx.conn.Disconnect()
-       conn, ch, err := core.AsyncConnect(ctx.mockVpp)
+       conn, statusChan, err := core.AsyncConnect(ctx.mockVpp)
        ctx.conn = conn
 
        Expect(err).ShouldNot(HaveOccurred())
        Expect(conn).ShouldNot(BeNil())
 
-       ev := <-ch
+       ev := <-statusChan
        Expect(ev.State).Should(BeEquivalentTo(core.Connected))
 }
 
-func TestFullBuffer(t *testing.T) {
-       ctx := setupTest(t, false)
-       defer ctx.teardownTest()
-
-       // close the default API channel
-       ctx.ch.Close()
-
-       // create a new channel with limited buffer sizes
-       var err error
-       ctx.ch, err = ctx.conn.NewAPIChannelBuffered(10, 1)
-       Expect(err).ShouldNot(HaveOccurred())
-
-       // send multiple requests, only one reply should be read
-       for i := 0; i < 20; i++ {
-               ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
-               ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}}
-       }
-
-       vppReply := <-ctx.ch.GetReplyChannel()
-       Expect(vppReply).ShouldNot(BeNil())
-
-       var received bool
-       select {
-       case <-ctx.ch.GetReplyChannel():
-               received = true // this should not happen
-       default:
-               received = false // no reply to be received
-       }
-       Expect(received).Should(BeFalse(), "A reply has been recieved, should had been ignored.")
-}
-
 func TestCodec(t *testing.T) {
        RegisterTestingT(t)
 
@@ -289,7 +163,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
 
        var reqCtx []api.RequestCtx
        for i := 0; i < 10; i++ {
-               ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)})
+               ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
                req := &vpe.ControlPing{}
                reqCtx = append(reqCtx, ctx.ch.SendRequest(req))
        }
@@ -298,7 +172,6 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
                reply := &vpe.ControlPingReply{}
                err := reqCtx[i].ReceiveReply(reply)
                Expect(err).ShouldNot(HaveOccurred())
-               Expect(reply.Retval).To(BeEquivalentTo(i))
        }
 }
 
@@ -306,7 +179,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) {
        ctx := setupTest(t, false)
        defer ctx.teardownTest()
 
-       msgs := []api.Message{}
+       var msgs []api.Message
        for i := 0; i < 10; i++ {
                msgs = append(msgs, &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)})
        }
@@ -325,7 +198,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) {
                lastReplyReceived, err := reqCtx.ReceiveReply(reply)
 
                if lastReplyReceived {
-                       break // break out of the loop
+                       break
                }
 
                Expect(err).ShouldNot(HaveOccurred())
@@ -343,7 +216,7 @@ func TestSimpleRequestWithTimeout(t *testing.T) {
 
        // reply for a previous timeouted requests to be ignored
        ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{
-               Msg:    &vpe.ControlPingReply{Retval: 1},
+               Msg:    &vpe.ControlPingReply{},
                SeqNum: 0,
        })
 
@@ -359,12 +232,12 @@ func TestSimpleRequestWithTimeout(t *testing.T) {
        ctx.mockVpp.MockReplyWithContext(
                // reply for the previous request
                mock.MsgWithContext{
-                       Msg:    &vpe.ControlPingReply{Retval: 1},
+                       Msg:    &vpe.ControlPingReply{},
                        SeqNum: 1,
                },
                // reply for the next request
                mock.MsgWithContext{
-                       Msg:    &vpe.ControlPingReply{Retval: 2},
+                       Msg:    &vpe.ControlPingReply{},
                        SeqNum: 2,
                })
 
@@ -376,7 +249,6 @@ func TestSimpleRequestWithTimeout(t *testing.T) {
        reply = &vpe.ControlPingReply{}
        err = reqCtx2.ReceiveReply(reply)
        Expect(err).To(BeNil())
-       Expect(reply.Retval).To(BeEquivalentTo(2))
 }
 
 func TestSimpleRequestsWithMissingReply(t *testing.T) {
@@ -393,7 +265,7 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) {
 
        // third request with reply
        ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{
-               Msg:    &vpe.ControlPingReply{Retval: 3},
+               Msg:    &vpe.ControlPingReply{},
                SeqNum: 3,
        })
        req3 := &vpe.ControlPing{}
@@ -414,7 +286,6 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) {
        reply = &vpe.ControlPingReply{}
        err = reqCtx3.ReceiveReply(reply)
        Expect(err).To(BeNil())
-       Expect(reply.Retval).To(BeEquivalentTo(3))
 }
 
 func TestMultiRequestsWithErrors(t *testing.T) {
@@ -422,38 +293,25 @@ func TestMultiRequestsWithErrors(t *testing.T) {
        defer ctx.teardownTest()
 
        // replies for a previous timeouted requests to be ignored
-       msgs := []mock.MsgWithContext{}
-       msgs = append(msgs,
-               mock.MsgWithContext{
-                       Msg:    &vpe.ControlPingReply{Retval: 1},
-                       SeqNum: 0xffff - 1,
-               },
-               mock.MsgWithContext{
-                       Msg:    &vpe.ControlPingReply{Retval: 1},
-                       SeqNum: 0xffff,
-               },
-               mock.MsgWithContext{
-                       Msg:    &vpe.ControlPingReply{Retval: 1},
-                       SeqNum: 0,
-               })
-
+       msgs := []mock.MsgWithContext{
+               {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff - 1},
+               {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff},
+               {Msg: &vpe.ControlPingReply{}, SeqNum: 0},
+       }
        for i := 0; i < 10; i++ {
-               msgs = append(msgs,
-                       mock.MsgWithContext{
-                               Msg:       &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)},
-                               SeqNum:    1,
-                               Multipart: true,
-                       })
+               msgs = append(msgs, mock.MsgWithContext{
+                       Msg:       &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)},
+                       SeqNum:    1,
+                       Multipart: true,
+               })
        }
        // missing finalizing control ping
 
        // reply for a next request
-       msgs = append(msgs,
-               mock.MsgWithContext{
-                       Msg:       &vpe.ControlPingReply{Retval: 2},
-                       SeqNum:    2,
-                       Multipart: false,
-               })
+       msgs = append(msgs, mock.MsgWithContext{
+               Msg:    &vpe.ControlPingReply{},
+               SeqNum: 2,
+       })
 
        // queue replies
        ctx.mockVpp.MockReplyWithContext(msgs...)
@@ -487,7 +345,6 @@ func TestMultiRequestsWithErrors(t *testing.T) {
        reply2 := &vpe.ControlPingReply{}
        err = reqCtx2.ReceiveReply(reply2)
        Expect(err).To(BeNil())
-       Expect(reply2.Retval).To(BeEquivalentTo(2))
 }
 
 func TestRequestsOrdering(t *testing.T) {
@@ -498,12 +355,12 @@ func TestRequestsOrdering(t *testing.T) {
        // some replies will get thrown away
 
        // first request
-       ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1})
+       ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
        req1 := &vpe.ControlPing{}
        reqCtx1 := ctx.ch.SendRequest(req1)
 
        // second request
-       ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2})
+       ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
        req2 := &vpe.ControlPing{}
        reqCtx2 := ctx.ch.SendRequest(req2)
 
@@ -512,7 +369,6 @@ func TestRequestsOrdering(t *testing.T) {
        reply2 := &vpe.ControlPingReply{}
        err := reqCtx2.ReceiveReply(reply2)
        Expect(err).To(BeNil())
-       Expect(reply2.Retval).To(BeEquivalentTo(2))
 
        // first request has already been considered closed
        reply1 := &vpe.ControlPingReply{}
@@ -522,7 +378,7 @@ func TestRequestsOrdering(t *testing.T) {
 }
 
 func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
-       ctx := setupTest(t, true)
+       ctx := setupTest(t, false)
        defer ctx.teardownTest()
 
        numIters := 0xffff + 100
@@ -530,7 +386,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
 
        for i := 0; i < numIters+30; i++ {
                if i < numIters {
-                       ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)})
+                       ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
                        req := &vpe.ControlPing{}
                        reqCtx[i] = ctx.ch.SendRequest(req)
                }
@@ -538,8 +394,6 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
                        reply := &vpe.ControlPingReply{}
                        err := reqCtx[i-30].ReceiveReply(reply)
                        Expect(err).ShouldNot(HaveOccurred())
-                       Expect(reply.Retval).To(BeEquivalentTo(i - 30))
                }
        }
 }
-*/
diff --git a/core/notification_handler.go b/core/notification_handler.go
deleted file mode 100644 (file)
index 7b889e3..0000000
+++ /dev/null
@@ -1,170 +0,0 @@
-// Copyright (c) 2017 Cisco and/or its affiliates.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at:
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package core
-
-import (
-       "fmt"
-
-       "git.fd.io/govpp.git/api"
-       logger "github.com/sirupsen/logrus"
-)
-
-// processSubscriptionRequest processes a notification subscribe request.
-func (c *Connection) processSubscriptionRequest(ch *channel, req *subscriptionRequest) error {
-       var err error
-
-       // subscribe / unsubscribe
-       if req.subscribe {
-               err = c.addNotifSubscription(req.sub)
-       } else {
-               err = c.removeNotifSubscription(req.sub)
-       }
-
-       // send the reply into the go channel
-       select {
-       case ch.notifSubsReplyChan <- err:
-               // reply sent successfully
-       default:
-               // unable to write into the channel without blocking
-               log.WithFields(logger.Fields{
-                       "channel": ch.id,
-               }).Warn("Unable to deliver the subscribe reply, reciever end not ready.")
-       }
-
-       return err
-}
-
-// addNotifSubscription adds the notification subscription into the subscriptions map of the connection.
-func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
-       // get message ID of the notification message
-       msgID, msgName, err := c.getSubscriptionMessageID(subs)
-       if err != nil {
-               return err
-       }
-
-       log.WithFields(logger.Fields{
-               "msg_name": msgName,
-               "msg_id":   msgID,
-       }).Debug("Adding new notification subscription.")
-
-       // add the subscription into map
-       c.notifSubscriptionsLock.Lock()
-       defer c.notifSubscriptionsLock.Unlock()
-
-       c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs)
-
-       return nil
-}
-
-// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection.
-func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error {
-       // get message ID of the notification message
-       msgID, msgName, err := c.getSubscriptionMessageID(subs)
-       if err != nil {
-               return err
-       }
-
-       log.WithFields(logger.Fields{
-               "msg_name": msgName,
-               "msg_id":   msgID,
-       }).Debug("Removing notification subscription.")
-
-       // remove the subscription from the map
-       c.notifSubscriptionsLock.Lock()
-       defer c.notifSubscriptionsLock.Unlock()
-
-       for i, item := range c.notifSubscriptions[msgID] {
-               if item == subs {
-                       // remove i-th item in the slice
-                       c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...)
-                       break
-               }
-       }
-
-       return nil
-}
-
-// isNotificationMessage returns true if someone has subscribed to provided message ID.
-func (c *Connection) isNotificationMessage(msgID uint16) bool {
-       c.notifSubscriptionsLock.RLock()
-       defer c.notifSubscriptionsLock.RUnlock()
-
-       _, exists := c.notifSubscriptions[msgID]
-       return exists
-}
-
-// sendNotifications send a notification message to all subscribers subscribed for that message.
-func (c *Connection) sendNotifications(msgID uint16, data []byte) {
-       c.notifSubscriptionsLock.RLock()
-       defer c.notifSubscriptionsLock.RUnlock()
-
-       matched := false
-
-       // send to notification to each subscriber
-       for _, subs := range c.notifSubscriptions[msgID] {
-               msg := subs.MsgFactory()
-               log.WithFields(logger.Fields{
-                       "msg_name": msg.GetMessageName(),
-                       "msg_id":   msgID,
-                       "msg_size": len(data),
-               }).Debug("Sending a notification to the subscription channel.")
-
-               if err := c.codec.DecodeMsg(data, msg); err != nil {
-                       log.WithFields(logger.Fields{
-                               "msg_name": msg.GetMessageName(),
-                               "msg_id":   msgID,
-                               "msg_size": len(data),
-                       }).Errorf("Unable to decode the notification message: %v", err)
-                       continue
-               }
-
-               // send the message into the go channel of the subscription
-               select {
-               case subs.NotifChan <- msg:
-                       // message sent successfully
-               default:
-                       // unable to write into the channel without blocking
-                       log.WithFields(logger.Fields{
-                               "msg_name": msg.GetMessageName(),
-                               "msg_id":   msgID,
-                               "msg_size": len(data),
-                       }).Warn("Unable to deliver the notification, reciever end not ready.")
-               }
-
-               matched = true
-       }
-
-       if !matched {
-               log.WithFields(logger.Fields{
-                       "msg_id":   msgID,
-                       "msg_size": len(data),
-               }).Info("No subscription found for the notification message.")
-       }
-}
-
-// getSubscriptionMessageID returns ID of the message the subscription is tied to.
-func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, string, error) {
-       msg := subs.MsgFactory()
-       msgID, err := c.GetMessageID(msg)
-       if err != nil {
-               log.WithFields(logger.Fields{
-                       "msg_name": msg.GetMessageName(),
-                       "msg_crc":  msg.GetCrcString(),
-               }).Errorf("unable to retrieve message ID: %v", err)
-               return 0, "", fmt.Errorf("unable to retrieve message ID: %v", err)
-       }
-
-       return msgID, msg.GetMessageName(), nil
-}
index fd6d100..14c095d 100644 (file)
@@ -29,7 +29,7 @@ var (
 )
 
 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *channel) {
+func (c *Connection) watchRequests(ch *Channel) {
        for {
                select {
                case req, ok := <-ch.reqChan:
@@ -40,16 +40,12 @@ func (c *Connection) watchRequests(ch *channel) {
                                return
                        }
                        c.processRequest(ch, req)
-
-               case req := <-ch.notifSubsChan:
-                       // new request on the notification subscribe channel
-                       c.processSubscriptionRequest(ch, req)
                }
        }
 }
 
 // processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
+func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.connected) == 0 {
                err := ErrNotConnected
@@ -137,7 +133,7 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
 }
 
 // msgCallback is called whenever any binary API message comes from VPP.
-func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
+func (c *Connection) msgCallback(msgID uint16, data []byte) {
        connLock.RLock()
        defer connLock.RUnlock()
 
@@ -157,13 +153,8 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
        // - replies that don't have context as first field (comes as zero)
        // - events that don't have context at all (comes as non zero)
        //
-       msgContext, err := c.codec.DecodeMsgContext(data, msg)
-       if err == nil {
-               if context != msgContext {
-                       log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext)
-                       context = msgContext
-               }
-       } else {
+       context, err := c.codec.DecodeMsgContext(data, msg)
+       if err != nil {
                log.Errorf("decoding context failed: %v", err)
        }
 
@@ -218,7 +209,7 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
 
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
 // it logs the error and do not send the message.
-func sendReply(ch *channel, reply *vppReply) {
+func sendReply(ch *Channel, reply *vppReply) {
        select {
        case ch.replyChan <- reply:
                // reply sent successfully
@@ -232,10 +223,68 @@ func sendReply(ch *channel, reply *vppReply) {
        }
 }
 
-func sendReplyError(ch *channel, req *vppRequest, err error) {
+func sendReplyError(ch *Channel, req *vppRequest, err error) {
        sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
 }
 
+// isNotificationMessage returns true if someone has subscribed to provided message ID.
+func (c *Connection) isNotificationMessage(msgID uint16) bool {
+       c.subscriptionsLock.RLock()
+       defer c.subscriptionsLock.RUnlock()
+
+       _, exists := c.subscriptions[msgID]
+       return exists
+}
+
+// sendNotifications send a notification message to all subscribers subscribed for that message.
+func (c *Connection) sendNotifications(msgID uint16, data []byte) {
+       c.subscriptionsLock.RLock()
+       defer c.subscriptionsLock.RUnlock()
+
+       matched := false
+
+       // send to notification to each subscriber
+       for _, sub := range c.subscriptions[msgID] {
+               log.WithFields(logger.Fields{
+                       "msg_name": sub.event.GetMessageName(),
+                       "msg_id":   msgID,
+                       "msg_size": len(data),
+               }).Debug("Sending a notification to the subscription channel.")
+
+               event := sub.msgFactory()
+               if err := c.codec.DecodeMsg(data, event); err != nil {
+                       log.WithFields(logger.Fields{
+                               "msg_name": sub.event.GetMessageName(),
+                               "msg_id":   msgID,
+                               "msg_size": len(data),
+                       }).Errorf("Unable to decode the notification message: %v", err)
+                       continue
+               }
+
+               // send the message into the go channel of the subscription
+               select {
+               case sub.notifChan <- event:
+                       // message sent successfully
+               default:
+                       // unable to write into the channel without blocking
+                       log.WithFields(logger.Fields{
+                               "msg_name": sub.event.GetMessageName(),
+                               "msg_id":   msgID,
+                               "msg_size": len(data),
+                       }).Warn("Unable to deliver the notification, reciever end not ready.")
+               }
+
+               matched = true
+       }
+
+       if !matched {
+               log.WithFields(logger.Fields{
+                       "msg_id":   msgID,
+                       "msg_size": len(data),
+               }).Info("No subscription found for the notification message.")
+       }
+}
+
 // +------------------+-------------------+-----------------------+
 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
 // +------------------+-------------------+-----------------------+
index ff80173..0dfd335 100644 (file)
@@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index a6bdc93..144bf74 100644 (file)
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index 5ef58ed..38bbb6b 100644 (file)
@@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index c980b6a..9dc3d56 100644 (file)
@@ -21,6 +21,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index 3650355..ceb615f 100644 (file)
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index eb2dd8f..b66a077 100644 (file)
@@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index 36f5549..d2878ea 100644 (file)
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index f91a164..50476c3 100644 (file)
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
 import "bytes"
 
 // Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
 var _ = struc.Pack
 var _ = bytes.NewBuffer
 
index 5b4b17d..664f046 100644 (file)
@@ -20,7 +20,6 @@ import (
        "flag"
        "fmt"
        "log"
-       "os"
        "time"
 
        "github.com/pkg/profile"
@@ -65,16 +64,14 @@ func main() {
        // connect to VPP
        conn, err := govpp.Connect("")
        if err != nil {
-               log.Println("Error:", err)
-               os.Exit(1)
+               log.Fatalln("Error:", err)
        }
        defer conn.Disconnect()
 
        // create an API channel
        ch, err := conn.NewAPIChannelBuffered(cnt, cnt)
        if err != nil {
-               log.Println("Error:", err)
-               os.Exit(1)
+               log.Fatalln("Error:", err)
        }
        defer ch.Close()
 
@@ -101,10 +98,8 @@ func syncTest(ch api.Channel, cnt int) {
                req := &vpe.ControlPing{}
                reply := &vpe.ControlPingReply{}
 
-               err := ch.SendRequest(req).ReceiveReply(reply)
-               if err != nil {
-                       log.Println("Error in reply:", err)
-                       os.Exit(1)
+               if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
+                       log.Fatalln("Error in reply:", err)
                }
        }
 }
@@ -125,8 +120,7 @@ func asyncTest(ch api.Channel, cnt int) {
        for ctx := range ctxChan {
                reply := &vpe.ControlPingReply{}
                if err := ctx.ReceiveReply(reply); err != nil {
-                       log.Println("Error in reply:", err)
-                       os.Exit(1)
+                       log.Fatalln("Error in reply:", err)
                }
        }
 }
index b9e8052..08d4da6 100644 (file)
@@ -18,6 +18,7 @@ package main
 
 import (
        "fmt"
+       "log"
        "net"
        "os"
        "strings"
@@ -35,16 +36,14 @@ func main() {
        // connect to VPP
        conn, err := govpp.Connect("")
        if err != nil {
-               fmt.Println("Error:", err)
-               os.Exit(1)
+               log.Fatalln("ERROR:", err)
        }
        defer conn.Disconnect()
 
        // create an API channel that will be used in the examples
        ch, err := conn.NewAPIChannel()
        if err != nil {
-               fmt.Println("Error:", err)
-               os.Exit(1)
+               log.Fatalln("ERROR:", err)
        }
        defer ch.Close()
 
@@ -64,20 +63,22 @@ func main() {
 
 // aclVersion is the simplest API example - one empty request message and one reply message.
 func aclVersion(ch api.Channel) {
+       fmt.Println("ACL getting version")
+
        req := &acl.ACLPluginGetVersion{}
        reply := &acl.ACLPluginGetVersionReply{}
 
-       err := ch.SendRequest(req).ReceiveReply(reply)
-
-       if err != nil {
-               fmt.Println("Error:", err)
+       if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
+               fmt.Println("ERROR:", err)
        } else {
-               fmt.Printf("%+v\n", reply)
+               fmt.Printf("ACL version reply: %+v\n", reply)
        }
 }
 
 // aclConfig is another simple API example - in this case, the request contains structured data.
 func aclConfig(ch api.Channel) {
+       fmt.Println("ACL adding replace")
+
        req := &acl.ACLAddReplace{
                ACLIndex: ^uint32(0),
                Tag:      []byte("access list 1"),
@@ -102,10 +103,8 @@ func aclConfig(ch api.Channel) {
        }
        reply := &acl.ACLAddReplaceReply{}
 
-       err := ch.SendRequest(req).ReceiveReply(reply)
-
-       if err != nil {
-               fmt.Println("Error:", err)
+       if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
+               fmt.Println("ERROR:", err)
                return
        }
        if reply.Retval != 0 {
@@ -113,19 +112,23 @@ func aclConfig(ch api.Channel) {
                return
        }
 
-       fmt.Printf("%+v\n", reply)
+       fmt.Printf("ACL add replace reply: %+v\n", reply)
 
 }
 
 // aclDump shows an example where SendRequest and ReceiveReply are not chained together.
 func aclDump(ch api.Channel) {
+       fmt.Println("Dumping ACL")
+
        req := &acl.ACLDump{}
        reply := &acl.ACLDetails{}
 
-       if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
-               fmt.Println("Error:", err)
+       reqCtx := ch.SendRequest(req)
+
+       if err := reqCtx.ReceiveReply(reply); err != nil {
+               fmt.Println("ERROR:", err)
        } else {
-               fmt.Printf("%+v\n", reply)
+               fmt.Printf("ACL details: %+v\n", reply)
        }
 }
 
@@ -133,14 +136,13 @@ func aclDump(ch api.Channel) {
 func interfaceDump(ch api.Channel) {
        fmt.Println("Dumping interfaces")
 
-       req := &interfaces.SwInterfaceDump{}
-       reqCtx := ch.SendMultiRequest(req)
+       reqCtx := ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
 
        for {
                msg := &interfaces.SwInterfaceDetails{}
                stop, err := reqCtx.ReceiveReply(msg)
                if stop {
-                       break // break out of the loop
+                       break
                }
                if err != nil {
                        fmt.Println("ERROR:", err)
@@ -148,7 +150,7 @@ func interfaceDump(ch api.Channel) {
                ifaceName := strings.TrimFunc(string(msg.InterfaceName), func(r rune) bool {
                        return r == 0x00
                })
-               fmt.Printf("Interface: %q %+v\n", ifaceName, msg)
+               fmt.Printf("Interface %q: %+v\n", ifaceName, msg)
        }
 }
 
@@ -164,12 +166,12 @@ func ipAddressDump(ch api.Channel) {
                msg := &ip.IPAddressDetails{}
                stop, err := reqCtx.ReceiveReply(msg)
                if stop {
-                       break // break out of the loop
+                       break
                }
                if err != nil {
                        fmt.Println("ERROR:", err)
                }
-               fmt.Printf("ip address: %d %+v\n", msg.SwIfIndex, msg)
+               fmt.Printf("ip address details: %d %+v\n", msg.SwIfIndex, msg)
        }
 }
 
@@ -183,7 +185,7 @@ func setIpUnnumbered(ch api.Channel) {
        reply := &interfaces.SwInterfaceSetUnnumberedReply{}
 
        if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
-               fmt.Println("Error:", err)
+               fmt.Println("ERROR:", err)
        } else {
                fmt.Printf("%+v\n", reply)
        }
@@ -192,21 +194,20 @@ func setIpUnnumbered(ch api.Channel) {
 func ipUnnumberedDump(ch api.Channel) {
        fmt.Println("Dumping IP unnumbered")
 
-       req := &ip.IPUnnumberedDump{
+       reqCtx := ch.SendMultiRequest(&ip.IPUnnumberedDump{
                SwIfIndex: ^uint32(0),
-       }
-       reqCtx := ch.SendMultiRequest(req)
+       })
 
        for {
                msg := &ip.IPUnnumberedDetails{}
                stop, err := reqCtx.ReceiveReply(msg)
                if stop {
-                       break // break out of the loop
+                       break
                }
                if err != nil {
                        fmt.Println("ERROR:", err)
                }
-               fmt.Printf("ip unnumbered: %+v\n", msg)
+               fmt.Printf("IP unnumbered details: %+v\n", msg)
        }
 }
 
@@ -214,9 +215,12 @@ func ipUnnumberedDump(ch api.Channel) {
 // you are supposed to create your own Go channel with your preferred buffer size. If the channel's
 // buffer is full, the notifications will not be delivered into it.
 func interfaceNotifications(ch api.Channel) {
-       // subscribe for specific notification message
+       fmt.Println("Subscribing to notificaiton events")
+
        notifChan := make(chan api.Message, 100)
-       subs, err := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent)
+
+       // subscribe for specific notification message
+       sub, err := ch.SubscribeNotification(notifChan, &interfaces.SwInterfaceEvent{})
        if err != nil {
                panic(err)
        }
@@ -248,7 +252,7 @@ func interfaceNotifications(ch api.Channel) {
 
        // receive one notification
        notif := (<-notifChan).(*interfaces.SwInterfaceEvent)
-       fmt.Printf("NOTIF: %+v\n", notif)
+       fmt.Printf("incoming event: %+v\n", notif)
 
        // disable interface events in VPP
        err = ch.SendRequest(&interfaces.WantInterfaceEvents{
@@ -260,7 +264,7 @@ func interfaceNotifications(ch api.Channel) {
        }
 
        // unsubscribe from delivery of the notifications
-       err = ch.UnsubscribeNotification(subs)
+       err = sub.Unsubscribe()
        if err != nil {
                panic(err)
        }
index 4ea4659..f61f975 100644 (file)
@@ -18,6 +18,7 @@ package main
 
 import (
        "fmt"
+       "log"
        "os"
        "os/signal"
 
@@ -28,45 +29,41 @@ import (
 )
 
 func main() {
-       fmt.Println("Starting stats VPP client...")
+       fmt.Println("Starting stats VPP client..")
 
        // async connect to VPP
        conn, statCh, err := govpp.AsyncConnect("")
        if err != nil {
-               fmt.Println("Error:", err)
-               os.Exit(1)
+               log.Fatalln("Error:", err)
        }
        defer conn.Disconnect()
 
        // create an API channel that will be used in the examples
        ch, err := conn.NewAPIChannel()
        if err != nil {
-               fmt.Println("Error:", err)
-               os.Exit(1)
+               log.Fatalln("Error:", err)
        }
-       defer fmt.Println("calling close")
        defer ch.Close()
 
        // create channel for Interrupt signal
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, os.Interrupt)
 
-       var simpleCountersSubs *api.NotifSubscription
-       var combinedCountersSubs *api.NotifSubscription
        var notifChan chan api.Message
+       var simpleSub api.SubscriptionCtx
+       var combinedSub api.SubscriptionCtx
 
        // loop until Interrupt signal is received
 loop:
        for {
                select {
-
                case connEvent := <-statCh:
                        // VPP connection state change
                        switch connEvent.State {
                        case core.Connected:
                                fmt.Println("VPP connected.")
-                               if simpleCountersSubs == nil {
-                                       simpleCountersSubs, combinedCountersSubs, notifChan = subscribeNotifications(ch)
+                               if notifChan == nil {
+                                       simpleSub, combinedSub, notifChan = subscribeNotifications(ch)
                                }
                                requestStatistics(ch)
 
@@ -93,24 +90,24 @@ loop:
                }
        }
 
-       ch.UnsubscribeNotification(simpleCountersSubs)
-       ch.UnsubscribeNotification(combinedCountersSubs)
+       simpleSub.Unsubscribe()
+       combinedSub.Unsubscribe()
 }
 
 // subscribeNotifications subscribes for interface counters notifications.
-func subscribeNotifications(ch api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) {
+func subscribeNotifications(ch api.Channel) (api.SubscriptionCtx, api.SubscriptionCtx, chan api.Message) {
        notifChan := make(chan api.Message, 100)
 
-       simpleCountersSubs, err := ch.SubscribeNotification(notifChan, stats.NewVnetInterfaceSimpleCounters)
+       simpleSub, err := ch.SubscribeNotification(notifChan, &stats.VnetInterfaceSimpleCounters{})
        if err != nil {
                panic(err)
        }
-       combinedCountersSubs, err := ch.SubscribeNotification(notifChan, stats.NewVnetInterfaceCombinedCounters)
+       combinedSub, err := ch.SubscribeNotification(notifChan, &stats.VnetInterfaceCombinedCounters{})
        if err != nil {
                panic(err)
        }
 
-       return simpleCountersSubs, combinedCountersSubs, notifChan
+       return simpleSub, combinedSub, notifChan
 }
 
 // requestStatistics requests interface counters notifications from VPP.