Introduce Stream - experimental API for low-level access to VPP API 73/27673/1
authorOndrej Fabry <ofabry@cisco.com>
Thu, 25 Jun 2020 09:55:58 +0000 (11:55 +0200)
committerOndrej Fabry <ofabry@cisco.com>
Thu, 25 Jun 2020 09:55:58 +0000 (11:55 +0200)
Change-Id: I2698e11b76ff55d9730b47d4fee990be93349516
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
14 files changed:
adapter/vppapiclient/doc.go
api/api.go [new file with mode: 0644]
api/binapi.go
api/errors.go [moved from api/vppapi_errors.go with 97% similarity]
api/errors_test.go [moved from api/vppapi_errors_test.go with 100% similarity]
core/channel.go
core/control_ping.go
core/request_handler.go
core/stream.go [new file with mode: 0644]
examples/binapi/gen.go
examples/binapi/mactime/mactime.ba.go [new file with mode: 0644]
examples/binapi/mactime/mactime_rpc.ba.go [new file with mode: 0644]
examples/perf-bench/perf-bench.go
examples/simple-client/simple_client.go

index 6505498..d9ad12b 100644 (file)
@@ -15,4 +15,7 @@
 // Package vppapiclient is the default VPP adapter being used for
 // the connection to VPP binary & stats API via shared memory.
 // It is essentially Go wrapper for the VPP vppapiclient library written in C.
+//
+// DEPRECATED: The vppapiclient implementation has been deprecated.
+// Use socketclient and statsclient.
 package vppapiclient
diff --git a/api/api.go b/api/api.go
new file mode 100644 (file)
index 0000000..977b02e
--- /dev/null
@@ -0,0 +1,125 @@
+//  Copyright (c) 2020 Cisco and/or its affiliates.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at:
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package api
+
+import (
+       "context"
+       "time"
+)
+
+// Connection represents the client connection to VPP API.
+//
+// NOTE: This API is EXPERIMENTAL.
+type Connection interface {
+       // NewStream creates a new stream for sending and receiving messages.
+       // Context can be used to close the stream using cancel or timeout.
+       NewStream(ctx context.Context) (Stream, error)
+
+       // Invoke can be used for a simple request-reply RPC.
+       // It creates stream and calls SendMsg with req and RecvMsg with reply.
+       Invoke(ctx context.Context, req Message, reply Message) error
+}
+
+// Stream provides low-level access for sending and receiving messages.
+// Users should handle correct type and ordering of messages.
+//
+// NOTE: This API is EXPERIMENTAL.
+type Stream interface {
+       // SendMsg sends a message to the client.
+       // It blocks until message is sent to the transport.
+       //
+       // It is safe to have a goroutine calling SendMsg and another goroutine
+       // calling RecvMsg on the same stream at the same time, but it is not safe
+       // to call SendMsg on the same stream in different goroutines.
+       SendMsg(Message) error
+
+       // RecvMsg blocks until a message is received or error occurs.
+       //
+       // It is safe to have a goroutine calling SendMsg and another goroutine
+       // calling RecvMsg on the same stream at the same time, but it is not safe
+       // to call SendMsg on the same stream in different goroutines.
+       RecvMsg() (Message, error)
+
+       // Close closes the stream. Calling SendMsg and RecvMsg will return error
+       // after closing stream.
+       Close() error
+}
+
+// ChannelProvider provides the communication channel with govpp core.
+type ChannelProvider interface {
+       // NewAPIChannel returns a new channel for communication with VPP via govpp core.
+       // It uses default buffer sizes for the request and reply Go channels.
+       NewAPIChannel() (Channel, error)
+
+       // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core.
+       // It allows to specify custom buffer sizes for the request and reply Go channels.
+       NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (Channel, error)
+}
+
+// Channel provides methods for direct communication with VPP channel.
+type Channel interface {
+       // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
+       // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
+       SendRequest(msg Message) RequestCtx
+
+       // SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP.
+       // Returns a multipart request context, that can be used to call ReceiveReply.
+       // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
+       SendMultiRequest(msg Message) MultiRequestCtx
+
+       // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
+       // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
+       // buffer is full, the notifications will not be delivered into it.
+       SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error)
+
+       // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
+       // from VPP before returning an error.
+       SetReplyTimeout(timeout time.Duration)
+
+       // CheckCompatibility checks the compatiblity for the given messages.
+       // It will return an error if any of the given messages are not compatible.
+       CheckCompatiblity(msgs ...Message) error
+
+       // Close closes the API channel and releases all API channel-related resources
+       // in the ChannelProvider.
+       Close()
+}
+
+// RequestCtx is helper interface which allows to receive reply on request.
+type RequestCtx interface {
+       // ReceiveReply receives a reply from VPP (blocks until a reply is delivered
+       // from VPP, or until an error occurs). The reply will be decoded into the msg
+       // argument. Error will be returned if the response cannot be received or decoded.
+       ReceiveReply(msg Message) error
+}
+
+// MultiRequestCtx is helper interface which allows to receive reply on multi-request.
+type MultiRequestCtx interface {
+       // ReceiveReply receives a reply from VPP (blocks until a reply is delivered
+       // from VPP, or until an error occurs).The reply will be decoded into the msg
+       // argument. If the last reply has been already consumed, lastReplyReceived is
+       // set to true. Do not use the message itself if lastReplyReceived is
+       // true - it won't be filled with actual data.Error will be returned if the
+       // response cannot be received or decoded.
+       ReceiveReply(msg Message) (lastReplyReceived bool, err error)
+}
+
+// SubscriptionCtx is helper interface which allows to control subscription for
+// notification events.
+type SubscriptionCtx interface {
+       // Unsubscribe unsubscribes from receiving the notifications tied to the
+       // subscription context.
+       Unsubscribe() error
+}
index 20ddd28..04fdc9e 100644 (file)
@@ -17,7 +17,6 @@ package api
 import (
        "fmt"
        "reflect"
-       "time"
 )
 
 // MessageType represents the type of a VPP message.
@@ -56,84 +55,6 @@ type DataType interface {
        GetTypeName() string
 }
 
