From 6b350c65fe0ec845cecf58bfb41ffc63dc9c04f7 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Thu, 23 Aug 2018 22:51:56 +0200 Subject: [PATCH] Simplify subscribing to events and fix events - there is no need for sending subscription requests through channels, since all the messages are registered and no communication with VPP is needed Change-Id: Ibc29957be02a32e26309f66c369a071559b822a9 Signed-off-by: Ondrej Fabry --- Makefile | 1 - adapter/adapter.go | 2 +- adapter/mock/mock_adapter.go | 11 +- adapter/vppapiclient/vppapiclient_adapter.go | 9 +- api/api.go | 37 +---- cmd/binapi-generator/generate.go | 3 +- codec/msg_codec.go | 16 +- core/channel.go | 218 ++++++++++++++++--------- core/channel_test.go | 173 ++++++++++---------- core/connection.go | 235 ++++++++++++--------------- core/connection_test.go | 212 ++++-------------------- core/notification_handler.go | 170 ------------------- core/request_handler.go | 81 +++++++-- examples/bin_api/acl/acl.ba.go | 1 + examples/bin_api/af_packet/af_packet.ba.go | 1 + examples/bin_api/interfaces/interfaces.ba.go | 1 + examples/bin_api/ip/ip.ba.go | 1 + examples/bin_api/memif/memif.ba.go | 1 + examples/bin_api/stats/stats.ba.go | 1 + examples/bin_api/tap/tap.ba.go | 1 + examples/bin_api/vpe/vpe.ba.go | 1 + examples/cmd/perf-bench/perf-bench.go | 16 +- examples/cmd/simple-client/simple_client.go | 70 ++++---- examples/cmd/stats-client/stats_client.go | 31 ++-- 24 files changed, 528 insertions(+), 765 deletions(-) delete mode 100644 core/notification_handler.go diff --git a/Makefile b/Makefile index ee06818..ea1daf6 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,6 @@ clean: @rm -f extras/libmemif/examples/icmp-responder/icmp-responder generate: - @cd core && go generate ./... @cd examples && go generate ./... lint: diff --git a/adapter/adapter.go b/adapter/adapter.go index 7d3d1e4..aa34329 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -22,7 +22,7 @@ import ( var ErrNotImplemented = errors.New("not implemented for this OS") // MsgCallback defines func signature for message callback. -type MsgCallback func(msgID uint16, context uint32, data []byte) +type MsgCallback func(msgID uint16, data []byte) // VppAdapter provides connection to VPP. It is responsible for sending and receiving of binary-encoded messages to/from VPP. type VppAdapter interface { diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go index 5ca190f..cdf2081 100644 --- a/adapter/mock/mock_adapter.go +++ b/adapter/mock/mock_adapter.go @@ -92,9 +92,9 @@ const ( // NewVppAdapter returns a new mock adapter. func NewVppAdapter() *VppAdapter { a := &VppAdapter{ + msgIDSeq: 1000, msgIDsToName: make(map[uint16]string), msgNameToIds: make(map[string]uint16), - msgIDSeq: 1000, binAPITypes: make(map[string]reflect.Type), } a.registerBinAPITypes() @@ -186,8 +186,7 @@ func (a *VppAdapter) ReplyBytes(request MessageDTO, reply api.Message) ([]byte, if err != nil { return nil, err } - err = struc.Pack(buf, reply) - if err != nil { + if err = struc.Pack(buf, reply); err != nil { return nil, err } @@ -245,7 +244,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { Data: data, }) if finished { - a.callback(msgID, clientID, reply) + a.callback(msgID, reply) return nil } } @@ -276,7 +275,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { struc.Pack(buf, &codec.VppOtherHeader{VlMsgID: msgID}) } struc.Pack(buf, msg.Msg) - a.callback(msgID, context, buf.Bytes()) + a.callback(msgID, buf.Bytes()) } a.replies = a.replies[1:] @@ -295,7 +294,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { msgID := uint16(defaultReplyMsgID) struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: clientID}) struc.Pack(buf, &defaultReply{}) - a.callback(msgID, clientID, buf.Bytes()) + a.callback(msgID, buf.Bytes()) } return nil } diff --git a/adapter/vppapiclient/vppapiclient_adapter.go b/adapter/vppapiclient/vppapiclient_adapter.go index 7aafa55..e62bccd 100644 --- a/adapter/vppapiclient/vppapiclient_adapter.go +++ b/adapter/vppapiclient/vppapiclient_adapter.go @@ -28,7 +28,7 @@ package vppapiclient #include #include -extern void go_msg_callback(uint16_t msg_id, uint32_t context, void* data, size_t size); +extern void go_msg_callback(uint16_t msg_id, void* data, size_t size); typedef struct __attribute__((__packed__)) _req_header { uint16_t msg_id; @@ -38,14 +38,13 @@ typedef struct __attribute__((__packed__)) _req_header { typedef struct __attribute__((__packed__)) _reply_header { uint16_t msg_id; - uint32_t context; // currently not all reply messages contain context field } reply_header_t; static void govpp_msg_callback (unsigned char *data, int size) { reply_header_t *header = ((reply_header_t *)data); - go_msg_callback(ntohs(header->msg_id), ntohl(header->context), data, size); + go_msg_callback(ntohs(header->msg_id), data, size); } static int @@ -204,10 +203,10 @@ func fileExists(name string) bool { } //export go_msg_callback -func go_msg_callback(msgID C.uint16_t, context C.uint32_t, data unsafe.Pointer, size C.size_t) { +func go_msg_callback(msgID C.uint16_t, data unsafe.Pointer, size C.size_t) { // convert unsafe.Pointer to byte slice slice := &reflect.SliceHeader{Data: uintptr(data), Len: int(size), Cap: int(size)} byteArr := *(*[]byte)(unsafe.Pointer(slice)) - vppClient.callback(uint16(msgID), uint32(context), byteArr) + vppClient.callback(uint16(msgID), byteArr) } diff --git a/api/api.go b/api/api.go index 39fe60f..9b7f0ff 100644 --- a/api/api.go +++ b/api/api.go @@ -58,21 +58,6 @@ type DataType interface { GetCrcString() string } -// MessageDecoder provides functionality for decoding binary data to generated API messages. -type MessageDecoder interface { - // DecodeMsg decodes binary-encoded data of a message into provided Message structure. - DecodeMsg(data []byte, msg Message) error -} - -// MessageIdentifier provides identification of generated API messages. -type MessageIdentifier interface { - // GetMessageID returns message identifier of given API message. - GetMessageID(msg Message) (uint16, error) - - // LookupByID looks up message name and crc by ID - LookupByID(msgID uint16) (Message, error) -} - // ChannelProvider provides the communication channel with govpp core. type ChannelProvider interface { // NewAPIChannel returns a new channel for communication with VPP via govpp core. @@ -86,9 +71,6 @@ type ChannelProvider interface { // Channel provides methods for direct communication with VPP channel. type Channel interface { - // GetID returns channel's ID - GetID() uint16 - // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). SendRequest(msg Message) RequestCtx @@ -101,10 +83,7 @@ type Channel interface { // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's // buffer is full, the notifications will not be delivered into it. - SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) - - // UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription. - UnsubscribeNotification(subscription *NotifSubscription) error + SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error) // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply // from VPP before returning an error. @@ -114,14 +93,14 @@ type Channel interface { Close() } -// RequestCtx is helper interface which allows to receive reply on request context data +// RequestCtx is helper interface which allows to receive reply on request. type RequestCtx interface { // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). // The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded. ReceiveReply(msg Message) error } -// MultiRequestCtx is helper interface which allows to receive reply on multi-request context data +// MultiRequestCtx is helper interface which allows to receive reply on multi-request. type MultiRequestCtx interface { // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). // The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is @@ -130,13 +109,13 @@ type MultiRequestCtx interface { ReceiveReply(msg Message) (lastReplyReceived bool, err error) } -// NotifSubscription represents a subscription for delivery of specific notification messages. -type NotifSubscription struct { - NotifChan chan Message // channel where notification messages will be delivered to - MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification - // TODO: use Message directly here, not a factory, eliminating need to allocation +// SubscriptionCtx is helper interface which allows to control subscription for notification events. +type SubscriptionCtx interface { + // Unsubscribe unsubscribes from receiving the notifications tied to the subscription context. + Unsubscribe() error } +// map of registered messages var registeredMessages = make(map[string]Message) // RegisterMessage is called from generated code to register message. diff --git a/cmd/binapi-generator/generate.go b/cmd/binapi-generator/generate.go index 33ab614..73bcd2a 100644 --- a/cmd/binapi-generator/generate.go +++ b/cmd/binapi-generator/generate.go @@ -140,9 +140,9 @@ func generatePackage(ctx *context, w *bufio.Writer) error { if len(ctx.packageData.Services) > 0 { fmt.Fprintf(w, "/* Services */\n\n") - fmt.Fprintf(w, "type %s interface {\n", "Services") ctx.inputBuff = bytes.NewBuffer(ctx.inputData) ctx.inputLine = 0 + fmt.Fprintf(w, "type %s interface {\n", "Services") for _, svc := range ctx.packageData.Services { generateService(ctx, w, &svc) } @@ -209,6 +209,7 @@ func generateImports(ctx *context, w io.Writer) { fmt.Fprintln(w) fmt.Fprintf(w, "// Reference imports to suppress errors if they are not otherwise used.\n") + fmt.Fprintf(w, "var _ = api.RegisterMessage\n") fmt.Fprintf(w, "var _ = struc.Pack\n") fmt.Fprintf(w, "var _ = bytes.NewBuffer\n") fmt.Fprintln(w) diff --git a/codec/msg_codec.go b/codec/msg_codec.go index 572e672..9d3f614 100644 --- a/codec/msg_codec.go +++ b/codec/msg_codec.go @@ -58,15 +58,19 @@ func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) { return nil, errors.New("nil message passed in") } - // encode message header var header interface{} + + // encode message header switch msg.GetMessageType() { case api.RequestMessage: header = &VppRequestHeader{VlMsgID: msgID} + case api.ReplyMessage: header = &VppReplyHeader{VlMsgID: msgID} + case api.EventMessage: header = &VppEventHeader{VlMsgID: msgID} + default: header = &VppOtherHeader{VlMsgID: msgID} } @@ -94,15 +98,19 @@ func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error { return errors.New("nil message passed in") } - // check which header is expected var header interface{} + + // check which header is expected switch msg.GetMessageType() { case api.RequestMessage: header = new(VppRequestHeader) + case api.ReplyMessage: header = new(VppReplyHeader) + case api.EventMessage: header = new(VppEventHeader) + default: header = new(VppOtherHeader) } @@ -127,17 +135,19 @@ func (*MsgCodec) DecodeMsgContext(data []byte, msg api.Message) (uint32, error) return 0, errors.New("nil message passed in") } + var header interface{} var getContext func() uint32 // check which header is expected - var header interface{} switch msg.GetMessageType() { case api.RequestMessage: header = new(VppRequestHeader) getContext = func() uint32 { return header.(*VppRequestHeader).Context } + case api.ReplyMessage: header = new(VppReplyHeader) getContext = func() uint32 { return header.(*VppReplyHeader).Context } + default: return 0, nil } diff --git a/core/channel.go b/core/channel.go index 718f89c..a7d95fe 100644 --- a/core/channel.go +++ b/core/channel.go @@ -29,40 +29,20 @@ var ( ErrInvalidRequestCtx = errors.New("invalid request context") ) -// requestCtx is a context for request with single reply -type requestCtx struct { - ch *channel - seqNum uint16 -} - -// multiRequestCtx is a context for request with multiple responses -type multiRequestCtx struct { - ch *channel - seqNum uint16 -} - -func (req *requestCtx) ReceiveReply(msg api.Message) error { - if req == nil || req.ch == nil { - return ErrInvalidRequestCtx - } - - lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) - if err != nil { - return err - } - if lastReplyReceived { - return errors.New("multipart reply recieved while a single reply expected") - } - - return nil +// MessageCodec provides functionality for decoding binary data to generated API messages. +type MessageCodec interface { + //EncodeMsg encodes message into binary data. + EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) + // DecodeMsg decodes binary-encoded data of a message into provided Message structure. + DecodeMsg(data []byte, msg api.Message) error } -func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { - if req == nil || req.ch == nil { - return false, ErrInvalidRequestCtx - } - - return req.ch.receiveReplyInternal(msg, req.seqNum) +// MessageIdentifier provides identification of generated API messages. +type MessageIdentifier interface { + // GetMessageID returns message identifier of given API message. + GetMessageID(msg api.Message) (uint16, error) + // LookupByID looks up message name and crc by ID + LookupByID(msgID uint16) (api.Message, error) } // vppRequest is a request that will be sent to VPP. @@ -81,27 +61,40 @@ type vppReply struct { err error // in case of error, data is nil and this member contains error } -// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages. -type subscriptionRequest struct { - sub *api.NotifSubscription // subscription details - subscribe bool // true if this is a request to subscribe +// requestCtx is a context for request with single reply +type requestCtx struct { + ch *Channel + seqNum uint16 +} + +// multiRequestCtx is a context for request with multiple responses +type multiRequestCtx struct { + ch *Channel + seqNum uint16 +} + +// subscriptionCtx is a context of subscription for delivery of specific notification messages. +type subscriptionCtx struct { + ch *Channel + notifChan chan api.Message // channel where notification messages will be delivered to + msgID uint16 // message ID for the subscribed event message + event api.Message // event message that this subscription is for + msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification } // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines // concurrently, otherwise the responses could mix! Use multiple channels instead. -type channel struct { - id uint16 // channel ID +type Channel struct { + id uint16 + conn *Connection reqChan chan *vppRequest // channel for sending the requests to VPP replyChan chan *vppReply // channel where VPP replies are delivered to - notifSubsChan chan *subscriptionRequest // channel for sending notification subscribe requests - notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to - - msgDecoder api.MessageDecoder // used to decode binary data to generated API messages - msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message + msgCodec MessageCodec // used to decode binary data to generated API messages + msgIdentifier MessageIdentifier // used to retrieve message ID of a message lastSeqNum uint16 // sequence number of the last sent request @@ -109,73 +102,142 @@ type channel struct { replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout } -func (ch *channel) GetID() uint16 { +func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel { + return &Channel{ + id: id, + conn: conn, + msgCodec: codec, + msgIdentifier: identifier, + reqChan: make(chan *vppRequest, reqSize), + replyChan: make(chan *vppReply, replySize), + replyTimeout: DefaultReplyTimeout, + } +} + +func (ch *Channel) GetID() uint16 { return ch.id } -func (ch *channel) nextSeqNum() uint16 { +func (ch *Channel) nextSeqNum() uint16 { ch.lastSeqNum++ return ch.lastSeqNum } -func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { - req := &vppRequest{ +func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx { + seqNum := ch.nextSeqNum() + ch.reqChan <- &vppRequest{ msg: msg, - seqNum: ch.nextSeqNum(), + seqNum: seqNum, } - ch.reqChan <- req - return &requestCtx{ch: ch, seqNum: req.seqNum} + return &requestCtx{ch: ch, seqNum: seqNum} } -func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { - req := &vppRequest{ +func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + seqNum := ch.nextSeqNum() + ch.reqChan <- &vppRequest{ msg: msg, - seqNum: ch.nextSeqNum(), + seqNum: seqNum, multi: true, } - ch.reqChan <- req - return &multiRequestCtx{ch: ch, seqNum: req.seqNum} + return &multiRequestCtx{ch: ch, seqNum: seqNum} } -func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) { - sub := &api.NotifSubscription{ - NotifChan: notifChan, - MsgFactory: msgFactory, +func getMsgFactory(msg api.Message) func() api.Message { + return func() api.Message { + return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) } - // TODO: get rid of notifSubsChan and notfSubsReplyChan, - // it's no longer need because we know all message IDs and can store subscription right here - ch.notifSubsChan <- &subscriptionRequest{ - sub: sub, - subscribe: true, - } - return sub, <-ch.notifSubsReplyChan } -func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error { - ch.notifSubsChan <- &subscriptionRequest{ - sub: subscription, - subscribe: false, +func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) { + msgID, err := ch.msgIdentifier.GetMessageID(event) + if err != nil { + log.WithFields(logrus.Fields{ + "msg_name": event.GetMessageName(), + "msg_crc": event.GetCrcString(), + }).Errorf("unable to retrieve message ID: %v", err) + return nil, fmt.Errorf("unable to retrieve event message ID: %v", err) + } + + sub := &subscriptionCtx{ + ch: ch, + notifChan: notifChan, + msgID: msgID, + event: event, + msgFactory: getMsgFactory(event), } - return <-ch.notifSubsReplyChan + + // add the subscription into map + ch.conn.subscriptionsLock.Lock() + defer ch.conn.subscriptionsLock.Unlock() + + ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub) + + return sub, nil } -func (ch *channel) SetReplyTimeout(timeout time.Duration) { +func (ch *Channel) SetReplyTimeout(timeout time.Duration) { ch.replyTimeout = timeout } -func (ch *channel) Close() { +func (ch *Channel) Close() { if ch.reqChan != nil { close(ch.reqChan) + ch.reqChan = nil + } +} + +func (req *requestCtx) ReceiveReply(msg api.Message) error { + if req == nil || req.ch == nil { + return ErrInvalidRequestCtx + } + + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) + if err != nil { + return err + } else if lastReplyReceived { + return errors.New("multipart reply recieved while a single reply expected") + } + + return nil +} + +func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { + if req == nil || req.ch == nil { + return false, ErrInvalidRequestCtx } + + return req.ch.receiveReplyInternal(msg, req.seqNum) +} + +func (sub *subscriptionCtx) Unsubscribe() error { + log.WithFields(logrus.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": sub.msgID, + }).Debug("Removing notification subscription.") + + // remove the subscription from the map + sub.ch.conn.subscriptionsLock.Lock() + defer sub.ch.conn.subscriptionsLock.Unlock() + + for i, item := range sub.ch.conn.subscriptions[sub.msgID] { + if item == sub { + // remove i-th item in the slice + sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...) + return nil + } + } + + return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName()) } // receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { - var ignore bool +func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { if msg == nil { return false, errors.New("nil message passed in") } + var ignore bool + if vppReply := ch.delayedReply; vppReply != nil { // try the delayed reply ch.delayedReply = nil @@ -204,12 +266,12 @@ func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last return } -func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { +func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { // check the sequence number cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum) if cmpSeqNums == -1 { // reply received too late, ignore the message - logrus.WithField("sequence-number", reply.seqNum).Warn( + logrus.WithField("seqNum", reply.seqNum).Warn( "Received reply to an already closed binary API request") ignore = true return @@ -253,7 +315,7 @@ func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa } // decode the message - if err = ch.msgDecoder.DecodeMsg(reply.data, msg); err != nil { + if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil { return } diff --git a/core/channel_test.go b/core/channel_test.go index 4a9ab2b..0eafa32 100644 --- a/core/channel_test.go +++ b/core/channel_test.go @@ -22,6 +22,7 @@ import ( "git.fd.io/govpp.git/examples/bin_api/interfaces" "git.fd.io/govpp.git/examples/bin_api/memif" "git.fd.io/govpp.git/examples/bin_api/tap" + "git.fd.io/govpp.git/examples/binapi/vpe" "git.fd.io/govpp.git/api" . "github.com/onsi/gomega" @@ -59,10 +60,11 @@ func TestRequestReplyTapConnect(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&tap.TapConnectReply{ - Retval: 0, SwIfIndex: 1, }) + request := &tap.TapConnect{ TapName: []byte("test-tap-name"), UseRandomMac: 1, @@ -71,17 +73,21 @@ func TestRequestReplyTapConnect(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapConnectReply") - Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for TapConnectReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(1), + "Incorrect SwIfIndex value for TapConnectReply") } func TestRequestReplyTapModify(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&tap.TapModifyReply{ SwIfIndex: 2, }) + request := &tap.TapModify{ TapName: []byte("test-tap-modify"), UseRandomMac: 1, @@ -91,15 +97,19 @@ func TestRequestReplyTapModify(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapModifyReply") - Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for TapModifyReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(2), + "Incorrect SwIfIndex value for TapModifyReply") } func TestRequestReplyTapDelete(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&tap.TapDeleteReply{}) + request := &tap.TapDelete{ SwIfIndex: 3, } @@ -107,34 +117,41 @@ func TestRequestReplyTapDelete(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapDeleteReply") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for TapDeleteReply") } func TestRequestReplySwInterfaceTapDump(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply byteName := []byte("dev-name-test") ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ SwIfIndex: 25, DevName: byteName, }) + request := &tap.SwInterfaceTapDump{} reply := &tap.SwInterfaceTapDetails{} err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails") - Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails") + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), + "Incorrect SwIfIndex value for SwInterfaceTapDetails") + Expect(reply.DevName).ToNot(BeEquivalentTo(byteName), + "Incorrect DevName value for SwInterfaceTapDetails") } func TestRequestReplyMemifCreate(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&memif.MemifCreateReply{ SwIfIndex: 4, }) + request := &memif.MemifCreate{ Role: 10, ID: 12, @@ -145,15 +162,19 @@ func TestRequestReplyMemifCreate(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifCreate") - Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for MemifCreate") + Expect(reply.SwIfIndex).To(BeEquivalentTo(4), + "Incorrect SwIfIndex value for MemifCreate") } func TestRequestReplyMemifDelete(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&memif.MemifDeleteReply{}) + request := &memif.MemifDelete{ SwIfIndex: 15, } @@ -161,26 +182,30 @@ func TestRequestReplyMemifDelete(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifDelete") } func TestRequestReplyMemifDetails(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&memif.MemifDetails{ SwIfIndex: 25, IfName: []byte("memif-name"), Role: 0, }) + request := &memif.MemifDump{} reply := &memif.MemifDetails{} err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails") - Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array") - Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails") + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), + "Incorrect SwIfIndex value for MemifDetails") + Expect(reply.IfName).ToNot(BeEmpty(), + "MemifDetails IfName is empty byte array") + Expect(reply.Role).To(BeEquivalentTo(0), + "Incorrect Role value for MemifDetails") } func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { @@ -204,7 +229,7 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { msg := &tap.SwInterfaceTapDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } Expect(err).ShouldNot(HaveOccurred()) cnt++ @@ -232,7 +257,7 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { msg := &memif.MemifDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } Expect(err).ShouldNot(HaveOccurred()) cnt++ @@ -240,51 +265,16 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { Expect(cnt).To(BeEquivalentTo(10)) } -func TestNotifications(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // subscribe for notification - notifChan := make(chan api.Message, 1) - subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) - Expect(err).ShouldNot(HaveOccurred()) - - // mock the notification and force its delivery - ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 3, - AdminUpDown: 1, - }) - ctx.mockVpp.SendMsg(0, []byte("")) - - // receive the notification - var notif *interfaces.SwInterfaceSetFlags - Eventually(func() *interfaces.SwInterfaceSetFlags { - select { - case n := <-notifChan: - notif = n.(*interfaces.SwInterfaceSetFlags) - return notif - default: - return nil - } - }).ShouldNot(BeNil()) - - // verify the received notifications - Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags") - Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags") - - ctx.ch.UnsubscribeNotification(subs) -} - func TestNotificationEvent(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() // subscribe for notification notifChan := make(chan api.Message, 1) - subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent) + sub, err := ctx.ch.SubscribeNotification(notifChan, &interfaces.SwInterfaceEvent{}) Expect(err).ShouldNot(HaveOccurred()) - // mock the notification and force its delivery + // mock event and force its delivery ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{ SwIfIndex: 2, LinkUpDown: 1, @@ -307,16 +297,9 @@ func TestNotificationEvent(t *testing.T) { Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags") Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags") - ctx.ch.UnsubscribeNotification(subs) -} - -/*func TestCheckMessageCompatibility(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{}) + err = sub.Unsubscribe() Expect(err).ShouldNot(HaveOccurred()) -}*/ +} func TestSetReplyTimeout(t *testing.T) { ctx := setupTest(t) @@ -324,8 +307,10 @@ func TestSetReplyTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) - // first one request should work + // mock reply ctx.mockVpp.MockReply(&ControlPingReply{}) + + // first one request should work err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -339,16 +324,23 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() - ctx.ch.SetReplyTimeout(time.Millisecond) + ctx.ch.SetReplyTimeout(time.Millisecond * 100) - var msgs []api.Message - for i := 1; i <= 3; i++ { - msgs = append(msgs, &interfaces.SwInterfaceDetails{ - SwIfIndex: uint32(i), + // mock reply + ctx.mockVpp.MockReply( + &interfaces.SwInterfaceDetails{ + SwIfIndex: 1, InterfaceName: []byte("if-name-test"), - }) - } - ctx.mockVpp.MockReply(msgs...) + }, + &interfaces.SwInterfaceDetails{ + SwIfIndex: 2, + InterfaceName: []byte("if-name-test"), + }, + &interfaces.SwInterfaceDetails{ + SwIfIndex: 3, + InterfaceName: []byte("if-name-test"), + }, + ) ctx.mockVpp.MockReply(&ControlPingReply{}) cnt := 0 @@ -357,12 +349,12 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { for { msg := &interfaces.SwInterfaceDetails{} stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } if err != nil { return err } + if stop { + break + } cnt++ } return nil @@ -443,7 +435,7 @@ func TestMultiRequestDouble(t *testing.T) { msg := &interfaces.SwInterfaceDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } if err != nil { return err @@ -468,8 +460,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) - // first one request should work + // mock reply ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1}) + // first one request should work + err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -479,9 +473,16 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { ctx.mockVpp.MockReplyWithContext( // simulating late reply - mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 2}, + mock.MsgWithContext{ + Msg: &ControlPingReply{}, + SeqNum: 2, + }, // normal reply for next request - mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) + mock.MsgWithContext{ + Msg: &tap.TapConnectReply{}, + SeqNum: 3, + }, + ) req := &tap.TapConnect{ TapName: []byte("test-tap-name"), @@ -508,8 +509,10 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond * 100) - // first one request should work + // mock reply ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1}) + + // first one request should work err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -520,7 +523,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { msg := &interfaces.SwInterfaceDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } if err != nil { return err @@ -564,18 +567,20 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { Expect(err).ShouldNot(HaveOccurred()) } -/*func TestInvalidMessageID(t *testing.T) { +func TestInvalidMessageID(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() - // first one request should work + // mock reply + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + + // first one request should work err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{}) Expect(err).ShouldNot(HaveOccurred()) // second should fail with error invalid message ID - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) err = ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("invalid message ID")) -}*/ +} diff --git a/core/connection.go b/core/connection.go index c77358f..7d014ce 100644 --- a/core/connection.go +++ b/core/connection.go @@ -29,42 +29,19 @@ import ( "git.fd.io/govpp.git/codec" ) -const ( - requestChannelBufSize = 100 // default size of the request channel buffer - replyChannelBufSize = 100 // default size of the reply channel buffer - notificationChannelBufSize = 100 // default size of the notification channel buffer - - defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout +var ( + RequestChanBufSize = 100 // default size of the request channel buffer + ReplyChanBufSize = 100 // default size of the reply channel buffer + NotificationChanBufSize = 100 // default size of the notification channel buffer ) var ( - healthCheckInterval = time.Second * 1 // default health check interval - healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check - healthCheckThreshold = 1 // number of failed health checks until the error is reported + HealthCheckProbeInterval = time.Second * 1 // default health check probe interval + HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe + HealthCheckThreshold = 1 // number of failed health checks until the error is reported + DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP ) -// SetHealthCheckProbeInterval sets health check probe interval. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckProbeInterval(interval time.Duration) { - healthCheckInterval = interval -} - -// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. -// If reply arrives after the timeout, check is considered as failed. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckReplyTimeout(timeout time.Duration) { - healthCheckReplyTimeout = timeout -} - -// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckThreshold(threshold int) { - healthCheckThreshold = threshold -} - // ConnectionState represents the current state of the connection to VPP. type ConnectionState int @@ -104,10 +81,10 @@ type Connection struct { maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*channel // map of all API channels indexed by the channel ID + channels map[uint16]*Channel // map of all API channels indexed by the channel ID - notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map - notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID + subscriptionsLock sync.RWMutex // lock for the subscriptions map + subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID pingReqID uint16 // ID if the ControlPing message pingReplyID uint16 // ID of the ControlPingReply message @@ -116,18 +93,30 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } +func newConnection(vpp adapter.VppAdapter) *Connection { + c := &Connection{ + vpp: vpp, + codec: &codec.MsgCodec{}, + msgIDs: make(map[string]uint16), + msgMap: make(map[uint16]api.Message), + channels: make(map[uint16]*Channel), + subscriptions: make(map[uint16][]*subscriptionCtx), + } + vpp.SetMsgCallback(c.msgCallback) + return c +} + // Connect connects to VPP using specified VPP adapter and returns the connection handle. // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { // create new connection handle - c, err := newConnection(vppAdapter) + c, err := createConnection(vppAdapter) if err != nil { return nil, err } // blocking attempt to connect to VPP - err = c.connectVPP() - if err != nil { + if err := c.connectVPP(); err != nil { return nil, err } @@ -140,13 +129,13 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c, err := newConnection(vppAdapter) + c, err := createConnection(vppAdapter) if err != nil { return nil, nil, err } // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, notificationChannelBufSize) + connChan := make(chan ConnectionEvent, NotificationChanBufSize) go c.connectLoop(connChan) return c, connChan, nil @@ -168,7 +157,7 @@ func (c *Connection) Disconnect() { } // newConnection returns new connection handle. -func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { +func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -176,15 +165,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { return nil, errors.New("only one connection per process is supported") } - conn = &Connection{ - vpp: vppAdapter, - codec: &codec.MsgCodec{}, - channels: make(map[uint16]*channel), - msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), - notifSubscriptions: make(map[uint16][]*api.NotifSubscription), - } - conn.vpp.SetMsgCallback(conn.msgCallback) + conn = newConnection(vppAdapter) return conn, nil } @@ -211,8 +192,72 @@ func (c *Connection) connectVPP() error { return nil } -func getMsgNameWithCrc(x api.Message) string { - return x.GetMessageName() + "_" + x.GetCrcString() +func (c *Connection) NewAPIChannel() (api.Channel, error) { + return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize) +} + +func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { + return c.newAPIChannel(reqChanBufSize, replyChanBufSize) +} + +// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. +// It allows to specify custom buffer sizes for the request and reply Go channels. +func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + // create new channel + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize) + + // store API channel within the client + c.channelsLock.Lock() + c.channels[chID] = channel + c.channelsLock.Unlock() + + // start watching on the request channel + go c.watchRequests(channel) + + return channel, nil +} + +// releaseAPIChannel releases API channel that needs to be closed. +func (c *Connection) releaseAPIChannel(ch *Channel) { + log.WithFields(logger.Fields{ + "channel": ch.id, + }).Debug("API channel released") + + // delete the channel from channels map + c.channelsLock.Lock() + delete(c.channels, ch.id) + c.channelsLock.Unlock() +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + + if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } + + return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) +} + +// LookupByID looks up message name and crc by ID. +func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + if msg, ok := c.msgMap[msgID]; ok { + return msg, nil + } + + return nil, fmt.Errorf("unknown message ID: %d", msgID) } // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map @@ -268,32 +313,6 @@ func (c *Connection) retrieveMessageIDs() (err error) { return nil } -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - - if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { - return msgID, nil - } - - return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) -} - -// LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - if msg, ok := c.msgMap[msgID]; ok { - return msg, nil - } - - return nil, fmt.Errorf("unknown message ID: %d", msgID) -} - // disconnectVPP disconnects from VPP in case it is connected. func (c *Connection) disconnectVPP() { if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { @@ -341,7 +360,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // send health check probes until an error or timeout occurs for { // sleep until next health check probe period - time.Sleep(healthCheckInterval) + time.Sleep(HealthCheckProbeInterval) if atomic.LoadUint32(&c.connected) == 0 { // Disconnect has been called in the meantime, return the healthcheck - reconnect loop @@ -365,7 +384,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { case vppReply := <-ch.replyChan: err = vppReply.err - case <-time.After(healthCheckReplyTimeout): + case <-time.After(HealthCheckReplyTimeout): err = ErrProbeTimeout // check if time since last reply from any other @@ -374,7 +393,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { sinceLastReply = time.Since(c.lastReply) c.lastReplyLock.Unlock() - if sinceLastReply < healthCheckReplyTimeout { + if sinceLastReply < HealthCheckReplyTimeout { log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) continue } @@ -384,10 +403,10 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { if err == ErrProbeTimeout { failedChecks++ - log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) - if failedChecks > healthCheckThreshold { + log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) + if failedChecks > HealthCheckThreshold { // in case of exceeded failed check treshold, assume VPP disconnected - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) + log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold) connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} break } @@ -411,52 +430,6 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.connectLoop(connChan) } -func (c *Connection) NewAPIChannel() (api.Channel, error) { - return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize) -} - -func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { - return c.newAPIChannel(reqChanBufSize, replyChanBufSize) -} - -// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. -// It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - ch := &channel{ - id: chID, - replyTimeout: defaultReplyTimeout, - msgDecoder: c.codec, - msgIdentifier: c, - reqChan: make(chan *vppRequest, reqChanBufSize), - replyChan: make(chan *vppReply, replyChanBufSize), - notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize), - notifSubsReplyChan: make(chan error, replyChanBufSize), - } - - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = ch - c.channelsLock.Unlock() - - // start watching on the request channel - go c.watchRequests(ch) - - return ch, nil -} - -// releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *channel) { - log.WithFields(logger.Fields{ - "channel": ch.id, - }).Debug("API channel released") - - // delete the channel from channels map - c.channelsLock.Lock() - delete(c.channels, ch.id) - c.channelsLock.Unlock() +func getMsgNameWithCrc(x api.Message) string { + return x.GetMessageName() + "_" + x.GetCrcString() } diff --git a/core/connection_test.go b/core/connection_test.go index 5c8c309..cea7d2d 100644 --- a/core/connection_test.go +++ b/core/connection_test.go @@ -14,18 +14,16 @@ package core_test -/* import ( "testing" "git.fd.io/govpp.git/adapter/mock" "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/codec" "git.fd.io/govpp.git/core" - "git.fd.io/govpp.git/core/bin_api/vpe" "git.fd.io/govpp.git/examples/bin_api/interfaces" - "git.fd.io/govpp.git/examples/bin_api/stats" - - "git.fd.io/govpp.git/codec" + "git.fd.io/govpp.git/examples/binapi/stats" + "git.fd.io/govpp.git/examples/binapi/vpe" . "github.com/onsi/gomega" ) @@ -38,8 +36,9 @@ type testCtx struct { func setupTest(t *testing.T, bufferedChan bool) *testCtx { RegisterTestingT(t) - ctx := &testCtx{} - ctx.mockVpp = &mock.VppAdapter{} + ctx := &testCtx{ + mockVpp: mock.NewVppAdapter(), + } var err error ctx.conn, err = core.Connect(ctx.mockVpp) @@ -60,100 +59,6 @@ func (ctx *testCtx) teardownTest() { ctx.conn.Disconnect() } -func TestSimpleRequest(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}) - - req := &vpe.ControlPing{} - reply := &vpe.ControlPingReply{} - - // send the request and receive a reply - ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req} - vppReply := <-ctx.ch.GetReplyChannel() - - Expect(vppReply).ShouldNot(BeNil()) - Expect(vppReply.Error).ShouldNot(HaveOccurred()) - - // decode the message - err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) - Expect(err).ShouldNot(HaveOccurred()) - - Expect(reply.Retval).To(BeEquivalentTo(-5)) -} - -func TestMultiRequest(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - msgs := []api.Message{} - for m := 0; m < 10; m++ { - msgs = append(msgs, &interfaces.SwInterfaceDetails{}) - } - ctx.mockVpp.MockReply(msgs...) - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - - // send multipart request - ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} - - cnt := 0 - for { - // receive a reply - vppReply := <-ctx.ch.GetReplyChannel() - if vppReply.LastReplyReceived { - break // break out of the loop - } - Expect(vppReply.Error).ShouldNot(HaveOccurred()) - - // decode the message - reply := &interfaces.SwInterfaceDetails{} - err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) - Expect(err).ShouldNot(HaveOccurred()) - cnt++ - } - - Expect(cnt).To(BeEquivalentTo(10)) -} - -func TestNotifications(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // subscribe for notification - notifChan := make(chan api.Message, 1) - subscription := &api.NotifSubscription{ - NotifChan: notifChan, - MsgFactory: interfaces.NewSwInterfaceSetFlags, - } - ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: true, - } - err := <-ctx.ch.GetNotificationReplyChannel() - Expect(err).ShouldNot(HaveOccurred()) - - // mock the notification and force its delivery - ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 3, - AdminUpDown: 1, - }) - ctx.mockVpp.SendMsg(0, []byte{0}) - - // receive the notification - notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) - - Expect(notif.SwIfIndex).To(BeEquivalentTo(3)) - - // unsubscribe notification - ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: false, - } - err = <-ctx.ch.GetNotificationReplyChannel() - Expect(err).ShouldNot(HaveOccurred()) -} - func TestNilConnection(t *testing.T) { RegisterTestingT(t) var conn *core.Connection @@ -184,47 +89,16 @@ func TestAsyncConnection(t *testing.T) { defer ctx.teardownTest() ctx.conn.Disconnect() - conn, ch, err := core.AsyncConnect(ctx.mockVpp) + conn, statusChan, err := core.AsyncConnect(ctx.mockVpp) ctx.conn = conn Expect(err).ShouldNot(HaveOccurred()) Expect(conn).ShouldNot(BeNil()) - ev := <-ch + ev := <-statusChan Expect(ev.State).Should(BeEquivalentTo(core.Connected)) } -func TestFullBuffer(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // close the default API channel - ctx.ch.Close() - - // create a new channel with limited buffer sizes - var err error - ctx.ch, err = ctx.conn.NewAPIChannelBuffered(10, 1) - Expect(err).ShouldNot(HaveOccurred()) - - // send multiple requests, only one reply should be read - for i := 0; i < 20; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}} - } - - vppReply := <-ctx.ch.GetReplyChannel() - Expect(vppReply).ShouldNot(BeNil()) - - var received bool - select { - case <-ctx.ch.GetReplyChannel(): - received = true // this should not happen - default: - received = false // no reply to be received - } - Expect(received).Should(BeFalse(), "A reply has been recieved, should had been ignored.") -} - func TestCodec(t *testing.T) { RegisterTestingT(t) @@ -289,7 +163,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { var reqCtx []api.RequestCtx for i := 0; i < 10; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req := &vpe.ControlPing{} reqCtx = append(reqCtx, ctx.ch.SendRequest(req)) } @@ -298,7 +172,6 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { reply := &vpe.ControlPingReply{} err := reqCtx[i].ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(i)) } } @@ -306,7 +179,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) { ctx := setupTest(t, false) defer ctx.teardownTest() - msgs := []api.Message{} + var msgs []api.Message for i := 0; i < 10; i++ { msgs = append(msgs, &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}) } @@ -325,7 +198,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) { lastReplyReceived, err := reqCtx.ReceiveReply(reply) if lastReplyReceived { - break // break out of the loop + break } Expect(err).ShouldNot(HaveOccurred()) @@ -343,7 +216,7 @@ func TestSimpleRequestWithTimeout(t *testing.T) { // reply for a previous timeouted requests to be ignored ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, + Msg: &vpe.ControlPingReply{}, SeqNum: 0, }) @@ -359,12 +232,12 @@ func TestSimpleRequestWithTimeout(t *testing.T) { ctx.mockVpp.MockReplyWithContext( // reply for the previous request mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, + Msg: &vpe.ControlPingReply{}, SeqNum: 1, }, // reply for the next request mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 2}, + Msg: &vpe.ControlPingReply{}, SeqNum: 2, }) @@ -376,7 +249,6 @@ func TestSimpleRequestWithTimeout(t *testing.T) { reply = &vpe.ControlPingReply{} err = reqCtx2.ReceiveReply(reply) Expect(err).To(BeNil()) - Expect(reply.Retval).To(BeEquivalentTo(2)) } func TestSimpleRequestsWithMissingReply(t *testing.T) { @@ -393,7 +265,7 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) { // third request with reply ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 3}, + Msg: &vpe.ControlPingReply{}, SeqNum: 3, }) req3 := &vpe.ControlPing{} @@ -414,7 +286,6 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) { reply = &vpe.ControlPingReply{} err = reqCtx3.ReceiveReply(reply) Expect(err).To(BeNil()) - Expect(reply.Retval).To(BeEquivalentTo(3)) } func TestMultiRequestsWithErrors(t *testing.T) { @@ -422,38 +293,25 @@ func TestMultiRequestsWithErrors(t *testing.T) { defer ctx.teardownTest() // replies for a previous timeouted requests to be ignored - msgs := []mock.MsgWithContext{} - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0xffff - 1, - }, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0xffff, - }, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0, - }) - + msgs := []mock.MsgWithContext{ + {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff - 1}, + {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff}, + {Msg: &vpe.ControlPingReply{}, SeqNum: 0}, + } for i := 0; i < 10; i++ { - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, - SeqNum: 1, - Multipart: true, - }) + msgs = append(msgs, mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, + SeqNum: 1, + Multipart: true, + }) } // missing finalizing control ping // reply for a next request - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 2}, - SeqNum: 2, - Multipart: false, - }) + msgs = append(msgs, mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{}, + SeqNum: 2, + }) // queue replies ctx.mockVpp.MockReplyWithContext(msgs...) @@ -487,7 +345,6 @@ func TestMultiRequestsWithErrors(t *testing.T) { reply2 := &vpe.ControlPingReply{} err = reqCtx2.ReceiveReply(reply2) Expect(err).To(BeNil()) - Expect(reply2.Retval).To(BeEquivalentTo(2)) } func TestRequestsOrdering(t *testing.T) { @@ -498,12 +355,12 @@ func TestRequestsOrdering(t *testing.T) { // some replies will get thrown away // first request - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req1 := &vpe.ControlPing{} reqCtx1 := ctx.ch.SendRequest(req1) // second request - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req2 := &vpe.ControlPing{} reqCtx2 := ctx.ch.SendRequest(req2) @@ -512,7 +369,6 @@ func TestRequestsOrdering(t *testing.T) { reply2 := &vpe.ControlPingReply{} err := reqCtx2.ReceiveReply(reply2) Expect(err).To(BeNil()) - Expect(reply2.Retval).To(BeEquivalentTo(2)) // first request has already been considered closed reply1 := &vpe.ControlPingReply{} @@ -522,7 +378,7 @@ func TestRequestsOrdering(t *testing.T) { } func TestCycleOverSetOfSequenceNumbers(t *testing.T) { - ctx := setupTest(t, true) + ctx := setupTest(t, false) defer ctx.teardownTest() numIters := 0xffff + 100 @@ -530,7 +386,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) { for i := 0; i < numIters+30; i++ { if i < numIters { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req := &vpe.ControlPing{} reqCtx[i] = ctx.ch.SendRequest(req) } @@ -538,8 +394,6 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) { reply := &vpe.ControlPingReply{} err := reqCtx[i-30].ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(i - 30)) } } } -*/ diff --git a/core/notification_handler.go b/core/notification_handler.go deleted file mode 100644 index 7b889e3..0000000 --- a/core/notification_handler.go +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "fmt" - - "git.fd.io/govpp.git/api" - logger "github.com/sirupsen/logrus" -) - -// processSubscriptionRequest processes a notification subscribe request. -func (c *Connection) processSubscriptionRequest(ch *channel, req *subscriptionRequest) error { - var err error - - // subscribe / unsubscribe - if req.subscribe { - err = c.addNotifSubscription(req.sub) - } else { - err = c.removeNotifSubscription(req.sub) - } - - // send the reply into the go channel - select { - case ch.notifSubsReplyChan <- err: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch.id, - }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") - } - - return err -} - -// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. -func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, msgName, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_id": msgID, - }).Debug("Adding new notification subscription.") - - // add the subscription into map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) - - return nil -} - -// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. -func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, msgName, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_id": msgID, - }).Debug("Removing notification subscription.") - - // remove the subscription from the map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - for i, item := range c.notifSubscriptions[msgID] { - if item == subs { - // remove i-th item in the slice - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) - break - } - } - - return nil -} - -// isNotificationMessage returns true if someone has subscribed to provided message ID. -func (c *Connection) isNotificationMessage(msgID uint16) bool { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - _, exists := c.notifSubscriptions[msgID] - return exists -} - -// sendNotifications send a notification message to all subscribers subscribed for that message. -func (c *Connection) sendNotifications(msgID uint16, data []byte) { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - matched := false - - // send to notification to each subscriber - for _, subs := range c.notifSubscriptions[msgID] { - msg := subs.MsgFactory() - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Sending a notification to the subscription channel.") - - if err := c.codec.DecodeMsg(data, msg); err != nil { - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_id": msgID, - "msg_size": len(data), - }).Errorf("Unable to decode the notification message: %v", err) - continue - } - - // send the message into the go channel of the subscription - select { - case subs.NotifChan <- msg: - // message sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_id": msgID, - "msg_size": len(data), - }).Warn("Unable to deliver the notification, reciever end not ready.") - } - - matched = true - } - - if !matched { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - }).Info("No subscription found for the notification message.") - } -} - -// getSubscriptionMessageID returns ID of the message the subscription is tied to. -func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, string, error) { - msg := subs.MsgFactory() - msgID, err := c.GetMessageID(msg) - if err != nil { - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_crc": msg.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - return 0, "", fmt.Errorf("unable to retrieve message ID: %v", err) - } - - return msgID, msg.GetMessageName(), nil -} diff --git a/core/request_handler.go b/core/request_handler.go index fd6d100..14c095d 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -29,7 +29,7 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *channel) { +func (c *Connection) watchRequests(ch *Channel) { for { select { case req, ok := <-ch.reqChan: @@ -40,16 +40,12 @@ func (c *Connection) watchRequests(ch *channel) { return } c.processRequest(ch, req) - - case req := <-ch.notifSubsChan: - // new request on the notification subscribe channel - c.processSubscriptionRequest(ch, req) } } } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *channel, req *vppRequest) error { +func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected @@ -137,7 +133,7 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error { } // msgCallback is called whenever any binary API message comes from VPP. -func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { +func (c *Connection) msgCallback(msgID uint16, data []byte) { connLock.RLock() defer connLock.RUnlock() @@ -157,13 +153,8 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // - replies that don't have context as first field (comes as zero) // - events that don't have context at all (comes as non zero) // - msgContext, err := c.codec.DecodeMsgContext(data, msg) - if err == nil { - if context != msgContext { - log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext) - context = msgContext - } - } else { + context, err := c.codec.DecodeMsgContext(data, msg) + if err != nil { log.Errorf("decoding context failed: %v", err) } @@ -218,7 +209,7 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *channel, reply *vppReply) { +func sendReply(ch *Channel, reply *vppReply) { select { case ch.replyChan <- reply: // reply sent successfully @@ -232,10 +223,68 @@ func sendReply(ch *channel, reply *vppReply) { } } -func sendReplyError(ch *channel, req *vppRequest, err error) { +func sendReplyError(ch *Channel, req *vppRequest, err error) { sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) } +// isNotificationMessage returns true if someone has subscribed to provided message ID. +func (c *Connection) isNotificationMessage(msgID uint16) bool { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + _, exists := c.subscriptions[msgID] + return exists +} + +// sendNotifications send a notification message to all subscribers subscribed for that message. +func (c *Connection) sendNotifications(msgID uint16, data []byte) { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + matched := false + + // send to notification to each subscriber + for _, sub := range c.subscriptions[msgID] { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Sending a notification to the subscription channel.") + + event := sub.msgFactory() + if err := c.codec.DecodeMsg(data, event); err != nil { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Errorf("Unable to decode the notification message: %v", err) + continue + } + + // send the message into the go channel of the subscription + select { + case sub.notifChan <- event: + // message sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Warn("Unable to deliver the notification, reciever end not ready.") + } + + matched = true + } + + if !matched { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + }).Info("No subscription found for the notification message.") + } +} + // +------------------+-------------------+-----------------------+ // | 15b = channel ID | 1b = is multipart | 16b = sequence number | // +------------------+-------------------+-----------------------+ diff --git a/examples/bin_api/acl/acl.ba.go b/examples/bin_api/acl/acl.ba.go index ff80173..0dfd335 100644 --- a/examples/bin_api/acl/acl.ba.go +++ b/examples/bin_api/acl/acl.ba.go @@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/bin_api/af_packet/af_packet.ba.go b/examples/bin_api/af_packet/af_packet.ba.go index a6bdc93..144bf74 100644 --- a/examples/bin_api/af_packet/af_packet.ba.go +++ b/examples/bin_api/af_packet/af_packet.ba.go @@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/bin_api/interfaces/interfaces.ba.go b/examples/bin_api/interfaces/interfaces.ba.go index 5ef58ed..38bbb6b 100644 --- a/examples/bin_api/interfaces/interfaces.ba.go +++ b/examples/bin_api/interfaces/interfaces.ba.go @@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/bin_api/ip/ip.ba.go b/examples/bin_api/ip/ip.ba.go index c980b6a..9dc3d56 100644 --- a/examples/bin_api/ip/ip.ba.go +++ b/examples/bin_api/ip/ip.ba.go @@ -21,6 +21,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/bin_api/memif/memif.ba.go b/examples/bin_api/memif/memif.ba.go index 3650355..ceb615f 100644 --- a/examples/bin_api/memif/memif.ba.go +++ b/examples/bin_api/memif/memif.ba.go @@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/bin_api/stats/stats.ba.go b/examples/bin_api/stats/stats.ba.go index eb2dd8f..b66a077 100644 --- a/examples/bin_api/stats/stats.ba.go +++ b/examples/bin_api/stats/stats.ba.go @@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/bin_api/tap/tap.ba.go b/examples/bin_api/tap/tap.ba.go index 36f5549..d2878ea 100644 --- a/examples/bin_api/tap/tap.ba.go +++ b/examples/bin_api/tap/tap.ba.go @@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/bin_api/vpe/vpe.ba.go b/examples/bin_api/vpe/vpe.ba.go index f91a164..50476c3 100644 --- a/examples/bin_api/vpe/vpe.ba.go +++ b/examples/bin_api/vpe/vpe.ba.go @@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc" import "bytes" // Reference imports to suppress errors if they are not otherwise used. +var _ = api.RegisterMessage var _ = struc.Pack var _ = bytes.NewBuffer diff --git a/examples/cmd/perf-bench/perf-bench.go b/examples/cmd/perf-bench/perf-bench.go index 5b4b17d..664f046 100644 --- a/examples/cmd/perf-bench/perf-bench.go +++ b/examples/cmd/perf-bench/perf-bench.go @@ -20,7 +20,6 @@ import ( "flag" "fmt" "log" - "os" "time" "github.com/pkg/profile" @@ -65,16 +64,14 @@ func main() { // connect to VPP conn, err := govpp.Connect("") if err != nil { - log.Println("Error:", err) - os.Exit(1) + log.Fatalln("Error:", err) } defer conn.Disconnect() // create an API channel ch, err := conn.NewAPIChannelBuffered(cnt, cnt) if err != nil { - log.Println("Error:", err) - os.Exit(1) + log.Fatalln("Error:", err) } defer ch.Close() @@ -101,10 +98,8 @@ func syncTest(ch api.Channel, cnt int) { req := &vpe.ControlPing{} reply := &vpe.ControlPingReply{} - err := ch.SendRequest(req).ReceiveReply(reply) - if err != nil { - log.Println("Error in reply:", err) - os.Exit(1) + if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { + log.Fatalln("Error in reply:", err) } } } @@ -125,8 +120,7 @@ func asyncTest(ch api.Channel, cnt int) { for ctx := range ctxChan { reply := &vpe.ControlPingReply{} if err := ctx.ReceiveReply(reply); err != nil { - log.Println("Error in reply:", err) - os.Exit(1) + log.Fatalln("Error in reply:", err) } } } diff --git a/examples/cmd/simple-client/simple_client.go b/examples/cmd/simple-client/simple_client.go index b9e8052..08d4da6 100644 --- a/examples/cmd/simple-client/simple_client.go +++ b/examples/cmd/simple-client/simple_client.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "log" "net" "os" "strings" @@ -35,16 +36,14 @@ func main() { // connect to VPP conn, err := govpp.Connect("") if err != nil { - fmt.Println("Error:", err) - os.Exit(1) + log.Fatalln("ERROR:", err) } defer conn.Disconnect() // create an API channel that will be used in the examples ch, err := conn.NewAPIChannel() if err != nil { - fmt.Println("Error:", err) - os.Exit(1) + log.Fatalln("ERROR:", err) } defer ch.Close() @@ -64,20 +63,22 @@ func main() { // aclVersion is the simplest API example - one empty request message and one reply message. func aclVersion(ch api.Channel) { + fmt.Println("ACL getting version") + req := &acl.ACLPluginGetVersion{} reply := &acl.ACLPluginGetVersionReply{} - err := ch.SendRequest(req).ReceiveReply(reply) - - if err != nil { - fmt.Println("Error:", err) + if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { + fmt.Println("ERROR:", err) } else { - fmt.Printf("%+v\n", reply) + fmt.Printf("ACL version reply: %+v\n", reply) } } // aclConfig is another simple API example - in this case, the request contains structured data. func aclConfig(ch api.Channel) { + fmt.Println("ACL adding replace") + req := &acl.ACLAddReplace{ ACLIndex: ^uint32(0), Tag: []byte("access list 1"), @@ -102,10 +103,8 @@ func aclConfig(ch api.Channel) { } reply := &acl.ACLAddReplaceReply{} - err := ch.SendRequest(req).ReceiveReply(reply) - - if err != nil { - fmt.Println("Error:", err) + if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { + fmt.Println("ERROR:", err) return } if reply.Retval != 0 { @@ -113,19 +112,23 @@ func aclConfig(ch api.Channel) { return } - fmt.Printf("%+v\n", reply) + fmt.Printf("ACL add replace reply: %+v\n", reply) } // aclDump shows an example where SendRequest and ReceiveReply are not chained together. func aclDump(ch api.Channel) { + fmt.Println("Dumping ACL") + req := &acl.ACLDump{} reply := &acl.ACLDetails{} - if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { - fmt.Println("Error:", err) + reqCtx := ch.SendRequest(req) + + if err := reqCtx.ReceiveReply(reply); err != nil { + fmt.Println("ERROR:", err) } else { - fmt.Printf("%+v\n", reply) + fmt.Printf("ACL details: %+v\n", reply) } } @@ -133,14 +136,13 @@ func aclDump(ch api.Channel) { func interfaceDump(ch api.Channel) { fmt.Println("Dumping interfaces") - req := &interfaces.SwInterfaceDump{} - reqCtx := ch.SendMultiRequest(req) + reqCtx := ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) for { msg := &interfaces.SwInterfaceDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } if err != nil { fmt.Println("ERROR:", err) @@ -148,7 +150,7 @@ func interfaceDump(ch api.Channel) { ifaceName := strings.TrimFunc(string(msg.InterfaceName), func(r rune) bool { return r == 0x00 }) - fmt.Printf("Interface: %q %+v\n", ifaceName, msg) + fmt.Printf("Interface %q: %+v\n", ifaceName, msg) } } @@ -164,12 +166,12 @@ func ipAddressDump(ch api.Channel) { msg := &ip.IPAddressDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } if err != nil { fmt.Println("ERROR:", err) } - fmt.Printf("ip address: %d %+v\n", msg.SwIfIndex, msg) + fmt.Printf("ip address details: %d %+v\n", msg.SwIfIndex, msg) } } @@ -183,7 +185,7 @@ func setIpUnnumbered(ch api.Channel) { reply := &interfaces.SwInterfaceSetUnnumberedReply{} if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { - fmt.Println("Error:", err) + fmt.Println("ERROR:", err) } else { fmt.Printf("%+v\n", reply) } @@ -192,21 +194,20 @@ func setIpUnnumbered(ch api.Channel) { func ipUnnumberedDump(ch api.Channel) { fmt.Println("Dumping IP unnumbered") - req := &ip.IPUnnumberedDump{ + reqCtx := ch.SendMultiRequest(&ip.IPUnnumberedDump{ SwIfIndex: ^uint32(0), - } - reqCtx := ch.SendMultiRequest(req) + }) for { msg := &ip.IPUnnumberedDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } if err != nil { fmt.Println("ERROR:", err) } - fmt.Printf("ip unnumbered: %+v\n", msg) + fmt.Printf("IP unnumbered details: %+v\n", msg) } } @@ -214,9 +215,12 @@ func ipUnnumberedDump(ch api.Channel) { // you are supposed to create your own Go channel with your preferred buffer size. If the channel's // buffer is full, the notifications will not be delivered into it. func interfaceNotifications(ch api.Channel) { - // subscribe for specific notification message + fmt.Println("Subscribing to notificaiton events") + notifChan := make(chan api.Message, 100) - subs, err := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent) + + // subscribe for specific notification message + sub, err := ch.SubscribeNotification(notifChan, &interfaces.SwInterfaceEvent{}) if err != nil { panic(err) } @@ -248,7 +252,7 @@ func interfaceNotifications(ch api.Channel) { // receive one notification notif := (<-notifChan).(*interfaces.SwInterfaceEvent) - fmt.Printf("NOTIF: %+v\n", notif) + fmt.Printf("incoming event: %+v\n", notif) // disable interface events in VPP err = ch.SendRequest(&interfaces.WantInterfaceEvents{ @@ -260,7 +264,7 @@ func interfaceNotifications(ch api.Channel) { } // unsubscribe from delivery of the notifications - err = ch.UnsubscribeNotification(subs) + err = sub.Unsubscribe() if err != nil { panic(err) } diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go index 4ea4659..f61f975 100644 --- a/examples/cmd/stats-client/stats_client.go +++ b/examples/cmd/stats-client/stats_client.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "log" "os" "os/signal" @@ -28,45 +29,41 @@ import ( ) func main() { - fmt.Println("Starting stats VPP client...") + fmt.Println("Starting stats VPP client..") // async connect to VPP conn, statCh, err := govpp.AsyncConnect("") if err != nil { - fmt.Println("Error:", err) - os.Exit(1) + log.Fatalln("Error:", err) } defer conn.Disconnect() // create an API channel that will be used in the examples ch, err := conn.NewAPIChannel() if err != nil { - fmt.Println("Error:", err) - os.Exit(1) + log.Fatalln("Error:", err) } - defer fmt.Println("calling close") defer ch.Close() // create channel for Interrupt signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt) - var simpleCountersSubs *api.NotifSubscription - var combinedCountersSubs *api.NotifSubscription var notifChan chan api.Message + var simpleSub api.SubscriptionCtx + var combinedSub api.SubscriptionCtx // loop until Interrupt signal is received loop: for { select { - case connEvent := <-statCh: // VPP connection state change switch connEvent.State { case core.Connected: fmt.Println("VPP connected.") - if simpleCountersSubs == nil { - simpleCountersSubs, combinedCountersSubs, notifChan = subscribeNotifications(ch) + if notifChan == nil { + simpleSub, combinedSub, notifChan = subscribeNotifications(ch) } requestStatistics(ch) @@ -93,24 +90,24 @@ loop: } } - ch.UnsubscribeNotification(simpleCountersSubs) - ch.UnsubscribeNotification(combinedCountersSubs) + simpleSub.Unsubscribe() + combinedSub.Unsubscribe() } // subscribeNotifications subscribes for interface counters notifications. -func subscribeNotifications(ch api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) { +func subscribeNotifications(ch api.Channel) (api.SubscriptionCtx, api.SubscriptionCtx, chan api.Message) { notifChan := make(chan api.Message, 100) - simpleCountersSubs, err := ch.SubscribeNotification(notifChan, stats.NewVnetInterfaceSimpleCounters) + simpleSub, err := ch.SubscribeNotification(notifChan, &stats.VnetInterfaceSimpleCounters{}) if err != nil { panic(err) } - combinedCountersSubs, err := ch.SubscribeNotification(notifChan, stats.NewVnetInterfaceCombinedCounters) + combinedSub, err := ch.SubscribeNotification(notifChan, &stats.VnetInterfaceCombinedCounters{}) if err != nil { panic(err) } - return simpleCountersSubs, combinedCountersSubs, notifChan + return simpleSub, combinedSub, notifChan } // requestStatistics requests interface counters notifications from VPP. -- 2.16.6