X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fconnection.go;h=ee5a06b82e04fe76c96ec2a7a132b2c45fb325a4;hb=57de49f7583b8174c7f3d8e21956d4eaac64ac28;hp=53a9acfced7d435116bfa31ca5019046f9a119f6;hpb=a155cd438c6558da266c1c5931361ea088b35653;p=govpp.git diff --git a/core/connection.go b/core/connection.go index 53a9acf..ee5a06b 100644 --- a/core/connection.go +++ b/core/connection.go @@ -17,6 +17,7 @@ package core import ( "errors" "fmt" + "path" "reflect" "sync" "sync/atomic" @@ -42,8 +43,8 @@ var ( var ( HealthCheckProbeInterval = time.Second // default health check probe interval - HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe - HealthCheckThreshold = 1 // number of failed health checks until the error is reported + HealthCheckReplyTimeout = time.Millisecond * 250 // timeout for reply to a health check probe + HealthCheckThreshold = 2 // number of failed health checks until the error is reported DefaultReplyTimeout = time.Second // default timeout for replies from VPP ) @@ -103,9 +104,9 @@ type Connection struct { connChan chan ConnectionEvent // connection status events are sent to this channel - codec MessageCodec // message codec - msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC - msgMap map[uint16]api.Message // map of messages indexed by message ID + codec MessageCodec // message codec + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) channelsLock sync.RWMutex // lock for the channels map @@ -122,6 +123,8 @@ type Connection struct { msgControlPing api.Message msgControlPingReply api.Message + + apiTrace *trace // API tracer (disabled by default) } func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { @@ -139,11 +142,15 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) connChan: make(chan ConnectionEvent, NotificationChanBufSize), codec: codec.DefaultCodec, msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), + msgMapByPath: make(map[string]map[uint16]api.Message), channels: make(map[uint16]*Channel), subscriptions: make(map[uint16][]*subscriptionCtx), msgControlPing: msgControlPing, msgControlPingReply: msgControlPingReply, + apiTrace: &trace{ + list: make([]*api.Record, 0), + mux: &sync.Mutex{}, + }, } binapi.SetMsgCallback(c.msgCallback) return c @@ -400,69 +407,74 @@ func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { if c == nil { return 0, errors.New("nil connection passed in") } - - if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { - return msgID, nil - } - + pkgPath := c.GetMessagePath(msg) msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) if err != nil { return 0, err } - + if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk { + c.msgMapByPath[pkgPath] = make(map[uint16]api.Message) + c.msgMapByPath[pkgPath][msgID] = msg + } else if _, msgOk := pathMsgs[msgID]; !msgOk { + c.msgMapByPath[pkgPath][msgID] = msg + } + if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } c.msgIDs[getMsgNameWithCrc(msg)] = msgID - c.msgMap[msgID] = msg - return msgID, nil } // LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { +func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) { if c == nil { return nil, errors.New("nil connection passed in") } - - if msg, ok := c.msgMap[msgID]; ok { + if msg, ok := c.msgMapByPath[path][msgID]; ok { return msg, nil } + return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path) +} - return nil, fmt.Errorf("unknown message ID: %d", msgID) +// GetMessagePath returns path for the given message +func (c *Connection) GetMessagePath(msg api.Message) string { + return path.Dir(reflect.TypeOf(msg).Elem().PkgPath()) } // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map func (c *Connection) retrieveMessageIDs() (err error) { t := time.Now() - msgs := api.GetRegisteredMessages() + msgsByPath := api.GetRegisteredMessages() var n int - for name, msg := range msgs { - typ := reflect.TypeOf(msg).Elem() - path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name()) + for pkgPath, msgs := range msgsByPath { + for _, msg := range msgs { + msgID, err := c.GetMessageID(msg) + if err != nil { + if debugMsgIDs { + log.Debugf("retrieving message ID for %s.%s failed: %v", + pkgPath, msg.GetMessageName(), err) + } + continue + } + n++ + + if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { + c.pingReqID = msgID + c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { + c.pingReplyID = msgID + c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } - msgID, err := c.GetMessageID(msg) - if err != nil { if debugMsgIDs { - log.Debugf("retrieving message ID for %s failed: %v", path, err) + log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID) } - continue - } - n++ - - if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { - c.pingReqID = msgID - c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { - c.pingReplyID = msgID - c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } - - if debugMsgIDs { - log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) } + log.WithField("took", time.Since(t)). + Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath) } - log.WithField("took", time.Since(t)). - Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs)) return nil } @@ -474,3 +486,24 @@ func (c *Connection) sendConnEvent(event ConnectionEvent) { log.Warn("Connection state channel is full, discarding value.") } } + +// Trace gives access to the API trace interface +func (c *Connection) Trace() api.Trace { + return c.apiTrace +} + +// trace records api message +func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) { + if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 { + return + } + entry := &api.Record{ + Message: msg, + Timestamp: t, + IsReceived: isReceived, + ChannelID: chId, + } + c.apiTrace.mux.Lock() + c.apiTrace.list = append(c.apiTrace.list, entry) + c.apiTrace.mux.Unlock() +}