-// ChannelProvider provides the communication channel with govpp core.
-type ChannelProvider interface {
-       // NewAPIChannel returns a new channel for communication with VPP via govpp core.
-       // It uses default buffer sizes for the request and reply Go channels.
-       NewAPIChannel() (Channel, error)
-
-       // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core.
-       // It allows to specify custom buffer sizes for the request and reply Go channels.
-       NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (Channel, error)
-}
-
-// Channel provides methods for direct communication with VPP channel.
-type Channel interface {
-       // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
-       // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
-       SendRequest(msg Message) RequestCtx
-
-       // SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP.
-       // Returns a multipart request context, that can be used to call ReceiveReply.
-       // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
-       SendMultiRequest(msg Message) MultiRequestCtx
-
-       // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
-       // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
-       // buffer is full, the notifications will not be delivered into it.
-       SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error)
-
-       // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
-       // from VPP before returning an error.
-       SetReplyTimeout(timeout time.Duration)
-
-       // CheckCompatibility checks the compatiblity for the given messages.
-       // It will return an error if any of the given messages are not compatible.
-       CheckCompatiblity(msgs ...Message) error
-
-       // Close closes the API channel and releases all API channel-related resources
-       // in the ChannelProvider.
-       Close()
-}
-
-// RequestCtx is helper interface which allows to receive reply on request.
-type RequestCtx interface {
-       // ReceiveReply receives a reply from VPP (blocks until a reply is delivered
-       // from VPP, or until an error occurs). The reply will be decoded into the msg
-       // argument. Error will be returned if the response cannot be received or decoded.
-       ReceiveReply(msg Message) error
-}
-
-// MultiRequestCtx is helper interface which allows to receive reply on multi-request.
-type MultiRequestCtx interface {
-       // ReceiveReply receives a reply from VPP (blocks until a reply is delivered
-       // from VPP, or until an error occurs).The reply will be decoded into the msg
-       // argument. If the last reply has been already consumed, lastReplyReceived is
-       // set to true. Do not use the message itself if lastReplyReceived is
-       // true - it won't be filled with actual data.Error will be returned if the
-       // response cannot be received or decoded.
-       ReceiveReply(msg Message) (lastReplyReceived bool, err error)
-}
-
-// SubscriptionCtx is helper interface which allows to control subscription for
-// notification events.
-type SubscriptionCtx interface {
-       // Unsubscribe unsubscribes from receiving the notifications tied to the
-       // subscription context.
-       Unsubscribe() error
-}
-
-// CompatibilityError is the error type usually returned by CheckCompatibility
-// method of Channel. It contains list of all the compatible/incompatible messages.
-type CompatibilityError struct {
-       CompatibleMessages   []string
-       IncompatibleMessages []string
-}
-
-func (c *CompatibilityError) Error() string {
-       return fmt.Sprintf("%d/%d messages incompatible", len(c.IncompatibleMessages), len(c.CompatibleMessages)+len(c.IncompatibleMessages))
-}
-
 var (
        registeredMessageTypes = make(map[reflect.Type]string)
        registeredMessages     = make(map[string]Message)
similarity index 97%
rename from api/vppapi_errors.go
rename to api/errors.go
index c9ce3a3..32cc677 100644 (file)
@@ -5,6 +5,17 @@ import (
        "strconv"
 )
 
+// CompatibilityError is the error type usually returned by CheckCompatibility
+// method of Channel. It contains list of all the compatible/incompatible messages.
+type CompatibilityError struct {
+       CompatibleMessages   []string
+       IncompatibleMessages []string
+}
+
+func (c *CompatibilityError) Error() string {
+       return fmt.Sprintf("%d/%d messages incompatible", len(c.IncompatibleMessages), len(c.CompatibleMessages)+len(c.IncompatibleMessages))
+}
+
 // RetvalToVPPApiError returns error for retval value.
 // Retval 0 returns nil error.
 func RetvalToVPPApiError(retval int32) error {
similarity index 100%
rename from api/vppapi_errors_test.go
rename to api/errors_test.go
index 8479d6a..1b5e77e 100644 (file)
@@ -102,19 +102,21 @@ type Channel struct {
 
        lastSeqNum uint16 // sequence number of the last sent request
 
-       delayedReply *vppReply     // reply already taken from ReplyChan, buffered for later delivery
-       replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
+       delayedReply        *vppReply     // reply already taken from ReplyChan, buffered for later delivery
+       replyTimeout        time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
+       receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply
 }
 
 func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
        return &Channel{
-               id:            id,
-               conn:          conn,
-               msgCodec:      codec,
-               msgIdentifier: identifier,
-               reqChan:       make(chan *vppRequest, reqSize),
-               replyChan:     make(chan *vppReply, replySize),
-               replyTimeout:  DefaultReplyTimeout,
+               id:                  id,
+               conn:                conn,
+               msgCodec:            codec,
+               msgIdentifier:       identifier,
+               reqChan:             make(chan *vppRequest, reqSize),
+               replyChan:           make(chan *vppReply, replySize),
+               replyTimeout:        DefaultReplyTimeout,
+               receiveReplyTimeout: ReplyChannelTimeout,
        }
 }
 
@@ -122,28 +124,29 @@ func (ch *Channel) GetID() uint16 {
        return ch.id
 }
 
+func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
+       req := ch.newRequest(msg, false)
+       ch.reqChan <- req
+       return &requestCtx{ch: ch, seqNum: req.seqNum}
+}
+
+func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+       req := ch.newRequest(msg, true)
+       ch.reqChan <- req
+       return &multiRequestCtx{ch: ch, seqNum: req.seqNum}
+}
+
 func (ch *Channel) nextSeqNum() uint16 {
        ch.lastSeqNum++
        return ch.lastSeqNum
 }
 
-func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
-       seqNum := ch.nextSeqNum()
-       ch.reqChan <- &vppRequest{
-               msg:    msg,
-               seqNum: seqNum,
-       }
-       return &requestCtx{ch: ch, seqNum: seqNum}
-}
-
-func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
-       seqNum := ch.nextSeqNum()
-       ch.reqChan <- &vppRequest{
+func (ch *Channel) newRequest(msg api.Message, multi bool) *vppRequest {
+       return &vppRequest{
                msg:    msg,
-               seqNum: seqNum,
-               multi:  true,
+               seqNum: ch.nextSeqNum(),
+               multi:  multi,
        }
-       return &multiRequestCtx{ch: ch, seqNum: seqNum}
 }
 
 func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
index b39fd3f..ed8d274 100644 (file)
@@ -21,15 +21,9 @@ func SetControlPingReply(m api.Message) {
 
 type ControlPing struct{}
 
-func (*ControlPing) GetMessageName() string {
-       return "control_ping"
-}
-func (*ControlPing) GetCrcString() string {
-       return "51077d14"
-}
-func (*ControlPing) GetMessageType() api.MessageType {
-       return api.RequestMessage
-}
+func (*ControlPing) GetMessageName() string          { return "control_ping" }
+func (*ControlPing) GetCrcString() string            { return "51077d14" }
+func (*ControlPing) GetMessageType() api.MessageType { return api.RequestMessage }
 
 type ControlPingReply struct {
        Retval      int32
@@ -37,15 +31,9 @@ type ControlPingReply struct {
        VpePID      uint32
 }
 
-func (*ControlPingReply) GetMessageName() string {
-       return "control_ping_reply"
-}
-func (*ControlPingReply) GetCrcString() string {
-       return "f6b0b8ca"
-}
-func (*ControlPingReply) GetMessageType() api.MessageType {
-       return api.ReplyMessage
-}
+func (*ControlPingReply) GetMessageName() string          { return "control_ping_reply" }
+func (*ControlPingReply) GetCrcString() string            { return "f6b0b8ca" }
+func (*ControlPingReply) GetMessageType() api.MessageType { return api.ReplyMessage }
 
 func init() {
        api.RegisterMessage((*ControlPing)(nil), "ControlPing")
index e272c6f..fc704cb 100644 (file)
@@ -45,18 +45,72 @@ func (c *Connection) watchRequests(ch *Channel) {
                                return
                        }
                        if err := c.processRequest(ch, req); err != nil {
-                               sendReplyError(ch, req, err)
+                               sendReply(ch, &vppReply{
+                                       seqNum: req.seqNum,
+                                       err:    fmt.Errorf("unable to process request: %w", err),
+                               })
                        }
                }
        }
 }
 
+// processRequest processes a single request received on the request channel.
+func (c *Connection) sendMessage(context uint32, msg api.Message) error {
+       // check whether we are connected to VPP
+       if atomic.LoadUint32(&c.vppConnected) == 0 {
+               return ErrNotConnected
+       }
+
+       /*log := log.WithFields(logger.Fields{
+               "context":  context,
+               "msg_name": msg.GetMessageName(),
+               "msg_crc":  msg.GetCrcString(),
+       })*/
+
+       // retrieve message ID
+       msgID, err := c.GetMessageID(msg)
+       if err != nil {
+               //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg)
+               return err
+       }
+
+       //log = log.WithField("msg_id", msgID)
+
+       // encode the message
+       data, err := c.codec.EncodeMsg(msg, msgID)
+       if err != nil {
+               log.WithError(err).Debugf("unable to encode message: %#v", msg)
+               return err
+       }
+
+       //log = log.WithField("msg_length", len(data))
+
+       if log.Level >= logger.DebugLevel {
+               log.Debugf("--> SEND: MSG %T %+v", msg, msg)
+       }
+
+       // send message to VPP
+       err = c.vppClient.SendMsg(context, data)
+       if err != nil {
+               log.WithError(err).Debugf("unable to send message: %#v", msg)
+               return err
+       }
+
+       return nil
+}
+
 // processRequest processes a single request received on the request channel.
 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.vppConnected) == 0 {
                err := ErrNotConnected
-               log.Errorf("processing request failed: %v", err)
+               log.WithFields(logger.Fields{
+                       "channel":  ch.id,
+                       "seq_num":  req.seqNum,
+                       "msg_name": req.msg.GetMessageName(),
+                       "msg_crc":  req.msg.GetCrcString(),
+                       "error":    err,
+               }).Warnf("Unable to process request")
                return err
        }
 
