Stream API options 94/29394/4
authorVladimir Lavor <vlavor@cisco.com>
Mon, 12 Oct 2020 12:21:05 +0000 (14:21 +0200)
committerOndrej Fabry <ofabry@cisco.com>
Thu, 15 Oct 2020 08:30:21 +0000 (08:30 +0000)
* Stream API uses the same default values as the Channel API
* request size, reply size and reply timeout settable using
  functional options
* Added stream client example to show the stream API usage

Change-Id: Id599134a7f520fc19f7d770ed5e3de74a7936829
Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
.gitignore
CHANGELOG.md
api/api.go
core/stream.go
examples/simple-client/simple_client.go
examples/stream-client/stream_client.go [new file with mode: 0644]

index 8a782d5..8e61e14 100644 (file)
@@ -17,4 +17,5 @@ examples/perf-bench/perf-bench
 examples/rpc-service/rpc-service
 examples/simple-client/simple-client
 examples/stats-client/stats-client
+examples/stream-client/stream-client
 examples/union-example/union-example
index e35ad0f..3bd9357 100644 (file)
@@ -63,6 +63,7 @@ This file lists changes for the GoVPP releases.
 - improved [simple client](examples/simple-client) example to work properly even with multiple runs
 - added [multi-vpp](examples/multi-vpp) example displaying management of two VPP instances from single
   application
+- added [stream-client](examples/stream-client) example showing usage of the new stream API  
 
 #### Dependencies
 - updated `github.com/sirupsen/logrus` dep to `v1.6.0`
