added async connect API, new structure of examples 76/6876/2
authorRastislav Szabo <raszabo@cisco.com>
Thu, 25 May 2017 11:47:43 +0000 (13:47 +0200)
committerRastislav Szabo <raszabo@cisco.com>
Thu, 25 May 2017 11:54:13 +0000 (13:54 +0200)
Change-Id: Iab9bce174596c30998981e02b7030c248c423384
Signed-off-by: Rastislav Szabo <raszabo@cisco.com>
13 files changed:
.gitignore
Makefile
README.md
adapter/mock/binapi/binapi_reflect.go [moved from adapter/mock/util/binapi_reflect.go with 96% similarity]
adapter/mock/mock_adapter.go
api/api.go
core/core.go
core/core_test.go
core/notification_handler.go [moved from core/notifications.go with 100% similarity]
core/request_handler.go [new file with mode: 0644]
examples/cmd/simple-client/simple_client.go [moved from examples/example_client.go with 75% similarity]
examples/cmd/stats-client/stats_client.go [new file with mode: 0644]
govpp.go

index f0ab2aa..bfa4a36 100644 (file)
@@ -1,3 +1,4 @@
 .idea
 cmd/binapi-generator/binapi-generator
-examples/examples
+examples/cmd/simple-client/simple-client
+examples/cmd/stats-client/stats-client
index 70f1f45..44a203b 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,6 +1,7 @@
 build:
        @cd cmd/binapi-generator && go build -v
-       @cd examples && go build -v
+       @cd examples/cmd/simple-client && go build -v
+       @cd examples/cmd/stats-client && go build -v
 
 test:
        @cd cmd/binapi-generator && go test -cover .
@@ -11,8 +12,9 @@ install:
        @cd cmd/binapi-generator && go install -v
 
 clean:
-       @rm cmd/binapi-generator/binapi-generator
-       @rm examples/examples
+       @rm -f cmd/binapi-generator/binapi-generator
+       @rm -f examples/cmd/simple-client/simple-client
+       @rm -f examples/cmd/stats-client/stats-client
 
 generate:
        @cd core && go generate ./...
