1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // Package statsclient is pure Go implementation of VPP stats API client.
27 "git.fd.io/govpp.git/adapter"
28 "github.com/fsnotify/fsnotify"
29 "github.com/ftrvxmtrx/fd"
30 logger "github.com/sirupsen/logrus"
34 // DefaultSocketName is default VPP stats socket file path.
35 DefaultSocketName = adapter.DefaultStatsSocket
38 const socketMissing = `
39 ------------------------------------------------------------
40 VPP stats socket file %s is missing!
42 - is VPP running with stats segment enabled?
43 - is the correct socket name configured?
45 To enable it add following section to your VPP config:
47 socket-name /run/vpp/stats.sock
49 ------------------------------------------------------------
53 // Debug is global variable that determines debug mode
54 Debug = os.Getenv("DEBUG_GOVPP_STATS") != ""
56 // Log is global logger
60 // init initializes global logger, which logs debug level messages to stdout.
64 Log.Level = logger.DebugLevel
65 Log.Debug("govpp/statsclient: enabled debug mode")
69 func debugf(f string, a ...interface{}) {
75 // implements StatsAPI
76 var _ adapter.StatsAPI = (*StatsClient)(nil)
78 // StatsClient is the pure Go implementation for VPP stats API.
79 type StatsClient struct {
84 // to quit socket monitor
90 // NewStatsClient returns new VPP stats API client.
91 func NewStatsClient(sockAddr string) *StatsClient {
93 sockAddr = DefaultSocketName
100 // Connect to validated VPP stats socket and start monitoring
101 // socket file changes
102 func (sc *StatsClient) Connect() (err error) {
104 return fmt.Errorf("already connected")
106 if err := sc.validate(); err != nil {
109 sc.done = make(chan struct{})
111 if sc.statSegment, err = sc.connect(); err != nil {
117 // Disconnect from the socket, unmap shared memory and terminate
119 func (sc *StatsClient) Disconnect() error {
121 return sc.disconnect()
124 func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
125 accessEpoch := sc.accessStart()
126 if accessEpoch == 0 {
127 return nil, adapter.ErrStatsAccessFailed
130 indexes, err := sc.listIndexes(patterns...)
135 dirVector := sc.GetDirectoryVector()
136 if dirVector == nil {
137 return nil, fmt.Errorf("failed to list stats: %v", err)
139 vecLen := *(*uint32)(vectorLen(dirVector))
142 for _, index := range indexes {
144 return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
146 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, index)
147 names = append(names, string(dirName))
150 if !sc.accessEnd(accessEpoch) {
151 return nil, adapter.ErrStatsDataBusy
157 func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
158 accessEpoch := sc.accessStart()
159 if accessEpoch == 0 {
160 return nil, adapter.ErrStatsAccessFailed
163 indexes, err := sc.listIndexes(patterns...)
168 dirVector := sc.GetDirectoryVector()
169 if dirVector == nil {
172 dirLen := *(*uint32)(vectorLen(dirVector))
174 debugf("dumping entries for %d indexes", len(indexes))
176 entries = make([]adapter.StatEntry, 0, len(indexes))
177 for _, index := range indexes {
179 return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
181 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
182 if len(dirName) == 0 {
185 entry := adapter.StatEntry{
186 Name: append([]byte(nil), dirName...),
187 Type: adapter.StatType(dirType),
188 Data: sc.CopyEntryData(dirPtr),
190 entries = append(entries, entry)
193 if !sc.accessEnd(accessEpoch) {
194 return nil, adapter.ErrStatsDataBusy
200 func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
201 dir := new(adapter.StatDir)
203 accessEpoch := sc.accessStart()
204 if accessEpoch == 0 {
205 return nil, adapter.ErrStatsAccessFailed
208 indexes, err := sc.listIndexes(patterns...)
212 dir.Indexes = indexes
214 dirVector := sc.GetDirectoryVector()
215 if dirVector == nil {
218 dirLen := *(*uint32)(vectorLen(dirVector))
220 debugf("dumping entries for %d indexes", len(indexes))
222 entries := make([]adapter.StatEntry, 0, len(indexes))
223 for _, index := range indexes {
225 return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
227 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
228 if len(dirName) == 0 {
231 entry := adapter.StatEntry{
232 Name: append([]byte(nil), dirName...),
233 Type: adapter.StatType(dirType),
234 Data: sc.CopyEntryData(dirPtr),
236 entries = append(entries, entry)
238 dir.Entries = entries
240 if !sc.accessEnd(accessEpoch) {
241 return nil, adapter.ErrStatsDataBusy
243 dir.Epoch = accessEpoch
248 // UpdateDir refreshes directory data for all counters
249 func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
250 epoch, _ := sc.GetEpoch()
251 if dir.Epoch != epoch {
252 return adapter.ErrStatsDirStale
255 accessEpoch := sc.accessStart()
256 if accessEpoch == 0 {
257 return adapter.ErrStatsAccessFailed
260 dirVector := sc.GetDirectoryVector()
261 if dirVector == nil {
264 for i, index := range dir.Indexes {
265 statSegDir, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
266 if len(dirName) == 0 {
269 entry := &dir.Entries[i]
270 if !bytes.Equal(dirName, entry.Name) {
273 if adapter.StatType(dirType) != entry.Type {
276 if entry.Data == nil {
279 if err := sc.UpdateEntryData(statSegDir, &entry.Data); err != nil {
280 return fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
283 if !sc.accessEnd(accessEpoch) {
284 return adapter.ErrStatsDataBusy
290 // validate file presence by retrieving its file info
291 func (sc *StatsClient) validate() error {
292 if _, err := os.Stat(sc.sockAddr); os.IsNotExist(err) {
293 fmt.Fprintf(os.Stderr, socketMissing, sc.sockAddr)
294 return fmt.Errorf("stats socket file %s does not exist", sc.sockAddr)
295 } else if err != nil {
296 return fmt.Errorf("stats socket error: %v", err)
301 // connect to the socket and map it into the memory. According to the
302 // header version info, an appropriate segment handler is returned
303 func (sc *StatsClient) connect() (ss statSegment, err error) {
304 addr := net.UnixAddr{
308 Log.Debugf("connecting to: %v", addr)
310 conn, err := net.DialUnix(addr.Net, nil, &addr)
312 Log.Warnf("connecting to socket %s failed: %s", addr, err)
316 if err := conn.Close(); err != nil {
317 Log.Warnf("closing socket failed: %v", err)
320 Log.Debugf("connected to socket")
322 files, err := fd.Get(conn, 1, nil)
324 return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
327 return nil, fmt.Errorf("no files received over socket")
332 if err := file.Close(); err != nil {
333 Log.Warnf("closing file failed: %v", err)
337 info, err := file.Stat()
343 sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
345 Log.Debugf("mapping shared memory failed: %v", err)
346 return nil, fmt.Errorf("mapping shared memory failed: %v", err)
348 Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
350 version := getVersion(sc.headerData)
353 ss = newStatSegmentV1(sc.headerData, size)
355 ss = newStatSegmentV2(sc.headerData, size)
357 return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
358 version, minVersion, maxVersion)
360 sc.isConnected = true
364 // reconnect disconnects from the socket, re-validates it and
366 func (sc *StatsClient) reconnect() (err error) {
367 if err = sc.disconnect(); err != nil {
368 return fmt.Errorf("error disconnecting socket: %v", err)
370 if err = sc.validate(); err != nil {
371 return fmt.Errorf("error validating socket: %v", err)
373 if sc.statSegment, err = sc.connect(); err != nil {
374 return fmt.Errorf("error connecting socket: %v", err)
379 // disconnect unmaps socket data from the memory and resets the header
380 func (sc *StatsClient) disconnect() error {
381 sc.isConnected = false
382 if sc.headerData == nil {
385 if err := syscall.Munmap(sc.headerData); err != nil {
386 Log.Debugf("unmapping shared memory failed: %v", err)
387 return fmt.Errorf("unmapping shared memory failed: %v", err)
391 Log.Debugf("successfully unmapped shared memory")
395 func (sc *StatsClient) monitorSocket() {
396 watcher, err := fsnotify.NewWatcher()
398 Log.Errorf("error starting socket monitor: %v", err)
405 case event := <-watcher.Events:
406 if event.Op == fsnotify.Remove {
407 if err := sc.reconnect(); err != nil {
408 Log.Errorf("error occurred during socket reconnect: %v", err)
410 // path must be re-added to the watcher
411 if err = watcher.Add(sc.sockAddr); err != nil {
412 Log.Errorf("failed to add socket address to the watcher: %v", err)
415 case err := <-watcher.Errors:
416 Log.Errorf("socket monitor delivered error event: %v", err)
418 err := watcher.Close()
419 Log.Debugf("socket monitor closed (error: %v)", err)
425 if err := watcher.Add(sc.sockAddr); err != nil {
426 Log.Errorf("failed to add socket address to the watcher: %v", err)
430 // Starts monitoring 'inProgress' field. Returns stats segment
431 // access epoch when completed, or zero value if not finished
432 // within MaxWaitInProgress
433 func (sc *StatsClient) accessStart() (epoch int64) {
436 epoch, inProg := sc.GetEpoch()
438 if time.Since(t) > MaxWaitInProgress {
441 time.Sleep(CheckDelayInProgress)
442 epoch, inProg = sc.GetEpoch()
447 // AccessEnd returns true if stats data reading was finished, false
449 func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
450 epoch, inProgress := sc.GetEpoch()
451 if accessEpoch != epoch || inProgress {
457 // listIndexes lists indexes for all stat entries that match any of the regex patterns.
458 func (sc *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
459 if len(patterns) == 0 {
460 return sc.listIndexesFunc(nil)
462 var regexes = make([]*regexp.Regexp, len(patterns))
463 for i, pattern := range patterns {
464 r, err := regexp.Compile(pattern)
466 return nil, fmt.Errorf("compiling regexp failed: %v", err)
470 nameMatches := func(name []byte) bool {
471 for _, r := range regexes {
478 return sc.listIndexesFunc(nameMatches)
481 // listIndexesFunc lists stats indexes. The optional function
482 // argument filters returned values or returns all if empty
483 func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
485 // there is around ~3157 stats, so to avoid too many allocations
486 // we set capacity to 3200 when listing all stats
487 indexes = make([]uint32, 0, 3200)
490 dirVector := sc.GetDirectoryVector()
491 if dirVector == nil {
494 vecLen := *(*uint32)(vectorLen(dirVector))
496 for i := uint32(0); i < vecLen; i++ {
497 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, i)
499 if len(dirName) == 0 || !f(dirName) {
503 indexes = append(indexes, i)