X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Frequest_handler.go;h=fc704cbe64492b9dfdfe7cda58d90ace8e931868;hb=df67791c6ffc96331f75aec7d3addfe2efca7739;hp=fd6d10027e96df3efacfeb6d2ec97526455cb401;hpb=a3bb834db727a3ac9a1ffcfeae9265e5dead851f;p=govpp.git diff --git a/core/request_handler.go b/core/request_handler.go index fd6d100..fc704cb 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -17,19 +17,24 @@ package core import ( "errors" "fmt" + "reflect" "sync/atomic" "time" logger "github.com/sirupsen/logrus" + + "git.fd.io/govpp.git/api" ) +var ReplyChannelTimeout = time.Millisecond * 100 + var ( ErrNotConnected = errors.New("not connected to VPP, ignoring the request") ErrProbeTimeout = errors.New("probe reply not received within timeout period") ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *channel) { +func (c *Connection) watchRequests(ch *Channel) { for { select { case req, ok := <-ch.reqChan: @@ -39,97 +44,157 @@ func (c *Connection) watchRequests(ch *channel) { c.releaseAPIChannel(ch) return } - c.processRequest(ch, req) - - case req := <-ch.notifSubsChan: - // new request on the notification subscribe channel - c.processSubscriptionRequest(ch, req) + if err := c.processRequest(ch, req); err != nil { + sendReply(ch, &vppReply{ + seqNum: req.seqNum, + err: fmt.Errorf("unable to process request: %w", err), + }) + } } } } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *channel, req *vppRequest) error { +func (c *Connection) sendMessage(context uint32, msg api.Message) error { // check whether we are connected to VPP - if atomic.LoadUint32(&c.connected) == 0 { + if atomic.LoadUint32(&c.vppConnected) == 0 { + return ErrNotConnected + } + + /*log := log.WithFields(logger.Fields{ + "context": context, + "msg_name": msg.GetMessageName(), + "msg_crc": msg.GetCrcString(), + })*/ + + // retrieve message ID + msgID, err := c.GetMessageID(msg) + if err != nil { + //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg) + return err + } + + //log = log.WithField("msg_id", msgID) + + // encode the message + data, err := c.codec.EncodeMsg(msg, msgID) + if err != nil { + log.WithError(err).Debugf("unable to encode message: %#v", msg) + return err + } + + //log = log.WithField("msg_length", len(data)) + + if log.Level >= logger.DebugLevel { + log.Debugf("--> SEND: MSG %T %+v", msg, msg) + } + + // send message to VPP + err = c.vppClient.SendMsg(context, data) + if err != nil { + log.WithError(err).Debugf("unable to send message: %#v", msg) + return err + } + + return nil +} + +// processRequest processes a single request received on the request channel. +func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { + // check whether we are connected to VPP + if atomic.LoadUint32(&c.vppConnected) == 0 { err := ErrNotConnected - log.Errorf("processing request failed: %v", err) - sendReplyError(ch, req, err) + log.WithFields(logger.Fields{ + "channel": ch.id, + "seq_num": req.seqNum, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "error": err, + }).Warnf("Unable to process request") return err } // retrieve message ID msgID, err := c.GetMessageID(req.msg) if err != nil { - err = fmt.Errorf("unable to retrieve message ID: %v", err) log.WithFields(logger.Fields{ + "channel": ch.id, "msg_name": req.msg.GetMessageName(), "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, - }).Error(err) - sendReplyError(ch, req, err) + "error": err, + }).Warnf("Unable to retrieve message ID") return err } // encode the message into binary data, err := c.codec.EncodeMsg(req.msg, msgID) if err != nil { - err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ "channel": ch.id, "msg_id": msgID, "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, - }).Error(err) - sendReplyError(ch, req, err) + "error": err, + }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg) return err } - // get context context := packRequestContext(ch.id, req.multi, req.seqNum) - if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled + + if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ "channel": ch.id, - "context": context, - "is_multi": req.multi, "msg_id": msgID, "msg_name": req.msg.GetMessageName(), - "msg_size": len(data), + "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, - }).Debug(" -> Sending a message to VPP.") + "is_multi": req.multi, + "context": context, + "data_len": len(data), + }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg) } // send the request to VPP - err = c.vpp.SendMsg(context, data) + err = c.vppClient.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "seq_num": req.seqNum, - }).Error(err) - sendReplyError(ch, req, err) + "channel": ch.id, + "msg_id": msgID, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "seq_num": req.seqNum, + "is_multi": req.multi, + "context": context, + "data_len": len(data), + "error": err, + }).Warnf("Unable to send message") return err } if req.multi { // send a control ping to determine end of the multipart response - pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID) + pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID) - log.WithFields(logger.Fields{ - "channel": ch.id, - "context": context, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - "seq_num": req.seqNum, - }).Debug(" -> Sending a control ping to VPP.") + if log.Level >= logger.DebugLevel { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": c.pingReqID, + "msg_name": c.msgControlPing.GetMessageName(), + "msg_crc": c.msgControlPing.GetCrcString(), + "seq_num": req.seqNum, + "context": context, + "data_len": len(pingData), + }).Debugf(" -> SEND MSG: %T", c.msgControlPing) + } - if err := c.vpp.SendMsg(context, pingData); err != nil { + if err := c.vppClient.SendMsg(context, pingData); err != nil { log.WithFields(logger.Fields{ "context": context, - "msg_id": msgID, "seq_num": req.seqNum, - }).Warnf("unable to send control ping: %v", err) + "error": err, + }).Warnf("unable to send control ping") } } @@ -137,12 +202,11 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error { } // msgCallback is called whenever any binary API message comes from VPP. -func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { - connLock.RLock() - defer connLock.RUnlock() - +func (c *Connection) msgCallback(msgID uint16, data []byte) { if c == nil { - log.Warn("Already disconnected, ignoring the message.") + log.WithField( + "msg_id", msgID, + ).Warn("Connection already disconnected, ignoring the message.") return } @@ -157,27 +221,32 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // - replies that don't have context as first field (comes as zero) // - events that don't have context at all (comes as non zero) // - msgContext, err := c.codec.DecodeMsgContext(data, msg) - if err == nil { - if context != msgContext { - log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext) - context = msgContext - } - } else { - log.Errorf("decoding context failed: %v", err) + context, err := c.codec.DecodeMsgContext(data, msg) + if err != nil { + log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err) + return } chanID, isMulti, seqNum := unpackRequestContext(context) + if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled + msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + + // decode the message + if err = c.codec.DecodeMsg(data, msg); err != nil { + err = fmt.Errorf("decoding message failed: %w", err) + return + } + log.WithFields(logger.Fields{ "context": context, "msg_id": msgID, - "msg_name": msg.GetMessageName(), "msg_size": len(data), "channel": chanID, "is_multi": isMulti, "seq_num": seqNum, - }).Debug(" <- Received a message from VPP.") + "msg_crc": msg.GetCrcString(), + }).Debugf("<-- govpp RECEIVE: %s %+v", msg.GetMessageName(), msg) } if context == 0 || c.isNotificationMessage(msgID) { @@ -202,11 +271,12 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // treat this as a last part of the reply lastReplyReceived := isMulti && msgID == c.pingReplyID - // send the data to the channel + // send the data to the channel, it needs to be copied, + // because it will be freed after this function returns sendReply(ch, &vppReply{ msgID: msgID, seqNum: seqNum, - data: data, + data: append([]byte(nil), data...), lastReceived: lastReplyReceived, }) @@ -218,22 +288,94 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *channel, reply *vppReply) { +func sendReply(ch *Channel, reply *vppReply) { + // first try to avoid creating timer + select { + case ch.replyChan <- reply: + return // reply sent ok + default: + // reply channel full + } + if ch.receiveReplyTimeout == 0 { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": reply.msgID, + "seq_num": reply.seqNum, + "err": reply.err, + }).Warn("Reply channel full, dropping reply.") + return + } select { case ch.replyChan <- reply: - // reply sent successfully - case <-time.After(time.Millisecond * 100): + return // reply sent ok + case <-time.After(ch.receiveReplyTimeout): // receiver still not ready log.WithFields(logger.Fields{ - "channel": ch, + "channel": ch.id, "msg_id": reply.msgID, "seq_num": reply.seqNum, - }).Warn("Unable to send the reply, reciever end not ready.") + "err": reply.err, + }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout) } } -func sendReplyError(ch *channel, req *vppRequest, err error) { - sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) +// isNotificationMessage returns true if someone has subscribed to provided message ID. +func (c *Connection) isNotificationMessage(msgID uint16) bool { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + _, exists := c.subscriptions[msgID] + return exists +} + +// sendNotifications send a notification message to all subscribers subscribed for that message. +func (c *Connection) sendNotifications(msgID uint16, data []byte) { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + matched := false + + // send to notification to each subscriber + for _, sub := range c.subscriptions[msgID] { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Sending a notification to the subscription channel.") + + event := sub.msgFactory() + if err := c.codec.DecodeMsg(data, event); err != nil { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + "error": err, + }).Warnf("Unable to decode the notification message") + continue + } + + // send the message into the go channel of the subscription + select { + case sub.notifChan <- event: + // message sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Warn("Unable to deliver the notification, reciever end not ready.") + } + + matched = true + } + + if !matched { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + }).Info("No subscription found for the notification message.") + } } // +------------------+-------------------+-----------------------+