Provide error counters per worker for statsclient
[govpp.git] / core / stats.go
1 package core
2
3 import (
4         "path"
5         "strings"
6         "time"
7
8         "git.fd.io/govpp.git/adapter"
9         "git.fd.io/govpp.git/api"
10 )
11
12 var (
13         RetryUpdateCount    = 10
14         RetryUpdateDelay    = time.Millisecond * 10
15         HealthCheckInterval = time.Second // default health check probe interval
16 )
17
18 const (
19         SystemStatsPrefix               = "/sys/"
20         SystemStats_VectorRate          = SystemStatsPrefix + "vector_rate"
21         SystemStats_NumWorkerThreads    = SystemStatsPrefix + "num_worker_threads"
22         SystemStats_VectorRatePerWorker = SystemStatsPrefix + "vector_rate_per_worker"
23         SystemStats_InputRate           = SystemStatsPrefix + "input_rate"
24         SystemStats_LastUpdate          = SystemStatsPrefix + "last_update"
25         SystemStats_LastStatsClear      = SystemStatsPrefix + "last_stats_clear"
26         SystemStats_Heartbeat           = SystemStatsPrefix + "heartbeat"
27
28         NodeStatsPrefix    = "/sys/node/"
29         NodeStats_Names    = NodeStatsPrefix + "names"
30         NodeStats_Clocks   = NodeStatsPrefix + "clocks"
31         NodeStats_Vectors  = NodeStatsPrefix + "vectors"
32         NodeStats_Calls    = NodeStatsPrefix + "calls"
33         NodeStats_Suspends = NodeStatsPrefix + "suspends"
34
35         BufferStatsPrefix     = "/buffer-pools/"
36         BufferStats_Cached    = "cached"
37         BufferStats_Used      = "used"
38         BufferStats_Available = "available"
39
40         CounterStatsPrefix = "/err/"
41
42         MemoryStatPrefix  = "/mem/statseg"
43         MemoryStats_Total = "total"
44         MemoryStats_Used  = "used"
45
46         InterfaceStatsPrefix         = "/if/"
47         InterfaceStats_Names         = InterfaceStatsPrefix + "names"
48         InterfaceStats_Drops         = InterfaceStatsPrefix + "drops"
49         InterfaceStats_Punt          = InterfaceStatsPrefix + "punt"
50         InterfaceStats_IP4           = InterfaceStatsPrefix + "ip4"
51         InterfaceStats_IP6           = InterfaceStatsPrefix + "ip6"
52         InterfaceStats_RxNoBuf       = InterfaceStatsPrefix + "rx-no-buf"
53         InterfaceStats_RxMiss        = InterfaceStatsPrefix + "rx-miss"
54         InterfaceStats_RxError       = InterfaceStatsPrefix + "rx-error"
55         InterfaceStats_TxError       = InterfaceStatsPrefix + "tx-error"
56         InterfaceStats_Mpls          = InterfaceStatsPrefix + "mpls"
57         InterfaceStats_Rx            = InterfaceStatsPrefix + "rx"
58         InterfaceStats_RxUnicast     = InterfaceStatsPrefix + "rx-unicast"
59         InterfaceStats_RxMulticast   = InterfaceStatsPrefix + "rx-multicast"
60         InterfaceStats_RxBroadcast   = InterfaceStatsPrefix + "rx-broadcast"
61         InterfaceStats_Tx            = InterfaceStatsPrefix + "tx"
62         InterfaceStats_TxUnicast     = InterfaceStatsPrefix + "tx-unicast"
63         InterfaceStats_TxUnicastMiss = InterfaceStatsPrefix + "tx-unicast-miss"
64         InterfaceStats_TxMulticast   = InterfaceStatsPrefix + "tx-multicast"
65         InterfaceStats_TxBroadcast   = InterfaceStatsPrefix + "tx-broadcast"
66
67         // TODO: network stats
68         NetworkStatsPrefix     = "/net/"
69         NetworkStats_RouteTo   = NetworkStatsPrefix + "route/to"
70         NetworkStats_RouteVia  = NetworkStatsPrefix + "route/via"
71         NetworkStats_MRoute    = NetworkStatsPrefix + "mroute"
72         NetworkStats_Adjacency = NetworkStatsPrefix + "adjacency"
73         NetworkStats_Punt      = NetworkStatsPrefix + "punt"
74 )
75
76 type StatsConnection struct {
77         statsClient adapter.StatsAPI
78
79         maxAttempts int           // interval for reconnect attempts
80         recInterval time.Duration // maximum number of reconnect attempts
81
82         connChan chan ConnectionEvent // connection event channel
83         done     chan struct{}        // to terminate stats connection watcher
84
85         errorStatsData *adapter.StatDir
86         nodeStatsData  *adapter.StatDir
87         ifaceStatsData *adapter.StatDir
88         sysStatsData   *adapter.StatDir
89         bufStatsData   *adapter.StatDir
90         memStatsData   *adapter.StatDir
91 }
92
93 func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
94         if attempts == 0 {
95                 attempts = DefaultMaxReconnectAttempts
96         }
97         if interval == 0 {
98                 interval = DefaultReconnectInterval
99         }
100
101         return &StatsConnection{
102                 statsClient: stats,
103                 maxAttempts: attempts,
104                 recInterval: interval,
105                 connChan:    make(chan ConnectionEvent, NotificationChanBufSize),
106                 done:        make(chan struct{}),
107         }
108 }
109
110 // ConnectStats connects to Stats API using specified adapter and returns a connection handle.
111 // This call blocks until it is either connected, or an error occurs.
112 // Only one connection attempt will be performed.
113 func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
114         log.Debug("Connecting to stats..")
115         c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
116
117         if err := c.statsClient.Connect(); err != nil {
118                 return nil, err
119         }
120         log.Debugf("Connected to stats.")
121
122         return c, nil
123 }
124
125 // AsyncConnectStats  connects to the VPP stats socket asynchronously and returns the connection
126 // handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
127 // values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
128 // socket in case the session was disconnected.
129 func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
130         log.Debug("Connecting to stats asynchronously..")
131         c := newStatsConnection(stats, attempts, interval)
132
133         go c.connectLoop()
134
135         return c, c.connChan, nil
136 }
137
138 func (c *StatsConnection) connectLoop() {
139         log.Debug("Asynchronously connecting to stats..")
140         var reconnectAttempts int
141
142         // loop until connected
143         for {
144                 if err := c.statsClient.Connect(); err == nil {
145                         c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
146                         break
147                 } else if reconnectAttempts < c.maxAttempts {
148                         reconnectAttempts++
149                         log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
150                         time.Sleep(c.recInterval)
151                 } else {
152                         c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
153                         return
154                 }
155         }
156         // start monitoring stats connection state
157         go c.monitorSocket()
158 }
159
160 // Disconnect disconnects from Stats API and releases all connection-related resources.
161 func (c *StatsConnection) Disconnect() {
162         if c == nil {
163                 return
164         }
165         if c.statsClient != nil {
166                 if err := c.statsClient.Disconnect(); err != nil {
167                         log.Debugf("disconnecting stats client failed: %v", err)
168                 }
169         }
170         close(c.connChan)
171         close(c.done)
172 }
173
174 func (c *StatsConnection) monitorSocket() {
175         var state, lastState ConnectionState
176         ticker := time.NewTicker(HealthCheckInterval)
177
178         for {
179                 select {
180                 case <-ticker.C:
181                         _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
182                         state = Connected
183                         if err == adapter.ErrStatsDataBusy {
184                                 state = NotResponding
185                         }
186                         if err == adapter.ErrStatsDisconnected {
187                                 state = Disconnected
188                         }
189                         if err == adapter.ErrStatsAccessFailed {
190                                 state = Failed
191                         }
192                         if state == lastState {
193                                 continue
194                         }
195                         lastState = state
196                         c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
197                 case <-c.done:
198                         log.Debugf("health check watcher closed")
199                         c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
200                         break
201                 }
202         }
203 }
204
205 func (c *StatsConnection) updateStats(statDir **adapter.StatDir, patterns ...string) error {
206         if statDir == nil {
207                 panic("statDir must not nil")
208         }
209         try := func() error {
210                 if (*statDir) == nil {
211                         dir, err := c.statsClient.PrepareDir(patterns...)
212                         if err != nil {
213                                 log.Debugln("preparing dir failed:", err)
214                                 return err
215                         }
216                         *statDir = dir
217                 } else {
218                         if err := c.statsClient.UpdateDir(*statDir); err != nil {
219                                 log.Debugln("updating dir failed:", err)
220                                 *statDir = nil
221                                 return err
222                         }
223                 }
224
225                 return nil
226         }
227         var err error
228         for r := 0; r < RetryUpdateCount; r++ {
229                 if err = try(); err == nil {
230                         if r > 0 {
231                                 log.Debugf("retry successfull (r=%d)", r)
232                         }
233                         return nil
234                 } else if err == adapter.ErrStatsDirStale || err == adapter.ErrStatsDataBusy {
235                         // retrying
236                         if r > 1 {
237                                 log.Debugf("sleeping for %v before next try", RetryUpdateDelay)
238                                 time.Sleep(RetryUpdateDelay)
239                         }
240                 } else {
241                         // error is not retryable
242                         break
243                 }
244         }
245         return err
246 }
247
248 // GetSystemStats retrieves VPP system stats.
249 func (c *StatsConnection) GetSystemStats(sysStats *api.SystemStats) (err error) {
250         if err := c.updateStats(&c.sysStatsData, SystemStatsPrefix); err != nil {
251                 return err
252         }
253
254         for _, stat := range c.sysStatsData.Entries {
255                 var val uint64
256                 if s, ok := stat.Data.(adapter.ScalarStat); ok {
257                         val = uint64(s)
258                 }
259                 switch string(stat.Name) {
260                 case SystemStats_VectorRate:
261                         sysStats.VectorRate = val
262                 case SystemStats_NumWorkerThreads:
263                         sysStats.NumWorkerThreads = val
264                 case SystemStats_VectorRatePerWorker:
265                         var vals []uint64
266                         if ss, ok := stat.Data.(adapter.SimpleCounterStat); ok {
267                                 vals = make([]uint64, len(ss))
268                                 for w := range ss {
269                                         if ss[w] == nil {
270                                                 continue
271                                         }
272                                         vals[w] = uint64(ss[w][0])
273                                 }
274                         }
275                         sysStats.VectorRatePerWorker = vals
276                 case SystemStats_InputRate:
277                         sysStats.InputRate = val
278                 case SystemStats_LastUpdate:
279                         sysStats.LastUpdate = val
280                 case SystemStats_LastStatsClear:
281                         sysStats.LastStatsClear = val
282                 case SystemStats_Heartbeat:
283                         sysStats.Heartbeat = val
284                 }
285         }
286
287         return nil
288 }
289
290 // GetErrorStats retrieves VPP error stats.
291 func (c *StatsConnection) GetErrorStats(errorStats *api.ErrorStats) (err error) {
292         if err := c.updateStats(&c.errorStatsData, CounterStatsPrefix); err != nil {
293                 return err
294         }
295
296         if errorStats.Errors == nil || len(errorStats.Errors) != len(c.errorStatsData.Entries) {
297                 errorStats.Errors = make([]api.ErrorCounter, len(c.errorStatsData.Entries))
298                 for i := 0; i < len(c.errorStatsData.Entries); i++ {
299                         errorStats.Errors[i].CounterName = string(c.errorStatsData.Entries[i].Name)
300                 }
301         }
302
303         for i, stat := range c.errorStatsData.Entries {
304                 if stat.Type != adapter.ErrorIndex {
305                         continue
306                 }
307                 if errStat, ok := stat.Data.(adapter.ErrorStat); ok {
308                         values := make([]uint64, len(errStat))
309                         for j, errStatW := range errStat {
310                                 values[j] = uint64(errStatW)
311                         }
312                         errorStats.Errors[i].Values = values
313                 }
314         }
315
316         return nil
317 }
318
319 func (c *StatsConnection) GetNodeStats(nodeStats *api.NodeStats) (err error) {
320         if err := c.updateStats(&c.nodeStatsData, NodeStatsPrefix); err != nil {
321                 return err
322         }
323
324         prepNodes := func(l int) {
325                 if nodeStats.Nodes == nil || len(nodeStats.Nodes) != l {
326                         nodeStats.Nodes = make([]api.NodeCounters, l)
327                         for i := 0; i < l; i++ {
328                                 nodeStats.Nodes[i].NodeIndex = uint32(i)
329                         }
330                 }
331         }
332         perNode := func(stat adapter.StatEntry, fn func(*api.NodeCounters, uint64)) {
333                 if s, ok := stat.Data.(adapter.SimpleCounterStat); ok {
334                         prepNodes(len(s[0]))
335                         for i := range nodeStats.Nodes {
336                                 val := adapter.ReduceSimpleCounterStatIndex(s, i)
337                                 fn(&nodeStats.Nodes[i], val)
338                         }
339                 }
340         }
341
342         for _, stat := range c.nodeStatsData.Entries {
343                 switch string(stat.Name) {
344                 case NodeStats_Names:
345                         if stat, ok := stat.Data.(adapter.NameStat); ok {
346                                 prepNodes(len(stat))
347                                 for i, nc := range nodeStats.Nodes {
348                                         if nc.NodeName != string(stat[i]) {
349                                                 nc.NodeName = string(stat[i])
350                                                 nodeStats.Nodes[i] = nc
351                                         }
352                                 }
353                         }
354                 case NodeStats_Clocks:
355                         perNode(stat, func(node *api.NodeCounters, val uint64) {
356                                 node.Clocks = val
357                         })
358                 case NodeStats_Vectors:
359                         perNode(stat, func(node *api.NodeCounters, val uint64) {
360                                 node.Vectors = val
361                         })
362                 case NodeStats_Calls:
363                         perNode(stat, func(node *api.NodeCounters, val uint64) {
364                                 node.Calls = val
365                         })
366                 case NodeStats_Suspends:
367                         perNode(stat, func(node *api.NodeCounters, val uint64) {
368                                 node.Suspends = val
369                         })
370                 }
371         }
372
373         return nil
374 }
375
376 // GetInterfaceStats retrieves VPP per interface stats.
377 func (c *StatsConnection) GetInterfaceStats(ifaceStats *api.InterfaceStats) (err error) {
378         if err := c.updateStats(&c.ifaceStatsData, InterfaceStatsPrefix); err != nil {
379                 return err
380         }
381
382         prep := func(l int) {
383                 if ifaceStats.Interfaces == nil || len(ifaceStats.Interfaces) != l {
384                         ifaceStats.Interfaces = make([]api.InterfaceCounters, l)
385                         for i := 0; i < l; i++ {
386                                 ifaceStats.Interfaces[i].InterfaceIndex = uint32(i)
387                         }
388                 }
389         }
390         perNode := func(stat adapter.StatEntry, fn func(*api.InterfaceCounters, uint64)) {
391                 if s, ok := stat.Data.(adapter.SimpleCounterStat); ok {
392                         prep(len(s[0]))
393                         for i := range ifaceStats.Interfaces {
394                                 val := adapter.ReduceSimpleCounterStatIndex(s, i)
395                                 fn(&ifaceStats.Interfaces[i], val)
396                         }
397                 }
398         }
399         perNodeComb := func(stat adapter.StatEntry, fn func(*api.InterfaceCounters, [2]uint64)) {
400                 if s, ok := stat.Data.(adapter.CombinedCounterStat); ok {
401                         prep(len(s[0]))
402                         for i := range ifaceStats.Interfaces {
403                                 val := adapter.ReduceCombinedCounterStatIndex(s, i)
404                                 fn(&ifaceStats.Interfaces[i], val)
405                         }
406                 }
407         }
408
409         for _, stat := range c.ifaceStatsData.Entries {
410                 switch string(stat.Name) {
411                 case InterfaceStats_Names:
412                         if stat, ok := stat.Data.(adapter.NameStat); ok {
413                                 prep(len(stat))
414                                 for i, nc := range ifaceStats.Interfaces {
415                                         if nc.InterfaceName != string(stat[i]) {
416                                                 nc.InterfaceName = string(stat[i])
417                                                 ifaceStats.Interfaces[i] = nc
418                                         }
419                                 }
420                         }
421                 case InterfaceStats_Drops:
422                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
423                                 iface.Drops = val
424                         })
425                 case InterfaceStats_Punt:
426                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
427                                 iface.Punts = val
428                         })
429                 case InterfaceStats_IP4:
430                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
431                                 iface.IP4 = val
432                         })
433                 case InterfaceStats_IP6:
434                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
435                                 iface.IP6 = val
436                         })
437                 case InterfaceStats_RxNoBuf:
438                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
439                                 iface.RxNoBuf = val
440                         })
441                 case InterfaceStats_RxMiss:
442                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
443                                 iface.RxMiss = val
444                         })
445                 case InterfaceStats_RxError:
446                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
447                                 iface.RxErrors = val
448                         })
449                 case InterfaceStats_TxError:
450                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
451                                 iface.TxErrors = val
452                         })
453                 case InterfaceStats_Mpls:
454                         perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
455                                 iface.Mpls = val
456                         })
457                 case InterfaceStats_Rx:
458                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
459                                 iface.Rx.Packets = val[0]
460                                 iface.Rx.Bytes = val[1]
461                         })
462                 case InterfaceStats_RxUnicast:
463                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
464                                 iface.RxUnicast.Packets = val[0]
465                                 iface.RxUnicast.Bytes = val[1]
466                         })
467                 case InterfaceStats_RxMulticast:
468                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
469                                 iface.RxMulticast.Packets = val[0]
470                                 iface.RxMulticast.Bytes = val[1]
471                         })
472                 case InterfaceStats_RxBroadcast:
473                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
474                                 iface.RxBroadcast.Packets = val[0]
475                                 iface.RxBroadcast.Bytes = val[1]
476                         })
477                 case InterfaceStats_Tx:
478                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
479                                 iface.Tx.Packets = val[0]
480                                 iface.Tx.Bytes = val[1]
481                         })
482                 case InterfaceStats_TxUnicastMiss:
483                         // tx-unicast-miss was a spelling mistake in older versions
484                         //
485                         fallthrough
486                 case InterfaceStats_TxUnicast:
487                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
488                                 iface.TxUnicast.Packets = val[0]
489                                 iface.TxUnicast.Bytes = val[1]
490                         })
491                 case InterfaceStats_TxMulticast:
492                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
493                                 iface.TxMulticast.Packets = val[0]
494                                 iface.TxMulticast.Bytes = val[1]
495                         })
496                 case InterfaceStats_TxBroadcast:
497                         perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
498                                 iface.TxBroadcast.Packets = val[0]
499                                 iface.TxBroadcast.Bytes = val[1]
500                         })
501                 }
502         }
503
504         return nil
505 }
506
507 // GetBufferStats retrieves VPP buffer pools stats.
508 func (c *StatsConnection) GetBufferStats(bufStats *api.BufferStats) (err error) {
509         if err := c.updateStats(&c.bufStatsData, BufferStatsPrefix); err != nil {
510                 return err
511         }
512
513         if bufStats.Buffer == nil {
514                 bufStats.Buffer = make(map[string]api.BufferPool)
515         }
516
517         for _, stat := range c.bufStatsData.Entries {
518                 d, f := path.Split(string(stat.Name))
519                 d = strings.TrimSuffix(d, "/")
520
521                 name := strings.TrimPrefix(d, BufferStatsPrefix)
522                 b, ok := bufStats.Buffer[name]
523                 if !ok {
524                         b.PoolName = name
525                 }
526
527                 var val float64
528                 s, ok := stat.Data.(adapter.ScalarStat)
529                 if ok {
530                         val = float64(s)
531                 }
532                 switch f {
533                 case BufferStats_Cached:
534                         b.Cached = val
535                 case BufferStats_Used:
536                         b.Used = val
537                 case BufferStats_Available:
538                         b.Available = val
539                 }
540
541                 bufStats.Buffer[name] = b
542         }
543
544         return nil
545 }
546
547 func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error) {
548         if err := c.updateStats(&c.memStatsData, MemoryStatPrefix); err != nil {
549                 return err
550         }
551
552         for _, stat := range c.memStatsData.Entries {
553                 _, f := path.Split(string(stat.Name))
554                 var val float64
555                 m, ok := stat.Data.(adapter.ScalarStat)
556                 if ok {
557                         val = float64(m)
558                 }
559                 switch f {
560                 case MemoryStats_Total:
561                         memStats.Total = val
562                 case MemoryStats_Used:
563                         memStats.Used = val
564                 }
565         }
566         return nil
567 }
568
569 func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
570         select {
571         case c.connChan <- event:
572         default:
573                 log.Warn("Stats connection state channel is full, discarding value.")
574         }
575 }