1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // Package statsclient is pure Go implementation of VPP stats API client.
29 "git.fd.io/govpp.git/adapter"
30 "github.com/fsnotify/fsnotify"
31 "github.com/ftrvxmtrx/fd"
32 logger "github.com/sirupsen/logrus"
36 // DefaultSocketName is default VPP stats socket file path.
37 DefaultSocketName = adapter.DefaultStatsSocket
39 // SocketRetryPeriod is the time period after the socket availability
41 SocketRetryPeriod = 50 * time.Millisecond
43 // SocketRetryTimeout is the maximum time for the stats socket
44 SocketRetryTimeout = 3 * time.Second
48 // Debug is global variable that determines debug mode
49 Debug = os.Getenv("DEBUG_GOVPP_STATS") != ""
51 // Log is global logger
55 // init initializes global logger, which logs debug level messages to stdout.
59 Log.Level = logger.DebugLevel
60 Log.Debug("govpp/statsclient: enabled debug mode")
64 func debugf(f string, a ...interface{}) {
70 // implements StatsAPI
71 var _ adapter.StatsAPI = (*StatsClient)(nil)
73 // StatsClient is the pure Go implementation for VPP stats API.
74 type StatsClient struct {
79 // defines the adapter connection state
82 // to quit socket monitor
88 // NewStatsClient returns a new StatsClient using socket.
89 // If socket is empty string DefaultSocketName is used.
90 func NewStatsClient(socket string) *StatsClient {
92 socket = DefaultSocketName
99 // Connect to validated VPP stats socket and start monitoring
100 // socket file changes
101 func (sc *StatsClient) Connect() (err error) {
102 if err := sc.waitForSocket(); err != nil {
105 sc.done = make(chan struct{})
106 if sc.statSegment, err = sc.connect(); err != nil {
113 // Disconnect from the socket, unmap shared memory and terminate
115 func (sc *StatsClient) Disconnect() error {
116 if sc.headerData == nil {
119 if err := syscall.Munmap(sc.headerData); err != nil {
120 Log.Debugf("unmapping shared memory failed: %v", err)
121 return fmt.Errorf("unmapping shared memory failed: %v", err)
125 Log.Debugf("successfully unmapped shared memory")
129 func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
130 if !sc.isConnected() {
131 return nil, adapter.ErrStatsDisconnected
133 accessEpoch := sc.accessStart()
134 if accessEpoch == 0 {
135 return nil, adapter.ErrStatsAccessFailed
138 indexes, err := sc.listIndexes(patterns...)
143 dirVector := sc.GetDirectoryVector()
144 if dirVector == nil {
145 return nil, fmt.Errorf("failed to list stats: %v", err)
147 vecLen := *(*uint32)(vectorLen(dirVector))
150 for _, index := range indexes {
152 return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
154 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, index)
155 names = append(names, string(dirName))
158 if !sc.accessEnd(accessEpoch) {
159 return nil, adapter.ErrStatsDataBusy
165 func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
166 if !sc.isConnected() {
167 return nil, adapter.ErrStatsDisconnected
169 accessEpoch := sc.accessStart()
170 if accessEpoch == 0 {
171 return nil, adapter.ErrStatsAccessFailed
174 indexes, err := sc.listIndexes(patterns...)
179 dirVector := sc.GetDirectoryVector()
180 if dirVector == nil {
183 dirLen := *(*uint32)(vectorLen(dirVector))
185 debugf("dumping entries for %d indexes", len(indexes))
187 entries = make([]adapter.StatEntry, 0, len(indexes))
188 for _, index := range indexes {
190 return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
192 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
193 if len(dirName) == 0 {
196 entry := adapter.StatEntry{
197 Name: append([]byte(nil), dirName...),
198 Type: adapter.StatType(dirType),
199 Data: sc.CopyEntryData(dirPtr),
201 entries = append(entries, entry)
204 if !sc.accessEnd(accessEpoch) {
205 return nil, adapter.ErrStatsDataBusy
211 func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
212 if !sc.isConnected() {
213 return nil, adapter.ErrStatsDisconnected
215 dir := new(adapter.StatDir)
217 accessEpoch := sc.accessStart()
218 if accessEpoch == 0 {
219 return nil, adapter.ErrStatsAccessFailed
222 indexes, err := sc.listIndexes(patterns...)
226 dir.Indexes = indexes
228 dirVector := sc.GetDirectoryVector()
229 if dirVector == nil {
232 dirLen := *(*uint32)(vectorLen(dirVector))
234 debugf("dumping entries for %d indexes", len(indexes))
236 entries := make([]adapter.StatEntry, 0, len(indexes))
237 for _, index := range indexes {
239 return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
241 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
242 if len(dirName) == 0 {
245 entry := adapter.StatEntry{
246 Name: append([]byte(nil), dirName...),
247 Type: adapter.StatType(dirType),
248 Data: sc.CopyEntryData(dirPtr),
250 entries = append(entries, entry)
252 dir.Entries = entries
254 if !sc.accessEnd(accessEpoch) {
255 return nil, adapter.ErrStatsDataBusy
257 dir.Epoch = accessEpoch
262 // UpdateDir refreshes directory data for all counters
263 func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
264 if !sc.isConnected() {
265 return adapter.ErrStatsDisconnected
267 epoch, _ := sc.GetEpoch()
268 if dir.Epoch != epoch {
269 return adapter.ErrStatsDirStale
272 accessEpoch := sc.accessStart()
273 if accessEpoch == 0 {
274 return adapter.ErrStatsAccessFailed
277 dirVector := sc.GetDirectoryVector()
278 if dirVector == nil {
281 for i, index := range dir.Indexes {
282 statSegDir, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
283 if len(dirName) == 0 {
286 entry := &dir.Entries[i]
287 if !bytes.Equal(dirName, entry.Name) {
290 if adapter.StatType(dirType) != entry.Type {
293 if entry.Data == nil {
296 if err := sc.UpdateEntryData(statSegDir, &entry.Data); err != nil {
297 return fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
300 if !sc.accessEnd(accessEpoch) {
301 return adapter.ErrStatsDataBusy
307 // checks the socket existence and waits for it for the designated
308 // time if it is not available immediately
309 func (sc *StatsClient) waitForSocket() error {
310 if _, err := os.Stat(sc.socket); err != nil {
311 if os.IsNotExist(err) {
312 ticker := time.NewTicker(SocketRetryPeriod)
316 if _, err := os.Stat(sc.socket); err == nil {
319 case <-time.After(SocketRetryTimeout):
320 return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket)
324 return fmt.Errorf("stats socket error: %v", err)
330 // connect to the socket and map it into the memory. According to the
331 // header version info, an appropriate segment handler is returned
332 func (sc *StatsClient) connect() (ss statSegment, err error) {
333 addr := net.UnixAddr{
337 Log.Debugf("connecting to: %v", addr)
339 conn, err := net.DialUnix(addr.Net, nil, &addr)
341 Log.Warnf("connecting to socket %s failed: %s", addr, err)
345 if err := conn.Close(); err != nil {
346 Log.Warnf("closing socket failed: %v", err)
349 Log.Debugf("connected to socket")
351 files, err := fd.Get(conn, 1, nil)
353 return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
356 return nil, fmt.Errorf("no files received over socket")
361 if err := file.Close(); err != nil {
362 Log.Warnf("closing file failed: %v", err)
366 info, err := file.Stat()
372 sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
374 Log.Debugf("mapping shared memory failed: %v", err)
375 return nil, fmt.Errorf("mapping shared memory failed: %v", err)
377 Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
379 version := getVersion(sc.headerData)
382 ss = newStatSegmentV1(sc.headerData, size)
384 ss = newStatSegmentV2(sc.headerData, size)
386 return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
387 version, minVersion, maxVersion)
391 atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
396 // reconnect disconnects from the socket, re-validates it and
398 func (sc *StatsClient) reconnect() (err error) {
399 if err = sc.disconnect(); err != nil {
400 return fmt.Errorf("error disconnecting socket: %v", err)
402 if err = sc.waitForSocket(); err != nil {
403 return fmt.Errorf("error while waiting on socket: %v", err)
405 if sc.statSegment, err = sc.connect(); err != nil {
406 return fmt.Errorf("error connecting socket: %v", err)
411 // disconnect unmaps socket data from the memory and resets the header
412 func (sc *StatsClient) disconnect() error {
413 if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
414 return fmt.Errorf("stats client is already disconnected")
416 if sc.headerData == nil {
419 if err := syscall.Munmap(sc.headerData); err != nil {
420 Log.Debugf("unmapping shared memory failed: %v", err)
421 return fmt.Errorf("unmapping shared memory failed: %v", err)
425 Log.Debugf("successfully unmapped shared memory")
429 func (sc *StatsClient) monitorSocket() {
430 watcher, err := fsnotify.NewWatcher()
432 Log.Errorf("error starting socket monitor: %v", err)
439 case event := <-watcher.Events:
440 if event.Op == fsnotify.Remove && event.Name == sc.socket {
441 if err := sc.reconnect(); err != nil {
442 Log.Errorf("error occurred during socket reconnect: %v", err)
445 case err := <-watcher.Errors:
446 Log.Errorf("socket monitor delivered error event: %v", err)
448 err := watcher.Close()
449 Log.Debugf("socket monitor closed (error: %v)", err)
455 if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
456 Log.Errorf("failed to add socket address to the watcher: %v", err)
460 // Starts monitoring 'inProgress' field. Returns stats segment
461 // access epoch when completed, or zero value if not finished
462 // within MaxWaitInProgress
463 func (sc *StatsClient) accessStart() (epoch int64) {
466 epoch, inProg := sc.GetEpoch()
468 if time.Since(t) > MaxWaitInProgress {
471 time.Sleep(CheckDelayInProgress)
472 epoch, inProg = sc.GetEpoch()
477 // AccessEnd returns true if stats data reading was finished, false
479 func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
480 epoch, inProgress := sc.GetEpoch()
481 if accessEpoch != epoch || inProgress {
487 // listIndexes lists indexes for all stat entries that match any of the regex patterns.
488 func (sc *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
489 if len(patterns) == 0 {
490 return sc.listIndexesFunc(nil)
492 var regexes = make([]*regexp.Regexp, len(patterns))
493 for i, pattern := range patterns {
494 r, err := regexp.Compile(pattern)
496 return nil, fmt.Errorf("compiling regexp failed: %v", err)
500 nameMatches := func(name []byte) bool {
501 for _, r := range regexes {
508 return sc.listIndexesFunc(nameMatches)
511 // listIndexesFunc lists stats indexes. The optional function
512 // argument filters returned values or returns all if empty
513 func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
515 // there is around ~3157 stats, so to avoid too many allocations
516 // we set capacity to 3200 when listing all stats
517 indexes = make([]uint32, 0, 3200)
520 dirVector := sc.GetDirectoryVector()
521 if dirVector == nil {
524 vecLen := *(*uint32)(vectorLen(dirVector))
526 for i := uint32(0); i < vecLen; i++ {
527 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, i)
529 if len(dirName) == 0 || !f(dirName) {
533 indexes = append(indexes, i)
539 func (sc *StatsClient) isConnected() bool {
540 return atomic.LoadUint32(&sc.connected) == 1