Reload stats socket when VPP restarts
[govpp.git] / adapter / statsclient / statsclient.go
1 // Copyright (c) 2019 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package statsclient is pure Go implementation of VPP stats API client.
16 package statsclient
17
18 import (
19         "bytes"
20         "fmt"
21         "net"
22         "os"
23         "regexp"
24         "syscall"
25         "time"
26
27         "git.fd.io/govpp.git/adapter"
28         "github.com/fsnotify/fsnotify"
29         "github.com/ftrvxmtrx/fd"
30         logger "github.com/sirupsen/logrus"
31 )
32
33 const (
34         // DefaultSocketName is default VPP stats socket file path.
35         DefaultSocketName = adapter.DefaultStatsSocket
36 )
37
38 const socketMissing = `
39 ------------------------------------------------------------
40  VPP stats socket file %s is missing!
41
42   - is VPP running with stats segment enabled?
43   - is the correct socket name configured?
44
45  To enable it add following section to your VPP config:
46    statseg {
47      socket-name /run/vpp/stats.sock
48    }
49 ------------------------------------------------------------
50 `
51
52 var (
53         // Debug is global variable that determines debug mode
54         Debug = os.Getenv("DEBUG_GOVPP_STATS") != ""
55
56         // Log is global logger
57         Log = logger.New()
58 )
59
60 // init initializes global logger, which logs debug level messages to stdout.
61 func init() {
62         Log.Out = os.Stdout
63         if Debug {
64                 Log.Level = logger.DebugLevel
65                 Log.Debug("govpp/statsclient: enabled debug mode")
66         }
67 }
68
69 func debugf(f string, a ...interface{}) {
70         if Debug {
71                 Log.Debugf(f, a...)
72         }
73 }
74
75 // implements StatsAPI
76 var _ adapter.StatsAPI = (*StatsClient)(nil)
77
78 // StatsClient is the pure Go implementation for VPP stats API.
79 type StatsClient struct {
80         sockAddr    string
81         headerData  []byte
82         isConnected bool
83
84         // to quit socket monitor
85         done chan struct{}
86
87         statSegment
88 }
89
90 // NewStatsClient returns new VPP stats API client.
91 func NewStatsClient(sockAddr string) *StatsClient {
92         if sockAddr == "" {
93                 sockAddr = DefaultSocketName
94         }
95         return &StatsClient{
96                 sockAddr: sockAddr,
97         }
98 }
99
100 // Connect to validated VPP stats socket and start monitoring
101 // socket file changes
102 func (sc *StatsClient) Connect() (err error) {
103         if sc.isConnected {
104                 return fmt.Errorf("already connected")
105         }
106         if err := sc.validate(); err != nil {
107                 return err
108         }
109         sc.done = make(chan struct{})
110         sc.monitorSocket()
111         if sc.statSegment, err = sc.connect(); err != nil {
112                 return err
113         }
114         return nil
115 }
116
117 // Disconnect from the socket, unmap shared memory and terminate
118 // socket monitor
119 func (sc *StatsClient) Disconnect() error {
120         close(sc.done)
121         return sc.disconnect()
122 }
123
124 func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
125         accessEpoch := sc.accessStart()
126         if accessEpoch == 0 {
127                 return nil, adapter.ErrStatsAccessFailed
128         }
129
130         indexes, err := sc.listIndexes(patterns...)
131         if err != nil {
132                 return nil, err
133         }
134
135         dirVector := sc.GetDirectoryVector()
136         if dirVector == nil {
137                 return nil, fmt.Errorf("failed to list stats: %v", err)
138         }
139         vecLen := *(*uint32)(vectorLen(dirVector))
140
141         var names []string
142         for _, index := range indexes {
143                 if index >= vecLen {
144                         return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
145                 }
146                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, index)
147                 names = append(names, string(dirName))
148         }
149
150         if !sc.accessEnd(accessEpoch) {
151                 return nil, adapter.ErrStatsDataBusy
152         }
153
154         return names, nil
155 }
156
157 func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
158         accessEpoch := sc.accessStart()
159         if accessEpoch == 0 {
160                 return nil, adapter.ErrStatsAccessFailed
161         }
162
163         indexes, err := sc.listIndexes(patterns...)
164         if err != nil {
165                 return nil, err
166         }
167
168         dirVector := sc.GetDirectoryVector()
169         if dirVector == nil {
170                 return nil, err
171         }
172         dirLen := *(*uint32)(vectorLen(dirVector))
173
174         debugf("dumping entries for %d indexes", len(indexes))
175
176         entries = make([]adapter.StatEntry, 0, len(indexes))
177         for _, index := range indexes {
178                 if index >= dirLen {
179                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
180                 }
181                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
182                 if len(dirName) == 0 {
183                         continue
184                 }
185                 entry := adapter.StatEntry{
186                         Name: append([]byte(nil), dirName...),
187                         Type: adapter.StatType(dirType),
188                         Data: sc.CopyEntryData(dirPtr),
189                 }
190                 entries = append(entries, entry)
191         }
192
193         if !sc.accessEnd(accessEpoch) {
194                 return nil, adapter.ErrStatsDataBusy
195         }
196
197         return entries, nil
198 }
199
200 func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
201         dir := new(adapter.StatDir)
202
203         accessEpoch := sc.accessStart()
204         if accessEpoch == 0 {
205                 return nil, adapter.ErrStatsAccessFailed
206         }
207
208         indexes, err := sc.listIndexes(patterns...)
209         if err != nil {
210                 return nil, err
211         }
212         dir.Indexes = indexes
213
214         dirVector := sc.GetDirectoryVector()
215         if dirVector == nil {
216                 return nil, err
217         }
218         dirLen := *(*uint32)(vectorLen(dirVector))
219
220         debugf("dumping entries for %d indexes", len(indexes))
221
222         entries := make([]adapter.StatEntry, 0, len(indexes))
223         for _, index := range indexes {
224                 if index >= dirLen {
225                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
226                 }
227                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
228                 if len(dirName) == 0 {
229                         continue
230                 }
231                 entry := adapter.StatEntry{
232                         Name: append([]byte(nil), dirName...),
233                         Type: adapter.StatType(dirType),
234                         Data: sc.CopyEntryData(dirPtr),
235                 }
236                 entries = append(entries, entry)
237         }
238         dir.Entries = entries
239
240         if !sc.accessEnd(accessEpoch) {
241                 return nil, adapter.ErrStatsDataBusy
242         }
243         dir.Epoch = accessEpoch
244
245         return dir, nil
246 }
247
248 // UpdateDir refreshes directory data for all counters
249 func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
250         epoch, _ := sc.GetEpoch()
251         if dir.Epoch != epoch {
252                 return adapter.ErrStatsDirStale
253         }
254
255         accessEpoch := sc.accessStart()
256         if accessEpoch == 0 {
257                 return adapter.ErrStatsAccessFailed
258         }
259
260         dirVector := sc.GetDirectoryVector()
261         if dirVector == nil {
262                 return err
263         }
264         for i, index := range dir.Indexes {
265                 statSegDir, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
266                 if len(dirName) == 0 {
267                         continue
268                 }
269                 entry := &dir.Entries[i]
270                 if !bytes.Equal(dirName, entry.Name) {
271                         continue
272                 }
273                 if adapter.StatType(dirType) != entry.Type {
274                         continue
275                 }
276                 if entry.Data == nil {
277                         continue
278                 }
279                 if err := sc.UpdateEntryData(statSegDir, &entry.Data); err != nil {
280                         return fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
281                 }
282         }
283         if !sc.accessEnd(accessEpoch) {
284                 return adapter.ErrStatsDataBusy
285         }
286
287         return nil
288 }
289
290 // validate file presence by retrieving its file info
291 func (sc *StatsClient) validate() error {
292         if _, err := os.Stat(sc.sockAddr); os.IsNotExist(err) {
293                 fmt.Fprintf(os.Stderr, socketMissing, sc.sockAddr)
294                 return fmt.Errorf("stats socket file %s does not exist", sc.sockAddr)
295         } else if err != nil {
296                 return fmt.Errorf("stats socket error: %v", err)
297         }
298         return nil
299 }
300
301 // connect to the socket and map it into the memory. According to the
302 // header version info, an appropriate segment handler is returned
303 func (sc *StatsClient) connect() (ss statSegment, err error) {
304         addr := net.UnixAddr{
305                 Net:  "unixpacket",
306                 Name: sc.sockAddr,
307         }
308         Log.Debugf("connecting to: %v", addr)
309
310         conn, err := net.DialUnix(addr.Net, nil, &addr)
311         if err != nil {
312                 Log.Warnf("connecting to socket %s failed: %s", addr, err)
313                 return nil, err
314         }
315         defer func() {
316                 if err := conn.Close(); err != nil {
317                         Log.Warnf("closing socket failed: %v", err)
318                 }
319         }()
320         Log.Debugf("connected to socket")
321
322         files, err := fd.Get(conn, 1, nil)
323         if err != nil {
324                 return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
325         }
326         if len(files) == 0 {
327                 return nil, fmt.Errorf("no files received over socket")
328         }
329
330         file := files[0]
331         defer func() {
332                 if err := file.Close(); err != nil {
333                         Log.Warnf("closing file failed: %v", err)
334                 }
335         }()
336
337         info, err := file.Stat()
338         if err != nil {
339                 return nil, err
340         }
341         size := info.Size()
342
343         sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
344         if err != nil {
345                 Log.Debugf("mapping shared memory failed: %v", err)
346                 return nil, fmt.Errorf("mapping shared memory failed: %v", err)
347         }
348         Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
349
350         version := getVersion(sc.headerData)
351         switch version {
352         case 1:
353                 ss = newStatSegmentV1(sc.headerData, size)
354         case 2:
355                 ss = newStatSegmentV2(sc.headerData, size)
356         default:
357                 return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
358                         version, minVersion, maxVersion)
359         }
360         sc.isConnected = true
361         return ss, nil
362 }
363
364 // reconnect disconnects from the socket, re-validates it and
365 // connects again
366 func (sc *StatsClient) reconnect() (err error) {
367         if err = sc.disconnect(); err != nil {
368                 return fmt.Errorf("error disconnecting socket: %v", err)
369         }
370         if err = sc.validate(); err != nil {
371                 return fmt.Errorf("error validating socket: %v", err)
372         }
373         if sc.statSegment, err = sc.connect(); err != nil {
374                 return fmt.Errorf("error connecting socket: %v", err)
375         }
376         return nil
377 }
378
379 // disconnect unmaps socket data from the memory and resets the header
380 func (sc *StatsClient) disconnect() error {
381         sc.isConnected = false
382         if sc.headerData == nil {
383                 return nil
384         }
385         if err := syscall.Munmap(sc.headerData); err != nil {
386                 Log.Debugf("unmapping shared memory failed: %v", err)
387                 return fmt.Errorf("unmapping shared memory failed: %v", err)
388         }
389         sc.headerData = nil
390
391         Log.Debugf("successfully unmapped shared memory")
392         return nil
393 }
394
395 func (sc *StatsClient) monitorSocket() {
396         watcher, err := fsnotify.NewWatcher()
397         if err != nil {
398                 Log.Errorf("error starting socket monitor: %v", err)
399                 return
400         }
401
402         go func() {
403                 for {
404                         select {
405                         case event := <-watcher.Events:
406                                 if event.Op == fsnotify.Remove {
407                                         if err := sc.reconnect(); err != nil {
408                                                 Log.Errorf("error occurred during socket reconnect: %v", err)
409                                         }
410                                         // path must be re-added to the watcher
411                                         if err = watcher.Add(sc.sockAddr); err != nil {
412                                                 Log.Errorf("failed to add socket address to the watcher: %v", err)
413                                         }
414                                 }
415                         case err := <-watcher.Errors:
416                                 Log.Errorf("socket monitor delivered error event: %v", err)
417                         case <-sc.done:
418                                 err := watcher.Close()
419                                 Log.Debugf("socket monitor closed (error: %v)", err)
420                                 return
421                         }
422                 }
423         }()
424
425         if err := watcher.Add(sc.sockAddr); err != nil {
426                 Log.Errorf("failed to add socket address to the watcher: %v", err)
427         }
428 }
429
430 // Starts monitoring 'inProgress' field. Returns stats segment
431 // access epoch when completed, or zero value if not finished
432 // within MaxWaitInProgress
433 func (sc *StatsClient) accessStart() (epoch int64) {
434         t := time.Now()
435
436         epoch, inProg := sc.GetEpoch()
437         for inProg {
438                 if time.Since(t) > MaxWaitInProgress {
439                         return int64(0)
440                 }
441                 time.Sleep(CheckDelayInProgress)
442                 epoch, inProg = sc.GetEpoch()
443         }
444         return epoch
445 }
446
447 // AccessEnd returns true if stats data reading was finished, false
448 // otherwise
449 func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
450         epoch, inProgress := sc.GetEpoch()
451         if accessEpoch != epoch || inProgress {
452                 return false
453         }
454         return true
455 }
456
457 // listIndexes lists indexes for all stat entries that match any of the regex patterns.
458 func (sc *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
459         if len(patterns) == 0 {
460                 return sc.listIndexesFunc(nil)
461         }
462         var regexes = make([]*regexp.Regexp, len(patterns))
463         for i, pattern := range patterns {
464                 r, err := regexp.Compile(pattern)
465                 if err != nil {
466                         return nil, fmt.Errorf("compiling regexp failed: %v", err)
467                 }
468                 regexes[i] = r
469         }
470         nameMatches := func(name []byte) bool {
471                 for _, r := range regexes {
472                         if r.Match(name) {
473                                 return true
474                         }
475                 }
476                 return false
477         }
478         return sc.listIndexesFunc(nameMatches)
479 }
480
481 // listIndexesFunc lists stats indexes. The optional function
482 // argument filters returned values or returns all if empty
483 func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
484         if f == nil {
485                 // there is around ~3157 stats, so to avoid too many allocations
486                 // we set capacity to 3200 when listing all stats
487                 indexes = make([]uint32, 0, 3200)
488         }
489
490         dirVector := sc.GetDirectoryVector()
491         if dirVector == nil {
492                 return nil, err
493         }
494         vecLen := *(*uint32)(vectorLen(dirVector))
495
496         for i := uint32(0); i < vecLen; i++ {
497                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, i)
498                 if f != nil {
499                         if len(dirName) == 0 || !f(dirName) {
500                                 continue
501                         }
502                 }
503                 indexes = append(indexes, i)
504         }
505
506         return indexes, nil
507 }