Added asynchronous connection for stats socket
[govpp.git] / core / stats.go
index f2da494..55c287e 100644 (file)
@@ -3,7 +3,6 @@ package core
 import (
        "path"
        "strings"
-       "sync/atomic"
        "time"
 
        "git.fd.io/govpp.git/adapter"
@@ -11,8 +10,9 @@ import (
 )
 
 var (
-       RetryUpdateCount = 10
-       RetryUpdateDelay = time.Millisecond * 10
+       RetryUpdateCount    = 10
+       RetryUpdateDelay    = time.Millisecond * 10
+       HealthCheckInterval = time.Second // default health check probe interval
 )
 
 const (
@@ -76,8 +76,11 @@ const (
 type StatsConnection struct {
        statsClient adapter.StatsAPI
 
-       // connected is true if the adapter is connected to VPP
-       connected uint32
+       maxAttempts int           // interval for reconnect attempts
+       recInterval time.Duration // maximum number of reconnect attempts
+
+       connChan chan ConnectionEvent // connection event channel
+       done     chan struct{}        // to terminate stats connection watcher
 
        errorStatsData *adapter.StatDir
        nodeStatsData  *adapter.StatDir
@@ -87,9 +90,20 @@ type StatsConnection struct {
        memStatsData   *adapter.StatDir
 }
 
-func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
+func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
+       if attempts == 0 {
+               attempts = DefaultMaxReconnectAttempts
+       }
+       if interval == 0 {
+               interval = DefaultReconnectInterval
+       }
+
        return &StatsConnection{
                statsClient: stats,
+               maxAttempts: attempts,
+               recInterval: interval,
+               connChan:    make(chan ConnectionEvent, NotificationChanBufSize),
+               done:        make(chan struct{}),
        }
 }
 
@@ -97,28 +111,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
 // This call blocks until it is either connected, or an error occurs.
 // Only one connection attempt will be performed.
 func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
-       c := newStatsConnection(stats)
+       log.Debug("Connecting to stats..")
+       c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
 
-       if err := c.connectClient(); err != nil {
+       if err := c.statsClient.Connect(); err != nil {
                return nil, err
        }
+       log.Debugf("Connected to stats.")
 
        return c, nil
 }
 
-func (c *StatsConnection) connectClient() error {
-       log.Debug("Connecting to stats..")
+// AsyncConnectStats  connects to the VPP stats socket asynchronously and returns the connection
+// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
+// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
+// socket in case the session was disconnected.
+func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
+       log.Debug("Connecting to stats asynchronously..")
+       c := newStatsConnection(stats, attempts, interval)
 
-       if err := c.statsClient.Connect(); err != nil {
-               return err
-       }
+       go c.connectLoop()
 
-       log.Debugf("Connected to stats.")
+       return c, c.connChan, nil
+}
 
-       // store connected state
-       atomic.StoreUint32(&c.connected, 1)
+func (c *StatsConnection) connectLoop() {
+       log.Debug("Asynchronously connecting to stats..")
+       var reconnectAttempts int
 
-       return nil
+       // loop until connected
+       for {
+               if err := c.statsClient.Connect(); err == nil {
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
+                       break
+               } else if reconnectAttempts < c.maxAttempts {
+                       reconnectAttempts++
+                       log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+                       time.Sleep(c.recInterval)
+               } else {
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
+                       return
+               }
+       }
+       // start monitoring stats connection state
+       go c.monitorSocket()
 }
 
 // Disconnect disconnects from Stats API and releases all connection-related resources.
@@ -127,14 +163,41 @@ func (c *StatsConnection) Disconnect() {
                return
        }
        if c.statsClient != nil {
-               c.disconnectClient()
+               if err := c.statsClient.Disconnect(); err != nil {
+                       log.Debugf("disconnecting stats client failed: %v", err)
+               }
        }
+       close(c.connChan)
+       close(c.done)
 }
 
-func (c *StatsConnection) disconnectClient() {
-       if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
-               if err := c.statsClient.Disconnect(); err != nil {
-                       log.Debugf("disconnecting stats client failed: %v", err)
+func (c *StatsConnection) monitorSocket() {
+       var state, lastState ConnectionState
+       ticker := time.NewTicker(HealthCheckInterval)
+
+       for {
+               select {
+               case <-ticker.C:
+                       _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
+                       state = Connected
+                       if err == adapter.ErrStatsDataBusy {
+                               state = NotResponding
+                       }
+                       if err == adapter.ErrStatsDisconnected {
+                               state = Disconnected
+                       }
+                       if err == adapter.ErrStatsAccessFailed {
+                               state = Failed
+                       }
+                       if state == lastState {
+                               continue
+                       }
+                       lastState = state
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
+               case <-c.done:
+                       log.Debugf("health check watcher closed")
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
+                       break
                }
        }
 }
@@ -498,3 +561,11 @@ func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error)
        }
        return nil
 }
+
+func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
+       select {
+       case c.connChan <- event:
+       default:
+               log.Warn("Stats connection state channel is full, discarding value.")
+       }
+}