X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=proxy%2Fclient.go;h=d1c5d73d6896e88b0a46c654ee71aca45ac8af98;hb=c0da1f2999a6b08d003c0fed1a23e1ca60dd1571;hp=aea9a943313d8ab990221b2326f3c219982e3b4f;hpb=24f179dbb9534ed7c05bee2a80f18b55443706ab;p=govpp.git diff --git a/proxy/client.go b/proxy/client.go index aea9a94..d1c5d73 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package proxy import ( + "context" "fmt" "net/rpc" "reflect" @@ -132,6 +133,67 @@ type BinapiClient struct { timeout time.Duration } +// RPCStream is a stream for forwarding requests to BinapiRPC's stream. +type RPCStream struct { + rpc *rpc.Client + id uint32 +} + +func (s *RPCStream) SendMsg(msg api.Message) error { + req := RPCStreamReqResp{ + ID: s.id, + Msg: msg, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp) + if err != nil { + return fmt.Errorf("RPC SendMessage call failed: %v", err) + } + return nil +} + +func (s *RPCStream) RecvMsg() (api.Message, error) { + req := RPCStreamReqResp{ + ID: s.id, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp) + if err != nil { + return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err) + } + return resp.Msg, nil +} + +func (s *RPCStream) Close() error { + req := RPCStreamReqResp{ + ID: s.id, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp) + if err != nil { + return fmt.Errorf("RPC CloseStream call failed: %v", err) + } + return nil +} + +func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) { + stream := &RPCStream{ + rpc: b.rpc, + } + req := RPCStreamReqResp{} + resp := RPCStreamReqResp{} + err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp) + if err != nil { + return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err) + } + stream.id = resp.ID + return stream, err +} + +func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error { + return invokeInternal(b.rpc, request, reply, b.timeout) +} + func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx { req := &requestCtx{ rpc: b.rpc, @@ -149,20 +211,24 @@ type requestCtx struct { } func (r *requestCtx) ReceiveReply(msg api.Message) error { + return invokeInternal(r.rpc, r.req, msg, r.timeout) +} + +func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error { req := BinapiRequest{ - Msg: r.req, - ReplyMsg: msg, - Timeout: r.timeout, + Msg: msgIn, + ReplyMsg: msgOut, + Timeout: timeout, } resp := BinapiResponse{} - err := r.rpc.Call("BinapiRPC.Invoke", req, &resp) + err := rpc.Call("BinapiRPC.Invoke", req, &resp) if err != nil { return fmt.Errorf("RPC call failed: %v", err) } // we set the value of msg to the value from response - reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem()) + reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem()) return nil }