From 2d07847237e754d9050f06f565baa430c70ed937 Mon Sep 17 00:00:00 2001 From: Rastislav Szabo Date: Thu, 25 May 2017 13:47:43 +0200 Subject: [PATCH] added async connect API, new structure of examples Change-Id: Iab9bce174596c30998981e02b7030c248c423384 Signed-off-by: Rastislav Szabo --- .gitignore | 3 +- Makefile | 8 +- README.md | 5 +- adapter/mock/{util => binapi}/binapi_reflect.go | 4 +- adapter/mock/mock_adapter.go | 4 +- api/api.go | 4 +- core/core.go | 365 +++++++++------------ core/core_test.go | 16 + core/{notifications.go => notification_handler.go} | 0 core/request_handler.go | 213 ++++++++++++ .../simple-client/simple_client.go} | 63 +--- examples/cmd/stats-client/stats_client.go | 132 ++++++++ govpp.go | 13 + 13 files changed, 557 insertions(+), 273 deletions(-) rename adapter/mock/{util => binapi}/binapi_reflect.go (96%) rename core/{notifications.go => notification_handler.go} (100%) create mode 100644 core/request_handler.go rename examples/{example_client.go => cmd/simple-client/simple_client.go} (75%) create mode 100644 examples/cmd/stats-client/stats_client.go diff --git a/.gitignore b/.gitignore index f0ab2aa..bfa4a36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea cmd/binapi-generator/binapi-generator -examples/examples +examples/cmd/simple-client/simple-client +examples/cmd/stats-client/stats-client diff --git a/Makefile b/Makefile index 70f1f45..44a203b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ build: @cd cmd/binapi-generator && go build -v - @cd examples && go build -v + @cd examples/cmd/simple-client && go build -v + @cd examples/cmd/stats-client && go build -v test: @cd cmd/binapi-generator && go test -cover . @@ -11,8 +12,9 @@ install: @cd cmd/binapi-generator && go install -v clean: - @rm cmd/binapi-generator/binapi-generator - @rm examples/examples + @rm -f cmd/binapi-generator/binapi-generator + @rm -f examples/cmd/simple-client/simple-client + @rm -f examples/cmd/stats-client/stats-client generate: @cd core && go generate ./... diff --git a/README.md b/README.md index a7d80d9..a5088f6 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ func main() { } ``` -The example above uses simple wrapper API over underlying go channels, see [example_client](examples/example_client.go) +The example above uses simple wrapper API over underlying go channels, see [example client](examples/cmd/simple-client/simple_client.go) for more examples, including the example on how to use the Go channels directly. @@ -127,7 +127,8 @@ binapi-generator --input-dir=examples/bin_api --output-dir=examples/bin_api In Go, [go generate](https://blog.golang.org/generate) tool can be leveraged to ease the code generation process. It allows to specify generator instructions in any one of the regular (non-generated) `.go` files -that are dependent on generated code using special comments, e.g. the one from [example_client](examples/example_client.go): +that are dependent on generated code using special comments, e.g. the one from +[example client](examples/cmd/simple-client/simple_client.go): ```go // go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api ``` diff --git a/adapter/mock/util/binapi_reflect.go b/adapter/mock/binapi/binapi_reflect.go similarity index 96% rename from adapter/mock/util/binapi_reflect.go rename to adapter/mock/binapi/binapi_reflect.go index 0ab065d..ee89909 100644 --- a/adapter/mock/util/binapi_reflect.go +++ b/adapter/mock/binapi/binapi_reflect.go @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package util is a helper package for generic handling of VPP binary +// Package binapi is a helper package for generic handling of VPP binary // API messages in the mock adapter and integration tests. -package util +package binapi import ( "reflect" diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go index 1076ec2..8c88030 100644 --- a/adapter/mock/mock_adapter.go +++ b/adapter/mock/mock_adapter.go @@ -25,7 +25,7 @@ import ( "github.com/lunixbochs/struc" "git.fd.io/govpp.git/adapter" - "git.fd.io/govpp.git/adapter/mock/util" + "git.fd.io/govpp.git/adapter/mock/binapi" "git.fd.io/govpp.git/api" ) @@ -137,7 +137,7 @@ func (a *VppAdapter) RegisterBinAPITypes(binAPITypes map[string]reflect.Type) { // ReplyTypeFor returns reply message type for given request message name. func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16, bool) { - replyName, foundName := util.ReplyNameFor(requestMsgName) + replyName, foundName := binapi.ReplyNameFor(requestMsgName) if foundName { if reply, found := a.binAPITypes[replyName]; found { msgID, err := a.GetMsgID(replyName, "") diff --git a/api/api.go b/api/api.go index fe6a34a..afaa8ad 100644 --- a/api/api.go +++ b/api/api.go @@ -32,7 +32,7 @@ const ( OtherMessage ) -// Message is an interface that is implemented by all VPP Binary API messages generated by the binapi-generator. +// Message is an interface that is implemented by all VPP Binary API messages generated by the binapigenerator. type Message interface { // GetMessageName returns the original VPP name of the message, as defined in the VPP API. GetMessageName() string @@ -44,7 +44,7 @@ type Message interface { GetCrcString() string } -// DataType is an interface that is implemented by all VPP Binary API data types by the binapi-generator. +// DataType is an interface that is implemented by all VPP Binary API data types by the binapi_generator. type DataType interface { // GetTypeName returns the original VPP name of the data type, as defined in the VPP API. GetTypeName() string diff --git a/core/core.go b/core/core.go index 5b14a42..2484c81 100644 --- a/core/core.go +++ b/core/core.go @@ -14,14 +14,12 @@ package core -//go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api - import ( "errors" - "fmt" "os" "sync" "sync/atomic" + "time" logger "github.com/Sirupsen/logrus" @@ -31,14 +29,41 @@ import ( ) const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers + requestChannelBufSize = 100 // default size of the request channel buffers + replyChannelBufSize = 100 // default size of the reply channel buffers + notificationChannelBufSize = 100 // default size of the notification channel buffers +) + +const ( + healthCheckProbeInterval = time.Second * 1 // default health check probe interval + healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe +) + +// ConnectionState holds the current state of the connection to VPP. +type ConnectionState int + +const ( + // Connected connection state means that the connection to VPP has been successfully established. + Connected ConnectionState = iota + + // Disconnected connection state means that the connection to VPP has been lost. + Disconnected = iota ) +// ConnectionEvent is a notification about change in the VPP connection state. +type ConnectionEvent struct { + // Timestamp holds the time when the event has been generated. + Timestamp time.Time + + // State holds the new state of the connection to VPP at the time when the event has been generated. + State ConnectionState +} + // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vpp adapter.VppAdapter // VPP adapter - codec *MsgCodec // message codec + vpp adapter.VppAdapter // VPP adapter + connected uint32 // non-zero if the adapter is connected to VPP + codec *MsgCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgIDsLock sync.RWMutex // lock for the message IDs map @@ -79,7 +104,57 @@ func SetLogger(l *logger.Logger) { } // Connect connects to VPP using specified VPP adapter and returns the connection handle. +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, err + } + + // blocking attempt to connect to VPP + err = c.connectVPP() + if err != nil { + return nil, err + } + + return conn, nil +} + +// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle +// and ConnectionState channel. This call does not block until connection is established, it +// returns immediately. The caller is supposed to watch the returned ConnectionState channel for +// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, nil, err + } + + // asynchronously attempt to connect to VPP + connChan := make(chan ConnectionEvent, notificationChannelBufSize) + go c.connectLoop(connChan) + + return conn, connChan, nil +} + +// Disconnect disconnects from VPP and releases all connection-related resources. +func (c *Connection) Disconnect() { + if c == nil { + return + } + connLock.Lock() + defer connLock.Unlock() + + if c != nil && c.vpp != nil { + c.disconnectVPP() + } + conn = nil +} + +// newConnection returns new connection handle. +func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -93,35 +168,102 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription) conn.vpp.SetMsgCallback(msgCallback) + return conn, nil +} - logger.Debug("Connecting to VPP...") +// connectVPP performs one blocking attempt to connect to VPP. +func (c *Connection) connectVPP() error { + log.Debug("Connecting to VPP...") - err := conn.vpp.Connect() + // blocking connect + err := c.vpp.Connect() if err != nil { - return nil, err + log.Warn(err) + return err } + // store connected state + atomic.StoreUint32(&c.connected, 1) + // store control ping IDs - conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{}) - conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{}) + c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{}) + c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{}) - logger.Debug("VPP connected.") + log.Info("Connected to VPP.") + return nil +} - return conn, nil +// disconnectVPP disconnects from VPP in case it is connected. +func (c *Connection) disconnectVPP() { + if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { + c.vpp.Disconnect() + } } -// Disconnect disconnects from VPP. -func (c *Connection) Disconnect() { - if c == nil { +// connectLoop attempts to connect to VPP until it succeeds. +// Then it continues with healthCheckLoop. +func (c *Connection) connectLoop(connChan chan ConnectionEvent) { + // loop until connected + for { + err := c.connectVPP() + if err == nil { + // signal connected event + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + break + } + } + + // we are now connected, continue with health check loop + c.healthCheckLoop(connChan) +} + +// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, +// it continues with connectLoop and tries to reconnect. +func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { + // create a separate API channel for health check probes + ch, err := conn.NewAPIChannel() + if err != nil { + log.Error("Error by creating health check API channel, health check will be disabled:", err) return } - connLock.Lock() - defer connLock.Unlock() - if c != nil && c.vpp != nil { - c.vpp.Disconnect() + // send health check probes until an error occurs + for { + // wait for healthCheckProbeInterval + <-time.After(healthCheckProbeInterval) + + if atomic.LoadUint32(&c.connected) == 0 { + // Disconnect has been called in the meantime, return the healthcheck - reconnect loop + log.Debug("Disconnected on request, exiting health check loop.") + return + } + + // send the control ping + ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + + // expect response within timeout period + select { + case vppReply := <-ch.ReplyChan: + err = vppReply.Error + case <-time.After(healthCheckReplyTimeout): + err = errors.New("probe reply not received within the timeout period") + } + + // in case of error, break & disconnect + if err != nil { + log.Errorf("VPP health check failed: %v", err) + // signal disconnected event via channel + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } } - conn = nil + + // cleanup + ch.Close() + c.disconnectVPP() + + // we are now disconnected, start connect loop + c.connectLoop(connChan) } // NewAPIChannel returns a new API channel for communication with VPP via govpp core. @@ -163,84 +305,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) return ch, nil } -// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { - for { - select { - case req, ok := <-ch.ReqChan: - // new request on the request channel - if !ok { - // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) - return - } - c.processRequest(ch, chMeta, req) - - case req := <-ch.NotifSubsChan: - // new request on the notification subscribe channel - c.processNotifSubscribeRequest(ch, req) - } - } -} - -// processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { - // retrieve message ID - msgID, err := c.GetMessageID(req.Message) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": req.Message.GetMessageName(), - "msg_crc": req.Message.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // encode the message into binary - data, err := c.codec.EncodeMsg(req.Message, msgID) - if err != nil { - error := fmt.Errorf("unable to encode the messge: %v", err) - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - }).Errorf("%v", error) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // send the message - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Sending a message to VPP.") - - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - - // send the request to VPP - c.vpp.SendMsg(chMeta.id, data) - - if req.Multipart { - // send a control ping to determine end of the multipart response - ping := &vpe.ControlPing{} - pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) - - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - }).Debug("Sending a control ping to VPP.") - - c.vpp.SendMsg(chMeta.id, pingData) - } - - return nil -} - // releaseAPIChannel releases API channel that needs to be closed. func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { log.WithFields(logger.Fields{ @@ -252,104 +316,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) delete(c.channels, chMeta.id) c.channelsLock.Unlock() } - -// msgCallback is called whenever any binary API message comes from VPP. -func msgCallback(context uint32, msgID uint16, data []byte) { - connLock.RLock() - defer connLock.RUnlock() - - if conn == nil { - log.Warn("Already disconnected, ignoring the message.") - return - } - - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Received a message from VPP.") - - if context == 0 || conn.isNotificationMessage(msgID) { - // process the message as a notification - conn.sendNotifications(msgID, data) - return - } - - // match ch according to the context - conn.channelsLock.RLock() - ch, ok := conn.channels[context] - conn.channelsLock.RUnlock() - - if !ok { - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") - return - } - - chMeta := ch.Metadata().(*channelMetadata) - lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { - lastReplyReceived = true - } - - // send the data to the channel - sendReply(ch, &api.VppReply{ - MessageID: msgID, - Data: data, - LastReplyReceived: lastReplyReceived, - }) -} - -// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise -// it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { - select { - case ch.ReplyChan <- reply: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - "msg_id": reply.MessageID, - }).Warn("Unable to send the reply, reciever end not ready.") - } -} - -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) -} - -// messageNameToID returns message ID of a message identified by its name and CRC. -func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { - // try to get the ID from the map - c.msgIDsLock.RLock() - id, ok := c.msgIDs[msgName+msgCrc] - c.msgIDsLock.RUnlock() - if ok { - return id, nil - } - - // get the ID using VPP API - id, err := c.vpp.GetMsgID(msgName, msgCrc) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_crc": msgCrc, - }).Errorf("unable to retrieve message ID: %v", err) - return id, error - } - - c.msgIDsLock.Lock() - c.msgIDs[msgName+msgCrc] = id - c.msgIDsLock.Unlock() - - return id, nil -} diff --git a/core/core_test.go b/core/core_test.go index d3c2e2c..3184ef5 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -145,6 +145,7 @@ func TestNotifications(t *testing.T) { } func TestNilConnection(t *testing.T) { + RegisterTestingT(t) var conn *Connection ch, err := conn.NewAPIChannel() @@ -168,6 +169,21 @@ func TestDoubleConnection(t *testing.T) { Expect(conn).Should(BeNil()) } +func TestAsyncConnection(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.conn.Disconnect() + conn, ch, err := AsyncConnect(ctx.mockVpp) + ctx.conn = conn + + Expect(err).ShouldNot(HaveOccurred()) + Expect(conn).ShouldNot(BeNil()) + + ev := <-ch + Expect(ev.State).Should(BeEquivalentTo(Connected)) +} + func TestFullBuffer(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() diff --git a/core/notifications.go b/core/notification_handler.go similarity index 100% rename from core/notifications.go rename to core/notification_handler.go diff --git a/core/request_handler.go b/core/request_handler.go new file mode 100644 index 0000000..f4f5e92 --- /dev/null +++ b/core/request_handler.go @@ -0,0 +1,213 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "errors" + "fmt" + "sync/atomic" + + logger "github.com/Sirupsen/logrus" + + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/core/bin_api/vpe" +) + +// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. +func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { + for { + select { + case req, ok := <-ch.ReqChan: + // new request on the request channel + if !ok { + // after closing the request channel, release API channel and return + c.releaseAPIChannel(ch, chMeta) + return + } + c.processRequest(ch, chMeta, req) + + case req := <-ch.NotifSubsChan: + // new request on the notification subscribe channel + c.processNotifSubscribeRequest(ch, req) + } + } +} + +// processRequest processes a single request received on the request channel. +func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { + // check whether we are connected to VPP + if atomic.LoadUint32(&c.connected) == 0 { + error := errors.New("not connected to VPP, ignoring the request") + log.Error(error) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // retrieve message ID + msgID, err := c.GetMessageID(req.Message) + if err != nil { + error := fmt.Errorf("unable to retrieve message ID: %v", err) + log.WithFields(logger.Fields{ + "msg_name": req.Message.GetMessageName(), + "msg_crc": req.Message.GetCrcString(), + }).Error(err) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // encode the message into binary + data, err := c.codec.EncodeMsg(req.Message, msgID) + if err != nil { + error := fmt.Errorf("unable to encode the messge: %v", err) + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": msgID, + }).Error(error) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // send the message + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Sending a message to VPP.") + + if req.Multipart { + // expect multipart response + atomic.StoreUint32(&chMeta.multipart, 1) + } + + // send the request to VPP + c.vpp.SendMsg(chMeta.id, data) + + if req.Multipart { + // send a control ping to determine end of the multipart response + ping := &vpe.ControlPing{} + pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) + + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": c.pingReqID, + "msg_size": len(pingData), + }).Debug("Sending a control ping to VPP.") + + c.vpp.SendMsg(chMeta.id, pingData) + } + + return nil +} + +// msgCallback is called whenever any binary API message comes from VPP. +func msgCallback(context uint32, msgID uint16, data []byte) { + connLock.RLock() + defer connLock.RUnlock() + + if conn == nil { + log.Warn("Already disconnected, ignoring the message.") + return + } + + log.WithFields(logger.Fields{ + "context": context, + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Received a message from VPP.") + + if context == 0 || conn.isNotificationMessage(msgID) { + // process the message as a notification + conn.sendNotifications(msgID, data) + return + } + + // match ch according to the context + conn.channelsLock.RLock() + ch, ok := conn.channels[context] + conn.channelsLock.RUnlock() + + if !ok { + log.WithFields(logger.Fields{ + "context": context, + "msg_id": msgID, + }).Error("Context ID not known, ignoring the message.") + return + } + + chMeta := ch.Metadata().(*channelMetadata) + lastReplyReceived := false + // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply + if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { + lastReplyReceived = true + } + + // send the data to the channel + sendReply(ch, &api.VppReply{ + MessageID: msgID, + Data: data, + LastReplyReceived: lastReplyReceived, + }) +} + +// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise +// it logs the error and do not send the message. +func sendReply(ch *api.Channel, reply *api.VppReply) { + select { + case ch.ReplyChan <- reply: + // reply sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "channel": ch, + "msg_id": reply.MessageID, + }).Warn("Unable to send the reply, reciever end not ready.") + } +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) +} + +// messageNameToID returns message ID of a message identified by its name and CRC. +func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { + // try to get the ID from the map + c.msgIDsLock.RLock() + id, ok := c.msgIDs[msgName+msgCrc] + c.msgIDsLock.RUnlock() + if ok { + return id, nil + } + + // get the ID using VPP API + id, err := c.vpp.GetMsgID(msgName, msgCrc) + if err != nil { + error := fmt.Errorf("unable to retrieve message ID: %v", err) + log.WithFields(logger.Fields{ + "msg_name": msgName, + "msg_crc": msgCrc, + }).Errorf("unable to retrieve message ID: %v", err) + return id, error + } + + c.msgIDsLock.Lock() + c.msgIDs[msgName+msgCrc] = id + c.msgIDsLock.Unlock() + + return id, nil +} diff --git a/examples/example_client.go b/examples/cmd/simple-client/simple_client.go similarity index 75% rename from examples/example_client.go rename to examples/cmd/simple-client/simple_client.go index f2e5804..6e46d6b 100644 --- a/examples/example_client.go +++ b/examples/cmd/simple-client/simple_client.go @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Example VPP management application that exercises the govpp API on real-world use-cases. +// Binary simple-client is an example VPP management application that exercises the +// govpp API on real-world use-cases. package main // Generates Go bindings for all VPP APIs located in the json directory. @@ -22,19 +23,16 @@ import ( "fmt" "net" "os" - "os/signal" "git.fd.io/govpp.git" "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/api/ifcounters" - "git.fd.io/govpp.git/core/bin_api/vpe" "git.fd.io/govpp.git/examples/bin_api/acl" "git.fd.io/govpp.git/examples/bin_api/interfaces" "git.fd.io/govpp.git/examples/bin_api/tap" ) func main() { - fmt.Println("Starting example VPP client...") + fmt.Println("Starting simple VPP client...") // connect to VPP conn, err := govpp.Connect() @@ -64,8 +62,6 @@ func main() { interfaceDump(ch) interfaceNotifications(ch) - - //interfaceCounters(ch) } // compatibilityCheck shows how an management application can check whether generated API messages are @@ -223,56 +219,3 @@ func interfaceNotifications(ch *api.Channel) { // unsubscribe from delivery of the notifications ch.UnsubscribeNotification(subs) } - -// interfaceCounters is an example of using notification API to periodically retrieve interface statistics. -// The ifcounters package contains the API that can be used to decode the strange VnetInterfaceCounters message. -func interfaceCounters(ch *api.Channel) { - // subscribe for interface counters notifications - notifChan := make(chan api.Message, 100) - subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters) - - // enable interface counters notifications from VPP - ch.SendRequest(&vpe.WantStats{ - Pid: uint32(os.Getpid()), - EnableDisable: 1, - }).ReceiveReply(&vpe.WantStatsReply{}) - - // create channel for Interrupt signal - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt) - - // loop until Interrupt signal is received -loop: - for { - select { - case <-sigChan: - // interrupt received - break loop - case notifMsg := <-notifChan: - notif := notifMsg.(*interfaces.VnetInterfaceCounters) - // notification received - fmt.Printf("%+v\n", notif) - - if notif.IsCombined == 0 { - // simple counter - counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*notif)) - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", counters) - } - } else { - // combined counter - counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*notif)) - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", counters) - } - } - } - } - - // unsubscribe from delivery of the notifications - ch.UnsubscribeNotification(subs) -} diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go new file mode 100644 index 0000000..fc40b24 --- /dev/null +++ b/examples/cmd/stats-client/stats_client.go @@ -0,0 +1,132 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Binary stats-client is an example VPP management application that exercises the +// govpp API for interface counters together with asynchronous connection to VPP. +package main + +// Generates Go bindings for all VPP APIs located in the json directory. +//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api + +import ( + "fmt" + "os" + "os/signal" + + "git.fd.io/govpp.git" + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/api/ifcounters" + "git.fd.io/govpp.git/core" + "git.fd.io/govpp.git/core/bin_api/vpe" + "git.fd.io/govpp.git/examples/bin_api/interfaces" +) + +func main() { + fmt.Println("Starting stats VPP client...") + + // async connect to VPP + conn, statCh, err := govpp.AsyncConnect() + if err != nil { + fmt.Println("Error:", err) + os.Exit(1) + } + defer conn.Disconnect() + + // create an API channel that will be used in the examples + ch, err := conn.NewAPIChannel() + if err != nil { + fmt.Println("Error:", err) + os.Exit(1) + } + defer ch.Close() + + // create channel for Interrupt signal + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + + var subs *api.NotifSubscription + var notifChan chan api.Message + + // loop until Interrupt signal is received +loop: + for { + select { + + case connEvent := <-statCh: + // VPP connection state change + switch connEvent.State { + case core.Connected: + fmt.Println("VPP connected.") + if subs == nil { + subs, notifChan = subscribeNotification(ch) + } + requestStatistics(ch) + + case core.Disconnected: + fmt.Println("VPP disconnected.") + } + + case notifMsg := <-notifChan: + // counter notification received + processCounters(notifMsg.(*interfaces.VnetInterfaceCounters)) + + case <-sigChan: + // interrupt received + fmt.Println("Interrupt received, exiting.") + break loop + } + } + + ch.UnsubscribeNotification(subs) +} + +// subscribeNotification subscribes for interface counters notifications. +func subscribeNotification(ch *api.Channel) (*api.NotifSubscription, chan api.Message) { + + notifChan := make(chan api.Message, 100) + subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters) + + return subs, notifChan +} + +// requestStatistics requests interface counters notifications from VPP. +func requestStatistics(ch *api.Channel) { + ch.SendRequest(&vpe.WantStats{ + Pid: uint32(os.Getpid()), + EnableDisable: 1, + }).ReceiveReply(&vpe.WantStatsReply{}) +} + +// processCounters processes a counter message received from VPP. +func processCounters(msg *interfaces.VnetInterfaceCounters) { + fmt.Printf("%+v\n", msg) + + if msg.IsCombined == 0 { + // simple counter + counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*msg)) + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", counters) + } + } else { + // combined counter + counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*msg)) + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", counters) + } + } +} diff --git a/govpp.go b/govpp.go index 5d3ed7f..6f0cc2e 100644 --- a/govpp.go +++ b/govpp.go @@ -24,6 +24,7 @@ var vppAdapter adapter.VppAdapter // VPP Adapter that will be used in the subseq // Connect connects the govpp core to VPP either using the default VPP Adapter, or using the adapter previously // set by SetAdapter (useful mostly just for unit/integration tests with mocked VPP adapter). +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect() (*core.Connection, error) { if vppAdapter == nil { vppAdapter = vppapiclient.NewVppAdapter() @@ -31,6 +32,18 @@ func Connect() (*core.Connection, error) { return core.Connect(vppAdapter) } +// AsyncConnect asynchronously connects the govpp core to VPP either using the default VPP Adapter, +// or using the adapter previously set by SetAdapter. +// This call does not block until connection is established, it returns immediately. The caller is +// supposed to watch the returned ConnectionState channel for Connected/Disconnected events. +// In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect() (*core.Connection, chan core.ConnectionEvent, error) { + if vppAdapter == nil { + vppAdapter = vppapiclient.NewVppAdapter() + } + return core.AsyncConnect(vppAdapter) +} + // SetAdapter sets the adapter that will be used for connections to VPP in the subsequent `Connect` calls. func SetAdapter(ad adapter.VppAdapter) { vppAdapter = ad -- 2.16.6