'trex_rpc_jsonrpc_v2_parser.cpp',
'trex_rpc_cmds_table.cpp',
'trex_rpc_cmd.cpp',
+ 'trex_rpc_zip.cpp',
'commands/trex_rpc_cmd_test.cpp',
'commands/trex_rpc_cmd_general.cpp',
linkflags = build_obj.get_link_flags(),
source = build_obj.get_src(),
use = build_obj.get_use_libs(),
+ lib = ['z'],
rpath = bld.env.RPATH + build_obj.get_rpath(),
target = build_obj.get_target())
'trex_rpc_jsonrpc_v2_parser.cpp',
'trex_rpc_cmds_table.cpp',
'trex_rpc_cmd.cpp',
+ 'trex_rpc_zip.cpp',
'commands/trex_rpc_cmd_test.cpp',
'commands/trex_rpc_cmd_general.cpp',
includes =includes_path,
cxxflags =(build_obj.get_cxx_flags()+['-std=gnu++11',]),
linkflags = build_obj.get_link_flags() ,
- lib=['pthread','dl'],
+ lib=['pthread','dl', 'z'],
use =[build_obj.get_dpdk_target(),'zmq'],
source = bp.file_list(top),
target = build_obj.get_target())
try:
# pcap injection removes all previous streams from the ports
self.remove_all_streams(ports = opts.ports)
+
profile = STLProfile.load_pcap(opts.file[0],
opts.ipg_usec,
opts.speedup,
print e.brief()
return
+ return True
import zmq
import json
import re
-from time import sleep
from collections import namedtuple
from trex_stl_types import *
from utils.common import random_id_gen
+import zlib
+import struct
+
class bcolors:
BLUE = '\033[94m'
msg = json.dumps(self.batch_list)
- return self.rpc_client.send_raw_msg(msg)
+ return self.rpc_client.send_msg(msg)
# JSON RPC v2.0 client
class JsonRpcClient(object):
+ MSG_COMPRESS_THRESHOLD = 4096
+ MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA
+
def __init__ (self, default_server, default_port, logger):
self.logger = logger
self.connected = False
id, msg = self.create_jsonrpc_v2(method_name, params)
- return self.send_raw_msg(msg)
+ return self.send_msg(msg)
+
+
+ def compress_msg (self, msg):
+ # compress
+ compressed = zlib.compress(msg)
+ new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed
+ return new_msg
+
+
+ def decompress_msg (self, msg):
+ if len(msg) < 8:
+ return None
+
+ t = struct.unpack(">II", msg[:8])
+ if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC):
+ return None
+
+ x = zlib.decompress(msg[8:])
+ if len(x) != t[1]:
+ return None
+
+ return x
+
+ def send_msg (self, msg):
+ # print before
+ if self.logger.check_verbose(self.logger.VERBOSE_HIGH):
+ self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
+
+ if len(msg) > self.MSG_COMPRESS_THRESHOLD:
+ response = self.send_raw_msg(self.compress_msg(msg))
+ if response:
+ response = self.decompress_msg(response)
+ else:
+ response = self.send_raw_msg(msg)
+
+ if response == None:
+ return RC_ERR("*** [RPC] - Failed to decode response from server")
+
+
+ # print after
+ if self.logger.check_verbose(self.logger.VERBOSE_HIGH):
+ self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
+
+ # process response (batch and regular)
+
+ response_json = json.loads(response)
+
+ if isinstance(response_json, list):
+ return self.process_batch_response(response_json)
+ else:
+ return self.process_single_response(response_json)
+
# low level send of string message
def send_raw_msg (self, msg):
- self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
-
tries = 0
while True:
try:
return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))
- self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
-
- # decode
-
- # batch ?
- response_json = json.loads(response)
-
- if isinstance(response_json, list):
- rc_batch = RC()
-
- for single_response in response_json:
- rc = self.process_single_response(single_response)
- rc_batch.add(rc)
-
- return rc_batch
-
- else:
- return self.process_single_response(response_json)
-
+ return response
+
+
+ # processs a single response from server
def process_single_response (self, response_json):
if (response_json.get("jsonrpc") != "2.0"):
+ # process a batch response
+ def process_batch_response (self, response_json):
+ rc_batch = RC()
+
+ for single_response in response_json:
+ rc = self.process_single_response(single_response)
+ rc_batch.add(rc)
+
+ return rc_batch
+
+
def disconnect (self):
if self.connected:
self.socket.close(linger = 0)
import trex_stl_stats
from trex_stl_types import *
+import time
StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
batch.append(cmd)
# meta data for show streams
- self.streams[stream.get_id()] = StreamOnPort(stream.to_json(),
- Port._generate_stream_metadata(stream))
+ #self.streams[stream.get_id()] = StreamOnPort(stream.to_json(),
+ # Port._generate_stream_metadata(stream))
rc = self.transmit_batch(batch)
if not rc:
return self.err(rc.err())
-
# the only valid state now
self.state = self.STATE_STREAMS
import re
import json
-
+import zlib
+import struct
import argparse
# write to temp file
f = tempfile.NamedTemporaryFile(delete = False)
- f.write(json.dumps(cmds_json))
+
+ msg = json.dumps(cmds_json)
+ compressed = zlib.compress(msg)
+ new_msg = struct.pack(">II", 0xABE85CEA, len(msg)) + compressed
+
+ f.write(new_msg)
f.close()
# launch bp-sim
# packet and VM
self.fields['packet'] = packet.dump_pkt()
self.fields['vm'] = packet.get_vm_data()
- self.packet_desc = packet.pkt_layers_desc()
+
+
+ # this is heavy, calculate lazy
+ self.packet_desc = None
if not rx_stats:
self.fields['rx_stats'] = {}
return self.next
def get_pkt_type (self):
+ if self.packet_desc == None:
+ self.packet_desc = CScapyTRexPktBuilder.pkt_layers_desc_from_buffer(base64.b64decode(self.fields['packet']['binary']))
+
return self.packet_desc
def get_pkt_len (self, count_crc = True):
else:
next = i + 1
+
streams.append(STLStream(name = i,
packet = CScapyTRexPktBuilder(pkt_buffer = cap),
mode = STLTXSingleBurst(total_pkts = 1),
self_start = True if (i == 1) else False,
isg = (ts_usec - last_ts_usec), # seconds to usec
next = next))
+
last_ts_usec = ts_usec
+
return STLProfile(streams)
from trex_stl_exceptions import *
RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+TupleRC = namedtuple('RC', ['rc', 'data', 'is_warn'])
class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
__slots__ = ()
self.rc_list = []
if (rc != None):
- tuple_rc = namedtuple('RC', ['rc', 'data', 'is_warn'])
- self.rc_list.append(tuple_rc(rc, data, is_warn))
+ self.rc_list.append(TupleRC(rc, data, is_warn))
def __nonzero__ (self):
return self.good()
#include <trex_rpc_server_api.h>
#include <trex_rpc_req_resp_server.h>
#include <trex_rpc_jsonrpc_v2_parser.h>
+#include <trex_rpc_zip.h>
#include <unistd.h>
#include <sstream>
* respondes to the request
*/
void TrexRpcServerReqRes::handle_request(const std::string &request) {
- std::string response_str = process_request(request);
- zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
+ std::string response;
+
+ process_request(request, response);
+
+ zmq_send(m_socket, response.c_str(), response.size(), 0);
+}
+
+void TrexRpcServerReqRes::process_request(const std::string &request, std::string &response) {
+
+ if (TrexRpcZip::is_compressed(request)) {
+ process_zipped_request(request, response);
+ } else {
+ process_request_raw(request, response);
+ }
+
}
/**
* main processing of the request
*
*/
-std::string TrexRpcServerReqRes::process_request(const std::string &request) {
+void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::string &response) {
std::vector<TrexJsonRpcV2ParsedObject *> commands;
Json::FastWriter writer;
- Json::Value response;
+ Json::Value response_json;
/* first parse the request using JSON RPC V2 parser */
TrexJsonRpcV2Parser rpc_request(request);
command->execute(single_response);
delete command;
- response[index++] = single_response;
+ response_json[index++] = single_response;
}
}
/* write the JSON to string and sever on ZMQ */
- std::string response_str;
if (response.size() == 1) {
- response_str = writer.write(response[0]);
+ response = writer.write(response_json[0]);
} else {
- response_str = writer.write(response);
+ response = writer.write(response_json);
}
- verbose_json("Server Replied: ", response_str);
+ verbose_json("Server Replied: ", response);
+
+}
+
+void TrexRpcServerReqRes::process_zipped_request(const std::string &request, std::string &response) {
+ std::string unzipped;
+
+ /* try to uncomrpess - if fails, last shot is the JSON RPC */
+ bool rc = TrexRpcZip::uncompress(request, unzipped);
+ if (!rc) {
+ return process_request_raw(request, response);
+ }
+
+ /* process the request */
+ std::string raw_response;
+ process_request_raw(unzipped, raw_response);
+
+ TrexRpcZip::compress(raw_response, response);
- return response_str;
}
/**
std::string
TrexRpcServerReqRes::test_inject_request(const std::string &req) {
- return process_request(req);
+ std::string response;
+
+ process_request(req, response);
+
+ return response;
}
bool fetch_one_request(std::string &msg);
void handle_request(const std::string &request);
- std::string process_request(const std::string &request);
+ void process_request(const std::string &request, std::string &response);
+ void process_request_raw(const std::string &request, std::string &response);
+ void process_zipped_request(const std::string &request, std::string &response);
void handle_server_error(const std::string &specific_err);
--- /dev/null
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "trex_rpc_zip.h"
+#include <zlib.h>
+#include <arpa/inet.h>
+#include <iostream>
+
+bool
+TrexRpcZip::is_compressed(const std::string &input) {
+ /* check for minimum size */
+ if (input.size() < sizeof(header_st)) {
+ return false;
+ }
+
+ /* cast */
+ const header_st *header = (header_st *)input.c_str();
+
+ /* check magic */
+ uint32_t magic = ntohl(header->magic);
+ if (magic != G_HEADER_MAGIC) {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+TrexRpcZip::uncompress(const std::string &input, std::string &output) {
+
+ /* sanity check first */
+ if (!is_compressed(input)) {
+ return false;
+ }
+
+ /* cast */
+ const header_st *header = (header_st *)input.c_str();
+
+ /* original size */
+ uint32_t uncmp_size = ntohl(header->uncmp_size);
+
+ /* alocate dynamic space for the uncomrpessed buffer */
+ Bytef *u_buffer = new Bytef[uncmp_size];
+ if (!u_buffer) {
+ return false;
+ }
+
+ /* set the target buffer size */
+ uLongf dest_len = uncmp_size;
+
+ /* try to uncompress */
+ int z_err = ::uncompress(u_buffer,
+ &dest_len,
+ (const Bytef *)header->data,
+ (uLong)input.size() - sizeof(header_st));
+
+ if (z_err != Z_OK) {
+ delete [] u_buffer;
+ return false;
+ }
+
+ output.append((const char *)u_buffer, dest_len);
+
+ delete [] u_buffer;
+ return true;
+}
+
+
+bool
+TrexRpcZip::compress(const std::string &input, std::string &output) {
+
+ /* get a bound */
+ int bound_size = compressBound((uLong)input.size()) + sizeof(header_st);
+
+ /* alocate dynamic space for the uncomrpessed buffer */
+ char *buffer = new char[bound_size];
+ if (!buffer) {
+ return false;
+ }
+
+ header_st *header = (header_st *)buffer;
+ uLongf destLen = bound_size;
+
+ int z_err = ::compress((Bytef *)header->data,
+ &destLen,
+ (const Bytef *)input.c_str(),
+ (uLong)input.size());
+
+ if (z_err != Z_OK) {
+ delete [] buffer;
+ return false;
+ }
+
+ /* terminate string */
+ header->data[destLen] = 0;
+
+ /* add the header */
+ header->magic = htonl(G_HEADER_MAGIC);
+ header->uncmp_size = htonl(input.size());
+
+ output.append((const char *)header, bound_size);
+
+ delete [] buffer;
+
+ return true;
+}
--- /dev/null
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#include <string>
+
+class TrexRpcZip {
+public:
+
+ /**
+ * return true if message is compressed
+ *
+ */
+ static bool is_compressed(const std::string &input);
+
+ /**
+ * uncompress an 'input' to 'output'
+ * on success return true else returns false
+ */
+ static bool uncompress(const std::string &input, std::string &output);
+
+ /**
+ * compress 'input' to 'output'
+ * on success return true else returns false
+ */
+ static bool compress(const std::string &input, std::string &output);
+
+private:
+
+ /**
+ * packed header for reading binary compressed messages
+ *
+ * @author imarom (15-Feb-16)
+ */
+ struct header_st {
+ uint32_t magic;
+ uint32_t uncmp_size;
+ char data[0];
+ } __attribute__((packed));
+
+
+ static const uint32_t G_HEADER_MAGIC = 0xABE85CEA;
+};
+