@@ -64,12 +118,13 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
        msgID, err := c.GetMessageID(req.msg)
        if err != nil {
                log.WithFields(logger.Fields{
+                       "channel":  ch.id,
                        "msg_name": req.msg.GetMessageName(),
                        "msg_crc":  req.msg.GetCrcString(),
                        "seq_num":  req.seqNum,
                        "error":    err,
-               }).Errorf("failed to retrieve message ID")
-               return fmt.Errorf("unable to retrieve message ID: %v", err)
+               }).Warnf("Unable to retrieve message ID")
+               return err
        }
 
        // encode the message into binary
@@ -79,35 +134,42 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
                        "channel":  ch.id,
                        "msg_id":   msgID,
                        "msg_name": req.msg.GetMessageName(),
+                       "msg_crc":  req.msg.GetCrcString(),
                        "seq_num":  req.seqNum,
                        "error":    err,
-               }).Errorf("failed to encode message: %#v", req.msg)
-               return fmt.Errorf("unable to encode the message: %v", err)
+               }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg)
+               return err
        }
 
        context := packRequestContext(ch.id, req.multi, req.seqNum)
 
-       if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
+       if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
-                       "context":  context,
-                       "is_multi": req.multi,
                        "msg_id":   msgID,
-                       "msg_size": len(data),
-                       "seq_num":  req.seqNum,
+                       "msg_name": req.msg.GetMessageName(),
                        "msg_crc":  req.msg.GetCrcString(),
-               }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg)
+                       "seq_num":  req.seqNum,
+                       "is_multi": req.multi,
+                       "context":  context,
+                       "data_len": len(data),
+               }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg)
        }
 
        // send the request to VPP
        err = c.vppClient.SendMsg(context, data)
        if err != nil {
-               err = fmt.Errorf("unable to send the message: %v", err)
                log.WithFields(logger.Fields{
-                       "context": context,
-                       "msg_id":  msgID,
-                       "seq_num": req.seqNum,
-               }).Error(err)
+                       "channel":  ch.id,
+                       "msg_id":   msgID,
+                       "msg_name": req.msg.GetMessageName(),
+                       "msg_crc":  req.msg.GetCrcString(),
+                       "seq_num":  req.seqNum,
+                       "is_multi": req.multi,
+                       "context":  context,
+                       "data_len": len(data),
+                       "error":    err,
+               }).Warnf("Unable to send message")
                return err
        }
 
@@ -115,20 +177,24 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
                // send a control ping to determine end of the multipart response
                pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
 
-               log.WithFields(logger.Fields{
-                       "channel":  ch.id,
-                       "context":  context,
-                       "msg_id":   c.pingReqID,
-                       "msg_size": len(pingData),
-                       "seq_num":  req.seqNum,
-               }).Debug(" -> sending control ping")
+               if log.Level >= logger.DebugLevel {
+                       log.WithFields(logger.Fields{
+                               "channel":  ch.id,
+                               "msg_id":   c.pingReqID,
+                               "msg_name": c.msgControlPing.GetMessageName(),
+                               "msg_crc":  c.msgControlPing.GetCrcString(),
+                               "seq_num":  req.seqNum,
+                               "context":  context,
+                               "data_len": len(pingData),
+                       }).Debugf(" -> SEND MSG: %T", c.msgControlPing)
+               }
 
                if err := c.vppClient.SendMsg(context, pingData); err != nil {
                        log.WithFields(logger.Fields{
                                "context": context,
-                               "msg_id":  msgID,
                                "seq_num": req.seqNum,
-                       }).Warnf("unable to send control ping: %v", err)
+                               "error":   err,
+                       }).Warnf("unable to send control ping")
                }
        }
 
@@ -138,7 +204,9 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
 // msgCallback is called whenever any binary API message comes from VPP.
 func (c *Connection) msgCallback(msgID uint16, data []byte) {
        if c == nil {
-               log.Warn("Already disconnected, ignoring the message.")
+               log.WithField(
+                       "msg_id", msgID,
+               ).Warn("Connection already disconnected, ignoring the message.")
                return
        }
 
@@ -155,7 +223,8 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
        //
        context, err := c.codec.DecodeMsgContext(data, msg)
        if err != nil {
-               log.Errorf("decoding context failed: %v", err)
+               log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err)
+               return
        }
 
        chanID, isMulti, seqNum := unpackRequestContext(context)
@@ -220,23 +289,36 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
 // it logs the error and do not send the message.
 func sendReply(ch *Channel, reply *vppReply) {
+       // first try to avoid creating timer
+       select {
+       case ch.replyChan <- reply:
+               return // reply sent ok
+       default:
+               // reply channel full
+       }
+       if ch.receiveReplyTimeout == 0 {
+               log.WithFields(logger.Fields{
+                       "channel": ch.id,
+                       "msg_id":  reply.msgID,
+                       "seq_num": reply.seqNum,
+                       "err":     reply.err,
+               }).Warn("Reply channel full, dropping reply.")
+               return
+       }
        select {
        case ch.replyChan <- reply:
-       // reply sent successfully
-       case <-time.After(ReplyChannelTimeout):
+               return // reply sent ok
+       case <-time.After(ch.receiveReplyTimeout):
                // receiver still not ready
                log.WithFields(logger.Fields{
-                       "channel": ch,
+                       "channel": ch.id,
                        "msg_id":  reply.msgID,
                        "seq_num": reply.seqNum,
-               }).Warn("Unable to send the reply, reciever end not ready.")
+                       "err":     reply.err,
+               }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout)
        }
 }
 
