X-Git-Url: https://gerrit.fd.io/r/gitweb?p=govpp.git;a=blobdiff_plain;f=adapter%2Fstatsclient%2Fstatsclient.go;h=e99d7870994a94934b402e2606b586f0843556c5;hp=947027592435cf8e66ae5a2ef488a2740353c062;hb=4459b648e9fb53c34abbf52a00e63ad384fb9ee2;hpb=4c1cccf48cd144414c7233f167087aff770ef67b diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 9470275..e99d787 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -20,20 +20,28 @@ import ( "fmt" "net" "os" + "path/filepath" "regexp" + "sync/atomic" "syscall" "time" + "git.fd.io/govpp.git/adapter" "github.com/fsnotify/fsnotify" "github.com/ftrvxmtrx/fd" logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/adapter" ) const ( // DefaultSocketName is default VPP stats socket file path. DefaultSocketName = adapter.DefaultStatsSocket + + // SocketRetryPeriod is the time period after the socket availability + // will be re-checked + SocketRetryPeriod = 50 * time.Millisecond + + // SocketRetryTimeout is the maximum time for the stats socket + SocketRetryTimeout = 3 * time.Second ) var ( @@ -64,10 +72,12 @@ var _ adapter.StatsAPI = (*StatsClient)(nil) // StatsClient is the pure Go implementation for VPP stats API. type StatsClient struct { - socketPath string + socket string + + headerData []byte - headerData []byte - isConnected bool + // defines the adapter connection state + connected uint32 // to quit socket monitor done chan struct{} @@ -82,17 +92,14 @@ func NewStatsClient(socket string) *StatsClient { socket = DefaultSocketName } return &StatsClient{ - socketPath: socket, + socket: socket, } } // Connect to validated VPP stats socket and start monitoring // socket file changes func (sc *StatsClient) Connect() (err error) { - if sc.isConnected { - return fmt.Errorf("already connected") - } - if err := sc.checkSocketValid(); err != nil { + if err := sc.waitForSocket(); err != nil { return err } sc.done = make(chan struct{}) @@ -100,22 +107,29 @@ func (sc *StatsClient) Connect() (err error) { return err } sc.monitorSocket() - sc.isConnected = true return nil } // Disconnect from the socket, unmap shared memory and terminate // socket monitor func (sc *StatsClient) Disconnect() error { - if !sc.isConnected { - return nil // not connected + if sc.headerData == nil { + return nil } - sc.isConnected = false - close(sc.done) - return sc.disconnect() + if err := syscall.Munmap(sc.headerData); err != nil { + Log.Debugf("unmapping shared memory failed: %v", err) + return fmt.Errorf("unmapping shared memory failed: %v", err) + } + sc.headerData = nil + + Log.Debugf("successfully unmapped shared memory") + return nil } func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -149,6 +163,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { } func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -192,6 +209,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr } func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } dir := new(adapter.StatDir) accessEpoch := sc.accessStart() @@ -241,6 +261,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) // UpdateDir refreshes directory data for all counters func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { + if !sc.isConnected() { + return adapter.ErrStatsDisconnected + } epoch, _ := sc.GetEpoch() if dir.Epoch != epoch { return adapter.ErrStatsDirStale @@ -281,11 +304,25 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { return nil } -func (sc *StatsClient) checkSocketValid() error { - if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) { - return fmt.Errorf("stats socket file %s does not exist", sc.socketPath) - } else if err != nil { - return fmt.Errorf("stats socket error: %v", err) +// checks the socket existence and waits for it for the designated +// time if it is not available immediately +func (sc *StatsClient) waitForSocket() error { + if _, err := os.Stat(sc.socket); err != nil { + if os.IsNotExist(err) { + ticker := time.NewTicker(SocketRetryPeriod) + for { + select { + case <-ticker.C: + if _, err := os.Stat(sc.socket); err == nil { + return nil + } + case <-time.After(SocketRetryTimeout): + return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket) + } + } + } else { + return fmt.Errorf("stats socket error: %v", err) + } } return nil } @@ -295,7 +332,7 @@ func (sc *StatsClient) checkSocketValid() error { func (sc *StatsClient) connect() (ss statSegment, err error) { addr := net.UnixAddr{ Net: "unixpacket", - Name: sc.socketPath, + Name: sc.socket, } Log.Debugf("connecting to: %v", addr) @@ -350,6 +387,9 @@ func (sc *StatsClient) connect() (ss statSegment, err error) { version, minVersion, maxVersion) } + // set connected + atomic.CompareAndSwapUint32(&sc.connected, 0, 1) + return ss, nil } @@ -359,8 +399,8 @@ func (sc *StatsClient) reconnect() (err error) { if err = sc.disconnect(); err != nil { return fmt.Errorf("error disconnecting socket: %v", err) } - if err = sc.checkSocketValid(); err != nil { - return fmt.Errorf("error validating socket: %v", err) + if err = sc.waitForSocket(); err != nil { + return fmt.Errorf("error while waiting on socket: %v", err) } if sc.statSegment, err = sc.connect(); err != nil { return fmt.Errorf("error connecting socket: %v", err) @@ -370,6 +410,9 @@ func (sc *StatsClient) reconnect() (err error) { // disconnect unmaps socket data from the memory and resets the header func (sc *StatsClient) disconnect() error { + if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) { + return fmt.Errorf("stats client is already disconnected") + } if sc.headerData == nil { return nil } @@ -394,14 +437,10 @@ func (sc *StatsClient) monitorSocket() { for { select { case event := <-watcher.Events: - if event.Op == fsnotify.Remove { + if event.Op == fsnotify.Remove && event.Name == sc.socket { if err := sc.reconnect(); err != nil { Log.Errorf("error occurred during socket reconnect: %v", err) } - // path must be re-added to the watcher - if err = watcher.Add(sc.socketPath); err != nil { - Log.Errorf("failed to add socket address to the watcher: %v", err) - } } case err := <-watcher.Errors: Log.Errorf("socket monitor delivered error event: %v", err) @@ -413,7 +452,7 @@ func (sc *StatsClient) monitorSocket() { } }() - if err := watcher.Add(sc.socketPath); err != nil { + if err := watcher.Add(filepath.Dir(sc.socket)); err != nil { Log.Errorf("failed to add socket address to the watcher: %v", err) } } @@ -496,3 +535,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint return indexes, nil } + +func (sc *StatsClient) isConnected() bool { + return atomic.LoadUint32(&sc.connected) == 1 +}