Introduce proxy for VPP 06/22606/3
authorOndrej Fabry <ofabry@cisco.com>
Tue, 8 Oct 2019 12:49:16 +0000 (14:49 +0200)
committerOndrej Fabry <ofabry@cisco.com>
Thu, 10 Oct 2019 09:30:49 +0000 (11:30 +0200)
- proxy server defines RPC service for proxying binapi/stats to VPP
- use cmd/vpp-proxy for proxy server/client

Change-Id: I6e698e166ecf6db7109ae5adf8a93f308d3f3f2a
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
cmd/vpp-proxy/main.go [new file with mode: 0644]
proxy/client.go [new file with mode: 0644]
proxy/proxy.go [new file with mode: 0644]
proxy/server.go [new file with mode: 0644]

diff --git a/cmd/vpp-proxy/main.go b/cmd/vpp-proxy/main.go
new file mode 100644 (file)
index 0000000..7aea885
--- /dev/null
@@ -0,0 +1,131 @@
+//  Copyright (c) 2019 Cisco and/or its affiliates.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at:
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package main
+
+import (
+       "context"
+       "encoding/gob"
+       "flag"
+       "io"
+       "log"
+
+       "git.fd.io/govpp.git/adapter/socketclient"
+       "git.fd.io/govpp.git/adapter/statsclient"
+       "git.fd.io/govpp.git/api"
+       "git.fd.io/govpp.git/examples/binapi/interfaces"
+       "git.fd.io/govpp.git/examples/binapi/vpe"
+       "git.fd.io/govpp.git/proxy"
+)
+
+var (
+       binapiSocket = flag.String("binapi-socket", socketclient.DefaultSocketName, "Path to VPP binapi socket")
+       statsSocket  = flag.String("stats-socket", statsclient.DefaultSocketName, "Path to VPP stats socket")
+       proxyAddr    = flag.String("addr", ":7878", "Address on which proxy serves RPC.")
+)
+
+func init() {
+       for _, msg := range api.GetRegisteredMessages() {
+               gob.Register(msg)
+       }
+}
+
+func main() {
+       flag.Parse()
+
+       switch cmd := flag.Arg(0); cmd {
+       case "server":
+               runServer()
+       case "client":
+               runClient()
+       default:
+               log.Printf("invalid command: %q, (available commands: client, server)", cmd)
+       }
+}
+
+func runClient() {
+       // connect to proxy server
+       client, err := proxy.Connect(*proxyAddr)
+       if err != nil {
+               log.Fatalln("connecting to proxy failed:", err)
+       }
+
+       // proxy stats
+       statsProvider, err := client.NewStatsClient()
+       if err != nil {
+               log.Fatalln(err)
+       }
+
+       var sysStats api.SystemStats
+       if err := statsProvider.GetSystemStats(&sysStats); err != nil {
+               log.Fatalln("getting stats failed:", err)
+       }
+       log.Printf("SystemStats: %+v", sysStats)
+
+       var ifaceStats api.InterfaceStats
+       if err := statsProvider.GetInterfaceStats(&ifaceStats); err != nil {
+               log.Fatalln("getting stats failed:", err)
+       }
+       log.Printf("InterfaceStats: %+v", ifaceStats)
+
+       // proxy binapi
+       binapiChannel, err := client.NewBinapiClient()
+       if err != nil {
+               log.Fatalln(err)
+       }
+
+       // - using binapi message directly
+       req := &vpe.CliInband{Cmd: "show version"}
+       reply := new(vpe.CliInbandReply)
+       if err := binapiChannel.SendRequest(req).ReceiveReply(reply); err != nil {
+               log.Fatalln("binapi request failed:", err)
+       }
+       log.Printf("VPP version: %+v", reply.Reply)
+
+       // - or using generated rpc service
+       svc := interfaces.NewServiceClient(binapiChannel)
+       stream, err := svc.DumpSwInterface(context.Background(), &interfaces.SwInterfaceDump{})
+       if err != nil {
+               log.Fatalln("binapi request failed:", err)
+       }
+       for {
+               iface, err := stream.Recv()
+               if err == io.EOF {
+                       break
+               }
+               if err != nil {
+                       log.Fatalln(err)
+               }
+               log.Printf("- interface: %+v", iface)
+       }
+}
+
+func runServer() {
+       p := proxy.NewServer()
+
+       statsAdapter := statsclient.NewStatsClient(*statsSocket)
+       binapiAdapter := socketclient.NewVppClient(*binapiSocket)
+
+       if err := p.ConnectStats(statsAdapter); err != nil {
+               log.Fatalln("connecting to stats failed:", err)
+       }
+       defer p.DisconnectStats()
+
+       if err := p.ConnectBinapi(binapiAdapter); err != nil {
+               log.Fatalln("connecting to binapi failed:", err)
+       }
+       defer p.DisconnectBinapi()
+
+       p.ListenAndServe(*proxyAddr)
+}
diff --git a/proxy/client.go b/proxy/client.go
new file mode 100644 (file)
index 0000000..72ee9e9
--- /dev/null
@@ -0,0 +1,178 @@
+package proxy
+
+import (
+       "fmt"
+       "log"
+       "net/rpc"
+       "reflect"
+       "time"
+
+       "git.fd.io/govpp.git/api"
+)
+
+type Client struct {
+       serverAddr string
+       rpc        *rpc.Client
+}
+
+// Connect dials remote proxy server on given address and
+// returns new client if successful.
+func Connect(addr string) (*Client, error) {
+       client, err := rpc.DialHTTP("tcp", addr)
+       if err != nil {
+               log.Fatal("Connection error: ", err)
+       }
+       c := &Client{
+               serverAddr: addr,
+               rpc:        client,
+       }
+       return c, nil
+}
+
+// NewStatsClient returns new StatsClient which implements api.StatsProvider.
+func (c *Client) NewStatsClient() (*StatsClient, error) {
+       stats := &StatsClient{
+               rpc: c.rpc,
+       }
+       return stats, nil
+}
+
+// NewBinapiClient returns new BinapiClient which implements api.Channel.
+func (c *Client) NewBinapiClient() (*BinapiClient, error) {
+       binapi := &BinapiClient{
+               rpc: c.rpc,
+       }
+       return binapi, nil
+}
+
+type StatsClient struct {
+       rpc *rpc.Client
+}
+
+func (s *StatsClient) GetSystemStats(sysStats *api.SystemStats) error {
+       req := StatsRequest{StatsType: "system"}
+       resp := StatsResponse{SysStats: sysStats}
+       return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetNodeStats(nodeStats *api.NodeStats) error {
+       req := StatsRequest{StatsType: "node"}
+       resp := StatsResponse{NodeStats: nodeStats}
+       return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetInterfaceStats(ifaceStats *api.InterfaceStats) error {
+       req := StatsRequest{StatsType: "interface"}
+       resp := StatsResponse{IfaceStats: ifaceStats}
+       return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetErrorStats(errStats *api.ErrorStats) error {
+       req := StatsRequest{StatsType: "error"}
+       resp := StatsResponse{ErrStats: errStats}
+       return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetBufferStats(bufStats *api.BufferStats) error {
+       req := StatsRequest{StatsType: "buffer"}
+       resp := StatsResponse{BufStats: bufStats}
+       return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+type BinapiClient struct {
+       rpc *rpc.Client
+}
+
+func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
+       req := &requestCtx{
+               rpc: b.rpc,
+               req: msg,
+       }
+       log.Printf("SendRequest: %T %+v", msg, msg)
+       return req
+}
+
+type requestCtx struct {
+       rpc *rpc.Client
+       req api.Message
+}
+
+func (r *requestCtx) ReceiveReply(msg api.Message) error {
+       req := BinapiRequest{
+               Msg:      r.req,
+               ReplyMsg: msg,
+       }
+       resp := BinapiResponse{}
+
+       err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+       if err != nil {
+               return fmt.Errorf("RPC call failed: %v", err)
+       }
+
+       // we set the value of msg to the value from response
+       reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
+
+       return nil
+}
+
+func (b *BinapiClient) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+       req := &multiRequestCtx{
+               rpc: b.rpc,
+               req: msg,
+       }
+       log.Printf("SendMultiRequest: %T %+v", msg, msg)
+       return req
+}
+
+type multiRequestCtx struct {
+       rpc *rpc.Client
+       req api.Message
+
+       index   int
+       replies []api.Message
+}
+
+func (r *multiRequestCtx) ReceiveReply(msg api.Message) (stop bool, err error) {
+       // we call Invoke only on first ReceiveReply
+       if r.index == 0 {
+               req := BinapiRequest{
+                       Msg:      r.req,
+                       ReplyMsg: msg,
+                       IsMulti:  true,
+               }
+               resp := BinapiResponse{}
+
+               err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+               if err != nil {
+                       return false, fmt.Errorf("RPC call failed: %v", err)
+               }
+
+               r.replies = resp.Msgs
+       }
+
+       if r.index >= len(r.replies) {
+               return true, nil
+       }
+
+       // we set the value of msg to the value from response
+       reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(r.replies[r.index]).Elem())
+       r.index++
+
+       return false, nil
+}
+
+func (b *BinapiClient) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
+       panic("implement me")
+}
+
+func (b *BinapiClient) SetReplyTimeout(timeout time.Duration) {
+       panic("implement me")
+}
+
+func (b *BinapiClient) CheckCompatiblity(msgs ...api.Message) error {
+       return nil // TODO: proxy this
+}
+
+func (b *BinapiClient) Close() {
+       b.rpc.Close()
+}
diff --git a/proxy/proxy.go b/proxy/proxy.go
new file mode 100644 (file)
index 0000000..1f8f824
--- /dev/null
@@ -0,0 +1,102 @@
+//  Copyright (c) 2019 Cisco and/or its affiliates.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at:
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package proxy
+
+import (
+       "log"
+       "net"
+       "net/http"
+       "net/rpc"
+
+       "git.fd.io/govpp.git/adapter"
+       "git.fd.io/govpp.git/core"
+)
+
+// Server defines a proxy server that serves client requests to stats and binapi.
+type Server struct {
+       rpc *rpc.Server
+
+       statsConn  *core.StatsConnection
+       binapiConn *core.Connection
+}
+
+func NewServer() *Server {
+       return &Server{
+               rpc: rpc.NewServer(),
+       }
+}
+
+func (p *Server) ConnectStats(stats adapter.StatsAPI) error {
+       var err error
+       p.statsConn, err = core.ConnectStats(stats)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (p *Server) DisconnectStats() {
+       if p.statsConn != nil {
+               p.statsConn.Disconnect()
+       }
+}
+
+func (p *Server) ConnectBinapi(binapi adapter.VppAPI) error {
+       var err error
+       p.binapiConn, err = core.Connect(binapi)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (p *Server) DisconnectBinapi() {
+       if p.binapiConn != nil {
+               p.binapiConn.Disconnect()
+       }
+}
+
+func (p *Server) ListenAndServe(addr string) {
+       if p.statsConn != nil {
+               statsRPC := NewStatsRPC(p.statsConn)
+               if err := p.rpc.Register(statsRPC); err != nil {
+                       panic(err)
+               }
+       }
+       if p.binapiConn != nil {
+               ch, err := p.binapiConn.NewAPIChannel()
+               if err != nil {
+                       panic(err)
+               }
+               binapiRPC := NewBinapiRPC(ch)
+               if err := p.rpc.Register(binapiRPC); err != nil {
+                       panic(err)
+               }
+       }
+
+       p.rpc.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)
+
+       l, e := net.Listen("tcp", addr)
+       if e != nil {
+               log.Fatal("listen error:", e)
+       }
+       defer l.Close()
+
+       log.Printf("proxy serving on: %v", addr)
+
+       if err := http.Serve(l, nil); err != nil {
+               log.Fatalln(err)
+       }
+}
diff --git a/proxy/server.go b/proxy/server.go
new file mode 100644 (file)
index 0000000..df62356
--- /dev/null
@@ -0,0 +1,109 @@
+package proxy
+
+import (
+       "fmt"
+       "log"
+       "reflect"
+
+       "git.fd.io/govpp.git/api"
+)
+
+type StatsRequest struct {
+       StatsType string
+}
+
+type StatsResponse struct {
+       SysStats   *api.SystemStats
+       NodeStats  *api.NodeStats
+       IfaceStats *api.InterfaceStats
+       ErrStats   *api.ErrorStats
+       BufStats   *api.BufferStats
+}
+
+// StatsRPC is a RPC server for proxying client request to api.StatsProvider.
+type StatsRPC struct {
+       stats api.StatsProvider
+}
+
+// NewStatsRPC returns new StatsRPC to be used as RPC server
+// proxying request to given api.StatsProvider.
+func NewStatsRPC(stats api.StatsProvider) *StatsRPC {
+       return &StatsRPC{stats: stats}
+}
+
+func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
+       log.Printf("StatsRPC.GetStats - REQ: %+v", req)
+
+       switch req.StatsType {
+       case "system":
+               resp.SysStats = new(api.SystemStats)
+               return s.stats.GetSystemStats(resp.SysStats)
+       case "node":
+               resp.NodeStats = new(api.NodeStats)
+               return s.stats.GetNodeStats(resp.NodeStats)
+       case "interface":
+               resp.IfaceStats = new(api.InterfaceStats)
+               return s.stats.GetInterfaceStats(resp.IfaceStats)
+       case "error":
+               resp.ErrStats = new(api.ErrorStats)
+               return s.stats.GetErrorStats(resp.ErrStats)
+       case "buffer":
+               resp.BufStats = new(api.BufferStats)
+               return s.stats.GetBufferStats(resp.BufStats)
+       default:
+               return fmt.Errorf("unknown stats type: %s", req.StatsType)
+       }
+}
+
+type BinapiRequest struct {
+       Msg      api.Message
+       IsMulti  bool
+       ReplyMsg api.Message
+}
+
+type BinapiResponse struct {
+       Msg  api.Message
+       Msgs []api.Message
+}
+
+// BinapiRPC is a RPC server for proxying client request to api.Channel.
+type BinapiRPC struct {
+       binapi api.Channel
+}
+
+// NewBinapiRPC returns new BinapiRPC to be used as RPC server
+// proxying request to given api.Channel.
+func NewBinapiRPC(binapi api.Channel) *BinapiRPC {
+       return &BinapiRPC{binapi: binapi}
+}
+
+func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
+       log.Printf("BinapiRPC.Invoke - REQ: %#v", req)
+
+       if req.IsMulti {
+               multi := s.binapi.SendMultiRequest(req.Msg)
+               for {
+                       // create new message in response of type ReplyMsg
+                       msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
+
+                       stop, err := multi.ReceiveReply(msg)
+                       if err != nil {
+                               return err
+                       } else if stop {
+                               break
+                       }
+
+                       resp.Msgs = append(resp.Msgs, msg)
+               }
+       } else {
+               // create new message in response of type ReplyMsg
+               resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
+
+               err := s.binapi.SendRequest(req.Msg).ReceiveReply(resp.Msg)
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}