index a7d80d9..a5088f6 100644 (file)
--- a/README.md
+++ b/README.md
@@ -84,7 +84,7 @@ func main() {
 }
 ```
 
-The example above uses simple wrapper API over underlying go channels, see [example_client](examples/example_client.go) 
+The example above uses simple wrapper API over underlying go channels, see [example client](examples/cmd/simple-client/simple_client.go) 
 for more examples, including the example on how to use the Go channels directly.
 
 
@@ -127,7 +127,8 @@ binapi-generator --input-dir=examples/bin_api --output-dir=examples/bin_api
 
 In Go, [go generate](https://blog.golang.org/generate) tool can be leveraged to ease the code generation
 process. It allows to specify generator instructions in any one of the regular (non-generated) `.go` files
-that are dependent on generated code using special comments, e.g. the one from [example_client](examples/example_client.go):
+that are dependent on generated code using special comments, e.g. the one from 
+[example client](examples/cmd/simple-client/simple_client.go):
 ```go
 // go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
 ```
similarity index 96%
rename from adapter/mock/util/binapi_reflect.go
rename to adapter/mock/binapi/binapi_reflect.go
index 0ab065d..ee89909 100644 (file)
@@ -12,9 +12,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package util is a helper package for generic handling of VPP binary
+// Package binapi is a helper package for generic handling of VPP binary
 // API messages in the mock adapter and integration tests.
-package util
+package binapi
 
 import (
        "reflect"
index 1076ec2..8c88030 100644 (file)
@@ -25,7 +25,7 @@ import (
        "github.com/lunixbochs/struc"
 
        "git.fd.io/govpp.git/adapter"
-       "git.fd.io/govpp.git/adapter/mock/util"
+       "git.fd.io/govpp.git/adapter/mock/binapi"
        "git.fd.io/govpp.git/api"
 )
 
@@ -137,7 +137,7 @@ func (a *VppAdapter) RegisterBinAPITypes(binAPITypes map[string]reflect.Type) {
 
 // ReplyTypeFor returns reply message type for given request message name.
 func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16, bool) {
-       replyName, foundName := util.ReplyNameFor(requestMsgName)
+       replyName, foundName := binapi.ReplyNameFor(requestMsgName)
        if foundName {
                if reply, found := a.binAPITypes[replyName]; found {
                        msgID, err := a.GetMsgID(replyName, "")
index fe6a34a..afaa8ad 100644 (file)
@@ -32,7 +32,7 @@ const (
        OtherMessage
 )
 
-// Message is an interface that is implemented by all VPP Binary API messages generated by the binapi-generator.
+// Message is an interface that is implemented by all VPP Binary API messages generated by the binapigenerator.
 type Message interface {
        // GetMessageName returns the original VPP name of the message, as defined in the VPP API.
        GetMessageName() string
@@ -44,7 +44,7 @@ type Message interface {
        GetCrcString() string
 }
 
-// DataType is an interface that is implemented by all VPP Binary API data types by the binapi-generator.
+// DataType is an interface that is implemented by all VPP Binary API data types by the binapi_generator.
 type DataType interface {
        // GetTypeName returns the original VPP name of the data type, as defined in the VPP API.
        GetTypeName() string
index 5b14a42..2484c81 100644 (file)
 
 package core
 
-//go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
-
 import (
        "errors"
-       "fmt"
        "os"
        "sync"
        "sync/atomic"
+       "time"
 
        logger "github.com/Sirupsen/logrus"
 
@@ -31,14 +29,41 @@ import (
 )
 
 const (
-       requestChannelBufSize = 100 // default size of the request channel buffers
-       replyChannelBufSize   = 100 // default size of the reply channel buffers
+       requestChannelBufSize      = 100 // default size of the request channel buffers
+       replyChannelBufSize        = 100 // default size of the reply channel buffers
+       notificationChannelBufSize = 100 // default size of the notification channel buffers
+)
+
+const (
+       healthCheckProbeInterval = time.Second * 1        // default health check probe interval
+       healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
+)
+
+// ConnectionState holds the current state of the connection to VPP.
+type ConnectionState int
+
+const (
+       // Connected connection state means that the connection to VPP has been successfully established.
+       Connected ConnectionState = iota
+
+       // Disconnected connection state means that the connection to VPP has been lost.
+       Disconnected = iota
 )
 
+// ConnectionEvent is a notification about change in the VPP connection state.
+type ConnectionEvent struct {
+       // Timestamp holds the time when the event has been generated.
+       Timestamp time.Time
+
+       // State holds the new state of the connection to VPP at the time when the event has been generated.
+       State ConnectionState
+}
+
 // Connection represents a shared memory connection to VPP via vppAdapter.
 type Connection struct {
-       vpp   adapter.VppAdapter // VPP adapter
-       codec *MsgCodec          // message codec
+       vpp       adapter.VppAdapter // VPP adapter
+       connected uint32             // non-zero if the adapter is connected to VPP
+       codec     *MsgCodec          // message codec
 
        msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
        msgIDsLock sync.RWMutex      // lock for the message IDs map
@@ -79,7 +104,57 @@ func SetLogger(l *logger.Logger) {
 }
 
 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
+// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
+       // create new connection handle
+       c, err := newConnection(vppAdapter)
+       if err != nil {
+               return nil, err
+       }
+
+       // blocking attempt to connect to VPP
+       err = c.connectVPP()
+       if err != nil {
+               return nil, err
+       }
+
+       return conn, nil
+}
+
+// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
+// and ConnectionState channel. This call does not block until connection is established, it
+// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
+// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
+func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
+       // create new connection handle
+       c, err := newConnection(vppAdapter)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       // asynchronously attempt to connect to VPP
+       connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+       go c.connectLoop(connChan)
+
+       return conn, connChan, nil
+}
+
+// Disconnect disconnects from VPP and releases all connection-related resources.
+func (c *Connection) Disconnect() {
+       if c == nil {
+               return
+       }
+       connLock.Lock()
+       defer connLock.Unlock()
+
+       if c != nil && c.vpp != nil {
+               c.disconnectVPP()
+       }
+       conn = nil
+}
+
+// newConnection returns new connection handle.
+func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
        connLock.Lock()
        defer connLock.Unlock()
 
@@ -93,35 +168,102 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
        conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
 
        conn.vpp.SetMsgCallback(msgCallback)
+       return conn, nil
+}
 
-       logger.Debug("Connecting to VPP...")
+// connectVPP performs one blocking attempt to connect to VPP.
+func (c *Connection) connectVPP() error {
+       log.Debug("Connecting to VPP...")
 
-       err := conn.vpp.Connect()
+       // blocking connect
+       err := c.vpp.Connect()
        if err != nil {
-               return nil, err
+               log.Warn(err)
+               return err
        }
 
+       // store connected state
+       atomic.StoreUint32(&c.connected, 1)
+
        // store control ping IDs
-       conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
-       conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
+       c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
+       c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
 
-       logger.Debug("VPP connected.")
+       log.Info("Connected to VPP.")
+       return nil
+}
 
-       return conn, nil
+// disconnectVPP disconnects from VPP in case it is connected.
+func (c *Connection) disconnectVPP() {
+       if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
+               c.vpp.Disconnect()
+       }
 }
 
-// Disconnect disconnects from VPP.
-func (c *Connection) Disconnect() {
-       if c == nil {
+// connectLoop attempts to connect to VPP until it succeeds.
+// Then it continues with healthCheckLoop.
+func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+       // loop until connected
+       for {
+               err := c.connectVPP()
+               if err == nil {
+                       // signal connected event
+                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+                       break
+               }
+       }
+
+       // we are now connected, continue with health check loop
+       c.healthCheckLoop(connChan)
+}
+
+// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
+// it continues with connectLoop and tries to reconnect.
+func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+       // create a separate API channel for health check probes
+       ch, err := conn.NewAPIChannel()
+       if err != nil {
+               log.Error("Error by creating health check API channel, health check will be disabled:", err)
                return
        }
-       connLock.Lock()
-       defer connLock.Unlock()
 
-       if c != nil && c.vpp != nil {
-               c.vpp.Disconnect()
+       // send health check probes until an error occurs
+       for {
+               // wait for healthCheckProbeInterval
+               <-time.After(healthCheckProbeInterval)
+
+               if atomic.LoadUint32(&c.connected) == 0 {
+                       // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
+                       log.Debug("Disconnected on request, exiting health check loop.")
+                       return
+               }
+
+               // send the control ping
+               ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
+
+               // expect response within timeout period
+               select {
+               case vppReply := <-ch.ReplyChan:
+                       err = vppReply.Error
+               case <-time.After(healthCheckReplyTimeout):
+                       err = errors.New("probe reply not received within the timeout period")
+               }
+
+               // in case of error, break & disconnect
+               if err != nil {
+                       log.Errorf("VPP health check failed: %v", err)
+                       // signal disconnected event via channel
+                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+                       break
+               }
        }
-       conn = nil
+
+       // cleanup
+       ch.Close()
+       c.disconnectVPP()
+
+       // we are now disconnected, start connect loop
+       c.connectLoop(connChan)
 }
 
 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
@@ -163,84 +305,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
        return ch, nil
 }
 
-// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
-       for {
-               select {
-               case req, ok := <-ch.ReqChan:
-                       // new request on the request channel
-                       if !ok {
-                               // after closing the request channel, release API channel and return
-                               c.releaseAPIChannel(ch, chMeta)
-                               return
-                       }
-                       c.processRequest(ch, chMeta, req)
-
-               case req := <-ch.NotifSubsChan:
-                       // new request on the notification subscribe channel
-                       c.processNotifSubscribeRequest(ch, req)
-               }
-       }
-}
-
-// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
-       // retrieve message ID
-       msgID, err := c.GetMessageID(req.Message)
-       if err != nil {
-               error := fmt.Errorf("unable to retrieve message ID: %v", err)
-               log.WithFields(logger.Fields{
-                       "msg_name": req.Message.GetMessageName(),
-                       "msg_crc":  req.Message.GetCrcString(),
-               }).Errorf("unable to retrieve message ID: %v", err)
-               sendReply(ch, &api.VppReply{Error: error})
-               return error
-       }
-
-       // encode the message into binary
-       data, err := c.codec.EncodeMsg(req.Message, msgID)
-       if err != nil {
-               error := fmt.Errorf("unable to encode the messge: %v", err)
-               log.WithFields(logger.Fields{
-                       "context": chMeta.id,
-                       "msg_id":  msgID,
-               }).Errorf("%v", error)
-               sendReply(ch, &api.VppReply{Error: error})
-               return error
-       }
-
-       // send the message
-       log.WithFields(logger.Fields{
-               "context":  chMeta.id,
-               "msg_id":   msgID,
-               "msg_size": len(data),
-       }).Debug("Sending a message to VPP.")
-
-       if req.Multipart {
-               // expect multipart response
-               atomic.StoreUint32(&chMeta.multipart, 1)
-       }
-
-       // send the request to VPP
-       c.vpp.SendMsg(chMeta.id, data)
-
-       if req.Multipart {
-               // send a control ping to determine end of the multipart response
-               ping := &vpe.ControlPing{}
-               pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
-
-               log.WithFields(logger.Fields{
-                       "context":  chMeta.id,
-                       "msg_id":   c.pingReqID,
-                       "msg_size": len(pingData),
-               }).Debug("Sending a control ping to VPP.")
-
-               c.vpp.SendMsg(chMeta.id, pingData)
-       }
-
-       return nil
-}
-
 // releaseAPIChannel releases API channel that needs to be closed.
 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
        log.WithFields(logger.Fields{
@@ -252,104 +316,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata)
        delete(c.channels, chMeta.id)
        c.channelsLock.Unlock()
 }
-
-// msgCallback is called whenever any binary API message comes from VPP.
-func msgCallback(context uint32, msgID uint16, data []byte) {
-       connLock.RLock()
-       defer connLock.RUnlock()
-
-       if conn == nil {
-               log.Warn("Already disconnected, ignoring the message.")
-               return
-       }
-
-       log.WithFields(logger.Fields{
-               "context":  context,
-               "msg_id":   msgID,
-               "msg_size": len(data),
-       }).Debug("Received a message from VPP.")
-
-       if context == 0 || conn.isNotificationMessage(msgID) {
-               // process the message as a notification
-               conn.sendNotifications(msgID, data)
-               return
-       }
-
-       // match ch according to the context
-       conn.channelsLock.RLock()
-       ch, ok := conn.channels[context]
-       conn.channelsLock.RUnlock()
-
-       if !ok {
-               log.WithFields(logger.Fields{
-                       "context": context,
-                       "msg_id":  msgID,
-               }).Error("Context ID not known, ignoring the message.")
-               return
-       }
-
-       chMeta := ch.Metadata().(*channelMetadata)
-       lastReplyReceived := false
-       // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
-       if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
-               lastReplyReceived = true
-       }
-
-       // send the data to the channel
-       sendReply(ch, &api.VppReply{
-               MessageID:         msgID,
-               Data:              data,
-               LastReplyReceived: lastReplyReceived,
-       })
-}
-
-// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
-// it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
-       select {
-       case ch.ReplyChan <- reply:
-               // reply sent successfully
-       default:
-               // unable to write into the channel without blocking
-               log.WithFields(logger.Fields{
-                       "channel": ch,
-                       "msg_id":  reply.MessageID,
-               }).Warn("Unable to send the reply, reciever end not ready.")
-       }
-}
-
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
-       if c == nil {
-               return 0, errors.New("nil connection passed in")
-       }
-       return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
-}
-
-// messageNameToID returns message ID of a message identified by its name and CRC.
-func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
-       // try to get the ID from the map
-       c.msgIDsLock.RLock()
-       id, ok := c.msgIDs[msgName+msgCrc]
-       c.msgIDsLock.RUnlock()
-       if ok {
-               return id, nil
-       }
-
-       // get the ID using VPP API
-       id, err := c.vpp.GetMsgID(msgName, msgCrc)
-       if err != nil {
-               error := fmt.Errorf("unable to retrieve message ID: %v", err)
-               log.WithFields(logger.Fields{
-                       "msg_name": msgName,
-                       "msg_crc":  msgCrc,
-               }).Errorf("unable to retrieve message ID: %v", err)
-               return id, error
-       }
-
-       c.msgIDsLock.Lock()
-       c.msgIDs[msgName+msgCrc] = id
-       c.msgIDsLock.Unlock()
-
-       return id, nil
-}
index d3c2e2c..3184ef5 100644 (file)
@@ -145,6 +145,7 @@ func TestNotifications(t *testing.T) {
 }
 
 func TestNilConnection(t *testing.T) {
+       RegisterTestingT(t)
        var conn *Connection
 
        ch, err := conn.NewAPIChannel()
@@ -168,6 +169,21 @@ func TestDoubleConnection(t *testing.T) {
        Expect(conn).Should(BeNil())
 }
 
+func TestAsyncConnection(t *testing.T) {
+       ctx := setupTest(t)
+       defer ctx.teardownTest()
+
+       ctx.conn.Disconnect()
+       conn, ch, err := AsyncConnect(ctx.mockVpp)
+       ctx.conn = conn
+
+       Expect(err).ShouldNot(HaveOccurred())
+       Expect(conn).ShouldNot(BeNil())
+
+       ev := <-ch
+       Expect(ev.State).Should(BeEquivalentTo(Connected))
+}
+
 func TestFullBuffer(t *testing.T) {
        ctx := setupTest(t)
        defer ctx.teardownTest()
diff --git a/core/request_handler.go b/core/request_handler.go
new file mode 100644 (file)
index 0000000..f4f5e92
--- /dev/null
@@ -0,0 +1,213 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+import (
+       "errors"
+       "fmt"
+       "sync/atomic"
+
+       logger "github.com/Sirupsen/logrus"
+
+       "git.fd.io/govpp.git/api"
+       "git.fd.io/govpp.git/core/bin_api/vpe"
+)
+
+// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
+func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+       for {
+               select {
+               case req, ok := <-ch.ReqChan:
+                       // new request on the request channel
+                       if !ok {
+                               // after closing the request channel, release API channel and return
+                               c.releaseAPIChannel(ch, chMeta)
+                               return
+                       }
+                       c.processRequest(ch, chMeta, req)
+
+               case req := <-ch.NotifSubsChan:
+                       // new request on the notification subscribe channel
+                       c.processNotifSubscribeRequest(ch, req)
+               }
+       }
+}
+
+// processRequest processes a single request received on the request channel.
+func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+       // check whether we are connected to VPP
+       if atomic.LoadUint32(&c.connected) == 0 {
+               error := errors.New("not connected to VPP, ignoring the request")
+               log.Error(error)
+               sendReply(ch, &api.VppReply{Error: error})
+               return error
+       }
+
+       // retrieve message ID
+       msgID, err := c.GetMessageID(req.Message)
+       if err != nil {
+               error := fmt.Errorf("unable to retrieve message ID: %v", err)
+               log.WithFields(logger.Fields{
+                       "msg_name": req.Message.GetMessageName(),
+                       "msg_crc":  req.Message.GetCrcString(),
+               }).Error(err)
+               sendReply(ch, &api.VppReply{Error: error})
+               return error
+       }
+
+       // encode the message into binary
+       data, err := c.codec.EncodeMsg(req.Message, msgID)
+       if err != nil {
+               error := fmt.Errorf("unable to encode the messge: %v", err)
+               log.WithFields(logger.Fields{
+                       "context": chMeta.id,
+                       "msg_id":  msgID,
+               }).Error(error)
+               sendReply(ch, &api.VppReply{Error: error})
+               return error
+       }
+
+       // send the message
+       log.WithFields(logger.Fields{
+               "context":  chMeta.id,
+               "msg_id":   msgID,
+               "msg_size": len(data),
+       }).Debug("Sending a message to VPP.")
+
+       if req.Multipart {
+               // expect multipart response
+               atomic.StoreUint32(&chMeta.multipart, 1)
+       }
+
+       // send the request to VPP
+       c.vpp.SendMsg(chMeta.id, data)
+
+       if req.Multipart {
+               // send a control ping to determine end of the multipart response
+               ping := &vpe.ControlPing{}
+               pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
+
+               log.WithFields(logger.Fields{
+                       "context":  chMeta.id,
+                       "msg_id":   c.pingReqID,
+                       "msg_size": len(pingData),
+               }).Debug("Sending a control ping to VPP.")
+
+               c.vpp.SendMsg(chMeta.id, pingData)
+       }
+
+       return nil
+}
+
+// msgCallback is called whenever any binary API message comes from VPP.
+func msgCallback(context uint32, msgID uint16, data []byte) {
+       connLock.RLock()
+       defer connLock.RUnlock()
+
+       if conn == nil {
+               log.Warn("Already disconnected, ignoring the message.")
+               return
+       }
+
+       log.WithFields(logger.Fields{
+               "context":  context,
+               "msg_id":   msgID,
+               "msg_size": len(data),
+       }).Debug("Received a message from VPP.")
+
+       if context == 0 || conn.isNotificationMessage(msgID) {
+               // process the message as a notification
+               conn.sendNotifications(msgID, data)
+               return
+       }
+
+       // match ch according to the context
+       conn.channelsLock.RLock()
+       ch, ok := conn.channels[context]
+       conn.channelsLock.RUnlock()
+
+       if !ok {
+               log.WithFields(logger.Fields{
+                       "context": context,
+                       "msg_id":  msgID,
+               }).Error("Context ID not known, ignoring the message.")
+               return
+       }
+
+       chMeta := ch.Metadata().(*channelMetadata)
+       lastReplyReceived := false
+       // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
+       if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+               lastReplyReceived = true
+       }
+
+       // send the data to the channel
+       sendReply(ch, &api.VppReply{
+               MessageID:         msgID,
+               Data:              data,
+               LastReplyReceived: lastReplyReceived,
+       })
+}
+
+// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
+// it logs the error and do not send the message.
+func sendReply(ch *api.Channel, reply *api.VppReply) {
+       select {
+       case ch.ReplyChan <- reply:
+               // reply sent successfully
+       default:
+               // unable to write into the channel without blocking
+               log.WithFields(logger.Fields{
+                       "channel": ch,
+                       "msg_id":  reply.MessageID,
+               }).Warn("Unable to send the reply, reciever end not ready.")
+       }
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+       if c == nil {
+               return 0, errors.New("nil connection passed in")
+       }
+       return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
+}
+
+// messageNameToID returns message ID of a message identified by its name and CRC.
+func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
+       // try to get the ID from the map
+       c.msgIDsLock.RLock()
+       id, ok := c.msgIDs[msgName+msgCrc]
+       c.msgIDsLock.RUnlock()
+       if ok {
+               return id, nil
+       }
+
+       // get the ID using VPP API
+       id, err := c.vpp.GetMsgID(msgName, msgCrc)
+       if err != nil {
+               error := fmt.Errorf("unable to retrieve message ID: %v", err)
+               log.WithFields(logger.Fields{
+                       "msg_name": msgName,
+                       "msg_crc":  msgCrc,
+               }).Errorf("unable to retrieve message ID: %v", err)
+               return id, error
+       }
+
+       c.msgIDsLock.Lock()
+       c.msgIDs[msgName+msgCrc] = id
+       c.msgIDsLock.Unlock()
+
+       return id, nil
+}
similarity index 75%
rename from examples/example_client.go
rename to examples/cmd/simple-client/simple_client.go
index f2e5804..6e46d6b 100644 (file)
@@ -12,7 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Example VPP management application that exercises the govpp API on real-world use-cases.
+// Binary simple-client is an example VPP management application that exercises the
+// govpp API on real-world use-cases.
 package main
 
 // Generates Go bindings for all VPP APIs located in the json directory.
@@ -22,19 +23,16 @@ import (
        "fmt"
        "net"
        "os"
-       "os/signal"
 
        "git.fd.io/govpp.git"
        "git.fd.io/govpp.git/api"
-       "git.fd.io/govpp.git/api/ifcounters"
-       "git.fd.io/govpp.git/core/bin_api/vpe"
        "git.fd.io/govpp.git/examples/bin_api/acl"
        "git.fd.io/govpp.git/examples/bin_api/interfaces"
        "git.fd.io/govpp.git/examples/bin_api/tap"
 )
 
 func main() {
-       fmt.Println("Starting example VPP client...")
+       fmt.Println("Starting simple VPP client...")
 
        // connect to VPP
        conn, err := govpp.Connect()
@@ -64,8 +62,6 @@ func main() {
 
        interfaceDump(ch)
        interfaceNotifications(ch)
-
-       //interfaceCounters(ch)
 }
 
 // compatibilityCheck shows how an management application can check whether generated API messages are
@@ -223,56 +219,3 @@ func interfaceNotifications(ch *api.Channel) {
        // unsubscribe from delivery of the notifications
        ch.UnsubscribeNotification(subs)
 }
-
-// interfaceCounters is an example of using notification API to periodically retrieve interface statistics.
-// The ifcounters package contains the API that can be used to decode the strange VnetInterfaceCounters message.
-func interfaceCounters(ch *api.Channel) {
-       // subscribe for interface counters notifications
-       notifChan := make(chan api.Message, 100)
-       subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters)
-
-       // enable interface counters notifications from VPP
-       ch.SendRequest(&vpe.WantStats{
-               Pid:           uint32(os.Getpid()),
-               EnableDisable: 1,
-       }).ReceiveReply(&vpe.WantStatsReply{})
-
-       // create channel for Interrupt signal
-       sigChan := make(chan os.Signal, 1)
-       signal.Notify(sigChan, os.Interrupt)
-
-       // loop until Interrupt signal is received
-loop:
-       for {
-               select {
-               case <-sigChan:
-                       // interrupt received
-                       break loop
-               case notifMsg := <-notifChan:
-                       notif := notifMsg.(*interfaces.VnetInterfaceCounters)
-                       // notification received
-                       fmt.Printf("%+v\n", notif)
-
-                       if notif.IsCombined == 0 {
-                               // simple counter
-                               counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*notif))
-                               if err != nil {
-                                       fmt.Println("Error:", err)
-                               } else {
-                                       fmt.Printf("%+v\n", counters)
-                               }
-                       } else {
-                               // combined counter
-                               counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*notif))
-                               if err != nil {
-                                       fmt.Println("Error:", err)
-                               } else {
-                                       fmt.Printf("%+v\n", counters)
-                               }
-                       }
-               }
-       }
-
-       // unsubscribe from delivery of the notifications
-       ch.UnsubscribeNotification(subs)
-}
diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go
new file mode 100644 (file)
index 0000000..fc40b24
--- /dev/null
@@ -0,0 +1,132 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Binary stats-client is an example VPP management application that exercises the
+// govpp API for interface counters together with asynchronous connection to VPP.
+package main
+
+// Generates Go bindings for all VPP APIs located in the json directory.
+//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
+
+import (
+       "fmt"
+       "os"
+       "os/signal"
+
+       "git.fd.io/govpp.git"
+       "git.fd.io/govpp.git/api"
+       "git.fd.io/govpp.git/api/ifcounters"
+       "git.fd.io/govpp.git/core"
+       "git.fd.io/govpp.git/core/bin_api/vpe"
+       "git.fd.io/govpp.git/examples/bin_api/interfaces"
+)
+
+func main() {
+       fmt.Println("Starting stats VPP client...")
+
+       // async connect to VPP
+       conn, statCh, err := govpp.AsyncConnect()
+       if err != nil {
+               fmt.Println("Error:", err)
+               os.Exit(1)
+       }
+       defer conn.Disconnect()
+
+       // create an API channel that will be used in the examples
+       ch, err := conn.NewAPIChannel()
+       if err != nil {
+               fmt.Println("Error:", err)
+               os.Exit(1)
+       }
+       defer ch.Close()
+
+       // create channel for Interrupt signal
+       sigChan := make(chan os.Signal, 1)
+       signal.Notify(sigChan, os.Interrupt)
+
+       var subs *api.NotifSubscription
+       var notifChan chan api.Message
+
+       // loop until Interrupt signal is received
+loop:
+       for {
+               select {
+
+               case connEvent := <-statCh:
+                       // VPP connection state change
+                       switch connEvent.State {
+                       case core.Connected:
+                               fmt.Println("VPP connected.")
+                               if subs == nil {
+                                       subs, notifChan = subscribeNotification(ch)
+                               }
+                               requestStatistics(ch)
+
+                       case core.Disconnected:
+                               fmt.Println("VPP disconnected.")
+                       }
+
+               case notifMsg := <-notifChan:
+                       // counter notification received
+                       processCounters(notifMsg.(*interfaces.VnetInterfaceCounters))
+
+               case <-sigChan:
+                       // interrupt received
+                       fmt.Println("Interrupt received, exiting.")
+                       break loop
+               }
+       }
+
+       ch.UnsubscribeNotification(subs)
+}
+
+// subscribeNotification subscribes for interface counters notifications.
+func subscribeNotification(ch *api.Channel) (*api.NotifSubscription, chan api.Message) {
+
+       notifChan := make(chan api.Message, 100)
+       subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters)
+
+       return subs, notifChan
+}
+
+// requestStatistics requests interface counters notifications from VPP.
+func requestStatistics(ch *api.Channel) {
+       ch.SendRequest(&vpe.WantStats{
+               Pid:           uint32(os.Getpid()),
+               EnableDisable: 1,
+       }).ReceiveReply(&vpe.WantStatsReply{})
+}
+
+// processCounters processes a counter message received from VPP.
+func processCounters(msg *interfaces.VnetInterfaceCounters) {
+       fmt.Printf("%+v\n", msg)
+
+       if msg.IsCombined == 0 {
+               // simple counter
+               counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*msg))
+               if err != nil {
+                       fmt.Println("Error:", err)
+               } else {
+                       fmt.Printf("%+v\n", counters)
+               }
+       } else {
+               // combined counter
+               counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*msg))
+               if err != nil {
+                       fmt.Println("Error:", err)
+               } else {
+                       fmt.Printf("%+v\n", counters)
+               }
+       }
+}
index 5d3ed7f..6f0cc2e 100644 (file)
--- a/govpp.go
+++ b/govpp.go
@@ -24,6 +24,7 @@ var vppAdapter adapter.VppAdapter // VPP Adapter that will be used in the subseq
 
 // Connect connects the govpp core to VPP either using the default VPP Adapter, or using the adapter previously
 // set by SetAdapter (useful mostly just for unit/integration tests with mocked VPP adapter).
+// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect() (*core.Connection, error) {
        if vppAdapter == nil {
                vppAdapter = vppapiclient.NewVppAdapter()
@@ -31,6 +32,18 @@ func Connect() (*core.Connection, error) {
        return core.Connect(vppAdapter)
 }
 
+// AsyncConnect asynchronously connects the govpp core to VPP either using the default VPP Adapter,
+// or using the adapter previously set by SetAdapter.
+// This call does not block until connection is established, it returns immediately. The caller is
+// supposed to watch the returned ConnectionState channel for Connected/Disconnected events.
+// In case of disconnect, the library will asynchronously try to reconnect.
+func AsyncConnect() (*core.Connection, chan core.ConnectionEvent, error) {
+       if vppAdapter == nil {
+               vppAdapter = vppapiclient.NewVppAdapter()
+       }
+       return core.AsyncConnect(vppAdapter)
+}
+
 // SetAdapter sets the adapter that will be used for connections to VPP in the subsequent `Connect` calls.
 func SetAdapter(ad adapter.VppAdapter) {
        vppAdapter = ad