947027592435cf8e66ae5a2ef488a2740353c062
[govpp.git] / adapter / statsclient / statsclient.go
1 // Copyright (c) 2019 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package statsclient is pure Go implementation of VPP stats API client.
16 package statsclient
17
18 import (
19         "bytes"
20         "fmt"
21         "net"
22         "os"
23         "regexp"
24         "syscall"
25         "time"
26
27         "github.com/fsnotify/fsnotify"
28         "github.com/ftrvxmtrx/fd"
29         logger "github.com/sirupsen/logrus"
30
31         "git.fd.io/govpp.git/adapter"
32 )
33
34 const (
35         // DefaultSocketName is default VPP stats socket file path.
36         DefaultSocketName = adapter.DefaultStatsSocket
37 )
38
39 var (
40         // Debug is global variable that determines debug mode
41         Debug = os.Getenv("DEBUG_GOVPP_STATS") != ""
42
43         // Log is global logger
44         Log = logger.New()
45 )
46
47 // init initializes global logger, which logs debug level messages to stdout.
48 func init() {
49         Log.Out = os.Stdout
50         if Debug {
51                 Log.Level = logger.DebugLevel
52                 Log.Debug("govpp/statsclient: enabled debug mode")
53         }
54 }
55
56 func debugf(f string, a ...interface{}) {
57         if Debug {
58                 Log.Debugf(f, a...)
59         }
60 }
61
62 // implements StatsAPI
63 var _ adapter.StatsAPI = (*StatsClient)(nil)
64
65 // StatsClient is the pure Go implementation for VPP stats API.
66 type StatsClient struct {
67         socketPath string
68
69         headerData  []byte
70         isConnected bool
71
72         // to quit socket monitor
73         done chan struct{}
74
75         statSegment
76 }
77
78 // NewStatsClient returns a new StatsClient using socket.
79 // If socket is empty string DefaultSocketName is used.
80 func NewStatsClient(socket string) *StatsClient {
81         if socket == "" {
82                 socket = DefaultSocketName
83         }
84         return &StatsClient{
85                 socketPath: socket,
86         }
87 }
88
89 // Connect to validated VPP stats socket and start monitoring
90 // socket file changes
91 func (sc *StatsClient) Connect() (err error) {
92         if sc.isConnected {
93                 return fmt.Errorf("already connected")
94         }
95         if err := sc.checkSocketValid(); err != nil {
96                 return err
97         }
98         sc.done = make(chan struct{})
99         if sc.statSegment, err = sc.connect(); err != nil {
100                 return err
101         }
102         sc.monitorSocket()
103         sc.isConnected = true
104         return nil
105 }
106
107 // Disconnect from the socket, unmap shared memory and terminate
108 // socket monitor
109 func (sc *StatsClient) Disconnect() error {
110         if !sc.isConnected {
111                 return nil // not connected
112         }
113         sc.isConnected = false
114         close(sc.done)
115         return sc.disconnect()
116 }
117
118 func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
119         accessEpoch := sc.accessStart()
120         if accessEpoch == 0 {
121                 return nil, adapter.ErrStatsAccessFailed
122         }
123
124         indexes, err := sc.listIndexes(patterns...)
125         if err != nil {
126                 return nil, err
127         }
128
129         dirVector := sc.GetDirectoryVector()
130         if dirVector == nil {
131                 return nil, fmt.Errorf("failed to list stats: %v", err)
132         }
133         vecLen := *(*uint32)(vectorLen(dirVector))
134
135         var names []string
136         for _, index := range indexes {
137                 if index >= vecLen {
138                         return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
139                 }
140                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, index)
141                 names = append(names, string(dirName))
142         }
143
144         if !sc.accessEnd(accessEpoch) {
145                 return nil, adapter.ErrStatsDataBusy
146         }
147
148         return names, nil
149 }
150
151 func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
152         accessEpoch := sc.accessStart()
153         if accessEpoch == 0 {
154                 return nil, adapter.ErrStatsAccessFailed
155         }
156
157         indexes, err := sc.listIndexes(patterns...)
158         if err != nil {
159                 return nil, err
160         }
161
162         dirVector := sc.GetDirectoryVector()
163         if dirVector == nil {
164                 return nil, err
165         }
166         dirLen := *(*uint32)(vectorLen(dirVector))
167
168         debugf("dumping entries for %d indexes", len(indexes))
169
170         entries = make([]adapter.StatEntry, 0, len(indexes))
171         for _, index := range indexes {
172                 if index >= dirLen {
173                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
174                 }
175                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
176                 if len(dirName) == 0 {
177                         continue
178                 }
179                 entry := adapter.StatEntry{
180                         Name: append([]byte(nil), dirName...),
181                         Type: adapter.StatType(dirType),
182                         Data: sc.CopyEntryData(dirPtr),
183                 }
184                 entries = append(entries, entry)
185         }
186
187         if !sc.accessEnd(accessEpoch) {
188                 return nil, adapter.ErrStatsDataBusy
189         }
190
191         return entries, nil
192 }
193
194 func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
195         dir := new(adapter.StatDir)
196
197         accessEpoch := sc.accessStart()
198         if accessEpoch == 0 {
199                 return nil, adapter.ErrStatsAccessFailed
200         }
201
202         indexes, err := sc.listIndexes(patterns...)
203         if err != nil {
204                 return nil, err
205         }
206         dir.Indexes = indexes
207
208         dirVector := sc.GetDirectoryVector()
209         if dirVector == nil {
210                 return nil, err
211         }
212         dirLen := *(*uint32)(vectorLen(dirVector))
213
214         debugf("dumping entries for %d indexes", len(indexes))
215
216         entries := make([]adapter.StatEntry, 0, len(indexes))
217         for _, index := range indexes {
218                 if index >= dirLen {
219                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
220                 }
221                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
222                 if len(dirName) == 0 {
223                         continue
224                 }
225                 entry := adapter.StatEntry{
226                         Name: append([]byte(nil), dirName...),
227                         Type: adapter.StatType(dirType),
228                         Data: sc.CopyEntryData(dirPtr),
229                 }
230                 entries = append(entries, entry)
231         }
232         dir.Entries = entries
233
234         if !sc.accessEnd(accessEpoch) {
235                 return nil, adapter.ErrStatsDataBusy
236         }
237         dir.Epoch = accessEpoch
238
239         return dir, nil
240 }
241
242 // UpdateDir refreshes directory data for all counters
243 func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
244         epoch, _ := sc.GetEpoch()
245         if dir.Epoch != epoch {
246                 return adapter.ErrStatsDirStale
247         }
248
249         accessEpoch := sc.accessStart()
250         if accessEpoch == 0 {
251                 return adapter.ErrStatsAccessFailed
252         }
253
254         dirVector := sc.GetDirectoryVector()
255         if dirVector == nil {
256                 return err
257         }
258         for i, index := range dir.Indexes {
259                 statSegDir, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
260                 if len(dirName) == 0 {
261                         continue
262                 }
263                 entry := &dir.Entries[i]
264                 if !bytes.Equal(dirName, entry.Name) {
265                         continue
266                 }
267                 if adapter.StatType(dirType) != entry.Type {
268                         continue
269                 }
270                 if entry.Data == nil {
271                         continue
272                 }
273                 if err := sc.UpdateEntryData(statSegDir, &entry.Data); err != nil {
274                         return fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
275                 }
276         }
277         if !sc.accessEnd(accessEpoch) {
278                 return adapter.ErrStatsDataBusy
279         }
280
281         return nil
282 }
283
284 func (sc *StatsClient) checkSocketValid() error {
285         if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) {
286                 return fmt.Errorf("stats socket file %s does not exist", sc.socketPath)
287         } else if err != nil {
288                 return fmt.Errorf("stats socket error: %v", err)
289         }
290         return nil
291 }
292
293 // connect to the socket and map it into the memory. According to the
294 // header version info, an appropriate segment handler is returned
295 func (sc *StatsClient) connect() (ss statSegment, err error) {
296         addr := net.UnixAddr{
297                 Net:  "unixpacket",
298                 Name: sc.socketPath,
299         }
300         Log.Debugf("connecting to: %v", addr)
301
302         conn, err := net.DialUnix(addr.Net, nil, &addr)
303         if err != nil {
304                 Log.Warnf("connecting to socket %s failed: %s", addr, err)
305                 return nil, err
306         }
307         defer func() {
308                 if err := conn.Close(); err != nil {
309                         Log.Warnf("closing socket failed: %v", err)
310                 }
311         }()
312         Log.Debugf("connected to socket")
313
314         files, err := fd.Get(conn, 1, nil)
315         if err != nil {
316                 return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
317         }
318         if len(files) == 0 {
319                 return nil, fmt.Errorf("no files received over socket")
320         }
321
322         file := files[0]
323         defer func() {
324                 if err := file.Close(); err != nil {
325                         Log.Warnf("closing file failed: %v", err)
326                 }
327         }()
328
329         info, err := file.Stat()
330         if err != nil {
331                 return nil, err
332         }
333         size := info.Size()
334
335         sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
336         if err != nil {
337                 Log.Debugf("mapping shared memory failed: %v", err)
338                 return nil, fmt.Errorf("mapping shared memory failed: %v", err)
339         }
340         Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
341
342         version := getVersion(sc.headerData)
343         switch version {
344         case 1:
345                 ss = newStatSegmentV1(sc.headerData, size)
346         case 2:
347                 ss = newStatSegmentV2(sc.headerData, size)
348         default:
349                 return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
350                         version, minVersion, maxVersion)
351         }
352
353         return ss, nil
354 }
355
356 // reconnect disconnects from the socket, re-validates it and
357 // connects again
358 func (sc *StatsClient) reconnect() (err error) {
359         if err = sc.disconnect(); err != nil {
360                 return fmt.Errorf("error disconnecting socket: %v", err)
361         }
362         if err = sc.checkSocketValid(); err != nil {
363                 return fmt.Errorf("error validating socket: %v", err)
364         }
365         if sc.statSegment, err = sc.connect(); err != nil {
366                 return fmt.Errorf("error connecting socket: %v", err)
367         }
368         return nil
369 }
370
371 // disconnect unmaps socket data from the memory and resets the header
372 func (sc *StatsClient) disconnect() error {
373         if sc.headerData == nil {
374                 return nil
375         }
376         if err := syscall.Munmap(sc.headerData); err != nil {
377                 Log.Debugf("unmapping shared memory failed: %v", err)
378                 return fmt.Errorf("unmapping shared memory failed: %v", err)
379         }
380         sc.headerData = nil
381
382         Log.Debugf("successfully unmapped shared memory")
383         return nil
384 }
385
386 func (sc *StatsClient) monitorSocket() {
387         watcher, err := fsnotify.NewWatcher()
388         if err != nil {
389                 Log.Errorf("error starting socket monitor: %v", err)
390                 return
391         }
392
393         go func() {
394                 for {
395                         select {
396                         case event := <-watcher.Events:
397                                 if event.Op == fsnotify.Remove {
398                                         if err := sc.reconnect(); err != nil {
399                                                 Log.Errorf("error occurred during socket reconnect: %v", err)
400                                         }
401                                         // path must be re-added to the watcher
402                                         if err = watcher.Add(sc.socketPath); err != nil {
403                                                 Log.Errorf("failed to add socket address to the watcher: %v", err)
404                                         }
405                                 }
406                         case err := <-watcher.Errors:
407                                 Log.Errorf("socket monitor delivered error event: %v", err)
408                         case <-sc.done:
409                                 err := watcher.Close()
410                                 Log.Debugf("socket monitor closed (error: %v)", err)
411                                 return
412                         }
413                 }
414         }()
415
416         if err := watcher.Add(sc.socketPath); err != nil {
417                 Log.Errorf("failed to add socket address to the watcher: %v", err)
418         }
419 }
420
421 // Starts monitoring 'inProgress' field. Returns stats segment
422 // access epoch when completed, or zero value if not finished
423 // within MaxWaitInProgress
424 func (sc *StatsClient) accessStart() (epoch int64) {
425         t := time.Now()
426
427         epoch, inProg := sc.GetEpoch()
428         for inProg {
429                 if time.Since(t) > MaxWaitInProgress {
430                         return int64(0)
431                 }
432                 time.Sleep(CheckDelayInProgress)
433                 epoch, inProg = sc.GetEpoch()
434         }
435         return epoch
436 }
437
438 // AccessEnd returns true if stats data reading was finished, false
439 // otherwise
440 func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
441         epoch, inProgress := sc.GetEpoch()
442         if accessEpoch != epoch || inProgress {
443                 return false
444         }
445         return true
446 }
447
448 // listIndexes lists indexes for all stat entries that match any of the regex patterns.
449 func (sc *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
450         if len(patterns) == 0 {
451                 return sc.listIndexesFunc(nil)
452         }
453         var regexes = make([]*regexp.Regexp, len(patterns))
454         for i, pattern := range patterns {
455                 r, err := regexp.Compile(pattern)
456                 if err != nil {
457                         return nil, fmt.Errorf("compiling regexp failed: %v", err)
458                 }
459                 regexes[i] = r
460         }
461         nameMatches := func(name []byte) bool {
462                 for _, r := range regexes {
463                         if r.Match(name) {
464                                 return true
465                         }
466                 }
467                 return false
468         }
469         return sc.listIndexesFunc(nameMatches)
470 }
471
472 // listIndexesFunc lists stats indexes. The optional function
473 // argument filters returned values or returns all if empty
474 func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
475         if f == nil {
476                 // there is around ~3157 stats, so to avoid too many allocations
477                 // we set capacity to 3200 when listing all stats
478                 indexes = make([]uint32, 0, 3200)
479         }
480
481         dirVector := sc.GetDirectoryVector()
482         if dirVector == nil {
483                 return nil, err
484         }
485         vecLen := *(*uint32)(vectorLen(dirVector))
486
487         for i := uint32(0); i < vecLen; i++ {
488                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, i)
489                 if f != nil {
490                         if len(dirName) == 0 || !f(dirName) {
491                                 continue
492                         }
493                 }
494                 indexes = append(indexes, i)
495         }
496
497         return indexes, nil
498 }