-func sendReplyError(ch *Channel, req *vppRequest, err error) {
-       sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
-}
-
 // isNotificationMessage returns true if someone has subscribed to provided message ID.
 func (c *Connection) isNotificationMessage(msgID uint16) bool {
        c.subscriptionsLock.RLock()
@@ -267,7 +349,8 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) {
                                "msg_name": sub.event.GetMessageName(),
                                "msg_id":   msgID,
                                "msg_size": len(data),
-                       }).Errorf("Unable to decode the notification message: %v", err)
+                               "error":    err,
+                       }).Warnf("Unable to decode the notification message")
                        continue
                }
 
diff --git a/core/stream.go b/core/stream.go
new file mode 100644 (file)
index 0000000..edc3f2b
--- /dev/null
@@ -0,0 +1,124 @@
+//  Copyright (c) 2020 Cisco and/or its affiliates.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at:
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package core
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "reflect"
+       "sync/atomic"
+
+       "git.fd.io/govpp.git/api"
+)
+
+type Stream struct {
+       id      uint32
+       conn    *Connection
+       ctx     context.Context
+       channel *Channel
+}
+
+func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
+       // TODO: add stream options as variadic parameters for customizing:
+       // - request/reply channel size
+       // - reply timeout
+       // - retries
+       // - ???
+
+       // create new channel
+       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+       channel := newChannel(chID, c, c.codec, c, 10, 10)
+
+       // store API channel within the client
+       c.channelsLock.Lock()
+       c.channels[chID] = channel
+       c.channelsLock.Unlock()
+
+       // Channel.watchRequests are not started here intentionally, because
+       // requests are sent directly by SendMsg.
+
+       return &Stream{
+               id:      uint32(chID),
+               conn:    c,
+               ctx:     ctx,
+               channel: channel,
+       }, nil
+}
+
+func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
+       // TODO: implement invoke
+       panic("not implemented")
+}
+
+func (s *Stream) Context() context.Context {
+       return s.ctx
+}
+
+func (s *Stream) Close() error {
+       if s.conn == nil {
+               return errors.New("stream closed")
+       }
+       s.conn.releaseAPIChannel(s.channel)
+       s.conn = nil
+       return nil
+}
+
+func (s *Stream) SendMsg(msg api.Message) error {
+       if s.conn == nil {
+               return errors.New("stream closed")
+       }
+       req := s.channel.newRequest(msg, false)
+       if err := s.conn.processRequest(s.channel, req); err != nil {
+               return err
+       }
+       return nil
+}
+
+func (s *Stream) RecvMsg() (api.Message, error) {
+       if s.conn == nil {
+               return nil, errors.New("stream closed")
+       }
+       select {
+       case reply, ok := <-s.channel.replyChan:
+               if !ok {
+                       return nil, fmt.Errorf("reply channel closed")
+               }
+               if reply.err != nil {
+                       // this case should actually never happen for stream
+                       // since reply.err is only filled in watchRequests
+                       // and stream does not use it
+                       return nil, reply.err
+               }
+               // resolve message type
+               msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
+               if err != nil {
+                       return nil, err
+               }
+               // allocate message instance
+               msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+               // decode message data
+               if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
+                       return nil, err
+               }
+               return msg, nil
+
+       case <-s.ctx.Done():
+               return nil, s.ctx.Err()
+       }
+}
index 7849d8d..46399f4 100644 (file)
@@ -2,4 +2,4 @@ package binapi
 
 // Generate Go code from the VPP APIs located in the /usr/share/vpp/api directory.
 
