if err := stream.SendMsg(req); err != nil {
return err
}
- msg, err := stream.RecvMsg()
+ s := stream.(*Stream)
+ rep, err := s.recvReply()
if err != nil {
return err
}
- if msg.GetMessageName() != reply.GetMessageName() ||
- msg.GetCrcString() != reply.GetCrcString() {
- return fmt.Errorf("unexpected reply: %T %+v", msg, msg)
+ if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil {
+ return err
}
- reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(msg).Elem())
return nil
}
}
func (s *Stream) RecvMsg() (api.Message, error) {
+ reply, err := s.recvReply()
+ if err != nil {
+ return nil, err
+ }
+ // resolve message type
+ msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
+ if err != nil {
+ return nil, err
+ }
+ // allocate message instance
+ msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ // decode message data
+ if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
+ return nil, err
+ }
+ return msg, nil
+}
+
+func (s *Stream) recvReply() (*vppReply, error) {
if s.conn == nil {
return nil, errors.New("stream closed")
}
// and stream does not use it
return nil, reply.err
}
- // resolve message type
- msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
- if err != nil {
- return nil, err
- }
- // allocate message instance
- msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
- // decode message data
- if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
- return nil, err
- }
- return msg, nil
+ return reply, nil
case <-s.ctx.Done():
return nil, s.ctx.Err()