added more commands to RPC server
authorimarom <[email protected]>
Sun, 6 Sep 2015 14:55:00 +0000 (17:55 +0300)
committerimarom <[email protected]>
Sun, 6 Sep 2015 14:55:00 +0000 (17:55 +0300)
src/gtest/rpc_test.cpp
src/rpc-server/commands/trex_rpc_cmd_stream.cpp
src/rpc-server/commands/trex_rpc_cmds.h
src/rpc-server/trex_rpc_cmds_table.cpp
src/rpc-server/trex_rpc_server_mock.cpp
src/stateless/trex_stateless.cpp
src/stateless/trex_stateless_api.h
src/stateless/trex_stream.cpp
src/stateless/trex_stream_api.h

index a3df2a6..5d3c473 100644 (file)
@@ -230,13 +230,81 @@ TEST_F(RpcTest, add_stream) {
     Json::Value response;
     Json::Reader reader;
 
-    string req_str;
     string resp_str;
 
-    req_str = "{'stream':{'port_id':7,'stream_id':12,'enable':True,'start':True,'Is':10.0,'packet':[0,1,2,3,4],"
-              "'vm_data':[{'Name':'ip_cnt','Size':4,'big_edian':True,'type':'inc','core_mask':'split','init_val':'10.0.0.7','min':'10.0.0.1','max':'10.0.0.10',}],"
-              "'vm_program':[{'op_core':['read_to_reg_mem','write_reg_offet','write_rand_offset'],'read_name':'nameofopecodetoread','pkt_offset':20}],"
-              "'mode':{'type':'continues','pps':1000},'next_stream':17,'next_stream_loop':100,'rx_stats':{'enable':True,'rx_stream_id':71,'seq_enable':True,'latency':True}}}";
+    // check the stream does not exists
+    string lookup_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"get_stream\", \"params\":{\"port_id\":1, \"stream_id\":5}}";
+    resp_str = send_msg(lookup_str);
+
+    EXPECT_TRUE(reader.parse(resp_str, response, false));
+    EXPECT_EQ(response["jsonrpc"], "2.0");
+    EXPECT_EQ(response["id"], 1);
+
+    EXPECT_EQ(response["error"]["code"], -32000);
+
+    // add it
+
+    string add_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"add_stream\", \"params\":"
+              "{\"port_id\":1, \"stream_id\":5, \"stream\":{"
+              "\"mode\": {\"type\":\"continuous\", \"pps\":3},"
+              "\"isg\":4.3, \"enabled\":true, \"self_start\":true,"
+              "\"next_stream_id\":-1,"
+              "\"packet\":{\"binary\":[4,1,255], \"meta\":\"dummy\"},"
+              "\"rx_stats\":{\"enabled\":false}}}}";
+
+    resp_str = send_msg(add_str);
+
+    EXPECT_TRUE(reader.parse(resp_str, response, false));
+    EXPECT_EQ(response["jsonrpc"], "2.0");
+    EXPECT_EQ(response["id"], 1);
+
+    EXPECT_EQ(response["result"], "ACK");
+
+    resp_str = send_msg(lookup_str);
+
+    EXPECT_TRUE(reader.parse(resp_str, response, false));
+    EXPECT_EQ(response["jsonrpc"], "2.0");
+    EXPECT_EQ(response["id"], 1);
+
+    const Json::Value &stream = response["result"]["stream"];
+
+    EXPECT_EQ(stream["enabled"], true);
+    EXPECT_EQ(stream["self_start"], true);
+
+    EXPECT_EQ(stream["packet"]["binary"][0], 4);
+    EXPECT_EQ(stream["packet"]["binary"][1], 1);
+    EXPECT_EQ(stream["packet"]["binary"][2], 255);
+
+    EXPECT_EQ(stream["packet"]["meta"], "dummy");
+    EXPECT_EQ(stream["next_stream_id"], -1);
+
+    double delta = stream["isg"].asDouble() - 4.3;
+    EXPECT_TRUE(delta < 0.0001);
+
+    EXPECT_EQ(stream["mode"]["type"], "continuous");
+    EXPECT_EQ(stream["mode"]["pps"], 3);
+
+    // remove it
+
+    string remove_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"remove_stream\", \"params\":{\"port_id\":1, \"stream_id\":5}}";
+    resp_str = send_msg(remove_str);
+
+    EXPECT_TRUE(reader.parse(resp_str, response, false));
+    EXPECT_EQ(response["jsonrpc"], "2.0");
+    EXPECT_EQ(response["id"], 1);
+
+    EXPECT_EQ(response["result"], "ACK");
+
+    resp_str = send_msg(remove_str);
+
+    // should not be present anymore
+
+    EXPECT_TRUE(reader.parse(resp_str, response, false));
+    EXPECT_EQ(response["jsonrpc"], "2.0");
+    EXPECT_EQ(response["id"], 1);
+
+    EXPECT_EQ(response["error"]["code"], -32000);
 
-    resp_str = send_msg(req_str);
 }