index 977b02e..93f2b42 100644 (file)
@@ -25,7 +25,7 @@ import (
 type Connection interface {
        // NewStream creates a new stream for sending and receiving messages.
        // Context can be used to close the stream using cancel or timeout.
-       NewStream(ctx context.Context) (Stream, error)
+       NewStream(ctx context.Context, options ...StreamOption) (Stream, error)
 
        // Invoke can be used for a simple request-reply RPC.
        // It creates stream and calls SendMsg with req and RecvMsg with reply.
@@ -57,6 +57,12 @@ type Stream interface {
        Close() error
 }
 
+// StreamOption allows customizing a Stream. Available options are:
+// - WithRequestSize
+// - WithReplySize
+// - WithReplyTimeout
+type StreamOption func(Stream)
+
 // ChannelProvider provides the communication channel with govpp core.
 type ChannelProvider interface {
        // NewAPIChannel returns a new channel for communication with VPP via govpp core.
index 61a9965..abe9d55 100644 (file)
@@ -20,6 +20,7 @@ import (
        "fmt"
        "reflect"
        "sync/atomic"
+       "time"
 
        "git.fd.io/govpp.git/api"
 )
@@ -29,36 +30,43 @@ type Stream struct {
        conn    *Connection
        ctx     context.Context
        channel *Channel
+       // available options
+       requestSize  int
+       replySize    int
+       replyTimeout time.Duration
 }
 
-func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
+func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
        if c == nil {
                return nil, errors.New("nil connection passed in")
        }
-       // TODO: add stream options as variadic parameters for customizing:
-       // - request/reply channel size
-       // - reply timeout
-       // - retries
-       // - ???
+       s := &Stream{
+               conn: c,
+               ctx:  ctx,
+               // default options
+               requestSize:  RequestChanBufSize,
+               replySize:    ReplyChanBufSize,
+               replyTimeout: DefaultReplyTimeout,
+       }
 
-       // create new channel
-       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
-       channel := newChannel(chID, c, c.codec, c, 10, 10)
+       // parse custom options
+       for _, option := range options {
+               option(s)
+       }
+       // create and store a new channel
+       s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
+       s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
+       s.channel.SetReplyTimeout(s.replyTimeout)
 
        // store API channel within the client
        c.channelsLock.Lock()
-       c.channels[chID] = channel
+       c.channels[uint16(s.id)] = s.channel
        c.channelsLock.Unlock()
 
        // Channel.watchRequests are not started here intentionally, because
        // requests are sent directly by SendMsg.
 
-       return &Stream{
-               id:      uint32(chID),
-               conn:    c,
-               ctx:     ctx,
-               channel: channel,
-       }, nil
+       return s, nil
 }
 
 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
@@ -123,6 +131,24 @@ func (s *Stream) RecvMsg() (api.Message, error) {
        return msg, nil
 }
 
+func WithRequestSize(size int) api.StreamOption {
+       return func(stream api.Stream) {
+               stream.(*Stream).requestSize = size
+       }
+}
+
+func WithReplySize(size int) api.StreamOption {
+       return func(stream api.Stream) {
+               stream.(*Stream).replySize = size
+       }
+}
+
+func WithReplyTimeout(timeout time.Duration) api.StreamOption {
+       return func(stream api.Stream) {
+               stream.(*Stream).replyTimeout = timeout
+       }
+}
+
 func (s *Stream) recvReply() (*vppReply, error) {
        if s.conn == nil {
                return nil, errors.New("stream closed")
index d823273..0898c0a 100644 (file)
@@ -17,7 +17,6 @@
 package main
 
 import (
-       "context"
        "encoding/json"
        "flag"
        "fmt"
@@ -31,7 +30,6 @@ import (
        "git.fd.io/govpp.git/binapi/interface_types"
        "git.fd.io/govpp.git/binapi/ip"
        "git.fd.io/govpp.git/binapi/ip_types"
-       "git.fd.io/govpp.git/binapi/mactime"
        "git.fd.io/govpp.git/binapi/vpe"
        "git.fd.io/govpp.git/core"
 )
@@ -44,9 +42,10 @@ func main() {
        flag.Parse()
 
        fmt.Println("Starting simple client example")
+       fmt.Println()
 
        // connect to VPP asynchronously
-       conn, conev, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
+       conn, connEv, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
        if err != nil {
                log.Fatalln("ERROR:", err)
        }
@@ -54,56 +53,46 @@ func main() {
 
        // wait for Connected event
        select {
-       case e := <-conev:
+       case e := <-connEv:
                if e.State != core.Connected {
                        log.Fatalln("ERROR: connecting to VPP failed:", e.Error)
                }
        }
 
-       // create an API channel that will be used in the examples
+       // check compatibility of used messages
        ch, err := conn.NewAPIChannel()
        if err != nil {
                log.Fatalln("ERROR: creating channel failed:", err)
        }
        defer ch.Close()
-
        if err := ch.CheckCompatiblity(vpe.AllMessages()...); err != nil {
                log.Fatal(err)
        }
-
-       vppVersion(ch)
-
        if err := ch.CheckCompatiblity(interfaces.AllMessages()...); err != nil {
                log.Fatal(err)
        }
 
+       // process errors encountered during the example
+       defer func() {
+               if len(Errors) > 0 {
+                       fmt.Printf("finished with %d errors\n", len(Errors))
+                       os.Exit(1)
+               } else {
+                       fmt.Println("finished successfully")
+               }
+       }()
+
+       // use request/reply (channel API)
+       getVppVersion(ch)
        idx := createLoopback(ch)
        interfaceDump(ch)
-
        addIPAddress(ch, idx)
        ipAddressDump(ch, idx)
        interfaceNotifications(ch, idx)
-
-       mactimeDump(conn)
-
-       if len(Errors) > 0 {
-               fmt.Printf("finished with %d errors\n", len(Errors))
-               os.Exit(1)
-       } else {
-               fmt.Println("finished successfully")
-       }
 }
 
-var Errors []error
-
-func logError(err error, msg string) {
-       fmt.Printf("ERROR: %s: %v\n", msg, err)
-       Errors = append(Errors, err)
-}
-
-// vppVersion is the simplest API example - it retrieves VPP version.
-func vppVersion(ch api.Channel) {
-       fmt.Println("Retrieving version")
+func getVppVersion(ch api.Channel) {
+       fmt.Println("Retrieving version..")
 
        req := &vpe.ShowVersion{}
        reply := &vpe.ShowVersionReply{}
@@ -112,16 +101,14 @@ func vppVersion(ch api.Channel) {
                logError(err, "retrieving version")
                return
        }
-       fmt.Printf("reply: %+v\n", reply)
 
        fmt.Printf("VPP version: %q\n", reply.Version)
        fmt.Println("OK")
        fmt.Println()
 }
 
-// createLoopback sends request to create loopback interface.
 func createLoopback(ch api.Channel) interface_types.InterfaceIndex {
-       fmt.Println("Creating loopback interface")
+       fmt.Println("Creating loopback interface..")
 
        req := &interfaces.CreateLoopback{}
        reply := &interfaces.CreateLoopbackReply{}
@@ -130,7 +117,6 @@ func createLoopback(ch api.Channel) interface_types.InterfaceIndex {
                logError(err, "creating loopback interface")
                return 0
        }
-       fmt.Printf("reply: %+v\n", reply)
 
        fmt.Printf("interface index: %v\n", reply.SwIfIndex)
        fmt.Println("OK")
@@ -139,9 +125,8 @@ func createLoopback(ch api.Channel) interface_types.InterfaceIndex {
        return reply.SwIfIndex
 }
 
-// interfaceDump shows an example of multipart request (multiple replies are expected).
 func interfaceDump(ch api.Channel) {
-       fmt.Println("Dumping interfaces")
+       fmt.Println("Dumping interfaces..")
 
        n := 0
        reqCtx := ch.SendMultiRequest(&interfaces.SwInterfaceDump{
@@ -166,9 +151,8 @@ func interfaceDump(ch api.Channel) {
        fmt.Println()
 }
 
-// addIPAddress sends request to add IP address to interface.
 func addIPAddress(ch api.Channel, index interface_types.InterfaceIndex) {
-       fmt.Printf("Adding IP address to interface to interface index %d\n", index)
+       fmt.Printf("Adding IP address to interface index %d\n", index)
 
        req := &interfaces.SwInterfaceAddDelAddress{
                SwIfIndex: index,
@@ -188,14 +172,13 @@ func addIPAddress(ch api.Channel, index interface_types.InterfaceIndex) {
                logError(err, "adding IP address to interface")
                return
        }
-       fmt.Printf("reply: %+v\n", reply)
 
        fmt.Println("OK")
        fmt.Println()
 }
 
 func ipAddressDump(ch api.Channel, index interface_types.InterfaceIndex) {
-       fmt.Printf("Dumping IP addresses for interface index %d\n", index)
+       fmt.Printf("Dumping IP addresses for interface index %d..\n", index)
 
        req := &ip.IPAddressDump{
                SwIfIndex: index,
@@ -293,48 +276,6 @@ func interfaceNotifications(ch api.Channel, index interface_types.InterfaceIndex
        fmt.Println()
 }
 
-func mactimeDump(conn api.Connection) {
-       fmt.Println("Sending mactime dump")
-
-       ctx := context.Background()
-
-       stream, err := conn.NewStream(ctx)
-       if err != nil {
-               panic(err)
-       }
-       defer stream.Close()
-
-       if err := stream.SendMsg(&mactime.MactimeDump{}); err != nil {
-               logError(err, "sending mactime dump")
-               return
-       }
-
-Loop:
-       for {
-               msg, err := stream.RecvMsg()
-               if err != nil {
-                       logError(err, "dumping mactime")
-                       return
-               }
-
-               switch msg.(type) {
-               case *mactime.MactimeDetails:
-                       fmt.Printf(" - MactimeDetails: %+v\n", msg)
-
-               case *mactime.MactimeDumpReply:
-                       fmt.Printf(" - MactimeDumpReply: %+v\n", msg)
-                       break Loop
-
-               default:
-                       logError(err, "unexpected message")
-                       return
-               }
-       }
-
-       fmt.Println("OK")
-       fmt.Println()
-}
-
 func marshal(v interface{}) {
        fmt.Printf("GO: %#v\n", v)
        b, err := json.MarshalIndent(v, "", "  ")
@@ -343,3 +284,10 @@ func marshal(v interface{}) {
        }
        fmt.Printf("JSON: %s\n", b)
 }
+
+var Errors []error
+
+func logError(err error, msg string) {
+       fmt.Printf("ERROR: %s: %v\n", msg, err)
+       Errors = append(Errors, err)
+}
diff --git a/examples/stream-client/stream_client.go b/examples/stream-client/stream_client.go
new file mode 100644 (file)
index 0000000..fadfe23
--- /dev/null
@@ -0,0 +1,302 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// stream-client is an example VPP management application that exercises the
+// govpp API on real-world use-cases.
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "log"
+       "os"
+       "time"
+
+       "git.fd.io/govpp.git"
+       "git.fd.io/govpp.git/adapter/socketclient"
+       "git.fd.io/govpp.git/api"
+       interfaces "git.fd.io/govpp.git/binapi/interface"
+       "git.fd.io/govpp.git/binapi/interface_types"
+       "git.fd.io/govpp.git/binapi/ip"
+       "git.fd.io/govpp.git/binapi/ip_types"
+       "git.fd.io/govpp.git/binapi/mactime"
+       "git.fd.io/govpp.git/binapi/vpe"
+       "git.fd.io/govpp.git/core"
+)
+
+var (
+       sockAddr = flag.String("sock", socketclient.DefaultSocketName, "Path to VPP binary API socket file")
+)
+
+func main() {
+       flag.Parse()
+
+       fmt.Println("Starting stream client example")
+       fmt.Println()
+
+       // connect to VPP asynchronously
+       conn, connEv, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
+       if err != nil {
+               log.Fatalln("ERROR:", err)
+       }
+       defer conn.Disconnect()
+
+       // wait for Connected event
+       select {
+       case e := <-connEv:
+               if e.State != core.Connected {
+                       log.Fatalln("ERROR: connecting to VPP failed:", e.Error)
+               }
+       }
+
+       // check compatibility of used messages
+       ch, err := conn.NewAPIChannel()
+       if err != nil {
+               log.Fatalln("ERROR: creating channel failed:", err)
+       }
+       defer ch.Close()
+       if err := ch.CheckCompatiblity(vpe.AllMessages()...); err != nil {
+               log.Fatal(err)
+       }
+       if err := ch.CheckCompatiblity(interfaces.AllMessages()...); err != nil {
+               log.Fatal(err)
+       }
+
+       // process errors encountered during the example
+       defer func() {
+               if len(Errors) > 0 {
+                       fmt.Printf("finished with %d errors\n", len(Errors))
+                       os.Exit(1)
+               } else {
+                       fmt.Println("finished successfully")
+               }
+       }()
+
+       // send and receive messages using stream (low-low level API)
+       stream, err := conn.NewStream(context.Background(),
+               core.WithRequestSize(50),
+               core.WithReplySize(50),
+               core.WithReplyTimeout(2*time.Second))
+       if err != nil {
+               panic(err)
+       }
+       defer func() {
+               if err := stream.Close(); err != nil {
+                       logError(err, "closing the stream")
+               }
+       }()
+       getVppVersionStream(stream)
+       idx := createLoopbackStream(stream)
+       interfaceDumpStream(stream)
+       addIPAddressStream(stream, idx)
+       ipAddressDumpStream(stream, idx)
+       mactimeDump(stream)
+       return
+}
+
+func getVppVersionStream(stream api.Stream) {
+       fmt.Println("Retrieving version..")
+
+       req := &vpe.ShowVersion{}
+       if err := stream.SendMsg(req); err != nil {
+               logError(err, "ShowVersion sending message")
+               return
+       }
+       recv, err := stream.RecvMsg()
+       if err != nil {
+               logError(err, "ShowVersion receive message")
+               return
+       }
+       recvMsg := recv.(*vpe.ShowVersionReply)
+
+       fmt.Printf("Retrieved VPP version: %q\n", recvMsg.Version)
+       fmt.Println("OK")
+       fmt.Println()
+}
+
+func createLoopbackStream(stream api.Stream) (ifIdx interface_types.InterfaceIndex) {
+       fmt.Println("Creating the loopback interface..")
+
+       req := &interfaces.CreateLoopback{}
+       if err := stream.SendMsg(req); err != nil {
+               logError(err, "CreateLoopback sending message")
+               return
+       }
+       recv, err := stream.RecvMsg()
+       if err != nil {
+               logError(err, "CreateLoopback receive message")
+               return
+       }
+       recvMsg := recv.(*interfaces.CreateLoopbackReply)
+
+       fmt.Printf("Loopback interface index: %v\n", recvMsg.SwIfIndex)
+       fmt.Println("OK")
+       fmt.Println()
+
+       return recvMsg.SwIfIndex
+}
+
+func interfaceDumpStream(stream api.Stream) {
+       fmt.Println("Dumping interfaces..")
+
+       if err := stream.SendMsg(&interfaces.SwInterfaceDump{
+               SwIfIndex: ^interface_types.InterfaceIndex(0),
+       }); err != nil {
+               logError(err, "SwInterfaceDump sending message")
+               return
+       }
+       if err := stream.SendMsg(&vpe.ControlPing{}); err != nil {
+               logError(err, "ControlPing sending message")
+               return
+       }
+
+Loop:
+       for {
+               msg, err := stream.RecvMsg()
+               if err != nil {
+                       logError(err, "SwInterfaceDump receiving message ")
+                       return
+               }
+
+               switch msg.(type) {
+               case *interfaces.SwInterfaceDetails:
+                       fmt.Printf(" - SwInterfaceDetails: %+v\n", msg)
+
+               case *vpe.ControlPingReply:
+                       fmt.Printf(" - ControlPingReply: %+v\n", msg)
+                       break Loop
+
+               default:
+                       logError(err, "unexpected message")
+                       return
+               }
+       }
+
+       fmt.Println("OK")
+       fmt.Println()
+}
+
+func addIPAddressStream(stream api.Stream, index interface_types.InterfaceIndex) {
+       fmt.Printf("Adding IP address to the interface index %d..\n", index)
+
+       if err := stream.SendMsg(&interfaces.SwInterfaceAddDelAddress{
+               SwIfIndex: index,
+               IsAdd:     true,
+               Prefix: ip_types.AddressWithPrefix{
+                       Address: ip_types.Address{
+                               Af: ip_types.ADDRESS_IP4,
+                               Un: ip_types.AddressUnionIP4(ip_types.IP4Address{10, 10, 0, uint8(index)}),
+                       },
+                       Len: 32,
+               },
+       }); err != nil {
+               logError(err, "SwInterfaceAddDelAddress sending message")
+               return
+       }
+
+       recv, err := stream.RecvMsg()
+       if err != nil {
+               logError(err, "SwInterfaceAddDelAddressReply receiving message")
+               return
+       }
+       recvMsg := recv.(*interfaces.SwInterfaceAddDelAddressReply)
+
+       fmt.Printf("Added IP address to interface: %v (return value: %d)\n", index, recvMsg.Retval)
+       fmt.Println("OK")
+       fmt.Println()
+}
+
+func ipAddressDumpStream(stream api.Stream, index interface_types.InterfaceIndex) {
+       fmt.Printf("Dumping IP addresses for interface index %d..\n", index)
+
+       if err := stream.SendMsg(&ip.IPAddressDump{
+               SwIfIndex: index,
+       }); err != nil {
+               logError(err, "IPAddressDump sending message")
+               return
+       }
+       if err := stream.SendMsg(&vpe.ControlPing{}); err != nil {
+               logError(err, "ControlPing sending sending message")
+               return
+       }
+
+Loop:
+       for {
+               msg, err := stream.RecvMsg()
+               if err != nil {
+                       logError(err, "IPAddressDump receiving message ")
+                       return
+               }
+
+               switch msg.(type) {
+               case *ip.IPAddressDetails:
+                       fmt.Printf(" - IPAddressDetails: %+v\n", msg)
+
+               case *vpe.ControlPingReply:
+                       fmt.Printf(" - ControlPingReply: %+v\n", msg)
+                       break Loop
+
+               default:
+                       logError(err, "unexpected message")
+                       return
+               }
+       }
+
+       fmt.Println("OK")
+       fmt.Println()
+}
+
+// Mactime dump uses MactimeDumpReply message as an end of the stream
+// notification instead of the control ping.
+func mactimeDump(stream api.Stream, ) {
+       fmt.Println("Sending mactime dump..")
+
+       if err := stream.SendMsg(&mactime.MactimeDump{}); err != nil {
+               logError(err, "sending mactime dump")
+               return
+       }
+
+Loop:
+       for {
+               msg, err := stream.RecvMsg()
+               if err != nil {
+                       logError(err, "MactimeDump receiving message")
+                       return
+               }
+
+               switch msg.(type) {
+               case *mactime.MactimeDetails:
+                       fmt.Printf(" - MactimeDetails: %+v\n", msg)
+
+               case *mactime.MactimeDumpReply:
+                       fmt.Printf(" - MactimeDumpReply: %+v\n", msg)
+                       break Loop
+
+               default:
+                       logError(err, "unexpected message")
+                       return
+               }
+       }
+
+       fmt.Println("OK")
+       fmt.Println()
+}
+
+var Errors []error
+
+func logError(err error, msg string) {
+       fmt.Printf("ERROR: %s: %v\n", msg, err)
+       Errors = append(Errors, err)
+}