core mask - first phase
authorimarom <[email protected]>
Wed, 10 Aug 2016 14:45:36 +0000 (17:45 +0300)
committerimarom <[email protected]>
Mon, 15 Aug 2016 13:03:59 +0000 (16:03 +0300)
20 files changed:
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
src/bp_sim.h
src/main_dpdk.cpp
src/main_dpdk.h
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/rpc-server/commands/trex_rpc_cmd_stream.cpp
src/rpc-server/commands/trex_rpc_cmds.h
src/sim/trex_sim.h
src/sim/trex_sim_stateless.cpp
src/stateless/cp/trex_stateless_port.cpp
src/stateless/cp/trex_stateless_port.h
src/stateless/cp/trex_streams_compiler.cpp
src/stateless/cp/trex_streams_compiler.h
src/stateless/dp/trex_stateless_dp_core.h
src/trex_defs.h
src/utl_cpuu.cpp
src/utl_cpuu.h

index 611e48f..f0201d6 100755 (executable)
@@ -453,6 +453,10 @@ class CCommLink(object):
 class STLClient(object):
     """TRex Stateless client object - gives operations per TRex/user"""
 
+    # different modes for attaching traffic to ports
+    CORE_MASK_SPLIT = 1
+    CORE_MASK_PIN   = 2
+
     def __init__(self,
                  username = common.get_current_user(),
                  server = "localhost",
@@ -675,14 +679,31 @@ class STLClient(object):
         return self.ports[port_id].get_stream_id_list()
 
 
-    def __start (self, multiplier, duration, port_id_list = None, force = False):
+    def __start (self,
+                 multiplier,
+                 duration,
+                 port_id_list,
+                 force,
+                 core_mask):
 
         port_id_list = self.__ports(port_id_list)
 
         rc = RC()
 
+        ports_mask = {}
+        for port_id in port_id_list:
+            # a pin mode was requested and we have
+            # the second port from the group in the start list
+            if (core_mask == self.CORE_MASK_PIN) and ( (port_id ^ 0x1) in port_id_list ):
+                ports_mask[port_id] = 0x55555555 if( port_id % 2) == 0 else 0xAAAAAAAA
+            else:
+                ports_mask[port_id] = None
+
         for port_id in port_id_list:
-            rc.add(self.ports[port_id].start(multiplier, duration, force))
+            rc.add(self.ports[port_id].start(multiplier,
+                                             duration,
+                                             force,
+                                             ports_mask[port_id]))
 
         return rc
 
@@ -800,13 +821,14 @@ class STLClient(object):
 
         self.server_version = rc.data()
         self.global_stats.server_version = rc.data()
-
+        
         # cache system info
         rc = self._transmit("get_system_info")
         if not rc:
             return rc
 
         self.system_info = rc.data()
+        self.global_stats.system_info = rc.data()
 
         # cache supported commands
         rc = self._transmit("get_supported_cmds")
@@ -1901,7 +1923,8 @@ class STLClient(object):
                mult = "1",
                force = False,
                duration = -1,
-               total = False):
+               total = False,
+               core_mask = CORE_MASK_SPLIT):
         """
             Start traffic on port(s)
 
@@ -1927,6 +1950,12 @@ class STLClient(object):
                     True: Divide bandwidth among the ports
                     False: Duplicate
 
+                core_mask: CORE_MASK_SPLIT, CORE_MASK_PIN
+                    Determine the allocation of cores per port
+                    In CORE_MASK_SPLIT all the traffic will be divided equally between all the cores
+                    associated with each port
+                    In CORE_MASK_PIN, for each dual ports (a group that shares the same cores)
+                    the cores will be divided half pinned for each port
 
             :raises:
                 + :exc:`STLError`
@@ -1964,7 +1993,7 @@ class STLClient(object):
 
         # start traffic
         self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports))
-        rc = self.__start(mult_obj, duration, ports, force)
+        rc = self.__start(mult_obj, duration, ports, force, core_mask)
         self.logger.post_cmd(rc)
 
         if not rc:
@@ -2647,7 +2676,8 @@ class STLClient(object):
                                          parsing_opts.DURATION,
                                          parsing_opts.TUNABLES,
                                          parsing_opts.MULTIPLIER_STRICT,
-                                         parsing_opts.DRY_RUN)
+                                         parsing_opts.DRY_RUN,
+                                         parsing_opts.PIN_CORES)
 
         opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
         if not opts:
@@ -2712,7 +2742,8 @@ class STLClient(object):
                        opts.mult,
                        opts.force,
                        opts.duration,
-                       opts.total)
+                       opts.total,
+                       core_mask = self.CORE_MASK_PIN if opts.pin_cores else self.CORE_MASK_SPLIT)
 
         return RC_OK()
 
index d239fc5..556a14d 100644 (file)
@@ -100,7 +100,7 @@ class Port(object):
 
     # decorator to check server is readable (port not down and etc.)
     def writeable(func):
-        def func_wrapper(*args):
+        def func_wrapper(*args, **kwargs):
             port = args[0]
 
             if not port.is_up():
@@ -112,7 +112,7 @@ class Port(object):
             if not port.is_writeable():
                 return port.err("{0} - port is not in a writeable state".format(func.__name__))
 
-            return func(*args)
+            return func(*args, **kwargs)
 
         return func_wrapper
 
@@ -396,16 +396,17 @@ class Port(object):
 
 
     @writeable
-    def start (self, mul, duration, force):
+    def start (self, mul, duration, force, mask):
 
         if self.state == self.STATE_IDLE:
             return self.err("unable to start traffic - no streams attached to port")
 
-        params = {"handler":  self.handler,
-                  "port_id":  self.port_id,
-                  "mul":      mul,
-                  "duration": duration,
-                  "force":    force}
+        params = {"handler":    self.handler,
+                  "port_id":    self.port_id,
+                  "mul":        mul,
+                  "duration":   duration,
+                  "force":      force,
+                  "core_mask":  mask if mask is not None else ((1 << 64) - 1)}
    
         # must set this before to avoid race with the async response
         last_state = self.state
index 1bf0a9a..b321c00 100644 (file)
@@ -210,8 +210,11 @@ class CTRexInfoGenerator(object):
                              ("version", "{ver}, UUID: {uuid}".format(ver=global_stats.server_version.get("version", "N/A"),
                                                                       uuid="N/A")),
 
-                             ("cpu_util.", "{0}% {1}".format( format_threshold(round_float(global_stats.get("m_cpu_util")), [85, 100], [0, 85]),
-                                                              global_stats.get_trend_gui("m_cpu_util", use_raw = True))),
+                             ("cpu_util.", "{0}% @ {2} cores ({3} per port) {1}".format( format_threshold(round_float(global_stats.get("m_cpu_util")), [85, 100], [0, 85]),
+                                                                                         global_stats.get_trend_gui("m_cpu_util", use_raw = True),
+                                                                                         global_stats.system_info.get('dp_core_count'),
+                                                                                         global_stats.system_info.get('dp_core_count_per_port'),
+                                                                                         )),
 
                              ("rx_cpu_util.", "{0}% {1}".format( format_threshold(round_float(global_stats.get("m_rx_cpu_util")), [85, 100], [0, 85]),
                                                                 global_stats.get_trend_gui("m_rx_cpu_util", use_raw = True))),
@@ -234,7 +237,7 @@ class CTRexInfoGenerator(object):
                              ("total_pps", "{0} {1}".format( global_stats.get("m_tx_pps", format=True, suffix="pkt/sec"),
                                                               global_stats.get_trend_gui("m_tx_pps"))),
 
-                             ("  ", ""),
+                             #("  ", ""),
 
                              ("drop_rate", "{0}".format( format_num(global_stats.get("m_rx_drop_bps"),
                                                                     suffix = 'b/sec',
@@ -422,21 +425,39 @@ class CTRexInfoGenerator(object):
 
     def _generate_cpu_util_stats(self):
         util_stats = self._util_stats_ref.get_stats(use_1sec_cache = True)
+        
         stats_table = text_tables.TRexTextTable()
         if util_stats:
             if 'cpu' not in util_stats:
                 raise Exception("Excepting 'cpu' section in stats %s" % util_stats)
             cpu_stats = util_stats['cpu']
-            hist_len = len(cpu_stats[0])
+            hist_len = len(cpu_stats[0]["history"])
             avg_len = min(5, hist_len)
             show_len = min(15, hist_len)
             stats_table.header(['Thread', 'Avg', 'Latest'] + list(range(-1, 0 - show_len, -1)))
             stats_table.set_cols_align(['l'] + ['r'] * (show_len + 1))
-            stats_table.set_cols_width([8, 3, 6] + [3] * (show_len - 1))
+            stats_table.set_cols_width([10, 3, 6] + [3] * (show_len - 1))
             stats_table.set_cols_dtype(['t'] * (show_len + 2))
+
             for i in range(min(14, len(cpu_stats))):
-                avg = int(round(sum(cpu_stats[i][:avg_len]) / avg_len))
-                stats_table.add_row([i, avg] + cpu_stats[i][:show_len])
+                history = cpu_stats[i]["history"]
+                ports = cpu_stats[i]["ports"]
+                if not len(ports) == 2:
+                    sys.__stdout__.write(str(util_stats["cpu"]))
+                    exit(-1)
+
+                avg = int(round(sum(history[:avg_len]) / avg_len))
+
+                # decode active ports for core
+                if ports == [-1, -1]:
+                    interfaces = "(IDLE)"
+                elif not -1 in ports:
+                    interfaces = "({:},{:})".format(ports[0], ports[1])
+                else:
+                    interfaces = "({:})".format(ports[0] if ports[0] != -1 else ports[1])
+
+                thread = "{:2} {:^7}".format(i, interfaces)
+                stats_table.add_row([thread, avg] + history[:show_len])
         else:
             stats_table.add_row(['No Data.'])
         return {'cpu_util(%)': ExportableStats(None, stats_table)}
@@ -542,6 +563,7 @@ class CTRexInfoGenerator(object):
         per_field_stats = OrderedDict([("owner", []),
                                        ("state", []),
                                        ("speed", []),
+                                       ("CPU util.", []),
                                        ("--", []),
                                        ("Tx bps L2", []),
                                        ("Tx bps L1", []),
@@ -1037,7 +1059,8 @@ class CPortStats(CTRexStats):
         return {"owner": owner,
                 "state": "{0}".format(state),
                 "speed": self._port_obj.get_formatted_speed() if self._port_obj else '',
-
+                "CPU util.": "{0} {1}%".format(self.get_trend_gui("m_cpu_util", use_raw = True),
+                                               format_threshold(round_float(self.get("m_cpu_util")), [85, 100], [0, 85])) if self._port_obj else '' ,
                 "--": " ",
                 "---": " ",
                 "----": " ",
index cfe8a93..5126525 100755 (executable)
@@ -37,6 +37,7 @@ PROMISCUOUS_SWITCH = 21
 TUNABLES = 22
 REMOTE_FILE = 23
 LOCKED = 24
+PIN_CORES = 25
 
 GLOBAL_STATS = 50
 PORT_STATS = 51
@@ -323,6 +324,11 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
                                      'default': False,
                                      'help': "Dry run - no traffic will be injected"}),
 
+              PIN_CORES: ArgumentPack(['--pin'],
+                                      {'action': 'store_true',
+                                       'dest': 'pin_cores',
+                                       'default': False,
+                                       'help': "Pin cores to interfaces - cores will be divided between interfaces (performance boot for symetric profiles)"}),
 
               XTERM: ArgumentPack(['-x', '--xterm'],
                                   {'action': 'store_true',
index bfdd90f..e1852da 100755 (executable)
@@ -3835,6 +3835,45 @@ public:
         m_stateless_dp_info.stop_traffic(port_id, false, 0);
     }
 
+    /**
+     * return true if a core currently has some pending CP 
+     * messages 
+     */
+    bool are_any_pending_cp_messages() {
+        if (get_is_stateless()) {
+            return m_stateless_dp_info.are_any_pending_cp_messages();
+        } else {
+            /* for stateful this is always false */
+            return false;
+        }
+    }
+
+    /**
+     * a core provides services for two interfaces
+     * it can either be idle, active for one port 
+     * or active for both 
+     */
+    bool is_port_active(uint8_t port_id) {
+        /* for stateful (batch) core is always active,
+           for stateless relay the query to the next level
+         */
+        if (get_is_stateless()) {
+            return m_stateless_dp_info.is_port_active(port_id);
+        } else {
+            return true;
+        }
+    }
+
+
+    /**
+     * returns the two ports associated with this core
+     * 
+     */
+    void get_port_ids(uint8_t &p1, uint8_t &p2) {
+        p1 = 2 * getDualPortId();
+        p2 = p1 + 1;
+    }
+
     void Dump(FILE *fd);
     void DumpCsv(FILE *fd);
     void DumpStats(FILE *fd);
index caec511..349db11 100644 (file)
@@ -1316,6 +1316,8 @@ int CPhyEthIF::get_rx_stat_capabilities() {
     return get_ex_drv()->get_rx_stat_capabilities();
 }
 
+
+
 void CPhyEthIF::configure(uint16_t nb_rx_queue,
                           uint16_t nb_tx_queue,
                           const struct rte_eth_conf *eth_conf){
@@ -2436,6 +2438,8 @@ public:
 
     float     m_total_rx_bps;
     float     m_total_rx_pps;
+
+    float     m_cpu_util;
 };
 
 class CGlobalStats {
@@ -2601,6 +2605,7 @@ void CGlobalStats::dump_json(std::string & json, bool baseline){
         json+=GET_FIELD_PORT(i,m_total_tx_pps);
         json+=GET_FIELD_PORT(i,m_total_rx_bps);
         json+=GET_FIELD_PORT(i,m_total_rx_pps);
+        json+=GET_FIELD_PORT(i,m_cpu_util);
     }
     json+=m_template.dump_as_json("template");
     json+="\"unknown\":0}}"  ;
@@ -2906,6 +2911,7 @@ public:
     tx_per_flow_t get_flow_tx_stats(uint8_t port, uint16_t hw_id);
     tx_per_flow_t clear_flow_tx_stats(uint8_t port, uint16_t index, bool is_lat);
     void get_stats(CGlobalStats & stats);
+    float get_cpu_util_per_interface(uint8_t port_id);
     void dump_post_test_stats(FILE *fd);
     void dump_config(FILE *fd);
     void dump_links_status(FILE *fd);
@@ -3744,6 +3750,9 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
         for (uint16_t flow = MAX_FLOW_STATS; flow <= MAX_FLOW_STATS + max_stat_hw_id_seen_payload; flow++) {
             stats.m_port[i].m_tx_per_flow[flow].clear();
         }
+
+        stp->m_cpu_util = get_cpu_util_per_interface(i);
+
     }
 
     uint64_t total_open_flows=0;
@@ -3858,6 +3867,25 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
     stats.m_tx_expected_bps        = m_expected_bps*pf;
 }
 
+float
+CGlobalTRex::get_cpu_util_per_interface(uint8_t port_id) {
+    CPhyEthIF * _if = &m_ports[port_id];
+
+    float    tmp = 0;
+    uint8_t  cnt = 0;
+    for (const auto &p : _if->get_core_list()) {
+        uint8_t core_id = p.first;
+        CFlowGenListPerThread *lp = m_fl.m_threads_info[core_id];
+        if (lp->is_port_active(port_id)) {
+            tmp += lp->m_cpu_cp_u.GetVal();
+            cnt++;
+        }
+    }
+
+    return ( (cnt > 0) ? (tmp / cnt) : 0);
+
+}
+
 bool CGlobalTRex::sanity_check(){
 
     CFlowGenListPerThread   * lpt;
@@ -4442,6 +4470,36 @@ int CGlobalTRex::start_master_statefull() {
 ////////////////////////////////////////////
 static CGlobalTRex g_trex;
 
+bool CPhyEthIF::Create(uint8_t portid) {
+    m_port_id      = portid;
+    m_last_rx_rate = 0.0;
+    m_last_tx_rate = 0.0;
+    m_last_tx_pps  = 0.0;
+
+    return true;
+}
+
+const std::vector<std::pair<uint8_t, uint8_t>> &
+CPhyEthIF::get_core_list() {
+
+    /* lazy find */
+    if (m_core_id_list.size() == 0) {
+
+        for (uint8_t core_id = 0; core_id < g_trex.get_cores_tx(); core_id++) {
+
+            /* iterate over all the directions*/
+            for (uint8_t dir = 0 ; dir < CS_NUM; dir++) {
+                if (g_trex.m_cores_vif[core_id + 1]->get_ports()[dir].m_port->get_port_id() == m_port_id) {
+                    m_core_id_list.push_back(std::make_pair(core_id, dir));
+                }
+            }
+        }
+    }
+
+    return m_core_id_list;
+
+}
+
 int CPhyEthIF::reset_hw_flow_stats() {
     if (get_ex_drv()->hw_rx_stat_supported()) {
         get_ex_drv()->reset_rx_stats(this, m_stats.m_fdir_prev_pkts, 0, MAX_FLOW_STATS);
@@ -6015,25 +6073,17 @@ TrexDpdkPlatformApi::get_interface_stats(uint8_t interface_id, TrexPlatformInter
 
 uint8_t
 TrexDpdkPlatformApi::get_dp_core_count() const {
-    return CGlobalInfo::m_options.preview.getCores();
+    return CGlobalInfo::m_options.get_number_of_dp_cores_needed();
 }
 
 
 void
 TrexDpdkPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {
 
-    cores_id_list.clear();
-
-    /* iterate over all DP cores */
-    for (uint8_t core_id = 0; core_id < g_trex.get_cores_tx(); core_id++) {
+    CPhyEthIF *lpt = &g_trex.m_ports[port_id];
 
-        /* iterate over all the directions*/
-        for (uint8_t dir = 0 ; dir < CS_NUM; dir++) {
-            if (g_trex.m_cores_vif[core_id + 1]->get_ports()[dir].m_port->get_port_id() == port_id) {
-                cores_id_list.push_back(std::make_pair(core_id, dir));
-            }
-        }
-    }
+    /* copy data from the interface */
+    cores_id_list = lpt->get_core_list();
 }
 
 void
@@ -6133,10 +6183,23 @@ int TrexDpdkPlatformApi::get_active_pgids(flow_stat_active_t &result) const {
 }
 
 int TrexDpdkPlatformApi::get_cpu_util_full(cpu_util_full_t &cpu_util_full) const {
+    uint8_t p1;
+    uint8_t p2;
+
     cpu_util_full.resize((int)g_trex.m_fl.m_threads_info.size());
     for (int thread_id=0; thread_id<(int)g_trex.m_fl.m_threads_info.size(); thread_id++) {
-        CFlowGenListPerThread * lp=g_trex.m_fl.m_threads_info[thread_id];
-        lp->m_cpu_cp_u.GetHistory(cpu_util_full[thread_id]);
+
+        /* history */
+        CFlowGenListPerThread *lp = g_trex.m_fl.m_threads_info[thread_id];
+        cpu_vct_st &per_cpu = cpu_util_full[thread_id];
+        lp->m_cpu_cp_u.GetHistory(per_cpu);
+
+
+        /* active ports */
+        lp->get_port_ids(p1, p2);
+        per_cpu.m_port1 = (lp->is_port_active(p1) ? p1 : -1);
+        per_cpu.m_port2 = (lp->is_port_active(p2) ? p2 : -1);
+
     }
     return 0;
 }
@@ -6158,3 +6221,4 @@ CFlowStatParser *TrexDpdkPlatformApi::get_flow_stat_parser() const {
 void TrexDpdkPlatformApi::mark_for_shutdown() const {
     g_trex.mark_for_shutdown(CGlobalTRex::SHUTDOWN_RPC_REQ);
 }
+
index f8c3573..9161807 100644 (file)
@@ -55,13 +55,7 @@ class CPhyEthIF  {
         m_port_id=0;
         m_rx_queue=0;
     }
-    bool Create(uint8_t portid){
-        m_port_id      = portid;
-        m_last_rx_rate = 0.0;
-        m_last_tx_rate = 0.0;
-        m_last_tx_pps  = 0.0;
-        return (true);
-    }
+    bool Create(uint8_t portid);
     void Delete();
 
     void set_rx_queue(uint8_t rx_queue){
@@ -156,6 +150,9 @@ class CPhyEthIF  {
         return m_port_id;
     }
     int get_rx_stat_capabilities();
+
+    const std::vector<std::pair<uint8_t, uint8_t>> & get_core_list();
+
  private:
     uint8_t                  m_port_id;
     uint8_t                  m_rx_queue;
@@ -171,6 +168,10 @@ class CPhyEthIF  {
     float                    m_last_rx_rate;
     float                    m_last_tx_pps;
     float                    m_last_rx_pps;
+
+    /* holds the core ID list for this port - (core, dir) list*/
+    std::vector<std::pair<uint8_t, uint8_t>> m_core_id_list;
+
  public:
     struct rte_eth_dev_info  m_dev_info;
 };
index 080856c..21f64e2 100644 (file)
@@ -199,10 +199,18 @@ TrexRpcCmdGetUtilization::_run(const Json::Value &params, Json::Value &result) {
     }
 
     for (int thread_id = 0; thread_id < cpu_util_full.size(); thread_id++) {
-        for (int history_id = 0; history_id < cpu_util_full[thread_id].size(); history_id++) {
-            section["cpu"][thread_id].append(cpu_util_full[thread_id][history_id]);
+
+        /* history */
+        for (int history_id = 0; history_id < cpu_util_full[thread_id].m_history.size(); history_id++) {
+            section["cpu"][thread_id]["history"].append(cpu_util_full[thread_id].m_history[history_id]);
         }
+
+        /* ports */
+        section["cpu"][thread_id]["ports"] = Json::arrayValue;
+        section["cpu"][thread_id]["ports"].append(cpu_util_full[thread_id].m_port1);
+        section["cpu"][thread_id]["ports"].append(cpu_util_full[thread_id].m_port2);
     }
+    
     return (TREX_RPC_CMD_OK);
 }
 
@@ -270,6 +278,7 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
 
     /* FIXME: core count */
     section["dp_core_count"] = main->get_dp_core_count();
+    section["dp_core_count_per_port"] = main->get_dp_core_count() / (main->get_port_count() / 2);
     section["core_type"] = get_cpu_model();
 
     /* ports */
index 736f3d0..7e973e6 100644 (file)
@@ -542,8 +542,13 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
     uint8_t port_id = parse_port(params, result);
     TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
 
-    double duration  = parse_double(params, "duration", result);
-    bool   force     = parse_bool(params, "force", result);
+    double    duration    = parse_double(params, "duration", result);
+    bool      force       = parse_bool(params, "force", result);
+    uint64_t  core_mask   = parse_uint64(params, "core_mask", result);
+
+    if (!TrexDPCoreMask::is_valid_mask(port->get_dp_core_count(), core_mask)) {
+        generate_parse_err(result, "invalid core mask provided");
+    }
 
     /* multiplier */
     const Json::Value &mul_obj  = parse_object(params, "mul", result);
@@ -551,7 +556,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
     std::string type   = parse_choice(mul_obj, "type", TrexPortMultiplier::g_types, result);
     std::string op     = parse_string(mul_obj, "op", result);
     double      value  = parse_double(mul_obj, "value", result);
-
+    
     if ( value <=0 ){
         generate_parse_err(result, "multiplier can't be zero");
     }
@@ -563,7 +568,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
     TrexPortMultiplier mul(type, op, value);
 
     try {
-        port->start_traffic(mul, duration, force);
+        port->start_traffic(mul, duration, force, core_mask);
 
     } catch (const TrexException &ex) {
         generate_execute_err(result, ex.what());
index 24b9522..9dde61d 100644 (file)
@@ -121,7 +121,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, false, APIClass::API_C
 
 
 
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic,    "start_traffic",     4, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic,    "start_traffic",     5, true, APIClass::API_CLASS_TYPE_CORE);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic,     "stop_traffic",      1, true, APIClass::API_CLASS_TYPE_CORE);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveRXFilters, "remove_rx_filters", 1, true, APIClass::API_CLASS_TYPE_CORE);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdPauseTraffic,    "pause_traffic",     1, true, APIClass::API_CLASS_TYPE_CORE);
index 0c34326..f52ad9b 100644 (file)
@@ -143,6 +143,7 @@ private:
     void prepare_control_plane();
     void prepare_dataplane();
     void execute_json(const std::string &json_filename);
+    void find_active_dp_cores();
 
     void run_dp(const std::string &out_filename);
 
@@ -179,6 +180,8 @@ private:
     int              m_dp_core_index;
     uint64_t         m_limit;
     bool             m_is_dry_run;
+
+    std::vector<uint8_t> m_active_dp_cores;
 };
 
 #endif /* __TREX_SIM_H__ */
index 77bd4d7..20041c2 100644 (file)
@@ -123,14 +123,14 @@ public:
 ************************/
 
 SimStateless::SimStateless() {
-    m_publisher         = NULL;
-    m_dp_to_cp_handler  = NULL;
-    m_verbose           = false;
-    m_dp_core_count     = -1;
-    m_dp_core_index     = -1;
-    m_port_count        = -1;
-    m_limit             = 0;
-    m_is_dry_run        = false;
+    m_publisher                   = NULL;
+    m_dp_to_cp_handler            = NULL;
+    m_verbose                     = false;
+    m_dp_core_count               = -1;
+    m_dp_core_index               = -1;
+    m_port_count                  = -1;
+    m_limit                       = 0;
+    m_is_dry_run                  = false;
 
     /* override ownership checks */
     TrexRpcCommand::test_set_override_ownership(true);
@@ -138,6 +138,23 @@ SimStateless::SimStateless() {
 }
 
 
+/**
+ * on the simulation we first construct CP and then DP 
+ * the only way to "assume" which DP will be active during 
+ * the run is by checking for pending CP messages on the cores 
+ * 
+ * @author imarom (8/10/2016)
+ */
+void
+SimStateless::find_active_dp_cores() {
+    for (int core_index = 0; core_index < m_dp_core_count; core_index++) {
+        CFlowGenListPerThread *lpt = m_fl.m_threads_info[core_index];
+        if (lpt->are_any_pending_cp_messages()) {
+            m_active_dp_cores.push_back(core_index);
+        }
+    }
+}
+
 int
 SimStateless::run(const string &json_filename,
                   const string &out_filename,
@@ -168,6 +185,8 @@ SimStateless::run(const string &json_filename,
         return (-1);
     }
 
+    find_active_dp_cores();
+
     run_dp(out_filename);
 
     return 0;
@@ -353,14 +372,14 @@ SimStateless::run_dp(const std::string &out_filename) {
     show_intro(out_filename);
 
     if (is_multiple_capture()) {
-        for (int i = 0; i < m_dp_core_count; i++) {
+        for (int i : m_active_dp_cores) {
             std::stringstream ss;
             ss << out_filename << "-" << i;
             run_dp_core(i, ss.str(), core_stats, total);
         }
 
     } else {
-        for (int i = 0; i < m_dp_core_count; i++) {
+        for (int i : m_active_dp_cores) {
             run_dp_core(i, out_filename, core_stats, total);
         }
     }
@@ -414,9 +433,9 @@ SimStateless::get_limit_per_core(int core_index) {
     if (m_limit == 0) {
         return (0);
     } else {
-        uint64_t l = std::max((uint64_t)1, m_limit / m_dp_core_count);
-        if (core_index == 0) {
-            l += (m_limit % m_dp_core_count);
+        uint64_t l = std::max((uint64_t)1, m_limit / m_active_dp_cores.size());
+        if (core_index == m_active_dp_cores[0]) {
+            l += (m_limit % m_active_dp_cores.size());
         }
         return l;
     }
index 376453b..134d4c9 100644 (file)
@@ -241,7 +241,7 @@ TrexStatelessPort::release(void) {
  * 
  */
 void
-TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, bool force) {
+TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, bool force, uint64_t core_mask) {
 
     /* command allowed only on state stream */
     verify_state(PORT_STATE_STREAMS);
@@ -262,10 +262,12 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
     std::string fail_msg;
 
     TrexStreamsCompiler compiler;
+    TrexDPCoreMask mask(get_dp_core_count(), core_mask);
+
     bool rc = compiler.compile(m_port_id,
                                feeder.get_streams(),
                                compiled_objs,
-                               get_dp_core_count(),
+                               mask,
                                factor,
                                &fail_msg);
 
@@ -282,7 +284,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
 
     /* update object status */
     m_factor = factor;
-    m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
+    m_last_all_streams_continues = compiled_objs[mask.get_active_cores()[0]]->get_all_streams_continues();
     m_last_duration = duration;
 
     change_state(PORT_STATE_TX);
@@ -484,7 +486,7 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
     }
 
     TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
-    send_message_to_all_dp(update_msg);
+    send_message_to_all_dp(update_msg, true);
 
     m_factor *= factor;
 
@@ -820,13 +822,17 @@ TrexStatelessPort::validate(void) {
     }
 
     TrexStreamsCompiler compiler;
+
+    /* TODO: think of this mask...*/
+    TrexDPCoreMask core_mask(get_dp_core_count(), TrexDPCoreMask::MASK_ALL);
+
     std::vector<TrexStreamsCompiledObj *> compiled_objs;
 
     std::string fail_msg;
     bool rc = compiler.compile(m_port_id,
                                streams,
                                compiled_objs,
-                               get_dp_core_count(),
+                               core_mask,
                                1.0,
                                &fail_msg);
     if (!rc) {
index b1f6ddf..7d976e4 100644 (file)
@@ -178,7 +178,7 @@ public:
      * start traffic
      * throws TrexException in case of an error
      */
-    void start_traffic(const TrexPortMultiplier &mul, double duration, bool force = false);
+    void start_traffic(const TrexPortMultiplier &mul, double duration, bool force = false, uint64_t core_mask = UINT64_MAX);
 
     /**
      * stop traffic
@@ -376,11 +376,12 @@ public:
 
     void get_pci_info(std::string &pci_addr, int &numa_node);
 
+
 private:
 
     bool is_core_active(int core_id);
 
-    const std::vector<int> get_core_id_list () {
+    const std::vector<uint8_t> get_core_id_list () {
         return m_cores_id_list;
     }
 
@@ -446,7 +447,7 @@ private:
     uint16_t           m_rx_caps;
 
     /* holds the DP cores associated with this port */
-    std::vector<int>   m_cores_id_list;
+    std::vector<uint8_t>   m_cores_id_list;
 
     bool               m_last_all_streams_continues;
     double             m_last_duration;
index e54c5f9..97f7a25 100644 (file)
@@ -375,19 +375,29 @@ bool
 TrexStreamsCompiler::compile(uint8_t                                port_id,
                              const std::vector<TrexStream *>        &streams,
                              std::vector<TrexStreamsCompiledObj *>  &objs,
-                             uint8_t                                dp_core_count,
+                             TrexDPCoreMask                         &core_mask,
                              double                                 factor,
                              std::string                            *fail_msg) {
 
-    assert(dp_core_count > 0);
+    assert(core_mask.get_active_count() > 0);
+
+    uint8_t indirect_core_count = core_mask.get_active_count();
+    std::vector<TrexStreamsCompiledObj *> indirect_objs(indirect_core_count);
+    
+    for (int i = 0; i < indirect_core_count; i++) {
+        indirect_objs[i] = NULL;
+    }
 
     try {
-        return compile_internal(port_id,
-                                streams,
-                                objs,
-                                dp_core_count,
-                                factor,
-                                fail_msg);
+        bool rc = compile_internal(port_id,
+                                   streams,
+                                   indirect_objs,
+                                   indirect_core_count,
+                                   factor,
+                                   fail_msg);
+        if (!rc) {
+            return rc;
+        }
 
     } catch (const TrexException &ex) {
         if (fail_msg) {
@@ -398,6 +408,21 @@ TrexStreamsCompiler::compile(uint8_t                                port_id,
         return false;
     }
 
+    /* prepare the result */
+    objs.resize(core_mask.get_total_count());
+    for (int i = 0; i < core_mask.get_total_count(); i++) {
+        objs.push_back(NULL);
+    }
+
+    uint8_t index = 0;
+    for (uint8_t active_core_id : core_mask.get_active_cores()) {
+        if (indirect_objs[index] == NULL) {
+            break;
+        }
+        objs[active_core_id] = indirect_objs[index++];
+    }
+
+    return true;
 }
 
 bool 
@@ -471,12 +496,7 @@ TrexStreamsCompiler::compile_on_single_core(uint8_t
     /* allocate object only for core 0 */
     TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
     obj->m_all_continues = all_continues;
-    objs.push_back(obj);
-
-    /* put NULL for the rest */
-    for (uint8_t i = 1; i < dp_core_count; i++) {
-        objs.push_back(NULL);
-    }
+    objs[0] = obj;
 
      /* compile all the streams */
     for (auto const stream : streams) {
@@ -508,7 +528,7 @@ TrexStreamsCompiler::compile_on_all_cores(uint8_t
     for (uint8_t i = 0; i < dp_core_count; i++) {
         TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
         obj->m_all_continues = all_continues;
-        objs.push_back(obj);
+        objs[i] = obj;
     }
 
     /* compile all the streams */
index 171e3af..b95bf17 100644 (file)
@@ -32,6 +32,66 @@ class TrexStreamsCompiler;
 class TrexStream;
 class GraphNodeMap;
 
+class TrexDPCoreMask {
+
+public:
+
+    TrexDPCoreMask(uint8_t dp_core_count, uint64_t dp_core_mask) {
+        assert(is_valid_mask(dp_core_count, dp_core_mask));
+
+        m_dp_core_count = dp_core_count;
+        m_dp_core_mask  = dp_core_mask;
+
+        for (int i = 0; i < m_dp_core_count; i++) {
+            if (is_core_active(i)) {
+                m_active_cores.push_back(i);
+            }
+        }
+    }
+
+   
+    uint8_t get_total_count() const {
+        return m_dp_core_count;
+    }
+
+    bool is_core_active(uint8_t core_id) const {
+        assert(core_id < m_dp_core_count);
+        return ( (1 << core_id) & m_dp_core_mask );
+    }
+
+    bool is_core_disabled(uint8_t core_id) const {
+        return (!is_core_active(core_id));
+    }
+
+    uint8_t get_active_count() const {
+        return m_active_cores.size();
+    }
+
+    const std::vector<uint8_t> & get_active_cores() const {
+        return m_active_cores;
+    }
+
+    static bool is_valid_mask(uint8_t dp_core_count, uint64_t dp_core_mask) {
+        if ( (dp_core_count < 1) || (dp_core_count > 64) ) {
+            return false;
+        }
+        /* highest bit pushed to left and then -1 will give all the other bits on */
+        return ( (dp_core_mask & ( (1 << dp_core_count) - 1 ) ) != 0);
+    }
+private:
+
+
+    uint8_t   m_dp_core_count;
+    uint64_t  m_dp_core_mask;
+
+    std::vector<uint8_t> m_active_cores;
+
+public:
+    static const uint64_t MASK_ALL = UINT64_MAX;
+
+};
+
+
 /**
  * compiled object for a table of streams
  * 
@@ -92,7 +152,7 @@ public:
     bool compile(uint8_t                                port_id,
                  const std::vector<TrexStream *>        &streams,
                  std::vector<TrexStreamsCompiledObj *>  &objs,
-                 uint8_t                                dp_core_count = 1,
+                 TrexDPCoreMask                         &core_mask,
                  double                                 factor = 1.0,
                  std::string                            *fail_msg = NULL);
 
index 31cb0be..9babb17 100644 (file)
@@ -83,10 +83,14 @@ public:
 
     bool update_number_of_active_streams(uint32_t d);
 
-    state_e get_state() {
+    state_e get_state() const {
         return m_state;
     }
 
+    bool is_active() const {
+        return (get_state() != ppSTATE_IDLE);
+    }
+
     void set_event_id(int event_id) {
         m_event_id = event_id;
     }
@@ -261,6 +265,11 @@ public:
     /* simply sends a message back (acts as a barrier for previous messages) */
     void barrier(uint8_t port_id, int event_id);
 
+    bool is_port_active(uint8_t port_id) {
+        return get_port_db(port_id)->is_active();
+    }
+
+
 private:
 
     void schedule_exit();
index 9abb38f..c659c33 100644 (file)
@@ -39,9 +39,18 @@ limitations under the License.
     #define UINT64_MAX 0xFFFFFFFFFFFFFFFF
 #endif
 
+struct cpu_vct_st {
+    cpu_vct_st() {
+        m_port1 = -1;
+        m_port2 = -1;
+    }
+    std::vector<uint8_t> m_history;
+    int m_port1;
+    int m_port2;
+};
+
 typedef std::set<uint32_t> flow_stat_active_t;
 typedef std::set<uint32_t>::iterator flow_stat_active_it_t;
-typedef std::vector<std::vector<uint8_t>> cpu_util_full_t;
-typedef std::vector<uint8_t> cpu_vct_t;
+typedef std::vector<cpu_vct_st> cpu_util_full_t;
 
 #endif
index 7786356..47c78c8 100755 (executable)
@@ -62,10 +62,10 @@ uint8_t CCpuUtlCp::GetValRaw(){
 }
 
 /* get cpu % utilization history */
-void CCpuUtlCp::GetHistory(cpu_vct_t &cpu_vct){
-    cpu_vct.clear();
+void CCpuUtlCp::GetHistory(cpu_vct_st &cpu_vct){
+    cpu_vct.m_history.clear();
     for (int i = m_history_latest_index + m_history_size; i > m_history_latest_index; i--) {
-        cpu_vct.push_back(m_cpu_util[i % m_history_size]);
+        cpu_vct.m_history.push_back(m_cpu_util[i % m_history_size]);
     }
 }
 
index 109fff4..b0a76fc 100755 (executable)
@@ -59,7 +59,7 @@ public:
     /* return cpu % */
     double GetVal();
     uint8_t GetValRaw();
-    void GetHistory(cpu_vct_t &cpu_vct);
+    void GetHistory(cpu_vct_st &cpu_vct);
 private:
     void AppendHistory(uint8_t);
     CCpuUtlDp *         m_dpcpu;