Fix statsclient for VPP 20.05-rc0 (master)
[govpp.git] / adapter / statsclient / stat_segment.go
index 507a9ea..0d988ba 100644 (file)
@@ -17,9 +17,7 @@ package statsclient
 import (
        "fmt"
        "net"
-       "sync/atomic"
        "syscall"
-       "time"
        "unsafe"
 
        "github.com/ftrvxmtrx/fd"
@@ -28,39 +26,61 @@ import (
 )
 
 var (
-       maxWaitInProgress = time.Second * 1
+       ErrStatDataLenIncorrect = fmt.Errorf("stat data length incorrect")
 )
 
-type statSegDirectoryEntry struct {
-       directoryType statDirectoryType
-       // unionData can represent: offset, index or value
-       unionData    uint64
-       offsetVector uint64
-       name         [128]byte
-}
-
-type statDirectoryType int32
+const (
+       minVersion = 0
+       maxVersion = 1
+)
 
-func (t statDirectoryType) String() string {
-       return adapter.StatType(t).String()
+func checkVersion(ver uint64) error {
+       if ver < minVersion {
+               return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, minVersion)
+       } else if ver > maxVersion {
+               return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, maxVersion)
+       }
+       return nil
 }
 
 type statSegment struct {
        sharedHeader []byte
        memorySize   int64
 
-       oldHeader bool
+       // legacyVersion represents stat segment version 0
+       // and is used as fallback for VPP 19.04
+       legacyVersion bool
+}
+
+func (c *statSegment) getHeader() (header sharedHeader) {
+       if c.legacyVersion {
+               return loadSharedHeaderLegacy(c.sharedHeader)
+       }
+       return loadSharedHeader(c.sharedHeader)
+}
+
+func (c *statSegment) getEpoch() (int64, bool) {
+       h := c.getHeader()
+       return h.epoch, h.inProgress != 0
+}
+
+func (c *statSegment) getOffsets() (dir, err, stat int64) {
+       h := c.getHeader()
+       return h.directoryOffset, h.errorOffset, h.statsOffset
 }
 
 func (c *statSegment) connect(sockName string) error {
-       addr := &net.UnixAddr{
+       if c.sharedHeader != nil {
+               return fmt.Errorf("already connected")
+       }
+
+       addr := net.UnixAddr{
                Net:  "unixpacket",
                Name: sockName,
        }
-
        Log.Debugf("connecting to: %v", addr)
 
-       conn, err := net.DialUnix(addr.Net, nil, addr)
+       conn, err := net.DialUnix(addr.Net, nil, &addr)
        if err != nil {
                Log.Warnf("connecting to socket %s failed: %s", addr, err)
                return err
@@ -80,167 +100,354 @@ func (c *statSegment) connect(sockName string) error {
        if len(files) == 0 {
                return fmt.Errorf("no files received over socket")
        }
+
+       file := files[0]
        defer func() {
-               for _, f := range files {
-                       if err := f.Close(); err != nil {
-                               Log.Warnf("closing file %s failed: %v", f.Name(), err)
-                       }
+               if err := file.Close(); err != nil {
+                       Log.Warnf("closing file failed: %v", err)
                }
        }()
 
-       Log.Debugf("received %d files over socket", len(files))
-
-       f := files[0]
-
-       info, err := f.Stat()
+       info, err := file.Stat()
        if err != nil {
                return err
        }
-
        size := info.Size()
 
-       Log.Debugf("fd: name=%v size=%v", info.Name(), size)
-
-       data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
+       data, err := syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
        if err != nil {
-               Log.Warnf("mapping shared memory failed: %v", err)
+               Log.Debugf("mapping shared memory failed: %v", err)
                return fmt.Errorf("mapping shared memory failed: %v", err)
        }
 
-       Log.Debugf("successfuly mapped shared memory")
+       Log.Debugf("successfuly mmapped shared memory segment (size: %v) %v", size, len(data))
 
        c.sharedHeader = data
        c.memorySize = size
 
-       header := c.readHeader()
-       Log.Debugf("stat segment header: %+v", header)
+       hdr := loadSharedHeader(c.sharedHeader)
+       Log.Debugf("stat segment header: %+v", hdr)
 
-       // older VPP (19.04) did not have version in stat segment header
-       // we try to provide fallback support by skipping it in header
-       if header.version > MaxVersion && header.inProgress > 1 && header.epoch == 0 {
-               h := c.readHeaderOld()
-               Log.Infof("statsclient: falling back to old stat segment version (VPP 19.04): %+v", h)
-               c.oldHeader = true
+       if hdr.legacyVersion() {
+               c.legacyVersion = true
+               hdr = loadSharedHeaderLegacy(c.sharedHeader)
+               Log.Debugf("falling back to legacy version (VPP <=19.04) of stat segment (header: %+v)", hdr)
+       }
+
+       if err := checkVersion(hdr.version); err != nil {
+               return err
        }
 
        return nil
 }
 
 func (c *statSegment) disconnect() error {
+       if c.sharedHeader == nil {
+               return nil
+       }
+
        if err := syscall.Munmap(c.sharedHeader); err != nil {
-               Log.Warnf("unmapping shared memory failed: %v", err)
+               Log.Debugf("unmapping shared memory failed: %v", err)
                return fmt.Errorf("unmapping shared memory failed: %v", err)
        }
+       c.sharedHeader = nil
 
        Log.Debugf("successfuly unmapped shared memory")
-
        return nil
 }
 
-type sharedHeaderBase struct {
-       epoch           int64
-       inProgress      int64
-       directoryOffset int64
-       errorOffset     int64
-       statsOffset     int64
-}
+type statDirectoryType int32
 
-type statSegSharedHeader struct {
-       version uint64
-       sharedHeaderBase
-}
+const (
+       statDirIllegal               = 0
+       statDirScalarIndex           = 1
+       statDirCounterVectorSimple   = 2
+       statDirCounterVectorCombined = 3
+       statDirErrorIndex            = 4
+       statDirNameVector            = 5
+       statDirEmpty                 = 6
+)
 
-func (c *statSegment) readHeaderOld() (header statSegSharedHeader) {
-       h := (*sharedHeaderBase)(unsafe.Pointer(&c.sharedHeader[0]))
-       header.version = 0
-       header.epoch = atomic.LoadInt64(&h.epoch)
-       header.inProgress = atomic.LoadInt64(&h.inProgress)
-       header.directoryOffset = atomic.LoadInt64(&h.directoryOffset)
-       header.errorOffset = atomic.LoadInt64(&h.errorOffset)
-       header.statsOffset = atomic.LoadInt64(&h.statsOffset)
-       return
+func (t statDirectoryType) String() string {
+       return adapter.StatType(t).String()
 }
 
-func (c *statSegment) readHeader() (header statSegSharedHeader) {
-       h := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0]))
-       header.version = atomic.LoadUint64(&h.version)
-       header.epoch = atomic.LoadInt64(&h.epoch)
-       header.inProgress = atomic.LoadInt64(&h.inProgress)
-       header.directoryOffset = atomic.LoadInt64(&h.directoryOffset)
-       header.errorOffset = atomic.LoadInt64(&h.errorOffset)
-       header.statsOffset = atomic.LoadInt64(&h.statsOffset)
-       return
+type statSegDirectoryEntry struct {
+       directoryType statDirectoryType
+       // unionData can represent:
+       // - offset
+       // - index
+       // - value
+       unionData    uint64
+       offsetVector uint64
+       name         [128]byte
 }
 
-func (c *statSegment) readVersion() uint64 {
-       if c.oldHeader {
-               return 0
-       }
-       header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0]))
-       version := atomic.LoadUint64(&header.version)
-       return version
+func (c *statSegment) getStatDirVector() unsafe.Pointer {
+       dirOffset, _, _ := c.getOffsets()
+       return unsafe.Pointer(&c.sharedHeader[dirOffset])
 }
 
-func (c *statSegment) readEpoch() (int64, bool) {
-       if c.oldHeader {
-               h := c.readHeaderOld()
-               return h.epoch, h.inProgress != 0
-       }
-       header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0]))
-       epoch := atomic.LoadInt64(&header.epoch)
-       inprog := atomic.LoadInt64(&header.inProgress)
-       return epoch, inprog != 0
+func (c *statSegment) getStatDirIndex(p unsafe.Pointer, index uint32) *statSegDirectoryEntry {
+       return (*statSegDirectoryEntry)(unsafe.Pointer(uintptr(p) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntry{})))
 }
 