+
+
index 6dfebe6..1688941 100644 (file)
@@ -34,6 +34,9 @@ using namespace std;
 trex_rpc_cmd_rc_e
 TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
 
+    uint8_t  port_id    = parse_int(params, "port_id", result);
+    uint32_t stream_id  = parse_int(params, "stream_id", result);
+
     const Json::Value &section = parse_object(params, "stream", result);
 
     /* get the type of the stream */
@@ -41,7 +44,7 @@ TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
     string type = parse_string(mode, "type", result);
 
     /* allocate a new stream based on the type */
-    TrexStream *stream = allocate_new_stream(section, result);
+    TrexStream *stream = allocate_new_stream(section, port_id, stream_id, result);
 
     /* some fields */
     stream->m_enabled         = parse_bool(section, "enabled", result);
@@ -97,10 +100,7 @@ TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
 
 
 TrexStream *
-TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, Json::Value &result) {
-
-    uint8_t  port_id    = parse_int(section, "port_id", result);
-    uint32_t stream_id  = parse_int(section, "stream_id", result);
+TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, uint8_t port_id, uint32_t stream_id, Json::Value &result) {
 
     TrexStream *stream;
 
@@ -200,6 +200,7 @@ TrexRpcCmdRemoveStream::_run(const Json::Value &params, Json::Value &result) {
     }
 
     port->get_stream_table()->remove_stream(stream);
+    delete stream;
 
     result["result"] = "ACK";
 
@@ -231,7 +232,7 @@ TrexRpcCmdRemoveAllStreams::_run(const Json::Value &params, Json::Value &result)
 
 /***************************
  * get all streams configured 
- * on specific port 
+ * on specific port 
  * 
  **************************/
 trex_rpc_cmd_rc_e
@@ -261,3 +262,118 @@ TrexRpcCmdGetStreamList::_run(const Json::Value &params, Json::Value &result) {
        return (TREX_RPC_CMD_OK);
 }
 
+/***************************
+ * get stream by id
+ * on a specific port 
+ * 
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdGetStream::_run(const Json::Value &params, Json::Value &result) {
+    uint8_t  port_id = parse_byte(params, "port_id", result);
+
+    uint32_t stream_id = parse_int(params, "stream_id", result);
+
+    if (port_id >= TrexStateless::get_instance().get_port_count()) {
+        std::stringstream ss;
+        ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1;
+        generate_execute_err(result, ss.str());
+    }
+
+    TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id);
+
+    TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id);
+
+    if (!stream) {
+        std::stringstream ss;
+        ss << "stream id " << stream_id << " on port " << (int)port_id << " does not exists";
+        generate_execute_err(result, ss.str());
+    }
+
+    Json::Value stream_json;
+
+    stream_json["enabled"]     = stream->m_enabled;
+    stream_json["self_start"]  = stream->m_self_start;
+
+    stream_json["isg"]            = stream->m_isg_usec;
+    stream_json["next_stream_id"] = stream->m_next_stream_id;
+
+    stream_json["packet"]["binary"]      = Json::arrayValue;
+    for (int i = 0; i < stream->m_pkt.len; i++) {
+        stream_json["packet"]["binary"].append(stream->m_pkt.binary[i]);
+    }
+
+    stream_json["packet"]["meta"] = stream->m_pkt.meta;
+
+    if (TrexStreamContinuous *cont = dynamic_cast<TrexStreamContinuous *>(stream)) {
+        stream_json["mode"]["type"] = "continuous";
+        stream_json["mode"]["pps"]  = cont->get_pps();
+
+    }
+
+    result["result"]["stream"] = stream_json;
+
+    return (TREX_RPC_CMD_OK);
+}
+
+/***************************
+ * start traffic on port
+ * 
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
+    uint8_t  port_id = parse_byte(params, "port_id", result);
+
+    if (port_id >= TrexStateless::get_instance().get_port_count()) {
+        std::stringstream ss;
+        ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1;
+        generate_execute_err(result, ss.str());
+    }
+
+    TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id);
+
+    TrexStatelessPort::traffic_rc_e rc = port->start_traffic();
+
+    if (rc == TrexStatelessPort::TRAFFIC_OK) {
+        result["result"] = "ACK";
+    } else {
+        std::stringstream ss;
+        switch (rc) {
+        case TrexStatelessPort::TRAFFIC_ERR_ALREADY_STARTED:
+            ss << "traffic has already started on that port";
+            break;
+        case TrexStatelessPort::TRAFFIC_ERR_NO_STREAMS:
+            ss << "no active streams on that port";
+            break;
+        default:
+            ss << "failed to start traffic";
+            break;
+        }
+
+        generate_execute_err(result, ss.str());
+    }
+
+    return (TREX_RPC_CMD_OK);
+}
+
+/***************************
+ * start traffic on port
+ * 
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdStopTraffic::_run(const Json::Value &params, Json::Value &result) {
+    uint8_t  port_id = parse_byte(params, "port_id", result);
+
+    if (port_id >= TrexStateless::get_instance().get_port_count()) {
+        std::stringstream ss;
+        ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1;
+        generate_execute_err(result, ss.str());
+    }
+
+    TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id);
+
+    port->stop_traffic();
+    result["result"] = "ACK";
+
+    return (TREX_RPC_CMD_OK);
+}
+
index e9666f2..428d48c 100644 (file)
@@ -65,10 +65,10 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStatus,  "get_status",      0);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams,   "remove_all_streams",   1);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream,       "remove_stream",        2);
 
-TREX_RPC_CMD_DEFINE_EXTENED(TrexRpcCmdAddStream, "add_stream", 1,
+TREX_RPC_CMD_DEFINE_EXTENED(TrexRpcCmdAddStream, "add_stream", 3,
 
 /* extended part */
-TrexStream * allocate_new_stream(const Json::Value &section, Json::Value &result);
+TrexStream * allocate_new_stream(const Json::Value &section, uint8_t port_id, uint32_t stream_id, Json::Value &result);
 void validate_stream(const TrexStream *stream, Json::Value &result);
 
 );
