X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=proxy%2Fclient.go;h=d1c5d73d6896e88b0a46c654ee71aca45ac8af98;hb=fa7a6d1cee22dae3544f2a6fb6759faf588e603a;hp=7f92946f444e0bad0a8808564af76d01d3d6aa2e;hpb=58601b470bbd4e5ef534fed83511aa5a7f1c2d1e;p=govpp.git diff --git a/proxy/client.go b/proxy/client.go index 7f92946..d1c5d73 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,13 +15,14 @@ package proxy import ( + "context" "fmt" - "git.fd.io/govpp.git/core" "net/rpc" "reflect" "time" "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/core" ) type Client struct { @@ -117,11 +118,82 @@ func (s *StatsClient) GetBufferStats(bufStats *api.BufferStats) error { return nil } +func (s *StatsClient) GetMemoryStats(memStats *api.MemoryStats) error { + req := StatsRequest{StatsType: "memory"} + resp := StatsResponse{MemStats: new(api.MemoryStats)} + if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil { + return err + } + *memStats = *resp.MemStats + return nil +} + type BinapiClient struct { rpc *rpc.Client timeout time.Duration } +// RPCStream is a stream for forwarding requests to BinapiRPC's stream. +type RPCStream struct { + rpc *rpc.Client + id uint32 +} + +func (s *RPCStream) SendMsg(msg api.Message) error { + req := RPCStreamReqResp{ + ID: s.id, + Msg: msg, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp) + if err != nil { + return fmt.Errorf("RPC SendMessage call failed: %v", err) + } + return nil +} + +func (s *RPCStream) RecvMsg() (api.Message, error) { + req := RPCStreamReqResp{ + ID: s.id, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp) + if err != nil { + return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err) + } + return resp.Msg, nil +} + +func (s *RPCStream) Close() error { + req := RPCStreamReqResp{ + ID: s.id, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp) + if err != nil { + return fmt.Errorf("RPC CloseStream call failed: %v", err) + } + return nil +} + +func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) { + stream := &RPCStream{ + rpc: b.rpc, + } + req := RPCStreamReqResp{} + resp := RPCStreamReqResp{} + err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp) + if err != nil { + return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err) + } + stream.id = resp.ID + return stream, err +} + +func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error { + return invokeInternal(b.rpc, request, reply, b.timeout) +} + func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx { req := &requestCtx{ rpc: b.rpc, @@ -139,20 +211,24 @@ type requestCtx struct { } func (r *requestCtx) ReceiveReply(msg api.Message) error { + return invokeInternal(r.rpc, r.req, msg, r.timeout) +} + +func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error { req := BinapiRequest{ - Msg: r.req, - ReplyMsg: msg, - Timeout: r.timeout, + Msg: msgIn, + ReplyMsg: msgOut, + Timeout: timeout, } resp := BinapiResponse{} - err := r.rpc.Call("BinapiRPC.Invoke", req, &resp) + err := rpc.Call("BinapiRPC.Invoke", req, &resp) if err != nil { return fmt.Errorf("RPC call failed: %v", err) } // we set the value of msg to the value from response - reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem()) + reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem()) return nil }