Add statsclient options and fix wait for socket
[govpp.git] / adapter / statsclient / statsclient.go
1 // Copyright (c) 2019 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package statsclient is pure Go implementation of VPP stats API client.
16 package statsclient
17
18 import (
19         "bytes"
20         "fmt"
21         "net"
22         "os"
23         "path/filepath"
24         "regexp"
25         "sync/atomic"
26         "syscall"
27         "time"
28
29         "git.fd.io/govpp.git/adapter"
30         "github.com/fsnotify/fsnotify"
31         "github.com/ftrvxmtrx/fd"
32         logger "github.com/sirupsen/logrus"
33 )
34
35 const (
36         // DefaultSocketName is default VPP stats socket file path.
37         DefaultSocketName = adapter.DefaultStatsSocket
38
39         // DefaultSocketRetryPeriod is the time period after the socket availability
40         // will be re-checked
41         DefaultSocketRetryPeriod = 50 * time.Millisecond
42
43         // DefaultSocketRetryTimeout is the maximum time for the stats socket
44         DefaultSocketRetryTimeout = 3 * time.Second
45 )
46
47 var (
48         // Debug is global variable that determines debug mode
49         Debug = os.Getenv("DEBUG_GOVPP_STATS") != ""
50
51         // Log is global logger
52         Log = logger.New()
53 )
54
55 // init initializes global logger, which logs debug level messages to stdout.
56 func init() {
57         Log.Out = os.Stdout
58         if Debug {
59                 Log.Level = logger.DebugLevel
60                 Log.Debug("govpp/statsclient: enabled debug mode")
61         }
62 }
63
64 func debugf(f string, a ...interface{}) {
65         if Debug {
66                 Log.Debugf(f, a...)
67         }
68 }
69
70 // implements StatsAPI
71 var _ adapter.StatsAPI = (*StatsClient)(nil)
72
73 // StatsClient is the pure Go implementation for VPP stats API.
74 type StatsClient struct {
75         socket       string
76         retryPeriod  time.Duration
77         retryTimeout time.Duration
78
79         headerData []byte
80
81         // defines the adapter connection state
82         connected uint32
83
84         // to quit socket monitor
85         done chan struct{}
86
87         statSegment
88 }
89
90 // Option is a StatsClient option
91 type Option func(*StatsClient)
92
93 // SetSocketRetryPeriod is and optional parameter to define a custom
94 // retry period while waiting for the VPP socket
95 func SetSocketRetryPeriod(t time.Duration) Option {
96         return func(c *StatsClient) {
97                 c.retryPeriod = t
98         }
99 }
100
101 // SetSocketRetryTimeout is and optional parameter to define a custom
102 // timeout while waiting for the VPP socket
103 func SetSocketRetryTimeout(t time.Duration) Option {
104         return func(c *StatsClient) {
105                 c.retryTimeout = t
106         }
107 }
108
109 // NewStatsClient returns a new StatsClient using socket.
110 // If socket is empty string DefaultSocketName is used.
111 func NewStatsClient(socket string, options ...Option) *StatsClient {
112         if socket == "" {
113                 socket = DefaultSocketName
114         }
115         s := &StatsClient{
116                 socket: socket,
117         }
118         for _, option := range options {
119                 option(s)
120         }
121         if s.retryPeriod == 0 {
122                 s.retryPeriod = DefaultSocketRetryPeriod
123         }
124         if s.retryTimeout == 0 {
125                 s.retryTimeout = DefaultSocketRetryTimeout
126         }
127         return s
128 }
129
130 // Connect to validated VPP stats socket and start monitoring
131 // socket file changes
132 func (sc *StatsClient) Connect() (err error) {
133         if err := sc.waitForSocket(); err != nil {
134                 return err
135         }
136         sc.done = make(chan struct{})
137         if sc.statSegment, err = sc.connect(); err != nil {
138                 return err
139         }
140         sc.monitorSocket()
141         return nil
142 }
143
144 // Disconnect from the socket, unmap shared memory and terminate
145 // socket monitor
146 func (sc *StatsClient) Disconnect() error {
147         if sc.headerData == nil {
148                 return nil
149         }
150         if err := syscall.Munmap(sc.headerData); err != nil {
151                 Log.Debugf("unmapping shared memory failed: %v", err)
152                 return fmt.Errorf("unmapping shared memory failed: %v", err)
153         }
154         sc.headerData = nil
155
156         Log.Debugf("successfully unmapped shared memory")
157         return nil
158 }
159
160 func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
161         if !sc.isConnected() {
162                 return nil, adapter.ErrStatsDisconnected
163         }
164         accessEpoch := sc.accessStart()
165         if accessEpoch == 0 {
166                 return nil, adapter.ErrStatsAccessFailed
167         }
168
169         indexes, err := sc.listIndexes(patterns...)
170         if err != nil {
171                 return nil, err
172         }
173
174         dirVector := sc.GetDirectoryVector()
175         if dirVector == nil {
176                 return nil, fmt.Errorf("failed to list stats: %v", err)
177         }
178         vecLen := *(*uint32)(vectorLen(dirVector))
179
180         var names []string
181         for _, index := range indexes {
182                 if index >= vecLen {
183                         return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
184                 }
185                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, index)
186                 names = append(names, string(dirName))
187         }
188
189         if !sc.accessEnd(accessEpoch) {
190                 return nil, adapter.ErrStatsDataBusy
191         }
192
193         return names, nil
194 }
195
196 func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
197         if !sc.isConnected() {
198                 return nil, adapter.ErrStatsDisconnected
199         }
200         accessEpoch := sc.accessStart()
201         if accessEpoch == 0 {
202                 return nil, adapter.ErrStatsAccessFailed
203         }
204
205         indexes, err := sc.listIndexes(patterns...)
206         if err != nil {
207                 return nil, err
208         }
209
210         dirVector := sc.GetDirectoryVector()
211         if dirVector == nil {
212                 return nil, err
213         }
214         dirLen := *(*uint32)(vectorLen(dirVector))
215
216         debugf("dumping entries for %d indexes", len(indexes))
217
218         entries = make([]adapter.StatEntry, 0, len(indexes))
219         for _, index := range indexes {
220                 if index >= dirLen {
221                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
222                 }
223                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
224                 if len(dirName) == 0 {
225                         continue
226                 }
227                 entry := adapter.StatEntry{
228                         Name: append([]byte(nil), dirName...),
229                         Type: adapter.StatType(dirType),
230                         Data: sc.CopyEntryData(dirPtr),
231                 }
232                 entries = append(entries, entry)
233         }
234
235         if !sc.accessEnd(accessEpoch) {
236                 return nil, adapter.ErrStatsDataBusy
237         }
238
239         return entries, nil
240 }
241
242 func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
243         if !sc.isConnected() {
244                 return nil, adapter.ErrStatsDisconnected
245         }
246         dir := new(adapter.StatDir)
247
248         accessEpoch := sc.accessStart()
249         if accessEpoch == 0 {
250                 return nil, adapter.ErrStatsAccessFailed
251         }
252
253         indexes, err := sc.listIndexes(patterns...)
254         if err != nil {
255                 return nil, err
256         }
257         dir.Indexes = indexes
258
259         dirVector := sc.GetDirectoryVector()
260         if dirVector == nil {
261                 return nil, err
262         }
263         dirLen := *(*uint32)(vectorLen(dirVector))
264
265         debugf("dumping entries for %d indexes", len(indexes))
266
267         entries := make([]adapter.StatEntry, 0, len(indexes))
268         for _, index := range indexes {
269                 if index >= dirLen {
270                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
271                 }
272                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
273                 if len(dirName) == 0 {
274                         continue
275                 }
276                 entry := adapter.StatEntry{
277                         Name: append([]byte(nil), dirName...),
278                         Type: adapter.StatType(dirType),
279                         Data: sc.CopyEntryData(dirPtr),
280                 }
281                 entries = append(entries, entry)
282         }
283         dir.Entries = entries
284
285         if !sc.accessEnd(accessEpoch) {
286                 return nil, adapter.ErrStatsDataBusy
287         }
288         dir.Epoch = accessEpoch
289
290         return dir, nil
291 }
292
293 // UpdateDir refreshes directory data for all counters
294 func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
295         if !sc.isConnected() {
296                 return adapter.ErrStatsDisconnected
297         }
298         epoch, _ := sc.GetEpoch()
299         if dir.Epoch != epoch {
300                 return adapter.ErrStatsDirStale
301         }
302
303         accessEpoch := sc.accessStart()
304         if accessEpoch == 0 {
305                 return adapter.ErrStatsAccessFailed
306         }
307
308         dirVector := sc.GetDirectoryVector()
309         if dirVector == nil {
310                 return err
311         }
312         for i, index := range dir.Indexes {
313                 statSegDir, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
314                 if len(dirName) == 0 {
315                         continue
316                 }
317                 entry := &dir.Entries[i]
318                 if !bytes.Equal(dirName, entry.Name) {
319                         continue
320                 }
321                 if adapter.StatType(dirType) != entry.Type {
322                         continue
323                 }
324                 if entry.Data == nil {
325                         continue
326                 }
327                 if err := sc.UpdateEntryData(statSegDir, &entry.Data); err != nil {
328                         return fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
329                 }
330         }
331         if !sc.accessEnd(accessEpoch) {
332                 return adapter.ErrStatsDataBusy
333         }
334
335         return nil
336 }
337
338 // checks the socket existence and waits for it for the designated
339 // time if it is not available immediately
340 func (sc *StatsClient) waitForSocket() error {
341         if _, err := os.Stat(sc.socket); err != nil {
342                 if os.IsNotExist(err) {
343                         n := time.Now()
344                         ticker := time.NewTicker(sc.retryPeriod)
345                         timeout := time.After(sc.retryTimeout)
346                         for {
347                                 select {
348                                 case <-ticker.C:
349                                         if _, err := os.Stat(sc.socket); err == nil {
350                                                 return nil
351                                         }
352                                 case <-timeout:
353                                         return fmt.Errorf("stats socket file %s is not ready within timeout (after %.2f s) ",
354                                                 sc.socket, time.Since(n).Seconds())
355                                 }
356                         }
357                 } else {
358                         return fmt.Errorf("stats socket error: %v", err)
359                 }
360         }
361         return nil
362 }
363
364 // connect to the socket and map it into the memory. According to the
365 // header version info, an appropriate segment handler is returned
366 func (sc *StatsClient) connect() (ss statSegment, err error) {
367         addr := net.UnixAddr{
368                 Net:  "unixpacket",
369                 Name: sc.socket,
370         }
371         Log.Debugf("connecting to: %v", addr)
372
373         conn, err := net.DialUnix(addr.Net, nil, &addr)
374         if err != nil {
375                 Log.Warnf("connecting to socket %s failed: %s", addr, err)
376                 return nil, err
377         }
378         defer func() {
379                 if err := conn.Close(); err != nil {
380                         Log.Warnf("closing socket failed: %v", err)
381                 }
382         }()
383         Log.Debugf("connected to socket")
384
385         files, err := fd.Get(conn, 1, nil)
386         if err != nil {
387                 return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
388         }
389         if len(files) == 0 {
390                 return nil, fmt.Errorf("no files received over socket")
391         }
392
393         file := files[0]
394         defer func() {
395                 if err := file.Close(); err != nil {
396                         Log.Warnf("closing file failed: %v", err)
397                 }
398         }()
399
400         info, err := file.Stat()
401         if err != nil {
402                 return nil, err
403         }
404         size := info.Size()
405
406         sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
407         if err != nil {
408                 Log.Debugf("mapping shared memory failed: %v", err)
409                 return nil, fmt.Errorf("mapping shared memory failed: %v", err)
410         }
411         Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
412
413         version := getVersion(sc.headerData)
414         switch version {
415         case 1:
416                 ss = newStatSegmentV1(sc.headerData, size)
417         case 2:
418                 ss = newStatSegmentV2(sc.headerData, size)
419         default:
420                 return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
421                         version, minVersion, maxVersion)
422         }
423
424         // set connected
425         atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
426
427         return ss, nil
428 }
429
430 // reconnect disconnects from the socket, re-validates it and
431 // connects again
432 func (sc *StatsClient) reconnect() (err error) {
433         if err = sc.disconnect(); err != nil {
434                 return fmt.Errorf("error disconnecting socket: %v", err)
435         }
436         if err = sc.waitForSocket(); err != nil {
437                 return fmt.Errorf("error while waiting on socket: %v", err)
438         }
439         if sc.statSegment, err = sc.connect(); err != nil {
440                 return fmt.Errorf("error connecting socket: %v", err)
441         }
442         return nil
443 }
444
445 // disconnect unmaps socket data from the memory and resets the header
446 func (sc *StatsClient) disconnect() error {
447         if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
448                 return fmt.Errorf("stats client is already disconnected")
449         }
450         if sc.headerData == nil {
451                 return nil
452         }
453         if err := syscall.Munmap(sc.headerData); err != nil {
454                 Log.Debugf("unmapping shared memory failed: %v", err)
455                 return fmt.Errorf("unmapping shared memory failed: %v", err)
456         }
457         sc.headerData = nil
458
459         Log.Debugf("successfully unmapped shared memory")
460         return nil
461 }
462
463 func (sc *StatsClient) monitorSocket() {
464         watcher, err := fsnotify.NewWatcher()
465         if err != nil {
466                 Log.Errorf("error starting socket monitor: %v", err)
467                 return
468         }
469
470         go func() {
471                 for {
472                         select {
473                         case event := <-watcher.Events:
474                                 if event.Op == fsnotify.Remove && event.Name == sc.socket {
475                                         if err := sc.reconnect(); err != nil {
476                                                 Log.Errorf("error occurred during socket reconnect: %v", err)
477                                         }
478                                 }
479                         case err := <-watcher.Errors:
480                                 Log.Errorf("socket monitor delivered error event: %v", err)
481                         case <-sc.done:
482                                 err := watcher.Close()
483                                 Log.Debugf("socket monitor closed (error: %v)", err)
484                                 return
485                         }
486                 }
487         }()
488
489         if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
490                 Log.Errorf("failed to add socket address to the watcher: %v", err)
491         }
492 }
493
494 // Starts monitoring 'inProgress' field. Returns stats segment
495 // access epoch when completed, or zero value if not finished
496 // within MaxWaitInProgress
497 func (sc *StatsClient) accessStart() (epoch int64) {
498         t := time.Now()
499
500         epoch, inProg := sc.GetEpoch()
501         for inProg {
502                 if time.Since(t) > MaxWaitInProgress {
503                         return int64(0)
504                 }
505                 time.Sleep(CheckDelayInProgress)
506                 epoch, inProg = sc.GetEpoch()
507         }
508         return epoch
509 }
510
511 // AccessEnd returns true if stats data reading was finished, false
512 // otherwise
513 func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
514         epoch, inProgress := sc.GetEpoch()
515         if accessEpoch != epoch || inProgress {
516                 return false
517         }
518         return true
519 }
520
521 // listIndexes lists indexes for all stat entries that match any of the regex patterns.
522 func (sc *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
523         if len(patterns) == 0 {
524                 return sc.listIndexesFunc(nil)
525         }
526         var regexes = make([]*regexp.Regexp, len(patterns))
527         for i, pattern := range patterns {
528                 r, err := regexp.Compile(pattern)
529                 if err != nil {
530                         return nil, fmt.Errorf("compiling regexp failed: %v", err)
531                 }
532                 regexes[i] = r
533         }
534         nameMatches := func(name []byte) bool {
535                 for _, r := range regexes {
536                         if r.Match(name) {
537                                 return true
538                         }
539                 }
540                 return false
541         }
542         return sc.listIndexesFunc(nameMatches)
543 }
544
545 // listIndexesFunc lists stats indexes. The optional function
546 // argument filters returned values or returns all if empty
547 func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
548         if f == nil {
549                 // there is around ~3157 stats, so to avoid too many allocations
550                 // we set capacity to 3200 when listing all stats
551                 indexes = make([]uint32, 0, 3200)
552         }
553
554         dirVector := sc.GetDirectoryVector()
555         if dirVector == nil {
556                 return nil, err
557         }
558         vecLen := *(*uint32)(vectorLen(dirVector))
559
560         for i := uint32(0); i < vecLen; i++ {
561                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, i)
562                 if f != nil {
563                         if len(dirName) == 0 || !f(dirName) {
564                                 continue
565                         }
566                 }
567                 indexes = append(indexes, i)
568         }
569
570         return indexes, nil
571 }
572
573 func (sc *StatsClient) isConnected() bool {
574         return atomic.LoadUint32(&sc.connected) == 1
575 }