8 "git.fd.io/govpp.git/adapter"
9 "git.fd.io/govpp.git/api"
14 RetryUpdateDelay = time.Millisecond * 10
15 HealthCheckInterval = time.Second // default health check probe interval
19 SystemStatsPrefix = "/sys/"
20 SystemStats_VectorRate = SystemStatsPrefix + "vector_rate"
21 SystemStats_NumWorkerThreads = SystemStatsPrefix + "num_worker_threads"
22 SystemStats_VectorRatePerWorker = SystemStatsPrefix + "vector_rate_per_worker"
23 SystemStats_InputRate = SystemStatsPrefix + "input_rate"
24 SystemStats_LastUpdate = SystemStatsPrefix + "last_update"
25 SystemStats_LastStatsClear = SystemStatsPrefix + "last_stats_clear"
26 SystemStats_Heartbeat = SystemStatsPrefix + "heartbeat"
28 NodeStatsPrefix = "/sys/node/"
29 NodeStats_Names = NodeStatsPrefix + "names"
30 NodeStats_Clocks = NodeStatsPrefix + "clocks"
31 NodeStats_Vectors = NodeStatsPrefix + "vectors"
32 NodeStats_Calls = NodeStatsPrefix + "calls"
33 NodeStats_Suspends = NodeStatsPrefix + "suspends"
35 BufferStatsPrefix = "/buffer-pools/"
36 BufferStats_Cached = "cached"
37 BufferStats_Used = "used"
38 BufferStats_Available = "available"
40 CounterStatsPrefix = "/err/"
42 MemoryStatPrefix = "/mem/statseg"
43 MemoryStats_Total = "total"
44 MemoryStats_Used = "used"
46 InterfaceStatsPrefix = "/if/"
47 InterfaceStats_Names = InterfaceStatsPrefix + "names"
48 InterfaceStats_Drops = InterfaceStatsPrefix + "drops"
49 InterfaceStats_Punt = InterfaceStatsPrefix + "punt"
50 InterfaceStats_IP4 = InterfaceStatsPrefix + "ip4"
51 InterfaceStats_IP6 = InterfaceStatsPrefix + "ip6"
52 InterfaceStats_RxNoBuf = InterfaceStatsPrefix + "rx-no-buf"
53 InterfaceStats_RxMiss = InterfaceStatsPrefix + "rx-miss"
54 InterfaceStats_RxError = InterfaceStatsPrefix + "rx-error"
55 InterfaceStats_TxError = InterfaceStatsPrefix + "tx-error"
56 InterfaceStats_Mpls = InterfaceStatsPrefix + "mpls"
57 InterfaceStats_Rx = InterfaceStatsPrefix + "rx"
58 InterfaceStats_RxUnicast = InterfaceStatsPrefix + "rx-unicast"
59 InterfaceStats_RxMulticast = InterfaceStatsPrefix + "rx-multicast"
60 InterfaceStats_RxBroadcast = InterfaceStatsPrefix + "rx-broadcast"
61 InterfaceStats_Tx = InterfaceStatsPrefix + "tx"
62 InterfaceStats_TxUnicast = InterfaceStatsPrefix + "tx-unicast"
63 InterfaceStats_TxUnicastMiss = InterfaceStatsPrefix + "tx-unicast-miss"
64 InterfaceStats_TxMulticast = InterfaceStatsPrefix + "tx-multicast"
65 InterfaceStats_TxBroadcast = InterfaceStatsPrefix + "tx-broadcast"
67 // TODO: network stats
68 NetworkStatsPrefix = "/net/"
69 NetworkStats_RouteTo = NetworkStatsPrefix + "route/to"
70 NetworkStats_RouteVia = NetworkStatsPrefix + "route/via"
71 NetworkStats_MRoute = NetworkStatsPrefix + "mroute"
72 NetworkStats_Adjacency = NetworkStatsPrefix + "adjacency"
73 NetworkStats_Punt = NetworkStatsPrefix + "punt"
76 type StatsConnection struct {
77 statsClient adapter.StatsAPI
79 maxAttempts int // interval for reconnect attempts
80 recInterval time.Duration // maximum number of reconnect attempts
82 connChan chan ConnectionEvent // connection event channel
83 done chan struct{} // to terminate stats connection watcher
85 errorStatsData *adapter.StatDir
86 nodeStatsData *adapter.StatDir
87 ifaceStatsData *adapter.StatDir
88 sysStatsData *adapter.StatDir
89 bufStatsData *adapter.StatDir
90 memStatsData *adapter.StatDir
93 func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
95 attempts = DefaultMaxReconnectAttempts
98 interval = DefaultReconnectInterval
101 return &StatsConnection{
103 maxAttempts: attempts,
104 recInterval: interval,
105 connChan: make(chan ConnectionEvent, NotificationChanBufSize),
106 done: make(chan struct{}),
110 // ConnectStats connects to Stats API using specified adapter and returns a connection handle.
111 // This call blocks until it is either connected, or an error occurs.
112 // Only one connection attempt will be performed.
113 func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
114 log.Debug("Connecting to stats..")
115 c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
117 if err := c.statsClient.Connect(); err != nil {
120 log.Debugf("Connected to stats.")
125 // AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection
126 // handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
127 // values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
128 // socket in case the session was disconnected.
129 func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
130 log.Debug("Connecting to stats asynchronously..")
131 c := newStatsConnection(stats, attempts, interval)
135 return c, c.connChan, nil
138 func (c *StatsConnection) connectLoop() {
139 log.Debug("Asynchronously connecting to stats..")
140 var reconnectAttempts int
142 // loop until connected
144 if err := c.statsClient.Connect(); err == nil {
145 c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
147 } else if reconnectAttempts < c.maxAttempts {
149 log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
150 time.Sleep(c.recInterval)
152 c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
156 // start monitoring stats connection state
160 // Disconnect disconnects from Stats API and releases all connection-related resources.
161 func (c *StatsConnection) Disconnect() {
165 if c.statsClient != nil {
166 if err := c.statsClient.Disconnect(); err != nil {
167 log.Debugf("disconnecting stats client failed: %v", err)
174 func (c *StatsConnection) monitorSocket() {
175 var state, lastState ConnectionState
176 ticker := time.NewTicker(HealthCheckInterval)
181 _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
183 if err == adapter.ErrStatsDataBusy {
184 state = NotResponding
186 if err == adapter.ErrStatsDisconnected {
189 if err == adapter.ErrStatsAccessFailed {
192 if state == lastState {
196 c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
198 log.Debugf("health check watcher closed")
199 c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
205 func (c *StatsConnection) updateStats(statDir **adapter.StatDir, patterns ...string) error {
207 panic("statDir must not nil")
209 try := func() error {
210 if (*statDir) == nil {
211 dir, err := c.statsClient.PrepareDir(patterns...)
213 log.Debugln("preparing dir failed:", err)
218 if err := c.statsClient.UpdateDir(*statDir); err != nil {
219 log.Debugln("updating dir failed:", err)
228 for r := 0; r < RetryUpdateCount; r++ {
229 if err = try(); err == nil {
231 log.Debugf("retry successfull (r=%d)", r)
234 } else if err == adapter.ErrStatsDirStale || err == adapter.ErrStatsDataBusy {
237 log.Debugf("sleeping for %v before next try", RetryUpdateDelay)
238 time.Sleep(RetryUpdateDelay)
241 // error is not retryable
248 // GetSystemStats retrieves VPP system stats.
249 func (c *StatsConnection) GetSystemStats(sysStats *api.SystemStats) (err error) {
250 if err := c.updateStats(&c.sysStatsData, SystemStatsPrefix); err != nil {
254 for _, stat := range c.sysStatsData.Entries {
256 if s, ok := stat.Data.(adapter.ScalarStat); ok {
259 switch string(stat.Name) {
260 case SystemStats_VectorRate:
261 sysStats.VectorRate = val
262 case SystemStats_NumWorkerThreads:
263 sysStats.NumWorkerThreads = val
264 case SystemStats_VectorRatePerWorker:
266 if ss, ok := stat.Data.(adapter.SimpleCounterStat); ok {
267 vals = make([]uint64, len(ss))
272 vals[w] = uint64(ss[w][0])
275 sysStats.VectorRatePerWorker = vals
276 case SystemStats_InputRate:
277 sysStats.InputRate = val
278 case SystemStats_LastUpdate:
279 sysStats.LastUpdate = val
280 case SystemStats_LastStatsClear:
281 sysStats.LastStatsClear = val
282 case SystemStats_Heartbeat:
283 sysStats.Heartbeat = val
290 // GetErrorStats retrieves VPP error stats.
291 func (c *StatsConnection) GetErrorStats(errorStats *api.ErrorStats) (err error) {
292 if err := c.updateStats(&c.errorStatsData, CounterStatsPrefix); err != nil {
296 if errorStats.Errors == nil || len(errorStats.Errors) != len(c.errorStatsData.Entries) {
297 errorStats.Errors = make([]api.ErrorCounter, len(c.errorStatsData.Entries))
298 for i := 0; i < len(c.errorStatsData.Entries); i++ {
299 errorStats.Errors[i].CounterName = string(c.errorStatsData.Entries[i].Name)
303 for i, stat := range c.errorStatsData.Entries {
304 if stat.Type != adapter.ErrorIndex {
307 if errStat, ok := stat.Data.(adapter.ErrorStat); ok {
308 errorStats.Errors[i].Value = uint64(errStat)
315 func (c *StatsConnection) GetNodeStats(nodeStats *api.NodeStats) (err error) {
316 if err := c.updateStats(&c.nodeStatsData, NodeStatsPrefix); err != nil {
320 prepNodes := func(l int) {
321 if nodeStats.Nodes == nil || len(nodeStats.Nodes) != l {
322 nodeStats.Nodes = make([]api.NodeCounters, l)
323 for i := 0; i < l; i++ {
324 nodeStats.Nodes[i].NodeIndex = uint32(i)
328 perNode := func(stat adapter.StatEntry, fn func(*api.NodeCounters, uint64)) {
329 if s, ok := stat.Data.(adapter.SimpleCounterStat); ok {
331 for i := range nodeStats.Nodes {
332 val := adapter.ReduceSimpleCounterStatIndex(s, i)
333 fn(&nodeStats.Nodes[i], val)
338 for _, stat := range c.nodeStatsData.Entries {
339 switch string(stat.Name) {
340 case NodeStats_Names:
341 if stat, ok := stat.Data.(adapter.NameStat); ok {
343 for i, nc := range nodeStats.Nodes {
344 if nc.NodeName != string(stat[i]) {
345 nc.NodeName = string(stat[i])
346 nodeStats.Nodes[i] = nc
350 case NodeStats_Clocks:
351 perNode(stat, func(node *api.NodeCounters, val uint64) {
354 case NodeStats_Vectors:
355 perNode(stat, func(node *api.NodeCounters, val uint64) {
358 case NodeStats_Calls:
359 perNode(stat, func(node *api.NodeCounters, val uint64) {
362 case NodeStats_Suspends:
363 perNode(stat, func(node *api.NodeCounters, val uint64) {
372 // GetInterfaceStats retrieves VPP per interface stats.
373 func (c *StatsConnection) GetInterfaceStats(ifaceStats *api.InterfaceStats) (err error) {
374 if err := c.updateStats(&c.ifaceStatsData, InterfaceStatsPrefix); err != nil {
378 prep := func(l int) {
379 if ifaceStats.Interfaces == nil || len(ifaceStats.Interfaces) != l {
380 ifaceStats.Interfaces = make([]api.InterfaceCounters, l)
381 for i := 0; i < l; i++ {
382 ifaceStats.Interfaces[i].InterfaceIndex = uint32(i)
386 perNode := func(stat adapter.StatEntry, fn func(*api.InterfaceCounters, uint64)) {
387 if s, ok := stat.Data.(adapter.SimpleCounterStat); ok {
389 for i := range ifaceStats.Interfaces {
390 val := adapter.ReduceSimpleCounterStatIndex(s, i)
391 fn(&ifaceStats.Interfaces[i], val)
395 perNodeComb := func(stat adapter.StatEntry, fn func(*api.InterfaceCounters, [2]uint64)) {
396 if s, ok := stat.Data.(adapter.CombinedCounterStat); ok {
398 for i := range ifaceStats.Interfaces {
399 val := adapter.ReduceCombinedCounterStatIndex(s, i)
400 fn(&ifaceStats.Interfaces[i], val)
405 for _, stat := range c.ifaceStatsData.Entries {
406 switch string(stat.Name) {
407 case InterfaceStats_Names:
408 if stat, ok := stat.Data.(adapter.NameStat); ok {
410 for i, nc := range ifaceStats.Interfaces {
411 if nc.InterfaceName != string(stat[i]) {
412 nc.InterfaceName = string(stat[i])
413 ifaceStats.Interfaces[i] = nc
417 case InterfaceStats_Drops:
418 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
421 case InterfaceStats_Punt:
422 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
425 case InterfaceStats_IP4:
426 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
429 case InterfaceStats_IP6:
430 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
433 case InterfaceStats_RxNoBuf:
434 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
437 case InterfaceStats_RxMiss:
438 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
441 case InterfaceStats_RxError:
442 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
445 case InterfaceStats_TxError:
446 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
449 case InterfaceStats_Mpls:
450 perNode(stat, func(iface *api.InterfaceCounters, val uint64) {
453 case InterfaceStats_Rx:
454 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
455 iface.Rx.Packets = val[0]
456 iface.Rx.Bytes = val[1]
458 case InterfaceStats_RxUnicast:
459 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
460 iface.RxUnicast.Packets = val[0]
461 iface.RxUnicast.Bytes = val[1]
463 case InterfaceStats_RxMulticast:
464 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
465 iface.RxMulticast.Packets = val[0]
466 iface.RxMulticast.Bytes = val[1]
468 case InterfaceStats_RxBroadcast:
469 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
470 iface.RxBroadcast.Packets = val[0]
471 iface.RxBroadcast.Bytes = val[1]
473 case InterfaceStats_Tx:
474 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
475 iface.Tx.Packets = val[0]
476 iface.Tx.Bytes = val[1]
478 case InterfaceStats_TxUnicastMiss:
479 // tx-unicast-miss was a spelling mistake in older versions
482 case InterfaceStats_TxUnicast:
483 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
484 iface.TxUnicast.Packets = val[0]
485 iface.TxUnicast.Bytes = val[1]
487 case InterfaceStats_TxMulticast:
488 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
489 iface.TxMulticast.Packets = val[0]
490 iface.TxMulticast.Bytes = val[1]
492 case InterfaceStats_TxBroadcast:
493 perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) {
494 iface.TxBroadcast.Packets = val[0]
495 iface.TxBroadcast.Bytes = val[1]
503 // GetBufferStats retrieves VPP buffer pools stats.
504 func (c *StatsConnection) GetBufferStats(bufStats *api.BufferStats) (err error) {
505 if err := c.updateStats(&c.bufStatsData, BufferStatsPrefix); err != nil {
509 if bufStats.Buffer == nil {
510 bufStats.Buffer = make(map[string]api.BufferPool)
513 for _, stat := range c.bufStatsData.Entries {
514 d, f := path.Split(string(stat.Name))
515 d = strings.TrimSuffix(d, "/")
517 name := strings.TrimPrefix(d, BufferStatsPrefix)
518 b, ok := bufStats.Buffer[name]
524 s, ok := stat.Data.(adapter.ScalarStat)
529 case BufferStats_Cached:
531 case BufferStats_Used:
533 case BufferStats_Available:
537 bufStats.Buffer[name] = b
543 func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error) {
544 if err := c.updateStats(&c.memStatsData, MemoryStatPrefix); err != nil {
548 for _, stat := range c.memStatsData.Entries {
549 _, f := path.Split(string(stat.Name))
551 m, ok := stat.Data.(adapter.ScalarStat)
556 case MemoryStats_Total:
558 case MemoryStats_Used:
565 func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
567 case c.connChan <- event:
569 log.Warn("Stats connection state channel is full, discarding value.")