-//go:generate binapi-generator --import-types=false af_packet interface interface_types ip memclnt vpe sr acl memif ip_types fib_types
+//go:generate binapi-generator --import-types=false mactime af_packet interface interface_types ip memclnt vpe sr acl memif ip_types fib_types
diff --git a/examples/binapi/mactime/mactime.ba.go b/examples/binapi/mactime/mactime.ba.go
new file mode 100644 (file)
index 0000000..38a206e
--- /dev/null
@@ -0,0 +1,684 @@
+// Code generated by GoVPP's binapi-generator. DO NOT EDIT.
+// versions:
+//  binapi-generator: v0.4.0-dev
+//  VPP:              20.05-release
+// source: /usr/share/vpp/api/plugins/mactime.api.json
+
+/*
+Package mactime contains generated code for VPP API file mactime.api (2.0.0).
+
+It consists of:
+         2 aliases
+         6 enums
+         7 messages
+         2 types
+*/
+package mactime
+
+import (
+       "bytes"
+       "context"
+       "encoding/binary"
+       "io"
+       "math"
+       "strconv"
+
+       api "git.fd.io/govpp.git/api"
+       codec "git.fd.io/govpp.git/codec"
+       struc "github.com/lunixbochs/struc"
+
+       interface_types "git.fd.io/govpp.git/examples/binapi/interface_types"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the GoVPP api package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// GoVPP api package needs to be updated.
+const _ = api.GoVppAPIPackageIsVersion2 // please upgrade the GoVPP api package
+
+const (
+       // ModuleName is the name of this module.
+       ModuleName = "mactime"
+       // APIVersion is the API version of this module.
+       APIVersion = "2.0.0"
+       // VersionCrc is the CRC of this module.
+       VersionCrc = 0x9283d3e
+)
+
+type IfStatusFlags = interface_types.IfStatusFlags
+
+type IfType = interface_types.IfType
+
+type LinkDuplex = interface_types.LinkDuplex
+
+type MtuProto = interface_types.MtuProto
+
+type RxMode = interface_types.RxMode
+
+type SubIfFlags = interface_types.SubIfFlags
+
+type InterfaceIndex = interface_types.InterfaceIndex
+
+// MacAddress represents VPP binary API alias 'mac_address'.
+type MacAddress [6]uint8
+
+// MactimeTimeRange represents VPP binary API type 'mactime_time_range'.
+type MactimeTimeRange struct {
+       Start float64 `binapi:"f64,name=start" json:"start,omitempty"`
+       End   float64 `binapi:"f64,name=end" json:"end,omitempty"`
+}
+
+func (*MactimeTimeRange) GetTypeName() string { return "mactime_time_range" }
+
+// TimeRange represents VPP binary API type 'time_range'.
+type TimeRange struct {
+       Start float64 `binapi:"f64,name=start" json:"start,omitempty"`
+       End   float64 `binapi:"f64,name=end" json:"end,omitempty"`
+}
+
+func (*TimeRange) GetTypeName() string { return "time_range" }
+
+// MactimeAddDelRange represents VPP binary API message 'mactime_add_del_range'.
+type MactimeAddDelRange struct {
+       IsAdd      bool        `binapi:"bool,name=is_add" json:"is_add,omitempty"`
+       Drop       bool        `binapi:"bool,name=drop" json:"drop,omitempty"`
+       Allow      bool        `binapi:"bool,name=allow" json:"allow,omitempty"`
+       AllowQuota uint8       `binapi:"u8,name=allow_quota" json:"allow_quota,omitempty"`
+       NoUDP10001 bool        `binapi:"bool,name=no_udp_10001" json:"no_udp_10001,omitempty"`
+       DataQuota  uint64      `binapi:"u64,name=data_quota" json:"data_quota,omitempty"`
+       MacAddress MacAddress  `binapi:"mac_address,name=mac_address" json:"mac_address,omitempty"`
+       DeviceName string      `binapi:"string[64],name=device_name" json:"device_name,omitempty" struc:"[64]byte"`
+       Count      uint32      `binapi:"u32,name=count" json:"count,omitempty" struc:"sizeof=Ranges"`
+       Ranges     []TimeRange `binapi:"time_range[count],name=ranges" json:"ranges,omitempty"`
+}
+
+func (m *MactimeAddDelRange) Reset()                        { *m = MactimeAddDelRange{} }
+func (*MactimeAddDelRange) GetMessageName() string          { return "mactime_add_del_range" }
+func (*MactimeAddDelRange) GetCrcString() string            { return "101858ef" }
+func (*MactimeAddDelRange) GetMessageType() api.MessageType { return api.RequestMessage }
+
+func (m *MactimeAddDelRange) Size() int {
+       if m == nil {
+               return 0
+       }
+       var size int
+       // field[1] m.IsAdd
+       size += 1
+       // field[1] m.Drop
+       size += 1
+       // field[1] m.Allow
+       size += 1
+       // field[1] m.AllowQuota
+       size += 1
+       // field[1] m.NoUDP10001
+       size += 1
+       // field[1] m.DataQuota
+       size += 8
+       // field[1] m.MacAddress
+       size += 6
+       // field[1] m.DeviceName
+       size += 64
+       // field[1] m.Count
+       size += 4
+       // field[1] m.Ranges
+       for j1 := 0; j1 < len(m.Ranges); j1++ {
+               var s1 TimeRange
+               _ = s1
+               if j1 < len(m.Ranges) {
+                       s1 = m.Ranges[j1]
+               }
+               // field[2] s1.Start
+               size += 8
+               // field[2] s1.End
+               size += 8
+       }
+       return size
+}
+func (m *MactimeAddDelRange) Marshal(b []byte) ([]byte, error) {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       var buf []byte
+       if b == nil {
+               buf = make([]byte, m.Size())
+       } else {
+               buf = b
+       }
+       // field[1] m.IsAdd
+       if m.IsAdd {
+               buf[pos] = 1
+       }
+       pos += 1
+       // field[1] m.Drop
+       if m.Drop {
+               buf[pos] = 1
+       }
+       pos += 1
+       // field[1] m.Allow
+       if m.Allow {
+               buf[pos] = 1
+       }
+       pos += 1
+       // field[1] m.AllowQuota
+       buf[pos] = uint8(m.AllowQuota)
+       pos += 1
+       // field[1] m.NoUDP10001
+       if m.NoUDP10001 {
+               buf[pos] = 1
+       }
+       pos += 1
+       // field[1] m.DataQuota
+       o.PutUint64(buf[pos:pos+8], uint64(m.DataQuota))
+       pos += 8
+       // field[1] m.MacAddress
+       for i := 0; i < 6; i++ {
+               var x uint8
+               if i < len(m.MacAddress) {
+                       x = uint8(m.MacAddress[i])
+               }
+               buf[pos] = uint8(x)
+               pos += 1
+       }
+       // field[1] m.DeviceName
+       copy(buf[pos:pos+64], m.DeviceName)
+       pos += 64
+       // field[1] m.Count
+       o.PutUint32(buf[pos:pos+4], uint32(len(m.Ranges)))
+       pos += 4
+       // field[1] m.Ranges
+       for j1 := 0; j1 < len(m.Ranges); j1++ {
+               var v1 TimeRange
+               if j1 < len(m.Ranges) {
+                       v1 = m.Ranges[j1]
+               }
+               // field[2] v1.Start
+               o.PutUint64(buf[pos:pos+8], math.Float64bits(float64(v1.Start)))
+               pos += 8
+               // field[2] v1.End
+               o.PutUint64(buf[pos:pos+8], math.Float64bits(float64(v1.End)))
+               pos += 8
+       }
+       return buf, nil
+}
+func (m *MactimeAddDelRange) Unmarshal(tmp []byte) error {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       // field[1] m.IsAdd
+       m.IsAdd = tmp[pos] != 0
+       pos += 1
+       // field[1] m.Drop
+       m.Drop = tmp[pos] != 0
+       pos += 1
+       // field[1] m.Allow
+       m.Allow = tmp[pos] != 0
+       pos += 1
+       // field[1] m.AllowQuota
+       m.AllowQuota = uint8(tmp[pos])
+       pos += 1
+       // field[1] m.NoUDP10001
+       m.NoUDP10001 = tmp[pos] != 0
+       pos += 1
+       // field[1] m.DataQuota
+       m.DataQuota = uint64(o.Uint64(tmp[pos : pos+8]))
+       pos += 8
+       // field[1] m.MacAddress
+       for i := 0; i < len(m.MacAddress); i++ {
+               m.MacAddress[i] = uint8(tmp[pos])
+               pos += 1
+       }
+       // field[1] m.DeviceName
+       {
+               nul := bytes.Index(tmp[pos:pos+64], []byte{0x00})
+               m.DeviceName = codec.DecodeString(tmp[pos : pos+nul])
+               pos += 64
+       }
+       // field[1] m.Count
+       m.Count = uint32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       // field[1] m.Ranges
+       m.Ranges = make([]TimeRange, int(m.Count))
+       for j1 := 0; j1 < int(m.Count); j1++ {
+               // field[2] m.Ranges[j1].Start
+               m.Ranges[j1].Start = float64(math.Float64frombits(o.Uint64(tmp[pos : pos+8])))
+               pos += 8
+               // field[2] m.Ranges[j1].End
+               m.Ranges[j1].End = float64(math.Float64frombits(o.Uint64(tmp[pos : pos+8])))
+               pos += 8
+       }
+       return nil
+}
+
+// MactimeAddDelRangeReply represents VPP binary API message 'mactime_add_del_range_reply'.
+type MactimeAddDelRangeReply struct {
+       Retval int32 `binapi:"i32,name=retval" json:"retval,omitempty"`
+}
+
+func (m *MactimeAddDelRangeReply) Reset()                        { *m = MactimeAddDelRangeReply{} }
+func (*MactimeAddDelRangeReply) GetMessageName() string          { return "mactime_add_del_range_reply" }
+func (*MactimeAddDelRangeReply) GetCrcString() string            { return "e8d4e804" }
+func (*MactimeAddDelRangeReply) GetMessageType() api.MessageType { return api.ReplyMessage }
+
+func (m *MactimeAddDelRangeReply) Size() int {
+       if m == nil {
+               return 0
+       }
+       var size int
+       // field[1] m.Retval
+       size += 4
+       return size
+}
+func (m *MactimeAddDelRangeReply) Marshal(b []byte) ([]byte, error) {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       var buf []byte
+       if b == nil {
+               buf = make([]byte, m.Size())
+       } else {
+               buf = b
+       }
+       // field[1] m.Retval
+       o.PutUint32(buf[pos:pos+4], uint32(m.Retval))
+       pos += 4
+       return buf, nil
+}
+func (m *MactimeAddDelRangeReply) Unmarshal(tmp []byte) error {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       // field[1] m.Retval
+       m.Retval = int32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       return nil
+}
+
+// MactimeDetails represents VPP binary API message 'mactime_details'.
+type MactimeDetails struct {
+       PoolIndex       uint32             `binapi:"u32,name=pool_index" json:"pool_index,omitempty"`
+       MacAddress      MacAddress         `binapi:"mac_address,name=mac_address" json:"mac_address,omitempty"`
+       DataQuota       uint64             `binapi:"u64,name=data_quota" json:"data_quota,omitempty"`
+       DataUsedInRange uint64             `binapi:"u64,name=data_used_in_range" json:"data_used_in_range,omitempty"`
+       Flags           uint32             `binapi:"u32,name=flags" json:"flags,omitempty"`
+       DeviceName      string             `binapi:"string[64],name=device_name" json:"device_name,omitempty" struc:"[64]byte"`
+       Nranges         uint32             `binapi:"u32,name=nranges" json:"nranges,omitempty" struc:"sizeof=Ranges"`
+       Ranges          []MactimeTimeRange `binapi:"mactime_time_range[nranges],name=ranges" json:"ranges,omitempty"`
+}
+
+func (m *MactimeDetails) Reset()                        { *m = MactimeDetails{} }
+func (*MactimeDetails) GetMessageName() string          { return "mactime_details" }
+func (*MactimeDetails) GetCrcString() string            { return "44921c06" }
+func (*MactimeDetails) GetMessageType() api.MessageType { return api.ReplyMessage }
+
+func (m *MactimeDetails) Size() int {
+       if m == nil {
+               return 0
+       }
+       var size int
+       // field[1] m.PoolIndex
+       size += 4
+       // field[1] m.MacAddress
+       size += 6
+       // field[1] m.DataQuota
+       size += 8
+       // field[1] m.DataUsedInRange
+       size += 8
+       // field[1] m.Flags
+       size += 4
+       // field[1] m.DeviceName
+       size += 64
+       // field[1] m.Nranges
+       size += 4
+       // field[1] m.Ranges
+       for j1 := 0; j1 < len(m.Ranges); j1++ {
+               var s1 MactimeTimeRange
+               _ = s1
+               if j1 < len(m.Ranges) {
+                       s1 = m.Ranges[j1]
+               }
+               // field[2] s1.Start
+               size += 8
+               // field[2] s1.End
+               size += 8
+       }
+       return size
+}
+func (m *MactimeDetails) Marshal(b []byte) ([]byte, error) {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       var buf []byte
+       if b == nil {
+               buf = make([]byte, m.Size())
+       } else {
+               buf = b
+       }
+       // field[1] m.PoolIndex
+       o.PutUint32(buf[pos:pos+4], uint32(m.PoolIndex))
+       pos += 4
+       // field[1] m.MacAddress
+       for i := 0; i < 6; i++ {
+               var x uint8
+               if i < len(m.MacAddress) {
+                       x = uint8(m.MacAddress[i])
+               }
+               buf[pos] = uint8(x)
+               pos += 1
+       }
+       // field[1] m.DataQuota
+       o.PutUint64(buf[pos:pos+8], uint64(m.DataQuota))
+       pos += 8
+       // field[1] m.DataUsedInRange
+       o.PutUint64(buf[pos:pos+8], uint64(m.DataUsedInRange))
+       pos += 8
+       // field[1] m.Flags
+       o.PutUint32(buf[pos:pos+4], uint32(m.Flags))
+       pos += 4
+       // field[1] m.DeviceName
+       copy(buf[pos:pos+64], m.DeviceName)
+       pos += 64
+       // field[1] m.Nranges
+       o.PutUint32(buf[pos:pos+4], uint32(len(m.Ranges)))
+       pos += 4
+       // field[1] m.Ranges
+       for j1 := 0; j1 < len(m.Ranges); j1++ {
+               var v1 MactimeTimeRange
+               if j1 < len(m.Ranges) {
+                       v1 = m.Ranges[j1]
+               }
+               // field[2] v1.Start
+               o.PutUint64(buf[pos:pos+8], math.Float64bits(float64(v1.Start)))
+               pos += 8
+               // field[2] v1.End
+               o.PutUint64(buf[pos:pos+8], math.Float64bits(float64(v1.End)))
+               pos += 8
+       }
+       return buf, nil
+}
+func (m *MactimeDetails) Unmarshal(tmp []byte) error {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       // field[1] m.PoolIndex
+       m.PoolIndex = uint32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       // field[1] m.MacAddress
+       for i := 0; i < len(m.MacAddress); i++ {
+               m.MacAddress[i] = uint8(tmp[pos])
+               pos += 1
+       }
+       // field[1] m.DataQuota
+       m.DataQuota = uint64(o.Uint64(tmp[pos : pos+8]))
+       pos += 8
+       // field[1] m.DataUsedInRange
+       m.DataUsedInRange = uint64(o.Uint64(tmp[pos : pos+8]))
+       pos += 8
+       // field[1] m.Flags
+       m.Flags = uint32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       // field[1] m.DeviceName
+       {
+               nul := bytes.Index(tmp[pos:pos+64], []byte{0x00})
+               m.DeviceName = codec.DecodeString(tmp[pos : pos+nul])
+               pos += 64
+       }
+       // field[1] m.Nranges
+       m.Nranges = uint32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       // field[1] m.Ranges
+       m.Ranges = make([]MactimeTimeRange, int(m.Nranges))
+       for j1 := 0; j1 < int(m.Nranges); j1++ {
+               // field[2] m.Ranges[j1].Start
+               m.Ranges[j1].Start = float64(math.Float64frombits(o.Uint64(tmp[pos : pos+8])))
+               pos += 8
+               // field[2] m.Ranges[j1].End
+               m.Ranges[j1].End = float64(math.Float64frombits(o.Uint64(tmp[pos : pos+8])))
+               pos += 8
+       }
+       return nil
+}
+
+// MactimeDump represents VPP binary API message 'mactime_dump'.
+type MactimeDump struct {
+       MyTableEpoch uint32 `binapi:"u32,name=my_table_epoch" json:"my_table_epoch,omitempty"`
+}
+
+func (m *MactimeDump) Reset()                        { *m = MactimeDump{} }
+func (*MactimeDump) GetMessageName() string          { return "mactime_dump" }
+func (*MactimeDump) GetCrcString() string            { return "8f454e23" }
+func (*MactimeDump) GetMessageType() api.MessageType { return api.RequestMessage }
+
+func (m *MactimeDump) Size() int {
+       if m == nil {
+               return 0
+       }
+       var size int
+       // field[1] m.MyTableEpoch
+       size += 4
+       return size
+}
+func (m *MactimeDump) Marshal(b []byte) ([]byte, error) {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       var buf []byte
+       if b == nil {
+               buf = make([]byte, m.Size())
+       } else {
+               buf = b
+       }
+       // field[1] m.MyTableEpoch
+       o.PutUint32(buf[pos:pos+4], uint32(m.MyTableEpoch))
+       pos += 4
+       return buf, nil
+}
+func (m *MactimeDump) Unmarshal(tmp []byte) error {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       // field[1] m.MyTableEpoch
+       m.MyTableEpoch = uint32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       return nil
+}
+
+// MactimeDumpReply represents VPP binary API message 'mactime_dump_reply'.
+type MactimeDumpReply struct {
+       Retval     int32  `binapi:"i32,name=retval" json:"retval,omitempty"`
+       TableEpoch uint32 `binapi:"u32,name=table_epoch" json:"table_epoch,omitempty"`
+}
+
+func (m *MactimeDumpReply) Reset()                        { *m = MactimeDumpReply{} }
+func (*MactimeDumpReply) GetMessageName() string          { return "mactime_dump_reply" }
+func (*MactimeDumpReply) GetCrcString() string            { return "49bcc753" }
+func (*MactimeDumpReply) GetMessageType() api.MessageType { return api.ReplyMessage }
+
+func (m *MactimeDumpReply) Size() int {
+       if m == nil {
+               return 0
+       }
+       var size int
+       // field[1] m.Retval
+       size += 4
+       // field[1] m.TableEpoch
+       size += 4
+       return size
+}
+func (m *MactimeDumpReply) Marshal(b []byte) ([]byte, error) {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       var buf []byte
+       if b == nil {
+               buf = make([]byte, m.Size())
+       } else {
+               buf = b
+       }
+       // field[1] m.Retval
+       o.PutUint32(buf[pos:pos+4], uint32(m.Retval))
+       pos += 4
+       // field[1] m.TableEpoch
+       o.PutUint32(buf[pos:pos+4], uint32(m.TableEpoch))
+       pos += 4
+       return buf, nil
+}
+func (m *MactimeDumpReply) Unmarshal(tmp []byte) error {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       // field[1] m.Retval
+       m.Retval = int32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       // field[1] m.TableEpoch
+       m.TableEpoch = uint32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       return nil
+}
+
+// MactimeEnableDisable represents VPP binary API message 'mactime_enable_disable'.
+type MactimeEnableDisable struct {
+       EnableDisable bool           `binapi:"bool,name=enable_disable" json:"enable_disable,omitempty"`
+       SwIfIndex     InterfaceIndex `binapi:"interface_index,name=sw_if_index" json:"sw_if_index,omitempty"`
+}
+
+func (m *MactimeEnableDisable) Reset()                        { *m = MactimeEnableDisable{} }
+func (*MactimeEnableDisable) GetMessageName() string          { return "mactime_enable_disable" }
+func (*MactimeEnableDisable) GetCrcString() string            { return "3865946c" }
+func (*MactimeEnableDisable) GetMessageType() api.MessageType { return api.RequestMessage }
+
+func (m *MactimeEnableDisable) Size() int {
+       if m == nil {
+               return 0
+       }
+       var size int
+       // field[1] m.EnableDisable
+       size += 1
+       // field[1] m.SwIfIndex
+       size += 4
+       return size
+}
+func (m *MactimeEnableDisable) Marshal(b []byte) ([]byte, error) {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       var buf []byte
+       if b == nil {
+               buf = make([]byte, m.Size())
+       } else {
+               buf = b
+       }
+       // field[1] m.EnableDisable
+       if m.EnableDisable {
+               buf[pos] = 1
+       }
+       pos += 1
+       // field[1] m.SwIfIndex
+       o.PutUint32(buf[pos:pos+4], uint32(m.SwIfIndex))
+       pos += 4
+       return buf, nil
+}
+func (m *MactimeEnableDisable) Unmarshal(tmp []byte) error {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       // field[1] m.EnableDisable
+       m.EnableDisable = tmp[pos] != 0
+       pos += 1
+       // field[1] m.SwIfIndex
+       m.SwIfIndex = InterfaceIndex(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       return nil
+}
+
+// MactimeEnableDisableReply represents VPP binary API message 'mactime_enable_disable_reply'.
+type MactimeEnableDisableReply struct {
+       Retval int32 `binapi:"i32,name=retval" json:"retval,omitempty"`
+}
+
+func (m *MactimeEnableDisableReply) Reset()                        { *m = MactimeEnableDisableReply{} }
+func (*MactimeEnableDisableReply) GetMessageName() string          { return "mactime_enable_disable_reply" }
+func (*MactimeEnableDisableReply) GetCrcString() string            { return "e8d4e804" }
+func (*MactimeEnableDisableReply) GetMessageType() api.MessageType { return api.ReplyMessage }
+
+func (m *MactimeEnableDisableReply) Size() int {
+       if m == nil {
+               return 0
+       }
+       var size int
+       // field[1] m.Retval
+       size += 4
+       return size
+}
+func (m *MactimeEnableDisableReply) Marshal(b []byte) ([]byte, error) {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       var buf []byte
+       if b == nil {
+               buf = make([]byte, m.Size())
+       } else {
+               buf = b
+       }
+       // field[1] m.Retval
+       o.PutUint32(buf[pos:pos+4], uint32(m.Retval))
+       pos += 4
+       return buf, nil
+}
+func (m *MactimeEnableDisableReply) Unmarshal(tmp []byte) error {
+       o := binary.BigEndian
+       _ = o
+       pos := 0
+       _ = pos
+       // field[1] m.Retval
+       m.Retval = int32(o.Uint32(tmp[pos : pos+4]))
+       pos += 4
+       return nil
+}
+
+func init() { file_mactime_binapi_init() }
+func file_mactime_binapi_init() {
+       api.RegisterMessage((*MactimeAddDelRange)(nil), "mactime.MactimeAddDelRange")
+       api.RegisterMessage((*MactimeAddDelRangeReply)(nil), "mactime.MactimeAddDelRangeReply")
+       api.RegisterMessage((*MactimeDetails)(nil), "mactime.MactimeDetails")
+       api.RegisterMessage((*MactimeDump)(nil), "mactime.MactimeDump")
+       api.RegisterMessage((*MactimeDumpReply)(nil), "mactime.MactimeDumpReply")
+       api.RegisterMessage((*MactimeEnableDisable)(nil), "mactime.MactimeEnableDisable")
+       api.RegisterMessage((*MactimeEnableDisableReply)(nil), "mactime.MactimeEnableDisableReply")
+}
+
+// Messages returns list of all messages in this module.
+func AllMessages() []api.Message {
+       return []api.Message{
+               (*MactimeAddDelRange)(nil),
+               (*MactimeAddDelRangeReply)(nil),
+               (*MactimeDetails)(nil),
+               (*MactimeDump)(nil),
+               (*MactimeDumpReply)(nil),
+               (*MactimeEnableDisable)(nil),
+               (*MactimeEnableDisableReply)(nil),
+       }
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
+var _ = codec.DecodeString
+var _ = bytes.NewBuffer
+var _ = context.Background
+var _ = io.Copy
+var _ = strconv.Itoa
+var _ = struc.Pack
+var _ = binary.BigEndian
+var _ = math.Float32bits
diff --git a/examples/binapi/mactime/mactime_rpc.ba.go b/examples/binapi/mactime/mactime_rpc.ba.go
new file mode 100644 (file)
index 0000000..c35b708
--- /dev/null
@@ -0,0 +1,74 @@
+// Code generated by GoVPP's binapi-generator. DO NOT EDIT.
+
+package mactime
+
+import (
+       "context"
+       "io"
+
+       api "git.fd.io/govpp.git/api"
+)
+
+// RPCService represents RPC service API for mactime module.
+type RPCService interface {
+       DumpMactime(ctx context.Context, in *MactimeDump) (RPCService_DumpMactimeClient, error)
+       MactimeAddDelRange(ctx context.Context, in *MactimeAddDelRange) (*MactimeAddDelRangeReply, error)
+       MactimeEnableDisable(ctx context.Context, in *MactimeEnableDisable) (*MactimeEnableDisableReply, error)
+}
+
+type serviceClient struct {
+       ch api.Channel
+}
+
+func NewServiceClient(ch api.Channel) RPCService {
+       return &serviceClient{ch}
+}
+
+func (c *serviceClient) DumpMactime(ctx context.Context, in *MactimeDump) (RPCService_DumpMactimeClient, error) {
+       stream := c.ch.SendMultiRequest(in)
+       x := &serviceClient_DumpMactimeClient{stream}
+       return x, nil
+}
+
+type RPCService_DumpMactimeClient interface {
+       Recv() (*MactimeDetails, error)
+}
+
+type serviceClient_DumpMactimeClient struct {
+       api.MultiRequestCtx
+}
+
+func (c *serviceClient_DumpMactimeClient) Recv() (*MactimeDetails, error) {
+       m := new(MactimeDetails)
+       stop, err := c.MultiRequestCtx.ReceiveReply(m)
+       if err != nil {
+               return nil, err
+       }
+       if stop {
+               return nil, io.EOF
+       }
+       return m, nil
+}
+
+func (c *serviceClient) MactimeAddDelRange(ctx context.Context, in *MactimeAddDelRange) (*MactimeAddDelRangeReply, error) {
+       out := new(MactimeAddDelRangeReply)
+       err := c.ch.SendRequest(in).ReceiveReply(out)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (c *serviceClient) MactimeEnableDisable(ctx context.Context, in *MactimeEnableDisable) (*MactimeEnableDisableReply, error) {
+       out := new(MactimeEnableDisableReply)
+       err := c.ch.SendRequest(in).ReceiveReply(out)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
+var _ = context.Background
+var _ = io.Copy
index 81d183c..a7ec146 100644 (file)
@@ -17,6 +17,7 @@
 package main
 
 import (
+       "context"
        "flag"
        "fmt"
        "log"
@@ -98,9 +99,11 @@ func main() {
        if sync {
                // run synchronous test
                syncTest(ch, cnt)
+               //syncTest2(conn, cnt)
        } else {
                // run asynchronous test
                asyncTest(ch, cnt)
+               //asyncTest2(conn, cnt)
        }
 
        elapsed := time.Since(start)
@@ -123,6 +126,27 @@ func syncTest(ch api.Channel, cnt int) {
        }
 }
 
+func syncTest2(conn api.Connection, cnt int) {
+       fmt.Printf("Running synchronous perf test with %d requests...\n", cnt)
+
+       stream, err := conn.NewStream(context.Background())
+       if err != nil {
+               log.Fatalln("Error NewStream:", err)
+       }
+       for i := 0; i < cnt; i++ {
+               if err := stream.SendMsg(&vpe.ControlPing{}); err != nil {
+                       log.Fatalln("Error SendMsg:", err)
+               }
+               if msg, err := stream.RecvMsg(); err != nil {
+                       log.Fatalln("Error RecvMsg:", err)
+               } else if _, ok := msg.(*vpe.ControlPingReply); ok {
+                       // ok
+               } else {
+                       log.Fatalf("unexpected reply: %v", msg.GetMessageName())
+               }
+       }
+}
+
 func asyncTest(ch api.Channel, cnt int) {
        fmt.Printf("Running asynchronous perf test with %d requests...\n", cnt)
 
@@ -143,3 +167,35 @@ func asyncTest(ch api.Channel, cnt int) {
                }
        }
 }
+
+func asyncTest2(conn api.Connection, cnt int) {
+       fmt.Printf("Running asynchronous perf test with %d requests...\n", cnt)
+
+       ctxChan := make(chan api.Stream, cnt)
+
+       go func() {
+               for i := 0; i < cnt; i++ {
+                       stream, err := conn.NewStream(context.Background())
+                       if err != nil {
+                               log.Fatalln("Error NewStream:", err)
+                       }
+                       if err := stream.SendMsg(&vpe.ControlPing{}); err != nil {
+                               log.Fatalln("Error SendMsg:", err)
+                       }
+                       ctxChan <- stream
+               }
+               close(ctxChan)
+               fmt.Printf("Sending asynchronous requests finished\n")
+       }()
+
+       for ctx := range ctxChan {
+               if msg, err := ctx.RecvMsg(); err != nil {
+                       log.Fatalln("Error RecvMsg:", err)
+               } else if _, ok := msg.(*vpe.ControlPingReply); ok {
+                       // ok
+               } else {
+                       log.Fatalf("unexpected reply: %v", msg.GetMessageName())
+               }
+               ctx.Close()
+       }
+}
index 096c222..803b2e1 100644 (file)
 package main
 
 import (
+       "context"
        "flag"
        "fmt"
        "log"
        "os"
-       "strings"
 
        "git.fd.io/govpp.git"
        "git.fd.io/govpp.git/adapter/socketclient"
@@ -31,6 +31,7 @@ import (
        "git.fd.io/govpp.git/examples/binapi/interfaces"
        "git.fd.io/govpp.git/examples/binapi/ip"
        "git.fd.io/govpp.git/examples/binapi/ip_types"
+       "git.fd.io/govpp.git/examples/binapi/mactime"
        "git.fd.io/govpp.git/examples/binapi/vpe"
 )
 
@@ -82,6 +83,8 @@ func main() {
        ipAddressDump(ch, idx)
        interfaceNotifications(ch, idx)
 
+       mactimeDump(conn)
+
        if len(Errors) > 0 {
                fmt.Printf("finished with %d errors\n", len(Errors))
                os.Exit(1)
@@ -110,7 +113,7 @@ func vppVersion(ch api.Channel) {
        }
        fmt.Printf("reply: %+v\n", reply)
 
-       fmt.Printf("VPP version: %q\n", cleanString(reply.Version))
+       fmt.Printf("VPP version: %q\n", reply.Version)
        fmt.Println("OK")
        fmt.Println()
 }
@@ -282,6 +285,44 @@ func interfaceNotifications(ch api.Channel, index interfaces.InterfaceIndex) {
        fmt.Println()
 }
 
-func cleanString(str string) string {
-       return strings.Split(str, "\x00")[0]
+func mactimeDump(conn api.Connection) {
+       fmt.Println("Sending mactime dump")
+
+       ctx := context.Background()
+
+       stream, err := conn.NewStream(ctx)
+       if err != nil {
+               panic(err)
+       }
+       defer stream.Close()
+
+       if err := stream.SendMsg(&mactime.MactimeDump{}); err != nil {
+               logError(err, "sending mactime dump")
+               return
+       }
+
+Loop:
+       for {
+               msg, err := stream.RecvMsg()
+               if err != nil {
+                       logError(err, "dumping mactime")
+                       return
+               }
+
+               switch msg.(type) {
+               case *mactime.MactimeDetails:
+                       fmt.Printf(" - MactimeDetails: %+v\n", msg)
+
+               case *mactime.MactimeDumpReply:
+                       fmt.Printf(" - MactimeDumpReply: %+v\n", msg)
+                       break Loop
+
+               default:
+                       logError(err, "unexpected message")
+                       return
+               }
+       }
+
+       fmt.Println("OK")
+       fmt.Println()
 }