@@ -76,4 +76,9 @@ void validate_stream(const TrexStream *stream, Json::Value &result);
 
 TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1);
 
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 2);
+
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 1);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1);
+
 #endif /* __TREX_RPC_CMD_H__ */
index fc6d7b8..7166899 100644 (file)
@@ -39,6 +39,9 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
     register_command(new TrexRpcCmdRemoveStream());
     register_command(new TrexRpcCmdRemoveAllStreams());
     register_command(new TrexRpcCmdGetStreamList());
+    register_command(new TrexRpcCmdGetStream());
+    register_command(new TrexRpcCmdStartTraffic());
+    register_command(new TrexRpcCmdStopTraffic());
 }
 
 TrexRpcCommandsTable::~TrexRpcCommandsTable() {
index a248558..835e28b 100644 (file)
@@ -44,6 +44,9 @@ int gtest_main(int argc, char **argv);
 
 int main(int argc, char *argv[]) {
 
+    /* configure the stateless object with 4 ports */
+    TrexStateless::configure(4);
+
     // gtest ?
     if (argc > 1) {
         if (string(argv[1]) != "--ut") {
@@ -56,9 +59,6 @@ int main(int argc, char *argv[]) {
     cout << "\n-= Starting RPC Server Mock =-\n\n";
     cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n";
 
-    /* configure the stateless object with 4 ports */
-    TrexStateless::configure(4);
-
     TrexRpcServerConfig rpc_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050);
     TrexRpcServer rpc(rpc_cfg);
 
index ff469d7..2ab0c5d 100644 (file)
@@ -55,7 +55,7 @@ TrexStateless::~TrexStateless() {
         delete m_ports[i];
     }
 
-    delete m_ports;
+    delete [] m_ports;
 }
 
 TrexStatelessPort * TrexStateless::get_port_by_id(uint8_t port_id) {
@@ -71,3 +71,47 @@ uint8_t TrexStateless::get_port_count() {
     return m_port_count;
 }
 
+/***************************
+ * trex stateless port
+ * 
+ **************************/
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) {
+    m_started = false;
+}
+
+
+/**
+ * starts the traffic on the port
+ * 
+ */
+TrexStatelessPort::traffic_rc_e
+TrexStatelessPort::start_traffic(void) {
+    if (m_started) {
+        return (TRAFFIC_ERR_ALREADY_STARTED);
+    }
+
+    if (get_stream_table()->size() == 0) {
+        return (TRAFFIC_ERR_NO_STREAMS);
+    }
+
+    m_started = true;
+
+    return (TRAFFIC_OK);
+}
+
+void 
+TrexStatelessPort::stop_traffic(void) {
+    if (m_started) {
+        m_started = false;
+    }
+}
+
+/**
+* access the stream table
+* 
+*/
+TrexStreamTable * TrexStatelessPort::get_stream_table() {
+    return &m_stream_table;
+}
+
+
index edd7b05..358ab33 100644 (file)
@@ -49,20 +49,32 @@ public:
 class TrexStatelessPort {
 public:
 
-    TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) {
-    }
+    /**
+     * describess error codes for starting traffic
+     */
+    enum traffic_rc_e {
+        TRAFFIC_OK,
+        TRAFFIC_ERR_ALREADY_STARTED,
+        TRAFFIC_ERR_NO_STREAMS,
+        TRAFFIC_ERR_FAILED_TO_COMPILE_STREAMS
+    };
+
+    TrexStatelessPort(uint8_t port_id);
+
+    traffic_rc_e start_traffic(void);
+
+    void stop_traffic(void);
 
     /**
      * access the stream table
      * 
      */
-    TrexStreamTable *get_stream_table() {
-        return &m_stream_table;
-    }
+    TrexStreamTable *get_stream_table();
 
 private:
     TrexStreamTable  m_stream_table;
     uint8_t          m_port_id;
+    bool             m_started;
 };
 
 /**
index b391977..2b5b242 100644 (file)
@@ -99,3 +99,7 @@ void TrexStreamTable::get_stream_list(std::vector<uint32_t> &stream_list) {
         stream_list.push_back(stream.first);
     }
 }
+
+int TrexStreamTable::size() {
+    return m_stream_table.size();
+}
index 97e0b7f..c924899 100644 (file)
@@ -35,6 +35,7 @@ class TrexRpcCmdAddStream;
 class TrexStream {
     /* provide the RPC parser a way to access private fields */
     friend class TrexRpcCmdAddStream;
+    friend class TrexRpcCmdGetStream;
     friend class TrexStreamTable;
 
 public:
@@ -53,7 +54,7 @@ private:
 
     /* config fields */
     double        m_isg_usec;
-    uint32_t      m_next_stream_id;
+    int           m_next_stream_id;
 
     /* indicators */
     bool          m_enabled;
@@ -87,6 +88,11 @@ class TrexStreamContinuous : public TrexStream {
 public:
     TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, uint32_t pps) : TrexStream(port_id, stream_id), m_pps(pps) {
     }
+
+    uint32_t get_pps() {
+        return m_pps;
+    }
+
 protected:
     uint32_t m_pps;
 };
@@ -171,6 +177,12 @@ public:
      */
     void get_stream_list(std::vector<uint32_t> &stream_list);
 
+    /**
+     * get the table size
+     * 
+     */
+    int size();
+
 private:
     /**
      * holds all the stream in a hash table by stream id