proxy update to vpp 20.05 33/33233/4
authormhalaj1 <matus.halaj@pantheon.tech>
Thu, 22 Jul 2021 09:58:59 +0000 (11:58 +0200)
committerOndrej Fabry <ofabry@cisco.com>
Tue, 10 Aug 2021 10:00:27 +0000 (10:00 +0000)
Signed-off-by: mhalaj1 <matus.halaj@pantheon.tech>
Change-Id: I1c7b11950756d0fe789eb7badc3e883c12826671

api/api.go
proxy/client.go
proxy/server.go

index 93f2b42..3089f24 100644 (file)
@@ -1,4 +1,4 @@
-//  Copyright (c) 2020 Cisco and/or its affiliates.
+//  Copyright (c) 2021 Cisco and/or its affiliates.
 //
 //  Licensed under the Apache License, Version 2.0 (the "License");
 //  you may not use this file except in compliance with the License.
@@ -28,7 +28,8 @@ type Connection interface {
        NewStream(ctx context.Context, options ...StreamOption) (Stream, error)
 
        // Invoke can be used for a simple request-reply RPC.
-       // It creates stream and calls SendMsg with req and RecvMsg with reply.
+       // It creates stream and calls SendMsg with req and RecvMsg which returns
+       // reply.
        Invoke(ctx context.Context, req Message, reply Message) error
 }
 
