proxy update to vpp 20.05
[govpp.git] / proxy / client.go
index aea9a94..d1c5d73 100644 (file)
@@ -1,4 +1,4 @@
-//  Copyright (c) 2019 Cisco and/or its affiliates.
+//  Copyright (c) 2021 Cisco and/or its affiliates.
 //
 //  Licensed under the Apache License, Version 2.0 (the "License");
 //  you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
 package proxy
 
 import (
+       "context"
        "fmt"
        "net/rpc"
        "reflect"
@@ -132,6 +133,67 @@ type BinapiClient struct {
        timeout time.Duration
 }
 
+// RPCStream is a stream for forwarding requests to BinapiRPC's stream.
+type RPCStream struct {
+       rpc *rpc.Client
+       id  uint32
+}
+
+func (s *RPCStream) SendMsg(msg api.Message) error {
+       req := RPCStreamReqResp{
+               ID:  s.id,
+               Msg: msg,
+       }
+       resp := RPCStreamReqResp{}
+       err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp)
+       if err != nil {
+               return fmt.Errorf("RPC SendMessage call failed: %v", err)
+       }
+       return nil
+}
+
+func (s *RPCStream) RecvMsg() (api.Message, error) {
+       req := RPCStreamReqResp{
+               ID: s.id,
+       }
+       resp := RPCStreamReqResp{}
+       err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp)
+       if err != nil {
+               return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err)
+       }
+       return resp.Msg, nil
+}
+
+func (s *RPCStream) Close() error {
+       req := RPCStreamReqResp{
+               ID: s.id,
+       }
+       resp := RPCStreamReqResp{}
+       err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp)
+       if err != nil {
+               return fmt.Errorf("RPC CloseStream call failed: %v", err)
+       }
+       return nil
+}
+
+func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) {
+       stream := &RPCStream{
+               rpc: b.rpc,
+       }
+       req := RPCStreamReqResp{}
+       resp := RPCStreamReqResp{}
+       err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp)
+       if err != nil {
+               return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err)
+       }
+       stream.id = resp.ID
+       return stream, err
+}
+
+func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error {
+       return invokeInternal(b.rpc, request, reply, b.timeout)
+}
+
 func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
        req := &requestCtx{
                rpc:     b.rpc,
@@ -149,20 +211,24 @@ type requestCtx struct {
 }
 
 func (r *requestCtx) ReceiveReply(msg api.Message) error {
+       return invokeInternal(r.rpc, r.req, msg, r.timeout)
+}
+
+func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error {
        req := BinapiRequest{
-               Msg:      r.req,
-               ReplyMsg: msg,
-               Timeout:  r.timeout,
+               Msg:      msgIn,
+               ReplyMsg: msgOut,
+               Timeout:  timeout,
        }
        resp := BinapiResponse{}
 
-       err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+       err := rpc.Call("BinapiRPC.Invoke", req, &resp)
        if err != nil {
                return fmt.Errorf("RPC call failed: %v", err)
        }
 
        // we set the value of msg to the value from response
-       reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
+       reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
 
        return nil
 }