added ownership to RPC server
authorimarom <[email protected]>
Wed, 9 Sep 2015 15:06:12 +0000 (18:06 +0300)
committerimarom <[email protected]>
Wed, 9 Sep 2015 15:56:11 +0000 (18:56 +0300)
src/gtest/rpc_test.cpp
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/rpc-server/commands/trex_rpc_cmd_stream.cpp
src/rpc-server/commands/trex_rpc_cmds.h
src/rpc-server/trex_rpc_cmd.cpp
src/rpc-server/trex_rpc_cmd_api.h
src/rpc-server/trex_rpc_cmds_table.cpp
src/rpc-server/trex_rpc_server.cpp
src/rpc-server/trex_rpc_server_api.h
src/stateless/trex_stream_api.h

index 8a7e917..767560b 100644 (file)
@@ -30,6 +30,32 @@ using namespace std;
 
 class RpcTest : public testing::Test {
 
+    void take_ownership(void) {
+        Json::Value request;
+        Json::Value response;
+
+        create_request(request, "aquire", 1 , false);
+
+        request["params"]["user"] = "test";
+        request["params"]["force"] = true;
+
+        send_request(request, response);
+
+        EXPECT_TRUE(response["result"] != Json::nullValue);
+        m_ownership_handler = response["result"].asString();
+    }
+
+    void release_ownership() {
+        Json::Value request;
+        Json::Value response;
+
+        create_request(request, "release", 1 , false);
+        request["params"]["handler"] = m_ownership_handler;
+
+        send_request(request, response);
+        EXPECT_TRUE(response["result"] == "ACK");
+    }
+
     virtual void SetUp() {
         TrexRpcServerConfig cfg = TrexRpcServerConfig(TrexRpcServerConfig::RPC_PROT_TCP, 5050);
 
@@ -39,6 +65,8 @@ class RpcTest : public testing::Test {
         m_context = zmq_ctx_new ();
         m_socket = zmq_socket (m_context, ZMQ_REQ);
         zmq_connect (m_socket, "tcp://localhost:5050");
+
+        take_ownership();
     }
 
     virtual void TearDown() {
@@ -50,6 +78,33 @@ class RpcTest : public testing::Test {
     }
 
 public:
+
+    void create_request(Json::Value &request, const string &method, int id = 1, bool ownership = false) {
+        request.clear();
+
+        request["jsonrpc"] = "2.0";
+        request["id"] = id;
+        request["method"] = method;
+
+        if (ownership) {
+            request["params"]["handler"] = m_ownership_handler; 
+        }
+    }
+
+    void send_request(const Json::Value &request, Json::Value &response) {
+        Json::FastWriter writer;
+        Json::Reader reader;
+
+        response.clear();
+
+        string request_str = writer.write(request);
+        string ret = send_msg(request_str);
+
+        EXPECT_TRUE(reader.parse(ret, response, false));
+        EXPECT_EQ(response["jsonrpc"], "2.0");
+        EXPECT_EQ(response["id"], request["id"]);
+    }
+
     string send_msg(const string &msg) {
         char buffer[512];
 
@@ -62,6 +117,7 @@ public:
     TrexRpcServer *m_rpc;
     void *m_context;
     void *m_socket;
+    string m_ownership_handler;
 };
 
 TEST_F(RpcTest, basic_rpc_test) {
@@ -230,42 +286,48 @@ TEST_F(RpcTest, add_stream) {
     Json::Value response;
     Json::Reader reader;
 
-    string resp_str;
+    create_request(request, "get_stream", 1, true);
 
-    // check the stream does not exists
-    string lookup_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"get_stream\", \"params\":{\"port_id\":1, \"stream_id\":5}}";
-    resp_str = send_msg(lookup_str);
+    request["params"]["port_id"] = 1;
+    request["params"]["stream_id"] = 5;
+
+    send_request(request, response);
 
-    EXPECT_TRUE(reader.parse(resp_str, response, false));
     EXPECT_EQ(response["jsonrpc"], "2.0");
     EXPECT_EQ(response["id"], 1);
-
     EXPECT_EQ(response["error"]["code"], -32000);
 
     // add it
+    create_request(request, "add_stream", 1, true);
+    request["params"]["port_id"] = 1;
+    request["params"]["stream_id"] = 5;
+    request["params"]["stream"]["mode"]["type"] = "continuous";
+    request["params"]["stream"]["mode"]["pps"] = 3; 
+    request["params"]["stream"]["isg"] = 4.3;
+    request["params"]["stream"]["enabled"] = true;
+    request["params"]["stream"]["self_start"] = true;
+    request["params"]["stream"]["next_stream_id"] = -1;
+
+    request["params"]["stream"]["packet"]["meta"] = "dummy";
+    request["params"]["stream"]["packet"]["binary"][0] = 4;
+    request["params"]["stream"]["packet"]["binary"][1] = 1;
+    request["params"]["stream"]["packet"]["binary"][2] = 255;
+
+    request["params"]["stream"]["vm"] = Json::arrayValue;
+    request["params"]["stream"]["rx_stats"]["enabled"] = false;
+
+    send_request(request, response);
 
-    string add_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"add_stream\", \"params\":"
-              "{\"port_id\":1, \"stream_id\":5, \"stream\":{"
-              "\"mode\": {\"type\":\"continuous\", \"pps\":3},"
-              "\"isg\":4.3, \"enabled\":true, \"self_start\":true,"
-              "\"next_stream_id\":-1,"
-              "\"packet\":{\"binary\":[4,1,255], \"meta\":\"dummy\"},"
-              "\"vm\":[],"
-              "\"rx_stats\":{\"enabled\":false}}}}";
+    EXPECT_EQ(response["result"], "ACK");
 
-    resp_str = send_msg(add_str);
+    /* get it */
 
-    EXPECT_TRUE(reader.parse(resp_str, response, false));
-    EXPECT_EQ(response["jsonrpc"], "2.0");
-    EXPECT_EQ(response["id"], 1);
+    create_request(request, "get_stream", 1, true);
 
-    EXPECT_EQ(response["result"], "ACK");
+    request["params"]["port_id"] = 1;
+    request["params"]["stream_id"] = 5;
 
-    resp_str = send_msg(lookup_str);
-
-    EXPECT_TRUE(reader.parse(resp_str, response, false));
-    EXPECT_EQ(response["jsonrpc"], "2.0");
-    EXPECT_EQ(response["id"], 1);
+    send_request(request, response);
 
     const Json::Value &stream = response["result"]["stream"];
 
@@ -286,23 +348,17 @@ TEST_F(RpcTest, add_stream) {
     EXPECT_EQ(stream["mode"]["pps"], 3);
 
     // remove it
+    create_request(request, "remove_stream", 1, true);
 
-    string remove_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"remove_stream\", \"params\":{\"port_id\":1, \"stream_id\":5}}";
-    resp_str = send_msg(remove_str);
+    request["params"]["port_id"] = 1;
+    request["params"]["stream_id"] = 5;
 
-    EXPECT_TRUE(reader.parse(resp_str, response, false));
-    EXPECT_EQ(response["jsonrpc"], "2.0");
-    EXPECT_EQ(response["id"], 1);
+    send_request(request, response);
 
     EXPECT_EQ(response["result"], "ACK");
 
-    resp_str = send_msg(remove_str);
-
     // should not be present anymore
-
-    EXPECT_TRUE(reader.parse(resp_str, response, false));
-    EXPECT_EQ(response["jsonrpc"], "2.0");
-    EXPECT_EQ(response["id"], 1);
+    send_request(request, response);
 
     EXPECT_EQ(response["error"]["code"], -32000);
 
index 32952b1..d5aa3a9 100644 (file)
@@ -63,3 +63,56 @@ TrexRpcCmdGetStatus::_run(const Json::Value &params, Json::Value &result) {
     return (TREX_RPC_CMD_OK);
 }
 
+/**
+ * returns the current owner of the device
+ * 
+ * @author imarom (08-Sep-15)
+ * 
+ * @param params 
+ * @param result 
+ * 
+ * @return trex_rpc_cmd_rc_e 
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdGetOwner::_run(const Json::Value &params, Json::Value &result) {
+    Json::Value &section = result["result"];
+
+    section["owner"] = TrexRpcServer::get_owner();
+
+    return (TREX_RPC_CMD_OK);
+}
+
+/**
+ * acquire device
+ * 
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdAcquire::_run(const Json::Value &params, Json::Value &result) {
+
+    const string &new_owner = parse_string(params, "user", result);
+    bool force = parse_bool(params, "force", result);
+
+    /* if not free and not you and not force - fail */
+    if ( (!TrexRpcServer::is_free_to_aquire()) && (TrexRpcServer::get_owner() != new_owner) && (!force)) {
+        generate_execute_err(result, "device is already taken by '" + TrexRpcServer::get_owner() + "'");
+    }
+
+    string handle = TrexRpcServer::set_owner(new_owner);
+
+    result["result"] = handle;
+
+    return (TREX_RPC_CMD_OK);
+}
+
+/**
+ * release device
+ * 
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdRelease::_run(const Json::Value &params, Json::Value &result) {
+    TrexRpcServer::clear_owner();
+
+    result["result"] = "ACK";
+
+    return (TREX_RPC_CMD_OK);
+}
index 90b55ea..b62d213 100644 (file)
@@ -136,13 +136,13 @@ TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, uint8_t por
     } else if (type == "single_burst") {
 
         uint32_t total_pkts      = parse_int(mode, "total_pkts", result);
-        uint32_t pps             = parse_int(mode, "pps", result);
+        double pps               = parse_double(mode, "pps", result);
 
         stream = new TrexStreamBurst(port_id, stream_id, total_pkts, pps);
 
     } else if (type == "multi_burst") {
 
-        uint32_t  pps              = parse_int(mode, "pps", result);
+        double    pps              = parse_double(mode, "pps", result);
         double    ibg_usec         = parse_double(mode, "ibg", result);
         uint32_t  num_bursts       = parse_int(mode, "number_of_bursts", result);
         uint32_t  pkts_per_burst   = parse_int(mode, "pkts_per_burst", result);
index f88631b..3213819 100644 (file)
@@ -35,37 +35,45 @@ class TrexStream;
  * syntactic sugar for creating a simple command
  */
 
-#define TREX_RPC_CMD_DEFINE_EXTENED(class_name, cmd_name, param_count, ext)                               \
+#define TREX_RPC_CMD_DEFINE_EXTENED(class_name, cmd_name, param_count, needs_ownership, ext)              \
     class class_name : public TrexRpcCommand {                                                            \
     public:                                                                                               \
-        class_name () : TrexRpcCommand(cmd_name, param_count) {}                                          \
+        class_name () : TrexRpcCommand(cmd_name, param_count, needs_ownership) {}                         \
     protected:                                                                                            \
         virtual trex_rpc_cmd_rc_e _run(const Json::Value &params, Json::Value &result);                   \
         ext                                                                                               \
     }
 
-#define TREX_RPC_CMD_DEFINE(class_name, cmd_name, param_count) TREX_RPC_CMD_DEFINE_EXTENED(class_name, cmd_name, param_count, ;)
+#define TREX_RPC_CMD_DEFINE(class_name, cmd_name, param_count, needs_ownership) TREX_RPC_CMD_DEFINE_EXTENED(class_name, cmd_name, param_count, needs_ownership, ;)
 
 /**
  * test cmds
  */
-TREX_RPC_CMD_DEFINE(TrexRpcCmdTestAdd,    "test_add", 2);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub,    "test_sub", 2);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdTestAdd,    "test_add", 2, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub,    "test_sub", 2, false);
 
 /**
  * general cmds
  */
-TREX_RPC_CMD_DEFINE(TrexRpcCmdPing,       "ping",            0);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetReg,     "get_reg_cmds",    0);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStatus,  "get_status",      0);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPing,       "ping",            0, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetReg,     "get_reg_cmds",    0, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStatus,  "get_status",      0, false);
+
+/**
+ * ownership
+ */
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetOwner,   "get_owner",       0, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdAcquire,    "acquire",         2, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease,    "release",         0, true);
+
 
 /**
  * stream cmds
  */
-TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams,   "remove_all_streams",   1);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream,       "remove_stream",        2);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams,   "remove_all_streams",   1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream,       "remove_stream",        2, true);
 
-TREX_RPC_CMD_DEFINE_EXTENED(TrexRpcCmdAddStream, "add_stream", 3,
+TREX_RPC_CMD_DEFINE_EXTENED(TrexRpcCmdAddStream, "add_stream", 3, true, 
 
 /* extended part */
 TrexStream * allocate_new_stream(const Json::Value &section, uint8_t port_id, uint32_t stream_id, Json::Value &result);
@@ -77,11 +85,11 @@ void parse_vm_instr_write_flow_var(const Json::Value &inst, TrexStream *stream,
 );
 
 
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, true);
 
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 2);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 2, true);
 
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 1);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true);
 
 #endif /* __TREX_RPC_CMD_H__ */
index 3fc77f7..437e8b1 100644 (file)
@@ -19,6 +19,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 #include <trex_rpc_cmd_api.h>
+#include <trex_rpc_server_api.h>
 
 trex_rpc_cmd_rc_e 
 TrexRpcCommand::run(const Json::Value &params, Json::Value &result) {
@@ -26,8 +27,16 @@ TrexRpcCommand::run(const Json::Value &params, Json::Value &result) {
 
     /* the internal run can throw a parser error / other error */
     try {
+
         check_param_count(params, m_param_count, result);
+
+        if (m_needs_ownership) {
+            verify_ownership(params, result);
+        }
+
+        /* run the command itself*/
         rc = _run(params, result);
+
     } catch (TrexRpcCommandException &e) {
         return e.get_rc();
     }
@@ -45,6 +54,15 @@ TrexRpcCommand::check_param_count(const Json::Value &params, int expected, Json:
     }
 }
 
+void
+TrexRpcCommand::verify_ownership(const Json::Value &params, Json::Value &result) {
+    std::string handler = parse_string(params, "handler", result);
+
+    if (!TrexRpcServer::verify_owner_handler(handler)) {
+        generate_execute_err(result, "invalid handler provided. please pass the handler given when calling 'acquire' or take ownership");
+    }
+}
+
 const char *
 TrexRpcCommand::type_to_str(field_type_e type) {
     switch (type) {
index def52fc..c72b3e3 100644 (file)
@@ -68,8 +68,15 @@ public:
     /**
      * method name and params
      */
-    TrexRpcCommand(const std::string &method_name, int param_count) : m_name(method_name), m_param_count(param_count) {
-
+    TrexRpcCommand(const std::string &method_name, int param_count, bool needs_ownership) : 
+                                                                    m_name(method_name),
+                                                                    m_param_count(param_count),
+                                                                    m_needs_ownership(needs_ownership) {
+
+        /* if needs ownership - another field is needed (handler) */
+        if (m_needs_ownership) {
+            m_param_count++;
+        }
     }
 
     /**
@@ -111,6 +118,12 @@ protected:
      */
     void check_param_count(const Json::Value &params, int expected, Json::Value &result);
 
+    /**
+     * verify ownership
+     * 
+     */
+    void verify_ownership(const Json::Value &params, Json::Value &result);
+
     /**
      * parse functions
      * 
@@ -209,6 +222,7 @@ protected:
     /* RPC command name */
     std::string   m_name;
     int           m_param_count;
+    bool          m_needs_ownership;
 };
 
 #endif /* __TREX_RPC_CMD_API_H__ */
index 7166899..f968bb7 100644 (file)
@@ -30,9 +30,15 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
     /* add the test command (for gtest) */
     register_command(new TrexRpcCmdTestAdd());
     register_command(new TrexRpcCmdTestSub());
+    
+
+    /* general */
     register_command(new TrexRpcCmdPing());
     register_command(new TrexRpcCmdGetReg());
     register_command(new TrexRpcCmdGetStatus());
+    register_command(new TrexRpcCmdGetOwner());
+    register_command(new TrexRpcCmdAcquire());
+    register_command(new TrexRpcCmdRelease());
 
     /* stream commands */
     register_command(new TrexRpcCmdAddStream());
index 149bb66..a5988ca 100644 (file)
@@ -112,6 +112,7 @@ get_current_date_time() {
 
 const std::string TrexRpcServer::s_server_uptime = get_current_date_time();
 std::string TrexRpcServer::s_owner = "none";
+std::string TrexRpcServer::s_owner_handler = "";
 
 TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg) {
 
@@ -157,3 +158,22 @@ void TrexRpcServer::set_verbose(bool verbose) {
     }
 }
 
+/**
+ * generate a random connection handler
+ * 
+ */
+std::string TrexRpcServer::generate_handler() {
+    std::stringstream ss;
+
+    static const char alphanum[] =
+        "0123456789"
+        "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+        "abcdefghijklmnopqrstuvwxyz";
+
+    /* generate 8 bytes of random handler */
+    for (int i = 0; i < 8; ++i) {
+        ss << alphanum[rand() % (sizeof(alphanum) - 1)];
+    }
+
+    return (ss.str());
+}
index b431367..c34ac0f 100644 (file)
@@ -172,25 +172,51 @@ public:
         return s_owner;
     }
 
+    /**
+     * owner handler 
+     * for the connection 
+     * 
+     */
+    static const std::string &get_owner_handler() {
+        return s_owner_handler;
+    }
+
+    static bool is_free_to_aquire() {
+        return (s_owner == "none");
+    }
+
     /**
     * take ownership of the server array 
     * this is static 
     * ownership is total 
     * 
     */
-    static void set_owner(const std::string &owner) {
+    static std::string set_owner(const std::string &owner) {
         s_owner = owner;
+        s_owner_handler = generate_handler();
+        return (s_owner_handler);
     }
 
     static void clear_owner() {
         s_owner = "none";
+        s_owner_handler = "";
+    }
+
+    static bool verify_owner_handler(const std::string &handler) {
+
+        return ( (s_owner != "none") && (s_owner_handler == handler) );
+
     }
 
 private:
+    static std::string generate_handler();
+
     std::vector<TrexRpcServerInterface *>   m_servers;
     bool                                    m_verbose;
     static const std::string                s_server_uptime;
+
     static std::string                      s_owner;
+    static std::string                      s_owner_handler;
 };
 
 #endif /* __TREX_RPC_SERVER_API_H__ */
index 2699975..0a955ff 100644 (file)
@@ -90,15 +90,15 @@ private:
  */
 class TrexStreamContinuous : public TrexStream {
 public:
-    TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, uint32_t pps) : TrexStream(port_id, stream_id), m_pps(pps) {
+    TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, double pps) : TrexStream(port_id, stream_id), m_pps(pps) {
     }
 
-    uint32_t get_pps() {
+    double get_pps() {
         return m_pps;
     }
 
 protected:
-    uint32_t m_pps;
+    double m_pps;
 };
 
 /**
@@ -107,7 +107,7 @@ protected:
  */
 class TrexStreamBurst : public TrexStream {
 public:
-    TrexStreamBurst(uint8_t port_id, uint32_t stream_id, uint32_t total_pkts, uint32_t pps) : 
+    TrexStreamBurst(uint8_t port_id, uint32_t stream_id, uint32_t total_pkts, double pps) : 
         TrexStream(port_id, stream_id),
         m_total_pkts(total_pkts),
         m_pps(pps) {
@@ -115,7 +115,7 @@ public:
 
 protected:
     uint32_t   m_total_pkts;
-    uint32_t   m_pps;
+    double     m_pps;
 };
 
 /**
@@ -127,7 +127,7 @@ public:
     TrexStreamMultiBurst(uint8_t  port_id,
                          uint32_t stream_id,
                          uint32_t pkts_per_burst,
-                         uint32_t pps,
+                         double   pps,
                          uint32_t num_bursts,
                          double   ibg_usec) : TrexStreamBurst(port_id, stream_id, pkts_per_burst, pps), m_num_bursts(num_bursts), m_ibg_usec(ibg_usec) {