many bugs uncovered by the PCAP injection:
authorimarom <[email protected]>
Mon, 15 Feb 2016 08:12:29 +0000 (03:12 -0500)
committerimarom <[email protected]>
Mon, 15 Feb 2016 12:11:26 +0000 (07:11 -0500)
1. NamedTuple constructor
2. Scappy
3. zlib for server

12 files changed:
linux/ws_main.py
linux_dpdk/ws_main.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py
src/rpc-server/trex_rpc_req_resp_server.cpp
src/rpc-server/trex_rpc_req_resp_server.h
src/rpc-server/trex_rpc_zip.cpp [new file with mode: 0644]
src/rpc-server/trex_rpc_zip.h [new file with mode: 0644]

index 5636965..ce7140b 100755 (executable)
@@ -170,6 +170,7 @@ rpc_server_src = SrcGroup(dir='src/rpc-server/',
                               'trex_rpc_jsonrpc_v2_parser.cpp',
                               'trex_rpc_cmds_table.cpp',
                               'trex_rpc_cmd.cpp',
+                              'trex_rpc_zip.cpp',
 
                               'commands/trex_rpc_cmd_test.cpp',
                               'commands/trex_rpc_cmd_general.cpp',
@@ -428,6 +429,7 @@ def build_prog (bld, build_obj):
                 linkflags = build_obj.get_link_flags(),
                 source = build_obj.get_src(),
                 use = build_obj.get_use_libs(),
+                lib = ['z'],
                 rpath  = bld.env.RPATH + build_obj.get_rpath(),
                 target = build_obj.get_target())
 
index 81bc6b2..aec325d 100755 (executable)
@@ -153,6 +153,7 @@ rpc_server_src = SrcGroup(dir='src/rpc-server/',
                               'trex_rpc_jsonrpc_v2_parser.cpp',
                               'trex_rpc_cmds_table.cpp',
                               'trex_rpc_cmd.cpp',
+                              'trex_rpc_zip.cpp',
 
                               'commands/trex_rpc_cmd_test.cpp',
                               'commands/trex_rpc_cmd_general.cpp',
@@ -696,7 +697,7 @@ def build_prog (bld, build_obj):
                 includes =includes_path,
                 cxxflags =(build_obj.get_cxx_flags()+['-std=gnu++11',]),
                 linkflags = build_obj.get_link_flags() ,
-                lib=['pthread','dl'],
+                lib=['pthread','dl', 'z'],
                 use =[build_obj.get_dpdk_target(),'zmq'],
                 source = bp.file_list(top),
                 target = build_obj.get_target())
index 5096e33..46d33e3 100644 (file)
@@ -2006,6 +2006,7 @@ class STLClient(object):
         try:
             # pcap injection removes all previous streams from the ports
             self.remove_all_streams(ports = opts.ports)
+            
             profile = STLProfile.load_pcap(opts.file[0],
                                            opts.ipg_usec,
                                            opts.speedup,
@@ -2018,4 +2019,5 @@ class STLClient(object):
             print e.brief()
             return
 
+        return True
 
index ab3c728..96d1854 100644 (file)
@@ -3,10 +3,12 @@
 import zmq
 import json
 import re
-from time import sleep
 from collections import namedtuple
 from trex_stl_types import *
 from utils.common import random_id_gen
+import zlib
+import struct
+
 
 class bcolors:
     BLUE = '\033[94m'
@@ -35,12 +37,15 @@ class BatchMessage(object):
 
         msg = json.dumps(self.batch_list)
 
-        return self.rpc_client.send_raw_msg(msg)
+        return self.rpc_client.send_msg(msg)
 
 
 # JSON RPC v2.0 client
 class JsonRpcClient(object):
 
+    MSG_COMPRESS_THRESHOLD = 4096
+    MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA
+
     def __init__ (self, default_server, default_port, logger):
         self.logger = logger
         self.connected = False
@@ -109,14 +114,64 @@ class JsonRpcClient(object):
 
         id, msg = self.create_jsonrpc_v2(method_name, params)
 
-        return self.send_raw_msg(msg)
+        return self.send_msg(msg)
+
+
+    def compress_msg (self, msg):
+        # compress
+        compressed = zlib.compress(msg)
+        new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed
+        return new_msg
+
+
+    def decompress_msg (self, msg):
+        if len(msg) < 8:
+            return None
+
+        t = struct.unpack(">II", msg[:8])
+        if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC):
+            return None
+
+        x = zlib.decompress(msg[8:])
+        if len(x) != t[1]:
+            return None
+
+        return x
+
+    def send_msg (self, msg):
+        # print before
+        if self.logger.check_verbose(self.logger.VERBOSE_HIGH):
+            self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
+
+        if len(msg) > self.MSG_COMPRESS_THRESHOLD:
+            response = self.send_raw_msg(self.compress_msg(msg))
+            if response:
+                response = self.decompress_msg(response)
+        else:
+            response = self.send_raw_msg(msg)
+
+        if response == None:
+            return RC_ERR("*** [RPC] - Failed to decode response from server")
+
+
+        # print after
+        if self.logger.check_verbose(self.logger.VERBOSE_HIGH):
+            self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
+
+        # process response (batch and regular)
+       
+        response_json = json.loads(response)
+
+        if isinstance(response_json, list):
+            return self.process_batch_response(response_json)
+        else:
+            return self.process_single_response(response_json)
+
 
 
     # low level send of string message
     def send_raw_msg (self, msg):
 