index aea9a94..d1c5d73 100644 (file)
@@ -1,4 +1,4 @@
-//  Copyright (c) 2019 Cisco and/or its affiliates.
+//  Copyright (c) 2021 Cisco and/or its affiliates.
 //
 //  Licensed under the Apache License, Version 2.0 (the "License");
 //  you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
 package proxy
 
 import (
+       "context"
        "fmt"
        "net/rpc"
        "reflect"
@@ -132,6 +133,67 @@ type BinapiClient struct {
        timeout time.Duration
 }
 
+// RPCStream is a stream for forwarding requests to BinapiRPC's stream.
+type RPCStream struct {
+       rpc *rpc.Client
+       id  uint32
+}
+
+func (s *RPCStream) SendMsg(msg api.Message) error {
+       req := RPCStreamReqResp{
+               ID:  s.id,
+               Msg: msg,
+       }
+       resp := RPCStreamReqResp{}
+       err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp)
+       if err != nil {
+               return fmt.Errorf("RPC SendMessage call failed: %v", err)
+       }
+       return nil
+}
+
+func (s *RPCStream) RecvMsg() (api.Message, error) {
+       req := RPCStreamReqResp{
+               ID: s.id,
+       }
+       resp := RPCStreamReqResp{}
+       err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp)
+       if err != nil {
+               return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err)
+       }
+       return resp.Msg, nil
+}
+
+func (s *RPCStream) Close() error {
+       req := RPCStreamReqResp{
+               ID: s.id,
+       }
+       resp := RPCStreamReqResp{}
+       err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp)
+       if err != nil {
+               return fmt.Errorf("RPC CloseStream call failed: %v", err)
+       }
+       return nil
+}
+
+func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) {
+       stream := &RPCStream{
+               rpc: b.rpc,
+       }
+       req := RPCStreamReqResp{}
+       resp := RPCStreamReqResp{}
+       err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp)
+       if err != nil {
+               return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err)
+       }
+       stream.id = resp.ID
+       return stream, err
+}
+
+func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error {
+       return invokeInternal(b.rpc, request, reply, b.timeout)
+}
+
 func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
        req := &requestCtx{
                rpc:     b.rpc,
@@ -149,20 +211,24 @@ type requestCtx struct {
 }
 
 func (r *requestCtx) ReceiveReply(msg api.Message) error {
+       return invokeInternal(r.rpc, r.req, msg, r.timeout)
+}
+
+func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error {
        req := BinapiRequest{
-               Msg:      r.req,
-               ReplyMsg: msg,
-               Timeout:  r.timeout,
+               Msg:      msgIn,
+               ReplyMsg: msgOut,
+               Timeout:  timeout,
        }
        resp := BinapiResponse{}
 
-       err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+       err := rpc.Call("BinapiRPC.Invoke", req, &resp)
        if err != nil {
                return fmt.Errorf("RPC call failed: %v", err)
        }
 
        // we set the value of msg to the value from response
-       reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
+       reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
 
        return nil
 }
index e395468..243001a 100644 (file)
@@ -1,4 +1,4 @@
-//  Copyright (c) 2019 Cisco and/or its affiliates.
+//  Copyright (c) 2021 Cisco and/or its affiliates.
 //
 //  Licensed under the Apache License, Version 2.0 (the "License");
 //  you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
 package proxy
 
 import (
+       "context"
        "errors"
        "fmt"
        "reflect"
@@ -230,11 +231,17 @@ type BinapiCompatibilityResponse struct {
        IncompatibleMsgs map[string][]string
 }
 
-// BinapiRPC is a RPC server for proxying client request to api.Channel.
+// BinapiRPC is a RPC server for proxying client request to api.Channel
+// or api.Stream.
 type BinapiRPC struct {
        binapiConn *core.Connection
        binapi     adapter.VppAPI
 
+       streamsLock sync.Mutex
+       // local ID, different from api.Stream ID
+       maxStreamID uint32
+       streams     map[uint32]api.Stream
+
        events chan core.ConnectionEvent
        done   chan struct{}
        // non-zero if the RPC service is available
@@ -324,6 +331,101 @@ func (s *BinapiRPC) serviceAvailable() bool {
        return atomic.LoadUint32(&s.available) == 1
 }
 
+type RPCStreamReqResp struct {
+       ID  uint32
+       Msg api.Message
+}
+
+func (s *BinapiRPC) NewAPIStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+       if !s.serviceAvailable() {
+               log.Println(binapiErrorMsg)
+               return errors.New("server does not support RPC calls at this time, try again later")
+       }
+       log.Debugf("BinapiRPC.NewAPIStream - REQ: %#v", req)
+
+       stream, err := s.binapiConn.NewStream(context.Background())
+       if err != nil {
+               return err
+       }
+
+       if s.streams == nil {
+               s.streams = make(map[uint32]api.Stream)
+       }
+
+       s.streamsLock.Lock()
+       s.maxStreamID++
+       s.streams[s.maxStreamID] = stream
+       resp.ID = s.maxStreamID
+       s.streamsLock.Unlock()
+
+       return nil
+}
+
+func (s *BinapiRPC) SendMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+       if !s.serviceAvailable() {
+               log.Println(binapiErrorMsg)
+               return errors.New("server does not support RPC calls at this time, try again later")
+       }
+       log.Debugf("BinapiRPC.SendMessage - REQ: %#v", req)
+
+       stream, err := s.getStream(req.ID)
+       if err != nil {
+               return err
+       }
+
+       return stream.SendMsg(req.Msg)
+}
+
+func (s *BinapiRPC) ReceiveMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+       if !s.serviceAvailable() {
+               log.Println(binapiErrorMsg)
+               return errors.New("server does not support RPC calls at this time, try again later")
+       }
+       log.Debugf("BinapiRPC.ReceiveMessage - REQ: %#v", req)
+
+       stream, err := s.getStream(req.ID)
+       if err != nil {
+               return err
+       }
+
+       resp.Msg, err = stream.RecvMsg()
+       return err
+}
+
+func (s *BinapiRPC) CloseStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+       if !s.serviceAvailable() {
+               log.Println(binapiErrorMsg)
+               return errors.New("server does not support RPC calls at this time, try again later")
+       }
+       log.Debugf("BinapiRPC.CloseStream - REQ: %#v", req)
+
+       stream, err := s.getStream(req.ID)
+       if err != nil {
+               return err
+       }
+
+       s.streamsLock.Lock()
+       delete(s.streams, req.ID)
+       s.streamsLock.Unlock()
+
+       return stream.Close()
+}
+
+func (s *BinapiRPC) getStream(id uint32) (api.Stream, error) {
+       s.streamsLock.Lock()
+       stream := s.streams[id]
+       s.streamsLock.Unlock()
+
+       if stream == nil || reflect.ValueOf(stream).IsNil() {
+               s.streamsLock.Lock()
+               // delete the stream in case it is still in the map
+               delete(s.streams, id)
+               s.streamsLock.Unlock()
+               return nil, errors.New("BinapiRPC stream closed")
+       }
+       return stream, nil
+}
+
 func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
        if !s.serviceAvailable() {
                log.Println(binapiErrorMsg)
@@ -383,12 +485,9 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo
        resp.IncompatibleMsgs = make(map[string][]string)
 
        for path, messages := range api.GetRegisteredMessages() {
-               if resp.IncompatibleMsgs[path] == nil {
-                       resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
-               }
-               if resp.CompatibleMsgs[path] == nil {
-                       resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
-               }
+               resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
+               resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
+
                for _, msg := range req.MsgNameCrcs {
                        val, ok := messages[msg]
                        if !ok {
@@ -402,11 +501,18 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo
                        }
                }
        }
-       for _, messages := range resp.IncompatibleMsgs {
-               if len(messages) > 0 {
-                       return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs)
+
+       compatible := false
+       for path, incompatibleMsgs := range resp.IncompatibleMsgs {
+               if len(incompatibleMsgs) == 0 {
+                       compatible = true
+               } else {
+                       log.Debugf("messages are incompatible for path %s", path)
                }
        }
+       if !compatible {
+               return errors.New("compatibility check failed")
+       }
 
        return nil
 }