X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=adapter%2Fstatsclient%2Fstatsclient.go;h=b2d91db79d86cd6c4dd18321e0ca80472f360ef6;hb=671f16c7b4ca24788fc503b4344fa22306548a3b;hp=947027592435cf8e66ae5a2ef488a2740353c062;hpb=4da29d1fb32a77dd299e84a9ed3a11ddcaa31a3b;p=govpp.git diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 9470275..b2d91db 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -20,20 +20,28 @@ import ( "fmt" "net" "os" + "path/filepath" "regexp" + "sync/atomic" "syscall" "time" + "git.fd.io/govpp.git/adapter" "github.com/fsnotify/fsnotify" "github.com/ftrvxmtrx/fd" logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/adapter" ) const ( // DefaultSocketName is default VPP stats socket file path. DefaultSocketName = adapter.DefaultStatsSocket + + // DefaultSocketRetryPeriod is the time period after the socket availability + // will be re-checked + DefaultSocketRetryPeriod = 50 * time.Millisecond + + // DefaultSocketRetryTimeout is the maximum time for the stats socket + DefaultSocketRetryTimeout = 3 * time.Second ) var ( @@ -64,10 +72,14 @@ var _ adapter.StatsAPI = (*StatsClient)(nil) // StatsClient is the pure Go implementation for VPP stats API. type StatsClient struct { - socketPath string + socket string + retryPeriod time.Duration + retryTimeout time.Duration - headerData []byte - isConnected bool + headerData []byte + + // defines the adapter connection state + connected uint32 // to quit socket monitor done chan struct{} @@ -75,24 +87,50 @@ type StatsClient struct { statSegment } +// Option is a StatsClient option +type Option func(*StatsClient) + +// SetSocketRetryPeriod is and optional parameter to define a custom +// retry period while waiting for the VPP socket +func SetSocketRetryPeriod(t time.Duration) Option { + return func(c *StatsClient) { + c.retryPeriod = t + } +} + +// SetSocketRetryTimeout is and optional parameter to define a custom +// timeout while waiting for the VPP socket +func SetSocketRetryTimeout(t time.Duration) Option { + return func(c *StatsClient) { + c.retryTimeout = t + } +} + // NewStatsClient returns a new StatsClient using socket. // If socket is empty string DefaultSocketName is used. -func NewStatsClient(socket string) *StatsClient { +func NewStatsClient(socket string, options ...Option) *StatsClient { if socket == "" { socket = DefaultSocketName } - return &StatsClient{ - socketPath: socket, + s := &StatsClient{ + socket: socket, + } + for _, option := range options { + option(s) + } + if s.retryPeriod == 0 { + s.retryPeriod = DefaultSocketRetryPeriod } + if s.retryTimeout == 0 { + s.retryTimeout = DefaultSocketRetryTimeout + } + return s } // Connect to validated VPP stats socket and start monitoring // socket file changes func (sc *StatsClient) Connect() (err error) { - if sc.isConnected { - return fmt.Errorf("already connected") - } - if err := sc.checkSocketValid(); err != nil { + if err := sc.waitForSocket(); err != nil { return err } sc.done = make(chan struct{}) @@ -100,22 +138,29 @@ func (sc *StatsClient) Connect() (err error) { return err } sc.monitorSocket() - sc.isConnected = true return nil } // Disconnect from the socket, unmap shared memory and terminate // socket monitor func (sc *StatsClient) Disconnect() error { - if !sc.isConnected { - return nil // not connected + if sc.headerData == nil { + return nil + } + if err := syscall.Munmap(sc.headerData); err != nil { + Log.Debugf("unmapping shared memory failed: %v", err) + return fmt.Errorf("unmapping shared memory failed: %v", err) } - sc.isConnected = false - close(sc.done) - return sc.disconnect() + sc.headerData = nil + + Log.Debugf("successfully unmapped shared memory") + return nil } func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -149,6 +194,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { } func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -192,6 +240,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr } func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } dir := new(adapter.StatDir) accessEpoch := sc.accessStart() @@ -241,6 +292,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) // UpdateDir refreshes directory data for all counters func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { + if !sc.isConnected() { + return adapter.ErrStatsDisconnected + } epoch, _ := sc.GetEpoch() if dir.Epoch != epoch { return adapter.ErrStatsDirStale @@ -281,11 +335,28 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { return nil } -func (sc *StatsClient) checkSocketValid() error { - if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) { - return fmt.Errorf("stats socket file %s does not exist", sc.socketPath) - } else if err != nil { - return fmt.Errorf("stats socket error: %v", err) +// checks the socket existence and waits for it for the designated +// time if it is not available immediately +func (sc *StatsClient) waitForSocket() error { + if _, err := os.Stat(sc.socket); err != nil { + if os.IsNotExist(err) { + n := time.Now() + ticker := time.NewTicker(sc.retryPeriod) + timeout := time.After(sc.retryTimeout) + for { + select { + case <-ticker.C: + if _, err := os.Stat(sc.socket); err == nil { + return nil + } + case <-timeout: + return fmt.Errorf("stats socket file %s is not ready within timeout (after %.2f s) ", + sc.socket, time.Since(n).Seconds()) + } + } + } else { + return fmt.Errorf("stats socket error: %v", err) + } } return nil } @@ -295,7 +366,7 @@ func (sc *StatsClient) checkSocketValid() error { func (sc *StatsClient) connect() (ss statSegment, err error) { addr := net.UnixAddr{ Net: "unixpacket", - Name: sc.socketPath, + Name: sc.socket, } Log.Debugf("connecting to: %v", addr) @@ -350,6 +421,9 @@ func (sc *StatsClient) connect() (ss statSegment, err error) { version, minVersion, maxVersion) } + // set connected + atomic.CompareAndSwapUint32(&sc.connected, 0, 1) + return ss, nil } @@ -359,8 +433,8 @@ func (sc *StatsClient) reconnect() (err error) { if err = sc.disconnect(); err != nil { return fmt.Errorf("error disconnecting socket: %v", err) } - if err = sc.checkSocketValid(); err != nil { - return fmt.Errorf("error validating socket: %v", err) + if err = sc.waitForSocket(); err != nil { + return fmt.Errorf("error while waiting on socket: %v", err) } if sc.statSegment, err = sc.connect(); err != nil { return fmt.Errorf("error connecting socket: %v", err) @@ -370,6 +444,9 @@ func (sc *StatsClient) reconnect() (err error) { // disconnect unmaps socket data from the memory and resets the header func (sc *StatsClient) disconnect() error { + if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) { + return fmt.Errorf("stats client is already disconnected") + } if sc.headerData == nil { return nil } @@ -394,14 +471,10 @@ func (sc *StatsClient) monitorSocket() { for { select { case event := <-watcher.Events: - if event.Op == fsnotify.Remove { + if event.Op == fsnotify.Remove && event.Name == sc.socket { if err := sc.reconnect(); err != nil { Log.Errorf("error occurred during socket reconnect: %v", err) } - // path must be re-added to the watcher - if err = watcher.Add(sc.socketPath); err != nil { - Log.Errorf("failed to add socket address to the watcher: %v", err) - } } case err := <-watcher.Errors: Log.Errorf("socket monitor delivered error event: %v", err) @@ -413,7 +486,7 @@ func (sc *StatsClient) monitorSocket() { } }() - if err := watcher.Add(sc.socketPath); err != nil { + if err := watcher.Add(filepath.Dir(sc.socket)); err != nil { Log.Errorf("failed to add socket address to the watcher: %v", err) } } @@ -496,3 +569,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint return indexes, nil } + +func (sc *StatsClient) isConnected() bool { + return atomic.LoadUint32(&sc.connected) == 1 +}