-        self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
-
         tries = 0
         while True:
             try:
@@ -141,26 +196,11 @@ class JsonRpcClient(object):
                     return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))
 
 
-        self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
-
-        # decode
-
-        # batch ?
-        response_json = json.loads(response)
-
-        if isinstance(response_json, list):
-            rc_batch = RC()
-
-            for single_response in response_json:
-                rc = self.process_single_response(single_response)
-                rc_batch.add(rc)
-
-            return rc_batch
-
-        else:
-            return self.process_single_response(response_json)
-
+        return response
+       
+     
 
+    # processs a single response from server
     def process_single_response (self, response_json):
 
         if (response_json.get("jsonrpc") != "2.0"):
@@ -182,6 +222,17 @@ class JsonRpcClient(object):
 
   
 
+    # process a batch response
+    def process_batch_response (self, response_json):
+        rc_batch = RC()
+
+        for single_response in response_json:
+            rc = self.process_single_response(single_response)
+            rc_batch.add(rc)
+
+        return rc_batch
+
+
     def disconnect (self):
         if self.connected:
             self.socket.close(linger = 0)
index 56657e2..ce7a630 100644 (file)
@@ -3,6 +3,7 @@ from collections import namedtuple, OrderedDict
 
 import trex_stl_stats
 from trex_stl_types import *
+import time
 
 StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
 
@@ -199,14 +200,13 @@ class Port(object):
             batch.append(cmd)
 
             # meta data for show streams
-            self.streams[stream.get_id()] = StreamOnPort(stream.to_json(),
-                                                         Port._generate_stream_metadata(stream))
+            #self.streams[stream.get_id()] = StreamOnPort(stream.to_json(),
+            #                                             Port._generate_stream_metadata(stream))
 
         rc = self.transmit_batch(batch)
         if not rc:
             return self.err(rc.err())
 
-        
 
         # the only valid state now
         self.state = self.STATE_STREAMS
index 4d720aa..086e46a 100644 (file)
@@ -27,7 +27,8 @@ from trex_stl_client import STLClient
 
 import re
 import json
-
+import zlib
+import struct
 
 
 import argparse
@@ -221,7 +222,12 @@ class STLSim(object):
 
         # write to temp file
         f = tempfile.NamedTemporaryFile(delete = False)
-        f.write(json.dumps(cmds_json))
+
+        msg = json.dumps(cmds_json)
+        compressed = zlib.compress(msg)
+        new_msg = struct.pack(">II", 0xABE85CEA, len(msg)) + compressed
+
+        f.write(new_msg)
         f.close()
 
         # launch bp-sim
index 1a46aae..efeb5c8 100644 (file)
@@ -157,7 +157,10 @@ class STLStream(object):
         # packet and VM
         self.fields['packet'] = packet.dump_pkt()
         self.fields['vm']     = packet.get_vm_data()
-        self.packet_desc      = packet.pkt_layers_desc()
+
+
+        # this is heavy, calculate lazy
+        self.packet_desc = None
 
         if not rx_stats:
             self.fields['rx_stats'] = {}
@@ -194,6 +197,9 @@ class STLStream(object):
         return self.next
 
     def get_pkt_type (self):
+        if self.packet_desc == None:
+            self.packet_desc = CScapyTRexPktBuilder.pkt_layers_desc_from_buffer(base64.b64decode(self.fields['packet']['binary']))
+
         return self.packet_desc
 
     def get_pkt_len (self, count_crc = True):
