Provide error counters per worker for statsclient
[govpp.git] / core / stats.go
index 2a9e964..3218f1e 100644 (file)
@@ -3,7 +3,6 @@ package core
 import (
        "path"
        "strings"
-       "sync/atomic"
        "time"
 
        "git.fd.io/govpp.git/adapter"
@@ -11,8 +10,9 @@ import (
 )
 
 var (
-       RetryUpdateCount = 10
-       RetryUpdateDelay = time.Millisecond * 10
+       RetryUpdateCount    = 10
+       RetryUpdateDelay    = time.Millisecond * 10
+       HealthCheckInterval = time.Second // default health check probe interval
 )
 
 const (
@@ -39,6 +39,10 @@ const (
 
        CounterStatsPrefix = "/err/"
 
+       MemoryStatPrefix  = "/mem/statseg"
+       MemoryStats_Total = "total"
+       MemoryStats_Used  = "used"
+
        InterfaceStatsPrefix         = "/if/"
        InterfaceStats_Names         = InterfaceStatsPrefix + "names"
        InterfaceStats_Drops         = InterfaceStatsPrefix + "drops"
@@ -72,48 +76,85 @@ const (
 type StatsConnection struct {
        statsClient adapter.StatsAPI
 
-       // connected is true if the adapter is connected to VPP
-       connected uint32
+       maxAttempts int           // interval for reconnect attempts
+       recInterval time.Duration // maximum number of reconnect attempts
+
+       connChan chan ConnectionEvent // connection event channel
+       done     chan struct{}        // to terminate stats connection watcher
 
        errorStatsData *adapter.StatDir
        nodeStatsData  *adapter.StatDir
        ifaceStatsData *adapter.StatDir
        sysStatsData   *adapter.StatDir
        bufStatsData   *adapter.StatDir
+       memStatsData   *adapter.StatDir
 }
 
-func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
+func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
+       if attempts == 0 {
+               attempts = DefaultMaxReconnectAttempts
+       }
+       if interval == 0 {
+               interval = DefaultReconnectInterval
+       }
+
        return &StatsConnection{
                statsClient: stats,
+               maxAttempts: attempts,
+               recInterval: interval,
+               connChan:    make(chan ConnectionEvent, NotificationChanBufSize),
+               done:        make(chan struct{}),
        }
 }
 
-// Connect connects to Stats API using specified adapter and returns a connection handle.
+// ConnectStats connects to Stats API using specified adapter and returns a connection handle.
 // This call blocks until it is either connected, or an error occurs.
 // Only one connection attempt will be performed.
 func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
-       c := newStatsConnection(stats)
+       log.Debug("Connecting to stats..")
+       c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
 
-       if err := c.connectClient(); err != nil {
+       if err := c.statsClient.Connect(); err != nil {
                return nil, err
        }
+       log.Debugf("Connected to stats.")
 
        return c, nil
 }
 
-func (c *StatsConnection) connectClient() error {
-       log.Debug("Connecting to stats..")
+// AsyncConnectStats  connects to the VPP stats socket asynchronously and returns the connection
+// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
+// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
+// socket in case the session was disconnected.
+func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
+       log.Debug("Connecting to stats asynchronously..")
+       c := newStatsConnection(stats, attempts, interval)
 
-       if err := c.statsClient.Connect(); err != nil {
-               return err
-       }
+       go c.connectLoop()
 
-       log.Debugf("Connected to stats.")
+       return c, c.connChan, nil
+}
 
-       // store connected state
-       atomic.StoreUint32(&c.connected, 1)
+func (c *StatsConnection) connectLoop() {
+       log.Debug("Asynchronously connecting to stats..")
+       var reconnectAttempts int
 
-       return nil
+       // loop until connected
+       for {
+               if err := c.statsClient.Connect(); err == nil {
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
+                       break
+               } else if reconnectAttempts < c.maxAttempts {
+                       reconnectAttempts++
+                       log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+                       time.Sleep(c.recInterval)
+               } else {
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
+                       return
+               }
+       }
+       // start monitoring stats connection state
+       go c.monitorSocket()
 }
 
 // Disconnect disconnects from Stats API and releases all connection-related resources.
@@ -122,14 +163,41 @@ func (c *StatsConnection) Disconnect() {
                return
        }
        if c.statsClient != nil {
-               c.disconnectClient()
+               if err := c.statsClient.Disconnect(); err != nil {
+                       log.Debugf("disconnecting stats client failed: %v", err)
+               }
        }
+       close(c.connChan)
+       close(c.done)
 }
 
-func (c *StatsConnection) disconnectClient() {
-       if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
-               if err := c.statsClient.Disconnect(); err != nil {
-                       log.Debugf("disconnecting stats client failed: %v", err)
+func (c *StatsConnection) monitorSocket() {
+       var state, lastState ConnectionState
+       ticker := time.NewTicker(HealthCheckInterval)
+
+       for {
+               select {
+               case <-ticker.C:
+                       _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
+                       state = Connected
+                       if err == adapter.ErrStatsDataBusy {
+                               state = NotResponding
+                       }
+                       if err == adapter.ErrStatsDisconnected {
+                               state = Disconnected
+                       }
+                       if err == adapter.ErrStatsAccessFailed {
+                               state = Failed
+                       }
+                       if state == lastState {
+                               continue
+                       }
+                       lastState = state
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
+               case <-c.done:
+                       log.Debugf("health check watcher closed")
+                       c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
+                       break
                }
        }
 }
@@ -177,7 +245,7 @@ func (c *StatsConnection) updateStats(statDir **adapter.StatDir, patterns ...str
        return err
 }
 
-// UpdateSystemStats retrieves VPP system stats.
+// GetSystemStats retrieves VPP system stats.
 func (c *StatsConnection) GetSystemStats(sysStats *api.SystemStats) (err error) {
        if err := c.updateStats(&c.sysStatsData, SystemStatsPrefix); err != nil {
                return err
@@ -198,6 +266,9 @@ func (c *StatsConnection) GetSystemStats(sysStats *api.SystemStats) (err error)
                        if ss, ok := stat.Data.(adapter.SimpleCounterStat); ok {
                                vals = make([]uint64, len(ss))
                                for w := range ss {
+                                       if ss[w] == nil {
+                                               continue
+                                       }
                                        vals[w] = uint64(ss[w][0])
                                }
                        }
@@ -234,7 +305,11 @@ func (c *StatsConnection) GetErrorStats(errorStats *api.ErrorStats) (err error)
                        continue
                }
                if errStat, ok := stat.Data.(adapter.ErrorStat); ok {
-                       errorStats.Errors[i].Value = uint64(errStat)
+                       values := make([]uint64, len(errStat))
+                       for j, errStatW := range errStat {
+                               values[j] = uint64(errStatW)
+                       }
+                       errorStats.Errors[i].Values = values
                }
        }
 
@@ -468,3 +543,33 @@ func (c *StatsConnection) GetBufferStats(bufStats *api.BufferStats) (err error)
 
        return nil
 }
+
+func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error) {
+       if err := c.updateStats(&c.memStatsData, MemoryStatPrefix); err != nil {
+               return err
+       }
+
+       for _, stat := range c.memStatsData.Entries {
+               _, f := path.Split(string(stat.Name))
+               var val float64
+               m, ok := stat.Data.(adapter.ScalarStat)
+               if ok {
+                       val = float64(m)
+               }
+               switch f {
+               case MemoryStats_Total:
+                       memStats.Total = val
+               case MemoryStats_Used:
+                       memStats.Used = val
+               }
+       }
+       return nil
+}
+
+func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
+       select {
+       case c.connChan <- event:
+       default:
+               log.Warn("Stats connection state channel is full, discarding value.")
+       }
+}