-func (c *statSegment) readOffsets() (dir, err, stat int64) {
-       if c.oldHeader {
-               h := c.readHeaderOld()
-               return h.directoryOffset, h.errorOffset, h.statsOffset
-       }
-       header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0]))
-       dirOffset := atomic.LoadInt64(&header.directoryOffset)
-       errOffset := atomic.LoadInt64(&header.errorOffset)
-       statOffset := atomic.LoadInt64(&header.statsOffset)
-       return dirOffset, errOffset, statOffset
-}
+func (c *statSegment) copyEntryData(dirEntry *statSegDirectoryEntry) adapter.Stat {
+       dirType := adapter.StatType(dirEntry.directoryType)
+
+       switch dirType {
+       case statDirScalarIndex:
+               return adapter.ScalarStat(dirEntry.unionData)
+
+       case statDirErrorIndex:
+               _, errOffset, _ := c.getOffsets()
+               offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset])
+
+               var errData adapter.Counter
+               if c.legacyVersion {
+                       // error were not vector (per-worker) in VPP 19.04
+                       offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0))
+                       val := *(*adapter.Counter)(statSegPointer(offsetVector, offset))
+                       errData = val
+               } else {
+                       vecLen := uint32(vectorLen(offsetVector))
+
+                       for i := uint32(0); i < vecLen; i++ {
+                               cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                               offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0))
+                               debugf("error index, cb: %d, offset: %d", cb, offset)
+                               val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset))
+                               errData += val
+                       }
+               }
+               return adapter.ErrorStat(errData)
+
+       case statDirCounterVectorSimple:
+               if dirEntry.unionData == 0 {
+                       debugf("offset invalid for %s", dirEntry.name)
+                       break
+               } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
+                       debugf("offset out of range for %s", dirEntry.name)
+                       break
+               }
 