@@ -432,14 +438,17 @@ class STLProfile(object):
             else:
                 next = i + 1
 
+            
             streams.append(STLStream(name = i,
                                      packet = CScapyTRexPktBuilder(pkt_buffer = cap),
                                      mode = STLTXSingleBurst(total_pkts = 1),
                                      self_start = True if (i == 1) else False,
                                      isg = (ts_usec - last_ts_usec),  # seconds to usec
                                      next = next))
+        
             last_ts_usec = ts_usec
 
+
         return STLProfile(streams)
 
       
index d387ac9..0a6e64f 100644 (file)
@@ -4,6 +4,7 @@ from utils.text_opts import *
 from trex_stl_exceptions import *
 
 RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+TupleRC    = namedtuple('RC', ['rc', 'data', 'is_warn'])
 
 class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
         __slots__ = ()
@@ -19,8 +20,7 @@ class RC():
         self.rc_list = []
 
         if (rc != None):
-            tuple_rc = namedtuple('RC', ['rc', 'data', 'is_warn'])
-            self.rc_list.append(tuple_rc(rc, data, is_warn))
+            self.rc_list.append(TupleRC(rc, data, is_warn))
 
     def __nonzero__ (self):
         return self.good()
index 1e8e177..da7e8c5 100644 (file)
@@ -22,6 +22,7 @@ limitations under the License.
 #include <trex_rpc_server_api.h>
 #include <trex_rpc_req_resp_server.h>
 #include <trex_rpc_jsonrpc_v2_parser.h>
+#include <trex_rpc_zip.h>
 
 #include <unistd.h>
 #include <sstream>
