+ reply, err := s.recvReply()
+ if err != nil {
+ return nil, err
+ }
+ // resolve message type
+ s.Lock()
+ path := s.pkgPath
+ s.Unlock()
+ msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
+ if err != nil {
+ return nil, err
+ }
+ // allocate message instance
+ msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ // decode message data
+ if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
+ return nil, err
+ }
+ return msg, nil
+}
+
+func WithRequestSize(size int) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).requestSize = size
+ }
+}
+
+func WithReplySize(size int) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).replySize = size
+ }
+}
+
+func WithReplyTimeout(timeout time.Duration) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).replyTimeout = timeout
+ }
+}
+
+func (s *Stream) recvReply() (*vppReply, error) {