Error error
}
-var (
- connLock sync.RWMutex // lock for the global connection
- conn *Connection // global handle to the Connection (used in the message receive callback)
-)
-
// Connection represents a shared memory connection to VPP via vppAdapter.
type Connection struct {
- vpp adapter.VppAdapter // VPP adapter
- connected uint32 // non-zero if the adapter is connected to VPP
+ vppClient adapter.VppAPI // VPP binary API client adapter
+
+ vppConnected uint32 // non-zero if the adapter is connected to VPP
codec *codec.MsgCodec // message codec
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
lastReply time.Time // time of the last received reply from VPP
}
-func newConnection(vpp adapter.VppAdapter) *Connection {
+func newConnection(binapi adapter.VppAPI) *Connection {
c := &Connection{
- vpp: vpp,
+ vppClient: binapi,
codec: &codec.MsgCodec{},
msgIDs: make(map[string]uint16),
msgMap: make(map[uint16]api.Message),
channels: make(map[uint16]*Channel),
subscriptions: make(map[uint16][]*subscriptionCtx),
}
- vpp.SetMsgCallback(c.msgCallback)
+ binapi.SetMsgCallback(c.msgCallback)
return c
}
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
-func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
+func Connect(binapi adapter.VppAPI) (*Connection, error) {
// create new connection handle
- c, err := createConnection(vppAdapter)
- if err != nil {
- return nil, err
- }
+ c := newConnection(binapi)
// blocking attempt to connect to VPP
if err := c.connectVPP(); err != nil {
// and ConnectionState channel. This call does not block until connection is established, it
// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
+func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
// create new connection handle
- c, err := createConnection(vppAdapter)
- if err != nil {
- return nil, nil, err
- }
+ c := newConnection(binapi)
// asynchronously attempt to connect to VPP
connChan := make(chan ConnectionEvent, NotificationChanBufSize)
return c, connChan, nil
}
-// Disconnect disconnects from VPP and releases all connection-related resources.
-func (c *Connection) Disconnect() {
- if c == nil {
- return
- }
-
- connLock.Lock()
- defer connLock.Unlock()
-
- if c.vpp != nil {
- c.disconnectVPP()
- }
- conn = nil
-}
-
-// newConnection returns new connection handle.
-func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
- connLock.Lock()
- defer connLock.Unlock()
-
- if conn != nil {
- return nil, errors.New("only one connection per process is supported")
- }
-
- conn = newConnection(vppAdapter)
-
- return conn, nil
-}
-
// connectVPP performs blocking attempt to connect to VPP.
func (c *Connection) connectVPP() error {
log.Debug("Connecting to VPP..")
// blocking connect
- if err := c.vpp.Connect(); err != nil {
+ if err := c.vppClient.Connect(); err != nil {
return err
}
log.Debugf("Connected to VPP.")
if err := c.retrieveMessageIDs(); err != nil {
- c.vpp.Disconnect()
+ c.vppClient.Disconnect()
return fmt.Errorf("VPP is incompatible: %v", err)
}
// store connected state
- atomic.StoreUint32(&c.connected, 1)
+ atomic.StoreUint32(&c.vppConnected, 1)
return nil
}
+// Disconnect disconnects from VPP and releases all connection-related resources.
+func (c *Connection) Disconnect() {
+ if c == nil {
+ return
+ }
+
+ if c.vppClient != nil {
+ c.disconnectVPP()
+ }
+}
+
+// disconnectVPP disconnects from VPP in case it is connected.
+func (c *Connection) disconnectVPP() {
+ if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
+ c.vppClient.Disconnect()
+ }
+}
+
func (c *Connection) NewAPIChannel() (api.Channel, error) {
return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
}
c.msgMap[msgID] = msg
}
- msgs := api.GetAllMessages()
+ msgs := api.GetRegisteredMessages()
for name, msg := range msgs {
- msgID, err := c.vpp.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
+ msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
if err != nil {
return err
}
// fallback for control ping when vpe package is not imported
if c.pingReqID == 0 {
- c.pingReqID, err = c.vpp.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
+ c.pingReqID, err = c.vppClient.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
if err != nil {
return err
}
addMsg(c.pingReqID, msgControlPing)
}
if c.pingReplyID == 0 {
- c.pingReplyID, err = c.vpp.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
+ c.pingReplyID, err = c.vppClient.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
if err != nil {
return err
}
return nil
}
-// disconnectVPP disconnects from VPP in case it is connected.
-func (c *Connection) disconnectVPP() {
- if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
- c.vpp.Disconnect()
- }
-}
-
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// loop until connected
for {
- if err := c.vpp.WaitReady(); err != nil {
+ if err := c.vppClient.WaitReady(); err != nil {
log.Warnf("wait ready failed: %v", err)
}
if err := c.connectVPP(); err == nil {
// sleep until next health check probe period
time.Sleep(HealthCheckProbeInterval)
- if atomic.LoadUint32(&c.connected) == 0 {
+ if atomic.LoadUint32(&c.vppConnected) == 0 {
// Disconnect has been called in the meantime, return the healthcheck - reconnect loop
log.Debug("Disconnected on request, exiting health check loop.")
return