1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // Package statsclient is pure Go implementation of VPP stats API client.
27 "github.com/fsnotify/fsnotify"
28 "github.com/ftrvxmtrx/fd"
29 logger "github.com/sirupsen/logrus"
31 "git.fd.io/govpp.git/adapter"
35 // DefaultSocketName is default VPP stats socket file path.
36 DefaultSocketName = adapter.DefaultStatsSocket
40 // Debug is global variable that determines debug mode
41 Debug = os.Getenv("DEBUG_GOVPP_STATS") != ""
43 // Log is global logger
47 // init initializes global logger, which logs debug level messages to stdout.
51 Log.Level = logger.DebugLevel
52 Log.Debug("govpp/statsclient: enabled debug mode")
56 func debugf(f string, a ...interface{}) {
62 // implements StatsAPI
63 var _ adapter.StatsAPI = (*StatsClient)(nil)
65 // StatsClient is the pure Go implementation for VPP stats API.
66 type StatsClient struct {
72 // to quit socket monitor
78 // NewStatsClient returns a new StatsClient using socket.
79 // If socket is empty string DefaultSocketName is used.
80 func NewStatsClient(socket string) *StatsClient {
82 socket = DefaultSocketName
89 // Connect to validated VPP stats socket and start monitoring
90 // socket file changes
91 func (sc *StatsClient) Connect() (err error) {
93 return fmt.Errorf("already connected")
95 if err := sc.checkSocketValid(); err != nil {
98 sc.done = make(chan struct{})
99 if sc.statSegment, err = sc.connect(); err != nil {
103 sc.isConnected = true
107 // Disconnect from the socket, unmap shared memory and terminate
109 func (sc *StatsClient) Disconnect() error {
111 return nil // not connected
113 sc.isConnected = false
115 return sc.disconnect()
118 func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
119 accessEpoch := sc.accessStart()
120 if accessEpoch == 0 {
121 return nil, adapter.ErrStatsAccessFailed
124 indexes, err := sc.listIndexes(patterns...)
129 dirVector := sc.GetDirectoryVector()
130 if dirVector == nil {
131 return nil, fmt.Errorf("failed to list stats: %v", err)
133 vecLen := *(*uint32)(vectorLen(dirVector))
136 for _, index := range indexes {
138 return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
140 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, index)
141 names = append(names, string(dirName))
144 if !sc.accessEnd(accessEpoch) {
145 return nil, adapter.ErrStatsDataBusy
151 func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
152 accessEpoch := sc.accessStart()
153 if accessEpoch == 0 {
154 return nil, adapter.ErrStatsAccessFailed
157 indexes, err := sc.listIndexes(patterns...)
162 dirVector := sc.GetDirectoryVector()
163 if dirVector == nil {
166 dirLen := *(*uint32)(vectorLen(dirVector))
168 debugf("dumping entries for %d indexes", len(indexes))
170 entries = make([]adapter.StatEntry, 0, len(indexes))
171 for _, index := range indexes {
173 return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
175 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
176 if len(dirName) == 0 {
179 entry := adapter.StatEntry{
180 Name: append([]byte(nil), dirName...),
181 Type: adapter.StatType(dirType),
182 Data: sc.CopyEntryData(dirPtr),
184 entries = append(entries, entry)
187 if !sc.accessEnd(accessEpoch) {
188 return nil, adapter.ErrStatsDataBusy
194 func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
195 dir := new(adapter.StatDir)
197 accessEpoch := sc.accessStart()
198 if accessEpoch == 0 {
199 return nil, adapter.ErrStatsAccessFailed
202 indexes, err := sc.listIndexes(patterns...)
206 dir.Indexes = indexes
208 dirVector := sc.GetDirectoryVector()
209 if dirVector == nil {
212 dirLen := *(*uint32)(vectorLen(dirVector))
214 debugf("dumping entries for %d indexes", len(indexes))
216 entries := make([]adapter.StatEntry, 0, len(indexes))
217 for _, index := range indexes {
219 return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
221 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
222 if len(dirName) == 0 {
225 entry := adapter.StatEntry{
226 Name: append([]byte(nil), dirName...),
227 Type: adapter.StatType(dirType),
228 Data: sc.CopyEntryData(dirPtr),
230 entries = append(entries, entry)
232 dir.Entries = entries
234 if !sc.accessEnd(accessEpoch) {
235 return nil, adapter.ErrStatsDataBusy
237 dir.Epoch = accessEpoch
242 // UpdateDir refreshes directory data for all counters
243 func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
244 epoch, _ := sc.GetEpoch()
245 if dir.Epoch != epoch {
246 return adapter.ErrStatsDirStale
249 accessEpoch := sc.accessStart()
250 if accessEpoch == 0 {
251 return adapter.ErrStatsAccessFailed
254 dirVector := sc.GetDirectoryVector()
255 if dirVector == nil {
258 for i, index := range dir.Indexes {
259 statSegDir, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
260 if len(dirName) == 0 {
263 entry := &dir.Entries[i]
264 if !bytes.Equal(dirName, entry.Name) {
267 if adapter.StatType(dirType) != entry.Type {
270 if entry.Data == nil {
273 if err := sc.UpdateEntryData(statSegDir, &entry.Data); err != nil {
274 return fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
277 if !sc.accessEnd(accessEpoch) {
278 return adapter.ErrStatsDataBusy
284 func (sc *StatsClient) checkSocketValid() error {
285 if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) {
286 return fmt.Errorf("stats socket file %s does not exist", sc.socketPath)
287 } else if err != nil {
288 return fmt.Errorf("stats socket error: %v", err)
293 // connect to the socket and map it into the memory. According to the
294 // header version info, an appropriate segment handler is returned
295 func (sc *StatsClient) connect() (ss statSegment, err error) {
296 addr := net.UnixAddr{
300 Log.Debugf("connecting to: %v", addr)
302 conn, err := net.DialUnix(addr.Net, nil, &addr)
304 Log.Warnf("connecting to socket %s failed: %s", addr, err)
308 if err := conn.Close(); err != nil {
309 Log.Warnf("closing socket failed: %v", err)
312 Log.Debugf("connected to socket")
314 files, err := fd.Get(conn, 1, nil)
316 return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
319 return nil, fmt.Errorf("no files received over socket")
324 if err := file.Close(); err != nil {
325 Log.Warnf("closing file failed: %v", err)
329 info, err := file.Stat()
335 sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
337 Log.Debugf("mapping shared memory failed: %v", err)
338 return nil, fmt.Errorf("mapping shared memory failed: %v", err)
340 Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
342 version := getVersion(sc.headerData)
345 ss = newStatSegmentV1(sc.headerData, size)
347 ss = newStatSegmentV2(sc.headerData, size)
349 return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
350 version, minVersion, maxVersion)
356 // reconnect disconnects from the socket, re-validates it and
358 func (sc *StatsClient) reconnect() (err error) {
359 if err = sc.disconnect(); err != nil {
360 return fmt.Errorf("error disconnecting socket: %v", err)
362 if err = sc.checkSocketValid(); err != nil {
363 return fmt.Errorf("error validating socket: %v", err)
365 if sc.statSegment, err = sc.connect(); err != nil {
366 return fmt.Errorf("error connecting socket: %v", err)
371 // disconnect unmaps socket data from the memory and resets the header
372 func (sc *StatsClient) disconnect() error {
373 if sc.headerData == nil {
376 if err := syscall.Munmap(sc.headerData); err != nil {
377 Log.Debugf("unmapping shared memory failed: %v", err)
378 return fmt.Errorf("unmapping shared memory failed: %v", err)
382 Log.Debugf("successfully unmapped shared memory")
386 func (sc *StatsClient) monitorSocket() {
387 watcher, err := fsnotify.NewWatcher()
389 Log.Errorf("error starting socket monitor: %v", err)
396 case event := <-watcher.Events:
397 if event.Op == fsnotify.Remove {
398 if err := sc.reconnect(); err != nil {
399 Log.Errorf("error occurred during socket reconnect: %v", err)
401 // path must be re-added to the watcher
402 if err = watcher.Add(sc.socketPath); err != nil {
403 Log.Errorf("failed to add socket address to the watcher: %v", err)
406 case err := <-watcher.Errors:
407 Log.Errorf("socket monitor delivered error event: %v", err)
409 err := watcher.Close()
410 Log.Debugf("socket monitor closed (error: %v)", err)
416 if err := watcher.Add(sc.socketPath); err != nil {
417 Log.Errorf("failed to add socket address to the watcher: %v", err)
421 // Starts monitoring 'inProgress' field. Returns stats segment
422 // access epoch when completed, or zero value if not finished
423 // within MaxWaitInProgress
424 func (sc *StatsClient) accessStart() (epoch int64) {
427 epoch, inProg := sc.GetEpoch()
429 if time.Since(t) > MaxWaitInProgress {
432 time.Sleep(CheckDelayInProgress)
433 epoch, inProg = sc.GetEpoch()
438 // AccessEnd returns true if stats data reading was finished, false
440 func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
441 epoch, inProgress := sc.GetEpoch()
442 if accessEpoch != epoch || inProgress {
448 // listIndexes lists indexes for all stat entries that match any of the regex patterns.
449 func (sc *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
450 if len(patterns) == 0 {
451 return sc.listIndexesFunc(nil)
453 var regexes = make([]*regexp.Regexp, len(patterns))
454 for i, pattern := range patterns {
455 r, err := regexp.Compile(pattern)
457 return nil, fmt.Errorf("compiling regexp failed: %v", err)
461 nameMatches := func(name []byte) bool {
462 for _, r := range regexes {
469 return sc.listIndexesFunc(nameMatches)
472 // listIndexesFunc lists stats indexes. The optional function
473 // argument filters returned values or returns all if empty
474 func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
476 // there is around ~3157 stats, so to avoid too many allocations
477 // we set capacity to 3200 when listing all stats
478 indexes = make([]uint32, 0, 3200)
481 dirVector := sc.GetDirectoryVector()
482 if dirVector == nil {
485 vecLen := *(*uint32)(vectorLen(dirVector))
487 for i := uint32(0); i < vecLen; i++ {
488 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, i)
490 if len(dirName) == 0 || !f(dirName) {
494 indexes = append(indexes, i)