-type statSegAccess struct {
-       epoch int64
-}
+               vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
+               offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+               data := make([][]adapter.Counter, vecLen)
+               for i := uint32(0); i < vecLen; i++ {
+                       cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                       counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
+                       vecLen2 := uint32(vectorLen(counterVec))
+                       data[i] = make([]adapter.Counter, vecLen2)
+                       for j := uint32(0); j < vecLen2; j++ {
+                               offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
+                               val := *(*adapter.Counter)(statSegPointer(counterVec, offset))
+                               data[i][j] = val
+                       }
+               }
+               return adapter.SimpleCounterStat(data)
+
+       case statDirCounterVectorCombined:
+               if dirEntry.unionData == 0 {
+                       debugf("offset invalid for %s", dirEntry.name)
+                       break
+               } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
+                       debugf("offset out of range for %s", dirEntry.name)
+                       break
+               }
 
-func (c *statSegment) accessStart() *statSegAccess {
-       epoch, inprog := c.readEpoch()
-       t := time.Now()
-       for inprog {
-               if time.Since(t) > maxWaitInProgress {
-                       return nil
+               vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
+               offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+               data := make([][]adapter.CombinedCounter, vecLen)
+               for i := uint32(0); i < vecLen; i++ {
+                       cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                       counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
+                       vecLen2 := uint32(vectorLen(counterVec))
+                       data[i] = make([]adapter.CombinedCounter, vecLen2)
+                       for j := uint32(0); j < vecLen2; j++ {
+                               offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
+                               val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset))
+                               data[i][j] = val
+                       }
                }
-               epoch, inprog = c.readEpoch()
-       }
-       return &statSegAccess{
-               epoch: epoch,
-       }
-}
+               return adapter.CombinedCounterStat(data)
+
+       case statDirNameVector:
+               if dirEntry.unionData == 0 {
+                       debugf("offset invalid for %s", dirEntry.name)
+                       break
+               } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
+                       debugf("offset out of range for %s", dirEntry.name)
+                       break
+               }
+
+               vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
+               offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
 
-func (c *statSegment) accessEnd(acc *statSegAccess) bool {
-       epoch, inprog := c.readEpoch()
-       if acc.epoch != epoch || inprog {
-               return false
+               data := make([]adapter.Name, vecLen)
+               for i := uint32(0); i < vecLen; i++ {
+                       cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                       if cb == 0 {
+                               debugf("name vector out of range for %s (%v)", dirEntry.name, i)
+                               continue
+                       }
+                       nameVec := unsafe.Pointer(&c.sharedHeader[cb])
+                       vecLen2 := uint32(vectorLen(nameVec))
+
+                       nameStr := make([]byte, 0, vecLen2)
+                       for j := uint32(0); j < vecLen2; j++ {
+                               offset := uintptr(j) * unsafe.Sizeof(byte(0))
+                               val := *(*byte)(statSegPointer(nameVec, offset))
+                               if val > 0 {
+                                       nameStr = append(nameStr, val)
+                               }
+                       }
+                       data[i] = adapter.Name(nameStr)
+               }
+               return adapter.NameStat(data)
+
+       case statDirEmpty:
+               // no-op
+
+       default:
+               // TODO: monitor occurrences with metrics
+               debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name)
        }
