Added asynchronous connection for stats socket
[govpp.git] / adapter / statsclient / statsclient.go
1 // Copyright (c) 2019 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package statsclient is pure Go implementation of VPP stats API client.
16 package statsclient
17
18 import (
19         "bytes"
20         "fmt"
21         "net"
22         "os"
23         "path/filepath"
24         "regexp"
25         "sync/atomic"
26         "syscall"
27         "time"
28
29         "git.fd.io/govpp.git/adapter"
30         "github.com/fsnotify/fsnotify"
31         "github.com/ftrvxmtrx/fd"
32         logger "github.com/sirupsen/logrus"
33 )
34
35 const (
36         // DefaultSocketName is default VPP stats socket file path.
37         DefaultSocketName = adapter.DefaultStatsSocket
38
39         // SocketRetryPeriod is the time period after the socket availability
40         // will be re-checked
41         SocketRetryPeriod = 50 * time.Millisecond
42
43         // SocketRetryTimeout is the maximum time for the stats socket
44         SocketRetryTimeout = 3 * time.Second
45 )
46
47 var (
48         // Debug is global variable that determines debug mode
49         Debug = os.Getenv("DEBUG_GOVPP_STATS") != ""
50
51         // Log is global logger
52         Log = logger.New()
53 )
54
55 // init initializes global logger, which logs debug level messages to stdout.
56 func init() {
57         Log.Out = os.Stdout
58         if Debug {
59                 Log.Level = logger.DebugLevel
60                 Log.Debug("govpp/statsclient: enabled debug mode")
61         }
62 }
63
64 func debugf(f string, a ...interface{}) {
65         if Debug {
66                 Log.Debugf(f, a...)
67         }
68 }
69
70 // implements StatsAPI
71 var _ adapter.StatsAPI = (*StatsClient)(nil)
72
73 // StatsClient is the pure Go implementation for VPP stats API.
74 type StatsClient struct {
75         socket string
76
77         headerData []byte
78
79         // defines the adapter connection state
80         connected uint32
81
82         // to quit socket monitor
83         done chan struct{}
84
85         statSegment
86 }
87
88 // NewStatsClient returns a new StatsClient using socket.
89 // If socket is empty string DefaultSocketName is used.
90 func NewStatsClient(socket string) *StatsClient {
91         if socket == "" {
92                 socket = DefaultSocketName
93         }
94         return &StatsClient{
95                 socket: socket,
96         }
97 }
98
99 // Connect to validated VPP stats socket and start monitoring
100 // socket file changes
101 func (sc *StatsClient) Connect() (err error) {
102         if err := sc.waitForSocket(); err != nil {
103                 return err
104         }
105         sc.done = make(chan struct{})
106         if sc.statSegment, err = sc.connect(); err != nil {
107                 return err
108         }
109         sc.monitorSocket()
110         return nil
111 }
112
113 // Disconnect from the socket, unmap shared memory and terminate
114 // socket monitor
115 func (sc *StatsClient) Disconnect() error {
116         if sc.headerData == nil {
117                 return nil
118         }
119         if err := syscall.Munmap(sc.headerData); err != nil {
120                 Log.Debugf("unmapping shared memory failed: %v", err)
121                 return fmt.Errorf("unmapping shared memory failed: %v", err)
122         }
123         sc.headerData = nil
124
125         Log.Debugf("successfully unmapped shared memory")
126         return nil
127 }
128
129 func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
130         if !sc.isConnected() {
131                 return nil, adapter.ErrStatsDisconnected
132         }
133         accessEpoch := sc.accessStart()
134         if accessEpoch == 0 {
135                 return nil, adapter.ErrStatsAccessFailed
136         }
137
138         indexes, err := sc.listIndexes(patterns...)
139         if err != nil {
140                 return nil, err
141         }
142
143         dirVector := sc.GetDirectoryVector()
144         if dirVector == nil {
145                 return nil, fmt.Errorf("failed to list stats: %v", err)
146         }
147         vecLen := *(*uint32)(vectorLen(dirVector))
148
149         var names []string
150         for _, index := range indexes {
151                 if index >= vecLen {
152                         return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
153                 }
154                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, index)
155                 names = append(names, string(dirName))
156         }
157
158         if !sc.accessEnd(accessEpoch) {
159                 return nil, adapter.ErrStatsDataBusy
160         }
161
162         return names, nil
163 }
164
165 func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
166         if !sc.isConnected() {
167                 return nil, adapter.ErrStatsDisconnected
168         }
169         accessEpoch := sc.accessStart()
170         if accessEpoch == 0 {
171                 return nil, adapter.ErrStatsAccessFailed
172         }
173
174         indexes, err := sc.listIndexes(patterns...)
175         if err != nil {
176                 return nil, err
177         }
178
179         dirVector := sc.GetDirectoryVector()
180         if dirVector == nil {
181                 return nil, err
182         }
183         dirLen := *(*uint32)(vectorLen(dirVector))
184
185         debugf("dumping entries for %d indexes", len(indexes))
186
187         entries = make([]adapter.StatEntry, 0, len(indexes))
188         for _, index := range indexes {
189                 if index >= dirLen {
190                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
191                 }
192                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
193                 if len(dirName) == 0 {
194                         continue
195                 }
196                 entry := adapter.StatEntry{
197                         Name: append([]byte(nil), dirName...),
198                         Type: adapter.StatType(dirType),
199                         Data: sc.CopyEntryData(dirPtr),
200                 }
201                 entries = append(entries, entry)
202         }
203
204         if !sc.accessEnd(accessEpoch) {
205                 return nil, adapter.ErrStatsDataBusy
206         }
207
208         return entries, nil
209 }
210
211 func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
212         if !sc.isConnected() {
213                 return nil, adapter.ErrStatsDisconnected
214         }
215         dir := new(adapter.StatDir)
216
217         accessEpoch := sc.accessStart()
218         if accessEpoch == 0 {
219                 return nil, adapter.ErrStatsAccessFailed
220         }
221
222         indexes, err := sc.listIndexes(patterns...)
223         if err != nil {
224                 return nil, err
225         }
226         dir.Indexes = indexes
227
228         dirVector := sc.GetDirectoryVector()
229         if dirVector == nil {
230                 return nil, err
231         }
232         dirLen := *(*uint32)(vectorLen(dirVector))
233
234         debugf("dumping entries for %d indexes", len(indexes))
235
236         entries := make([]adapter.StatEntry, 0, len(indexes))
237         for _, index := range indexes {
238                 if index >= dirLen {
239                         return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
240                 }
241                 dirPtr, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
242                 if len(dirName) == 0 {
243                         continue
244                 }
245                 entry := adapter.StatEntry{
246                         Name: append([]byte(nil), dirName...),
247                         Type: adapter.StatType(dirType),
248                         Data: sc.CopyEntryData(dirPtr),
249                 }
250                 entries = append(entries, entry)
251         }
252         dir.Entries = entries
253
254         if !sc.accessEnd(accessEpoch) {
255                 return nil, adapter.ErrStatsDataBusy
256         }
257         dir.Epoch = accessEpoch
258
259         return dir, nil
260 }
261
262 // UpdateDir refreshes directory data for all counters
263 func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
264         if !sc.isConnected() {
265                 return adapter.ErrStatsDisconnected
266         }
267         epoch, _ := sc.GetEpoch()
268         if dir.Epoch != epoch {
269                 return adapter.ErrStatsDirStale
270         }
271
272         accessEpoch := sc.accessStart()
273         if accessEpoch == 0 {
274                 return adapter.ErrStatsAccessFailed
275         }
276
277         dirVector := sc.GetDirectoryVector()
278         if dirVector == nil {
279                 return err
280         }
281         for i, index := range dir.Indexes {
282                 statSegDir, dirName, dirType := sc.GetStatDirOnIndex(dirVector, index)
283                 if len(dirName) == 0 {
284                         continue
285                 }
286                 entry := &dir.Entries[i]
287                 if !bytes.Equal(dirName, entry.Name) {
288                         continue
289                 }
290                 if adapter.StatType(dirType) != entry.Type {
291                         continue
292                 }
293                 if entry.Data == nil {
294                         continue
295                 }
296                 if err := sc.UpdateEntryData(statSegDir, &entry.Data); err != nil {
297                         return fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
298                 }
299         }
300         if !sc.accessEnd(accessEpoch) {
301                 return adapter.ErrStatsDataBusy
302         }
303
304         return nil
305 }
306
307 // checks the socket existence and waits for it for the designated
308 // time if it is not available immediately
309 func (sc *StatsClient) waitForSocket() error {
310         if _, err := os.Stat(sc.socket); err != nil {
311                 if os.IsNotExist(err) {
312                         ticker := time.NewTicker(SocketRetryPeriod)
313                         for {
314                                 select {
315                                 case <-ticker.C:
316                                         if _, err := os.Stat(sc.socket); err == nil {
317                                                 return nil
318                                         }
319                                 case <-time.After(SocketRetryTimeout):
320                                         return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket)
321                                 }
322                         }
323                 } else {
324                         return fmt.Errorf("stats socket error: %v", err)
325                 }
326         }
327         return nil
328 }
329
330 // connect to the socket and map it into the memory. According to the
331 // header version info, an appropriate segment handler is returned
332 func (sc *StatsClient) connect() (ss statSegment, err error) {
333         addr := net.UnixAddr{
334                 Net:  "unixpacket",
335                 Name: sc.socket,
336         }
337         Log.Debugf("connecting to: %v", addr)
338
339         conn, err := net.DialUnix(addr.Net, nil, &addr)
340         if err != nil {
341                 Log.Warnf("connecting to socket %s failed: %s", addr, err)
342                 return nil, err
343         }
344         defer func() {
345                 if err := conn.Close(); err != nil {
346                         Log.Warnf("closing socket failed: %v", err)
347                 }
348         }()
349         Log.Debugf("connected to socket")
350
351         files, err := fd.Get(conn, 1, nil)
352         if err != nil {
353                 return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
354         }
355         if len(files) == 0 {
356                 return nil, fmt.Errorf("no files received over socket")
357         }
358
359         file := files[0]
360         defer func() {
361                 if err := file.Close(); err != nil {
362                         Log.Warnf("closing file failed: %v", err)
363                 }
364         }()
365
366         info, err := file.Stat()
367         if err != nil {
368                 return nil, err
369         }
370         size := info.Size()
371
372         sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
373         if err != nil {
374                 Log.Debugf("mapping shared memory failed: %v", err)
375                 return nil, fmt.Errorf("mapping shared memory failed: %v", err)
376         }
377         Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
378
379         version := getVersion(sc.headerData)
380         switch version {
381         case 1:
382                 ss = newStatSegmentV1(sc.headerData, size)
383         case 2:
384                 ss = newStatSegmentV2(sc.headerData, size)
385         default:
386                 return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
387                         version, minVersion, maxVersion)
388         }
389
390         // set connected
391         atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
392
393         return ss, nil
394 }
395
396 // reconnect disconnects from the socket, re-validates it and
397 // connects again
398 func (sc *StatsClient) reconnect() (err error) {
399         if err = sc.disconnect(); err != nil {
400                 return fmt.Errorf("error disconnecting socket: %v", err)
401         }
402         if err = sc.waitForSocket(); err != nil {
403                 return fmt.Errorf("error while waiting on socket: %v", err)
404         }
405         if sc.statSegment, err = sc.connect(); err != nil {
406                 return fmt.Errorf("error connecting socket: %v", err)
407         }
408         return nil
409 }
410
411 // disconnect unmaps socket data from the memory and resets the header
412 func (sc *StatsClient) disconnect() error {
413         if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
414                 return fmt.Errorf("stats client is already disconnected")
415         }
416         if sc.headerData == nil {
417                 return nil
418         }
419         if err := syscall.Munmap(sc.headerData); err != nil {
420                 Log.Debugf("unmapping shared memory failed: %v", err)
421                 return fmt.Errorf("unmapping shared memory failed: %v", err)
422         }
423         sc.headerData = nil
424
425         Log.Debugf("successfully unmapped shared memory")
426         return nil
427 }
428
429 func (sc *StatsClient) monitorSocket() {
430         watcher, err := fsnotify.NewWatcher()
431         if err != nil {
432                 Log.Errorf("error starting socket monitor: %v", err)
433                 return
434         }
435
436         go func() {
437                 for {
438                         select {
439                         case event := <-watcher.Events:
440                                 if event.Op == fsnotify.Remove && event.Name == sc.socket {
441                                         if err := sc.reconnect(); err != nil {
442                                                 Log.Errorf("error occurred during socket reconnect: %v", err)
443                                         }
444                                 }
445                         case err := <-watcher.Errors:
446                                 Log.Errorf("socket monitor delivered error event: %v", err)
447                         case <-sc.done:
448                                 err := watcher.Close()
449                                 Log.Debugf("socket monitor closed (error: %v)", err)
450                                 return
451                         }
452                 }
453         }()
454
455         if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
456                 Log.Errorf("failed to add socket address to the watcher: %v", err)
457         }
458 }
459
460 // Starts monitoring 'inProgress' field. Returns stats segment
461 // access epoch when completed, or zero value if not finished
462 // within MaxWaitInProgress
463 func (sc *StatsClient) accessStart() (epoch int64) {
464         t := time.Now()
465
466         epoch, inProg := sc.GetEpoch()
467         for inProg {
468                 if time.Since(t) > MaxWaitInProgress {
469                         return int64(0)
470                 }
471                 time.Sleep(CheckDelayInProgress)
472                 epoch, inProg = sc.GetEpoch()
473         }
474         return epoch
475 }
476
477 // AccessEnd returns true if stats data reading was finished, false
478 // otherwise
479 func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
480         epoch, inProgress := sc.GetEpoch()
481         if accessEpoch != epoch || inProgress {
482                 return false
483         }
484         return true
485 }
486
487 // listIndexes lists indexes for all stat entries that match any of the regex patterns.
488 func (sc *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
489         if len(patterns) == 0 {
490                 return sc.listIndexesFunc(nil)
491         }
492         var regexes = make([]*regexp.Regexp, len(patterns))
493         for i, pattern := range patterns {
494                 r, err := regexp.Compile(pattern)
495                 if err != nil {
496                         return nil, fmt.Errorf("compiling regexp failed: %v", err)
497                 }
498                 regexes[i] = r
499         }
500         nameMatches := func(name []byte) bool {
501                 for _, r := range regexes {
502                         if r.Match(name) {
503                                 return true
504                         }
505                 }
506                 return false
507         }
508         return sc.listIndexesFunc(nameMatches)
509 }
510
511 // listIndexesFunc lists stats indexes. The optional function
512 // argument filters returned values or returns all if empty
513 func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
514         if f == nil {
515                 // there is around ~3157 stats, so to avoid too many allocations
516                 // we set capacity to 3200 when listing all stats
517                 indexes = make([]uint32, 0, 3200)
518         }
519
520         dirVector := sc.GetDirectoryVector()
521         if dirVector == nil {
522                 return nil, err
523         }
524         vecLen := *(*uint32)(vectorLen(dirVector))
525
526         for i := uint32(0); i < vecLen; i++ {
527                 _, dirName, _ := sc.GetStatDirOnIndex(dirVector, i)
528                 if f != nil {
529                         if len(dirName) == 0 || !f(dirName) {
530                                 continue
531                         }
532                 }
533                 indexes = append(indexes, i)
534         }
535
536         return indexes, nil
537 }
538
539 func (sc *StatsClient) isConnected() bool {
540         return atomic.LoadUint32(&sc.connected) == 1
541 }