adding some files
authorimarom <[email protected]>
Wed, 12 Aug 2015 12:13:25 +0000 (15:13 +0300)
committerimarom <[email protected]>
Wed, 12 Aug 2015 12:13:25 +0000 (15:13 +0300)
unsteady version

src/gtest/rpc_test.cpp [new file with mode: 0644]
src/rpc-server/include/trex_rpc_req_resp.h [new file with mode: 0644]
src/rpc-server/include/trex_rpc_server_api.h [new file with mode: 0644]
src/rpc-server/src/trex_rpc_req_resp.cpp [new file with mode: 0644]
src/rpc-server/src/trex_rpc_server.cpp [new file with mode: 0644]

diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp
new file mode 100644 (file)
index 0000000..7f78efd
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <common/gtest.h>
+#include <trex_rpc_server_api.h>
+#include <zmq.h>
+#include <json/json.h>
+#include <sstream>
+
+class RpcTest : public testing::Test {
+
+    virtual void SetUp() {
+    }
+
+    virtual void TearDown() {
+    }
+};
+
+TEST_F(RpcTest, basic_rpc_test) {
+    TrexRpcServerArray rpc(TrexRpcServerArray::RPC_PROT_TCP, 5050);
+    rpc.start();
+
+    sleep(1);
+
+    printf ("Connecting to hello world server\85\n");
+    void *context = zmq_ctx_new ();
+    void *requester = zmq_socket (context, ZMQ_REQ);
+    zmq_connect (requester, "tcp://localhost:5050");
+
+    
+    char buffer[50];
+    Json::Value request;
+
+    int id = 1;
+    request["jsonrpc"] = "2.0";
+    request["method"]  = "test_func";
+
+    Json::Value &params = request["params"];
+    params["num"] = 12;
+    params["msg"] = "hello, method test_func";
+
+    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
+        request["id"] = id++;
+
+        std::stringstream ss;
+        ss << request;
+
+        std::cout << "Sending : '" << ss.str() << "'\n";
+        
+        zmq_send (requester, ss.str().c_str(), ss.str().size(), 0);
+
+        zmq_recv (requester, buffer, 50, 0);
+        printf ("Received ACK\n");
+    }
+    zmq_close (requester);
+    zmq_ctx_destroy (context);
+
+    sleep(1);
+
+    rpc.stop();
+}
+
diff --git a/src/rpc-server/include/trex_rpc_req_resp.h b/src/rpc-server/include/trex_rpc_req_resp.h
new file mode 100644 (file)
index 0000000..46d0157
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#ifndef __TREX_RPC_REQ_RESP_API_H__
+#define __TREX_RPC_REQ_RESP_API_H__
+
+#include <trex_rpc_server_api.h>
+
+/**
+ * request-response RPC server
+ * 
+ * @author imarom (11-Aug-15)
+ */
+class TrexRpcServerReqRes : public TrexRpcServerInterface  {
+public:
+
+    TrexRpcServerReqRes(TrexRpcServerArray::protocol_type_e protocol, uint16_t port);
+
+protected:
+    void _rpc_thread_cb();
+    void _stop_rpc_thread();
+
+private:
+    void handle_request(const uint8_t *msg, uint32_t size);
+
+    static const int RPC_MAX_MSG_SIZE = 2048;
+    void            *m_context;
+    void            *m_socket;
+    uint8_t          m_msg_buffer[RPC_MAX_MSG_SIZE];
+};
+
+
+#endif /* __TREX_RPC_REQ_RESP_API_H__ */
diff --git a/src/rpc-server/include/trex_rpc_server_api.h b/src/rpc-server/include/trex_rpc_server_api.h
new file mode 100644 (file)
index 0000000..98e5f97
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#ifndef __TREX_RPC_SERVER_API_H__
+#define __TREX_RPC_SERVER_API_H__
+
+#include <stdint.h>
+#include <vector>
+#include <thread>
+#include <string>
+#include <stdexcept>
+
+/**
+ * generic exception for RPC errors
+ * 
+ */
+class TrexRpcException : public std::runtime_error 
+{
+public:
+    TrexRpcException(const std::string &what) : std::runtime_error(what) {
+    }
+};
+
+/* forward decl. of class */
+class TrexRpcServerInterface;
+
+/**
+ * servers array
+ * 
+ * @author imarom (12-Aug-15)
+ */
+class TrexRpcServerArray {
+public:
+   /**
+    * different types the RPC server supports
+    */
+    enum protocol_type_e {
+        RPC_PROT_TCP
+    };
+
+    TrexRpcServerArray(protocol_type_e protocol, uint16_t port);
+    ~TrexRpcServerArray();
+
+    void start();
+    void stop();
+
+private:
+    std::vector<TrexRpcServerInterface *>  m_servers;
+    protocol_type_e                        m_protocol;
+    uint16_t                               m_port;
+};
+
+/**
+ * generic type RPC server instance
+ * 
+ * @author imarom (12-Aug-15)
+ */
+class TrexRpcServerInterface {
+public:
+  
+    TrexRpcServerInterface(TrexRpcServerArray::protocol_type_e protocol, uint16_t port);
+    virtual ~TrexRpcServerInterface();
+
+    /**
+     * starts the server
+     * 
+     */
+    void start();
+
+    /**
+     * stops the server
+     * 
+     */
+    void stop();
+
+    /**
+     * return TRUE if server is active
+     * 
+     */
+    bool is_running();
+
+protected:
+    /**
+     * instances implement this
+     * 
+     */
+    virtual void _rpc_thread_cb() = 0;
+    virtual void _stop_rpc_thread()  = 0;
+
+    TrexRpcServerArray::protocol_type_e  m_protocol;
+    uint16_t                             m_port;
+    bool                                 m_is_running;
+    std::thread                          *m_thread;
+};
+
+#endif /* __TREX_RPC_SERVER_API_H__ */
diff --git a/src/rpc-server/src/trex_rpc_req_resp.cpp b/src/rpc-server/src/trex_rpc_req_resp.cpp
new file mode 100644 (file)
index 0000000..a64129a
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_rpc_server_api.h>
+#include <trex_rpc_req_resp.h>
+
+#include <unistd.h>
+#include <sstream>
+#include <iostream>
+
+#include <zmq.h>
+#include <json/json.h>
+
+
+TrexRpcServerReqRes::TrexRpcServerReqRes(TrexRpcServerArray::protocol_type_e protocol, uint16_t port) : TrexRpcServerInterface(protocol, port) {
+    /* ZMQ is not thread safe - this should be outside */
+    m_context = zmq_ctx_new();
+}
+
+void TrexRpcServerReqRes::_rpc_thread_cb() {
+    std::stringstream ss;
+
+    //  Socket to talk to clients
+    m_socket  = zmq_socket (m_context, ZMQ_REP);
+
+    switch (m_protocol) {
+    case TrexRpcServerArray::RPC_PROT_TCP:
+        ss << "tcp://*:";
+        break;
+    default:
+        throw TrexRpcException("unknown protocol for RPC");
+    }
+
+    ss << m_port;
+
+    int rc = zmq_bind (m_socket, ss.str().c_str());
+    if (rc != 0) {
+        throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
+    }
+
+    printf("listening on %s\n", ss.str().c_str());
+
+    /* server main loop */
+    while (m_is_running) {
+        int msg_size = zmq_recv (m_socket, m_msg_buffer, sizeof(m_msg_buffer), 0);
+
+        if (msg_size == -1) {
+            /* normal shutdown and zmq_term was called */
+            if (errno == ETERM) {
+                break;
+            } else {
+                throw TrexRpcException("Unhandled error of zmq_recv");
+            }
+        }
+
+        handle_request(m_msg_buffer, msg_size);
+    }
+
+    /* must be done from the same thread */
+    zmq_close(m_socket);
+}
+
+void TrexRpcServerReqRes::_stop_rpc_thread() {
+    /* by calling zmq_term we signal the blocked thread to exit */
+    zmq_term(m_context);
+
+}
+
+void TrexRpcServerReqRes::handle_request(const uint8_t *msg, uint32_t msg_size) {
+    Json::Reader  reader;
+    Json::Value   request;
+    std::string   response;
+
+    /* parse the json request */
+    bool rc = reader.parse( (const char *)msg, (const char *)msg + msg_size, request, false);
+    if (!rc) {
+        throw TrexRpcException("Unable to decode JSON RPC request: " + std::string( (const char *)msg, msg_size));
+    }
+    std::cout << request << std::endl;
+
+    #if 0
+    TrexJsonRpcRequest rpc_request(msg, msg_size);
+
+    rpc_request->parse();
+    rpc_request->execute();
+
+    rpc_request->get_response(response);
+
+    zmq_send(m_socket, response, response.size(), 0);
+    #endif
+
+    zmq_send(m_socket, "ACK", 3 ,0);
+    
+}
diff --git a/src/rpc-server/src/trex_rpc_server.cpp b/src/rpc-server/src/trex_rpc_server.cpp
new file mode 100644 (file)
index 0000000..3f6f510
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_rpc_server_api.h>
+#include <trex_rpc_req_resp.h>
+#include <unistd.h>
+#include <zmq.h>
+#include <sstream>
+
+/************** RPC server array *************/
+
+TrexRpcServerArray::TrexRpcServerArray(protocol_type_e protocol, uint16_t prot) {
+
+    /* add the request response server */
+    m_servers.push_back(new TrexRpcServerReqRes(protocol, prot));
+}
+
+TrexRpcServerArray::~TrexRpcServerArray() {
+
+    /* make sure they are all stopped */
+    TrexRpcServerArray::stop();
+
+    for (auto server : m_servers) {
+        delete server;
+    }
+}
+
+/**
+ * start the server array
+ * 
+ */
+void TrexRpcServerArray::start() {
+    for (auto server : m_servers) {
+        server->start();
+    }
+}
+
+/**
+ * stop the server array
+ * 
+ */
+void TrexRpcServerArray::stop() {
+    for (auto server : m_servers) {
+        if (server->is_running()) {
+            server->stop();
+        }
+    }
+}
+
+/************** RPC server interface ***************/
+
+TrexRpcServerInterface::TrexRpcServerInterface(TrexRpcServerArray::protocol_type_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) {
+    m_is_running = false;
+}
+
+TrexRpcServerInterface::~TrexRpcServerInterface() {
+
+}
+
+void TrexRpcServerInterface::start() {
+    m_is_running = true;
+
+    m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this);
+    if (!m_thread) {
+        throw TrexRpcException("unable to create RPC thread");
+    }
+}
+
+void TrexRpcServerInterface::stop() {
+    m_is_running = false;
+
+    /* call the dynamic type class stop */
+    _stop_rpc_thread();
+    
+    /* hold until thread has joined */    
+    m_thread->join();
+}
+
+bool TrexRpcServerInterface::is_running() {
+    return m_is_running;
+}
+