-       return true
+       return nil
 }
 
-type vecHeader struct {
-       length     uint64
-       vectorData [0]uint8
-}
+func (c *statSegment) updateEntryData(dirEntry *statSegDirectoryEntry, stat *adapter.Stat) error {
+       switch (*stat).(type) {
+       case adapter.ScalarStat:
+               *stat = adapter.ScalarStat(dirEntry.unionData)
+
+       case adapter.ErrorStat:
+               _, errOffset, _ := c.getOffsets()
+               offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset])
+
+               var errData adapter.Counter
+               if c.legacyVersion {
+                       // error were not vector (per-worker) in VPP 19.04
+                       offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0))
+                       val := *(*adapter.Counter)(statSegPointer(offsetVector, offset))
+                       errData = val
+               } else {
+                       vecLen := vectorLen(offsetVector)
+                       for i := uint64(0); i < vecLen; i++ {
+                               cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                               offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0))
+                               val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset))
+                               errData += val
+                       }
+               }
+               *stat = adapter.ErrorStat(errData)
+
+       case adapter.SimpleCounterStat:
+               if dirEntry.unionData == 0 {
+                       debugf("offset invalid for %s", dirEntry.name)
+                       break
+               } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
+                       debugf("offset out of range for %s", dirEntry.name)
+                       break
+               }
 
-func vectorLen(v unsafe.Pointer) uint64 {
-       vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uintptr(0))))
-       return vec.length
-}
+               vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))
+               offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+               data := (*stat).(adapter.SimpleCounterStat)
+               if uint64(len(data)) != vecLen {
+                       return ErrStatDataLenIncorrect
+               }
+               for i := uint64(0); i < vecLen; i++ {
+                       cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                       counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
+                       vecLen2 := vectorLen(counterVec)
+                       simpData := data[i]
+                       if uint64(len(simpData)) != vecLen2 {
+                               return ErrStatDataLenIncorrect
+                       }
+                       for j := uint64(0); j < vecLen2; j++ {
+                               offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
+                               val := *(*adapter.Counter)(statSegPointer(counterVec, offset))
+                               simpData[j] = val
+                       }
+               }
+
+       case adapter.CombinedCounterStat:
+               if dirEntry.unionData == 0 {
+                       debugf("offset invalid for %s", dirEntry.name)
+                       break
+               } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
+                       debugf("offset out of range for %s", dirEntry.name)
+                       break
+               }
+
+               vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))
+               offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
 
-//go:nosplit
-func add(p unsafe.Pointer, x uintptr) unsafe.Pointer {
-       return unsafe.Pointer(uintptr(p) + x)
+               data := (*stat).(adapter.CombinedCounterStat)
+               if uint64(len(data)) != vecLen {
+                       return ErrStatDataLenIncorrect
+               }
+               for i := uint64(0); i < vecLen; i++ {
+                       cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                       counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
+                       vecLen2 := vectorLen(counterVec)
+                       combData := data[i]
+                       if uint64(len(combData)) != vecLen2 {
+                               return ErrStatDataLenIncorrect
+                       }
+                       for j := uint64(0); j < vecLen2; j++ {
+                               offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
+                               val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset))
+                               combData[j] = val
+                       }
+               }
+
+       case adapter.NameStat:
+               if dirEntry.unionData == 0 {
+                       debugf("offset invalid for %s", dirEntry.name)
+                       break
+               } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
+                       debugf("offset out of range for %s", dirEntry.name)
+                       break
+               }
+
+               vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))
+               offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+               data := (*stat).(adapter.NameStat)
+               if uint64(len(data)) != vecLen {
+                       return ErrStatDataLenIncorrect
+               }
+               for i := uint64(0); i < vecLen; i++ {
+                       cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+                       if cb == 0 {
+                               continue
+                       }
+                       nameVec := unsafe.Pointer(&c.sharedHeader[cb])
+                       vecLen2 := vectorLen(nameVec)
+
+                       nameData := data[i]
+                       if uint64(len(nameData))+1 != vecLen2 {
+                               return ErrStatDataLenIncorrect
+                       }
+                       for j := uint64(0); j < vecLen2; j++ {
+                               offset := uintptr(j) * unsafe.Sizeof(byte(0))
+                               val := *(*byte)(statSegPointer(nameVec, offset))
+                               if val == 0 {
+                                       break
+                               }
+                               nameData[j] = val
+                       }
+               }
+
+       default:
+               if Debug {
+                       Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name)
+               }
+       }
+       return nil
 }