@@ -138,20 +139,33 @@ void TrexRpcServerReqRes::_stop_rpc_thread() {
  * respondes to the request
  */
 void TrexRpcServerReqRes::handle_request(const std::string &request) {
-    std::string response_str = process_request(request);
-    zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
+    std::string response;
+
+    process_request(request, response);
+
+    zmq_send(m_socket, response.c_str(), response.size(), 0);
+}
+
+void TrexRpcServerReqRes::process_request(const std::string &request, std::string &response) {
+
+    if (TrexRpcZip::is_compressed(request)) {
+        process_zipped_request(request, response);
+    } else {
+        process_request_raw(request, response);
+    }
+
 }
 
 /**
  * main processing of the request
  * 
  */
-std::string TrexRpcServerReqRes::process_request(const std::string &request) {
+void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::string &response) {
 
     std::vector<TrexJsonRpcV2ParsedObject *> commands;
 
     Json::FastWriter writer;
-    Json::Value response;
+    Json::Value response_json;
 
     /* first parse the request using JSON RPC V2 parser */
     TrexJsonRpcV2Parser rpc_request(request);
@@ -171,7 +185,7 @@ std::string TrexRpcServerReqRes::process_request(const std::string &request) {
         command->execute(single_response);
         delete command;
 
-        response[index++] = single_response;
+        response_json[index++] = single_response;
 
     }
 
@@ -181,17 +195,32 @@ std::string TrexRpcServerReqRes::process_request(const std::string &request) {
     }
 
     /* write the JSON to string and sever on ZMQ */
-    std::string response_str;
 
     if (response.size() == 1) {
-        response_str = writer.write(response[0]);
+        response = writer.write(response_json[0]);
     } else {
-        response_str = writer.write(response);
+        response = writer.write(response_json);
     }
     
-    verbose_json("Server Replied:  ", response_str);
+    verbose_json("Server Replied:  ", response);
+
+}
+
+void TrexRpcServerReqRes::process_zipped_request(const std::string &request, std::string &response) {
+    std::string unzipped;
+
+    /* try to uncomrpess - if fails, last shot is the JSON RPC */
+    bool rc = TrexRpcZip::uncompress(request, unzipped);
+    if (!rc) {
+        return process_request_raw(request, response);
+    }
+
+    /* process the request */
+    std::string raw_response;
+    process_request_raw(unzipped, raw_response);
+
+    TrexRpcZip::compress(raw_response, response);
 
-    return response_str;
 }
 
 /**
@@ -218,7 +247,11 @@ TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) {
 
 std::string
 TrexRpcServerReqRes::test_inject_request(const std::string &req) {
-    return process_request(req);
+    std::string response;
+
+    process_request(req, response);
+
+    return response;
 }
 
 
index 97efbe0..979bf9a 100644 (file)
@@ -45,7 +45,9 @@ protected:
 
     bool fetch_one_request(std::string &msg);
     void handle_request(const std::string &request);
-    std::string process_request(const std::string &request);
+    void process_request(const std::string &request, std::string &response);
+    void process_request_raw(const std::string &request, std::string &response);
+    void process_zipped_request(const std::string &request, std::string &response);
 
     void handle_server_error(const std::string &specific_err);
 
diff --git a/src/rpc-server/trex_rpc_zip.cpp b/src/rpc-server/trex_rpc_zip.cpp
new file mode 100644 (file)
index 0000000..ef5c483
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "trex_rpc_zip.h"
+#include <zlib.h>
+#include <arpa/inet.h>
+#include <iostream>
+
+bool
+TrexRpcZip::is_compressed(const std::string &input) {
+    /* check for minimum size */
+    if (input.size() < sizeof(header_st)) {
+        return false;
+    }
+
+    /* cast */
+    const header_st *header = (header_st *)input.c_str();
+
+    /* check magic */
+    uint32_t magic = ntohl(header->magic);
+    if (magic != G_HEADER_MAGIC) {
+        return false;
+    }
+
+    return true;
+}
+
+bool
+TrexRpcZip::uncompress(const std::string &input, std::string &output) {
+
+    /* sanity check first */
+    if (!is_compressed(input)) {
+        return false;
+    }
+
+    /* cast */
+    const header_st *header = (header_st *)input.c_str();
+
+    /* original size */
+    uint32_t uncmp_size = ntohl(header->uncmp_size);
+
+    /* alocate dynamic space for the uncomrpessed buffer */
+    Bytef *u_buffer = new Bytef[uncmp_size];
+    if (!u_buffer) {
+        return false;
+    }
+
+    /* set the target buffer size */
+    uLongf dest_len = uncmp_size;
+
+    /* try to uncompress */
+    int z_err = ::uncompress(u_buffer,
+                             &dest_len,
+                             (const Bytef *)header->data,
+                             (uLong)input.size() - sizeof(header_st));
+
+    if (z_err != Z_OK) {
+        delete [] u_buffer;
+        return false;
+    }
+
+    output.append((const char *)u_buffer, dest_len);
+
+    delete [] u_buffer;
+    return true;
+}
+
+
+bool
+TrexRpcZip::compress(const std::string &input, std::string &output) {
+
+    /* get a bound */
+    int bound_size = compressBound((uLong)input.size()) + sizeof(header_st);
+
+    /* alocate dynamic space for the uncomrpessed buffer */
+    char *buffer = new char[bound_size];
+    if (!buffer) {
+        return false;
+    }
+
+    header_st *header = (header_st *)buffer;
+    uLongf destLen    = bound_size;
+    
+    int z_err = ::compress((Bytef *)header->data,
+                           &destLen,
+                           (const Bytef *)input.c_str(),
+                           (uLong)input.size());
+
+    if (z_err != Z_OK) {
+        delete [] buffer;
+        return false;
+    }
+
+    /* terminate string */
+    header->data[destLen] = 0;
+
+    /* add the header */
+    header->magic      = htonl(G_HEADER_MAGIC);
+    header->uncmp_size = htonl(input.size());
+    
+    output.append((const char *)header, bound_size);
+
+    delete [] buffer;
+
+    return true;    
+}
diff --git a/src/rpc-server/trex_rpc_zip.h b/src/rpc-server/trex_rpc_zip.h
new file mode 100644 (file)
index 0000000..4b9930b
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#include <string>
+
+class TrexRpcZip {
+public:
+
+    /**
+     * return true if message is compressed
+     * 
+     */
+    static bool is_compressed(const std::string &input);
+
+    /**
+     * uncompress an 'input' to 'output'
+     * on success return true else returns false
+     */
+    static bool uncompress(const std::string &input, std::string &output);
+
+    /**
+     * compress 'input' to 'output'
+     * on success return true else returns false
+     */
+    static bool compress(const std::string &input, std::string &output);
+
+private:
+
+    /**
+     * packed header for reading binary compressed messages
+     * 
+     * @author imarom (15-Feb-16)
+     */
+    struct header_st {
+        uint32_t    magic;
+        uint32_t    uncmp_size;
+        char        data[0];
+    } __attribute__((packed));
+
+  
+    static const uint32_t G_HEADER_MAGIC = 0xABE85CEA;
+};
+