X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fstats.go;h=40339816157a202e25e295932b2ad4192954e63b;hb=4a4094e6cdc7f5d9e5a470ccf82df1c780c7e224;hp=f2da4942e42d88f644b5bc48d1258655cc54e778;hpb=b74e3b37479a7fa763bed1f2c76de612ee51dcbc;p=govpp.git diff --git a/core/stats.go b/core/stats.go index f2da494..4033981 100644 --- a/core/stats.go +++ b/core/stats.go @@ -3,7 +3,6 @@ package core import ( "path" "strings" - "sync/atomic" "time" "git.fd.io/govpp.git/adapter" @@ -11,8 +10,9 @@ import ( ) var ( - RetryUpdateCount = 10 - RetryUpdateDelay = time.Millisecond * 10 + RetryUpdateCount = 10 + RetryUpdateDelay = time.Millisecond * 10 + HealthCheckInterval = time.Second // default health check probe interval ) const ( @@ -39,9 +39,11 @@ const ( CounterStatsPrefix = "/err/" - MemoryStatPrefix = "/mem/statseg" - MemoryStats_Total = "total" - MemoryStats_Used = "used" + MemoryStatSegPrefix = "/mem/statseg" + MemoryStatPrefix = "/mem/stat" + MemoryMainPrefix = "/mem/main" + MemoryStats_Total = "total" + MemoryStats_Used = "used" InterfaceStatsPrefix = "/if/" InterfaceStats_Names = InterfaceStatsPrefix + "names" @@ -76,8 +78,11 @@ const ( type StatsConnection struct { statsClient adapter.StatsAPI - // connected is true if the adapter is connected to VPP - connected uint32 + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts + + connChan chan ConnectionEvent // connection event channel + done chan struct{} // to terminate stats connection watcher errorStatsData *adapter.StatDir nodeStatsData *adapter.StatDir @@ -87,9 +92,20 @@ type StatsConnection struct { memStatsData *adapter.StatDir } -func newStatsConnection(stats adapter.StatsAPI) *StatsConnection { +func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } + return &StatsConnection{ statsClient: stats, + maxAttempts: attempts, + recInterval: interval, + connChan: make(chan ConnectionEvent, NotificationChanBufSize), + done: make(chan struct{}), } } @@ -97,28 +113,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection { // This call blocks until it is either connected, or an error occurs. // Only one connection attempt will be performed. func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) { - c := newStatsConnection(stats) + log.Debug("Connecting to stats..") + c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval) - if err := c.connectClient(); err != nil { + if err := c.statsClient.Connect(); err != nil { return nil, err } + log.Debugf("Connected to stats.") return c, nil } -func (c *StatsConnection) connectClient() error { - log.Debug("Connecting to stats..") +// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection +// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent +// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the +// socket in case the session was disconnected. +func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) { + log.Debug("Connecting to stats asynchronously..") + c := newStatsConnection(stats, attempts, interval) - if err := c.statsClient.Connect(); err != nil { - return err - } + go c.connectLoop() - log.Debugf("Connected to stats.") + return c, c.connChan, nil +} - // store connected state - atomic.StoreUint32(&c.connected, 1) +func (c *StatsConnection) connectLoop() { + log.Debug("Asynchronously connecting to stats..") + var reconnectAttempts int - return nil + // loop until connected + for { + if err := c.statsClient.Connect(); err == nil { + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected}) + break + } else if reconnectAttempts < c.maxAttempts { + reconnectAttempts++ + log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) + } else { + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}) + return + } + } + // start monitoring stats connection state + go c.monitorSocket() } // Disconnect disconnects from Stats API and releases all connection-related resources. @@ -127,14 +165,41 @@ func (c *StatsConnection) Disconnect() { return } if c.statsClient != nil { - c.disconnectClient() + if err := c.statsClient.Disconnect(); err != nil { + log.Debugf("disconnecting stats client failed: %v", err) + } } + close(c.connChan) + close(c.done) } -func (c *StatsConnection) disconnectClient() { - if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { - if err := c.statsClient.Disconnect(); err != nil { - log.Debugf("disconnecting stats client failed: %v", err) +func (c *StatsConnection) monitorSocket() { + var state, lastState ConnectionState + ticker := time.NewTicker(HealthCheckInterval) + + for { + select { + case <-ticker.C: + _, err := c.statsClient.ListStats(SystemStats_Heartbeat) + state = Connected + if err == adapter.ErrStatsDataBusy { + state = NotResponding + } + if err == adapter.ErrStatsDisconnected { + state = Disconnected + } + if err == adapter.ErrStatsAccessFailed { + state = Failed + } + if state == lastState { + continue + } + lastState = state + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err}) + case <-c.done: + log.Debugf("health check watcher closed") + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil}) + break } } } @@ -242,7 +307,11 @@ func (c *StatsConnection) GetErrorStats(errorStats *api.ErrorStats) (err error) continue } if errStat, ok := stat.Data.(adapter.ErrorStat); ok { - errorStats.Errors[i].Value = uint64(errStat) + values := make([]uint64, len(errStat)) + for j, errStatW := range errStat { + values[j] = uint64(errStatW) + } + errorStats.Errors[i].Values = values } } @@ -478,23 +547,59 @@ func (c *StatsConnection) GetBufferStats(bufStats *api.BufferStats) (err error) } func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error) { - if err := c.updateStats(&c.memStatsData, MemoryStatPrefix); err != nil { + if err := c.updateStats(&c.memStatsData, MemoryStatSegPrefix, MemoryStatPrefix, MemoryMainPrefix); err != nil { return err } + convertStats := func(stats []adapter.Counter) api.MemoryCounters { + memUsg := make([]adapter.Counter, 7) + copy(memUsg, stats) + return api.MemoryCounters{ + Total: uint64(memUsg[0]), Used: uint64(memUsg[1]), Free: uint64(memUsg[2]), UsedMMap: uint64(memUsg[3]), + TotalAlloc: uint64(memUsg[4]), FreeChunks: uint64(memUsg[5]), Releasable: uint64(memUsg[6]), + } + } for _, stat := range c.memStatsData.Entries { - _, f := path.Split(string(stat.Name)) - var val float64 - m, ok := stat.Data.(adapter.ScalarStat) - if ok { - val = float64(m) - } - switch f { - case MemoryStats_Total: - memStats.Total = val - case MemoryStats_Used: - memStats.Used = val + if strings.Contains(string(stat.Name), MemoryStatSegPrefix) { + _, f := path.Split(string(stat.Name)) + var val float64 + m, ok := stat.Data.(adapter.ScalarStat) + if ok { + val = float64(m) + } + switch f { + case MemoryStats_Total: + memStats.Total = val + case MemoryStats_Used: + memStats.Used = val + } + } else if strings.Contains(string(stat.Name), MemoryStatPrefix) { + if perHeapStats, ok := stat.Data.(adapter.SimpleCounterStat); ok { + if memStats.Stat == nil { + memStats.Stat = make(map[int]api.MemoryCounters) + } + for heap, stats := range perHeapStats { + memStats.Stat[heap] = convertStats(stats) + } + } + } else if strings.Contains(string(stat.Name), MemoryMainPrefix) { + if perHeapStats, ok := stat.Data.(adapter.SimpleCounterStat); ok { + if memStats.Main == nil { + memStats.Main = make(map[int]api.MemoryCounters) + } + for heap, stats := range perHeapStats { + memStats.Main[heap] = convertStats(stats) + } + } } } return nil } + +func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) { + select { + case c.connChan <- event: + default: + log.Warn("Stats connection state channel is full, discarding value.") + } +}