+func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
+ rpc := new(StatsRPC)
+ if err := rpc.connect(stats); err != nil {
+ return nil, err
+ }
+ return rpc, nil
+}
+
+func (s *StatsRPC) watchConnection() {
+ heartbeatTicker := time.NewTicker(10 * time.Second).C
+ atomic.StoreUint32(&s.available, 1)
+ log.Debugln("enabling statsRPC service")
+
+ count := 0
+ prev := new(api.SystemStats)
+
+ s.mu.Lock()
+ if err := s.statsConn.GetSystemStats(prev); err != nil {
+ atomic.StoreUint32(&s.available, 0)
+ log.Warnf("disabling statsRPC service, reason: %v", err)
+ }
+ s.mu.Unlock()
+
+ for {
+ select {
+ case <-heartbeatTicker:
+ // If disconnect was called exit.
+ if atomic.LoadUint32(&s.isConnected) == 0 {
+ atomic.StoreUint32(&s.available, 0)
+ return
+ }
+
+ curr := new(api.SystemStats)
+
+ s.mu.Lock()
+ if err := s.statsConn.GetSystemStats(curr); err != nil {
+ atomic.StoreUint32(&s.available, 0)
+ log.Warnf("disabling statsRPC service, reason: %v", err)
+ }
+ s.mu.Unlock()
+
+ if curr.Heartbeat <= prev.Heartbeat {
+ count++
+ // vpp might have crashed/reset... try reconnecting
+ if count == 5 {
+ count = 0
+ atomic.StoreUint32(&s.available, 0)
+ log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...")
+ s.statsConn.Disconnect()
+ for {
+ var err error
+ s.statsConn, err = core.ConnectStats(s.stats)
+ if err == nil {
+ atomic.StoreUint32(&s.available, 1)
+ log.Debugln("enabling statsRPC service")
+ break
+ }
+ time.Sleep(5 * time.Second)
+ }
+ }
+ } else {
+ count = 0
+ }
+
+ prev = curr
+ case <-s.done:
+ return
+ }
+ }
+}
+
+func (s *StatsRPC) connect(stats adapter.StatsAPI) error {
+ if atomic.LoadUint32(&s.isConnected) == 1 {
+ return errors.New("connection already exists")
+ }
+ s.stats = stats
+ var err error
+ s.statsConn, err = core.ConnectStats(s.stats)
+ if err != nil {
+ return err
+ }
+ s.done = make(chan struct{})
+ atomic.StoreUint32(&s.isConnected, 1)
+
+ go s.watchConnection()
+ return nil
+}
+
+func (s *StatsRPC) disconnect() {
+ if atomic.LoadUint32(&s.isConnected) == 1 {
+ atomic.StoreUint32(&s.isConnected, 0)
+ close(s.done)
+ s.statsConn.Disconnect()
+ s.statsConn = nil
+ }
+}
+
+func (s *StatsRPC) serviceAvailable() bool {
+ return atomic.LoadUint32(&s.available) == 1