X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=adapter%2Fstatsclient%2Fstatsclient.go;h=e99d7870994a94934b402e2606b586f0843556c5;hb=4459b648e9fb53c34abbf52a00e63ad384fb9ee2;hp=86934102abb98ca9a5ac8a00083eee78045c2bda;hpb=9efccff8b3f3c8ba3f9e2343a5e827bfaf07a7f8;p=govpp.git diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 8693410..e99d787 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -20,7 +20,9 @@ import ( "fmt" "net" "os" + "path/filepath" "regexp" + "sync/atomic" "syscall" "time" @@ -33,21 +35,14 @@ import ( const ( // DefaultSocketName is default VPP stats socket file path. DefaultSocketName = adapter.DefaultStatsSocket -) - -const socketMissing = ` ------------------------------------------------------------- - VPP stats socket file %s is missing! - - is VPP running with stats segment enabled? - - is the correct socket name configured? + // SocketRetryPeriod is the time period after the socket availability + // will be re-checked + SocketRetryPeriod = 50 * time.Millisecond - To enable it add following section to your VPP config: - statseg { - socket-name /run/vpp/stats.sock - } ------------------------------------------------------------- -` + // SocketRetryTimeout is the maximum time for the stats socket + SocketRetryTimeout = 3 * time.Second +) var ( // Debug is global variable that determines debug mode @@ -77,9 +72,12 @@ var _ adapter.StatsAPI = (*StatsClient)(nil) // StatsClient is the pure Go implementation for VPP stats API. type StatsClient struct { - sockAddr string - headerData []byte - isConnected bool + socket string + + headerData []byte + + // defines the adapter connection state + connected uint32 // to quit socket monitor done chan struct{} @@ -87,41 +85,51 @@ type StatsClient struct { statSegment } -// NewStatsClient returns new VPP stats API client. -func NewStatsClient(sockAddr string) *StatsClient { - if sockAddr == "" { - sockAddr = DefaultSocketName +// NewStatsClient returns a new StatsClient using socket. +// If socket is empty string DefaultSocketName is used. +func NewStatsClient(socket string) *StatsClient { + if socket == "" { + socket = DefaultSocketName } return &StatsClient{ - sockAddr: sockAddr, + socket: socket, } } // Connect to validated VPP stats socket and start monitoring // socket file changes func (sc *StatsClient) Connect() (err error) { - if sc.isConnected { - return fmt.Errorf("already connected") - } - if err := sc.validate(); err != nil { + if err := sc.waitForSocket(); err != nil { return err } sc.done = make(chan struct{}) - sc.monitorSocket() if sc.statSegment, err = sc.connect(); err != nil { return err } + sc.monitorSocket() return nil } // Disconnect from the socket, unmap shared memory and terminate // socket monitor func (sc *StatsClient) Disconnect() error { - close(sc.done) - return sc.disconnect() + if sc.headerData == nil { + return nil + } + if err := syscall.Munmap(sc.headerData); err != nil { + Log.Debugf("unmapping shared memory failed: %v", err) + return fmt.Errorf("unmapping shared memory failed: %v", err) + } + sc.headerData = nil + + Log.Debugf("successfully unmapped shared memory") + return nil } func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -155,6 +163,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { } func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -198,6 +209,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr } func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } dir := new(adapter.StatDir) accessEpoch := sc.accessStart() @@ -247,6 +261,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) // UpdateDir refreshes directory data for all counters func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { + if !sc.isConnected() { + return adapter.ErrStatsDisconnected + } epoch, _ := sc.GetEpoch() if dir.Epoch != epoch { return adapter.ErrStatsDirStale @@ -287,13 +304,25 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { return nil } -// validate file presence by retrieving its file info -func (sc *StatsClient) validate() error { - if _, err := os.Stat(sc.sockAddr); os.IsNotExist(err) { - fmt.Fprintf(os.Stderr, socketMissing, sc.sockAddr) - return fmt.Errorf("stats socket file %s does not exist", sc.sockAddr) - } else if err != nil { - return fmt.Errorf("stats socket error: %v", err) +// checks the socket existence and waits for it for the designated +// time if it is not available immediately +func (sc *StatsClient) waitForSocket() error { + if _, err := os.Stat(sc.socket); err != nil { + if os.IsNotExist(err) { + ticker := time.NewTicker(SocketRetryPeriod) + for { + select { + case <-ticker.C: + if _, err := os.Stat(sc.socket); err == nil { + return nil + } + case <-time.After(SocketRetryTimeout): + return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket) + } + } + } else { + return fmt.Errorf("stats socket error: %v", err) + } } return nil } @@ -303,7 +332,7 @@ func (sc *StatsClient) validate() error { func (sc *StatsClient) connect() (ss statSegment, err error) { addr := net.UnixAddr{ Net: "unixpacket", - Name: sc.sockAddr, + Name: sc.socket, } Log.Debugf("connecting to: %v", addr) @@ -357,7 +386,10 @@ func (sc *StatsClient) connect() (ss statSegment, err error) { return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)", version, minVersion, maxVersion) } - sc.isConnected = true + + // set connected + atomic.CompareAndSwapUint32(&sc.connected, 0, 1) + return ss, nil } @@ -367,8 +399,8 @@ func (sc *StatsClient) reconnect() (err error) { if err = sc.disconnect(); err != nil { return fmt.Errorf("error disconnecting socket: %v", err) } - if err = sc.validate(); err != nil { - return fmt.Errorf("error validating socket: %v", err) + if err = sc.waitForSocket(); err != nil { + return fmt.Errorf("error while waiting on socket: %v", err) } if sc.statSegment, err = sc.connect(); err != nil { return fmt.Errorf("error connecting socket: %v", err) @@ -378,7 +410,9 @@ func (sc *StatsClient) reconnect() (err error) { // disconnect unmaps socket data from the memory and resets the header func (sc *StatsClient) disconnect() error { - sc.isConnected = false + if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) { + return fmt.Errorf("stats client is already disconnected") + } if sc.headerData == nil { return nil } @@ -403,14 +437,10 @@ func (sc *StatsClient) monitorSocket() { for { select { case event := <-watcher.Events: - if event.Op == fsnotify.Remove { + if event.Op == fsnotify.Remove && event.Name == sc.socket { if err := sc.reconnect(); err != nil { Log.Errorf("error occurred during socket reconnect: %v", err) } - // path must be re-added to the watcher - if err = watcher.Add(sc.sockAddr); err != nil { - Log.Errorf("failed to add socket address to the watcher: %v", err) - } } case err := <-watcher.Errors: Log.Errorf("socket monitor delivered error event: %v", err) @@ -422,7 +452,7 @@ func (sc *StatsClient) monitorSocket() { } }() - if err := watcher.Add(sc.sockAddr); err != nil { + if err := watcher.Add(filepath.Dir(sc.socket)); err != nil { Log.Errorf("failed to add socket address to the watcher: %v", err) } } @@ -505,3 +535,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint return indexes, nil } + +func (sc *StatsClient) isConnected() bool { + return atomic.LoadUint32(&sc.connected) == 1 +}