7 Copyright (c) 2015-2015 Cisco Systems, Inc.
9 Licensed under the Apache License, Version 2.0 (the "License");
10 you may not use this file except in compliance with the License.
11 You may obtain a copy of the License at
13 http://www.apache.org/licenses/LICENSE-2.0
15 Unless required by applicable law or agreed to in writing, software
16 distributed under the License is distributed on an "AS IS" BASIS,
17 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 See the License for the specific language governing permissions and
19 limitations under the License.
23 #include "stateful_rx_core.h"
26 #include "msg_manager.h"
27 #include "trex_watchdog.h"
29 #include <common/basic_utils.h>
31 #include <trex_stream_node.h>
32 #include <trex_stateless_messaging.h>
37 #include <valgrind/callgrind.h>
41 CPluginCallback * CPluginCallback::callback;
44 uint32_t getDualPortId(uint32_t thread_id){
45 return ( thread_id % (CGlobalInfo::m_options.get_expected_dual_ports()) );
50 CRteMemPool CGlobalInfo::m_mem_pool[MAX_SOCKETS_SUPPORTED];
52 uint32_t CGlobalInfo::m_nodes_pool_size = 10*1024;
53 CParserOption CGlobalInfo::m_options;
54 CGlobalMemory CGlobalInfo::m_memory_cfg;
55 CPlatformSocketInfo CGlobalInfo::m_socket;
61 void CGlobalMemory::Dump(FILE *fd){
62 fprintf(fd," Total Memory : \n");
64 const std::string * names =get_mbuf_names();
70 for (i=0; i<MBUF_ELM_SIZE; i++) {
71 if ( (i>MBUF_9k) && (i<MBUF_DP_FLOWS)){
74 if ( i<TRAFFIC_MBUF_64 ){
75 c_total= m_mbuf[i] *c_size;
79 fprintf(fd," %-40s : %lu \n",names[i].c_str(),(ulong)m_mbuf[i]);
81 c_total += (m_mbuf[MBUF_DP_FLOWS] * sizeof(CGenNode));
83 fprintf(fd," %-40s : %lu \n","get_each_core_dp_flows",(ulong)get_each_core_dp_flows());
84 fprintf(fd," %-40s : %s \n","Total memory",double_to_human_str(c_total,"bytes",KBYE_1024).c_str() );
88 void CGlobalMemory::set(const CPlatformMemoryYamlInfo &info,float mul){
90 for (i=0; i<MBUF_ELM_SIZE; i++) {
91 m_mbuf[i]=(uint32_t)((float)info.m_mbuf[i]*mul);
93 /* no need to multiply */
94 m_mbuf[MBUF_64] += info.m_mbuf[TRAFFIC_MBUF_64];
95 m_mbuf[MBUF_128] += info.m_mbuf[TRAFFIC_MBUF_128];
96 m_mbuf[MBUF_256] += info.m_mbuf[TRAFFIC_MBUF_256];
97 m_mbuf[MBUF_512] += info.m_mbuf[TRAFFIC_MBUF_512];
98 m_mbuf[MBUF_1024] += info.m_mbuf[TRAFFIC_MBUF_1024];
99 m_mbuf[MBUF_2048] += info.m_mbuf[TRAFFIC_MBUF_2048];
100 m_mbuf[MBUF_4096] += info.m_mbuf[TRAFFIC_MBUF_4096];
101 m_mbuf[MBUF_9k] += info.m_mbuf[MBUF_9k];
103 for (i=0; i<MBUF_1024; i++) {
104 float per_queue_factor= (float)m_mbuf[i]/((float)m_pool_cache_size*(float)m_num_cores);
105 if (per_queue_factor<2.0) {
106 printf("WARNING not enough mbuf memory for this configuration trying to auto update\n");
107 printf(" %d : %f \n",(int)i,per_queue_factor);
108 m_mbuf[i]=(uint32_t)(m_mbuf[i]*2.0/per_queue_factor);
114 ////////////////////////////////////////
117 bool CPlatformSocketInfoNoConfig::is_sockets_enable(socket_id_t socket){
124 socket_id_t CPlatformSocketInfoNoConfig::max_num_active_sockets(){
129 socket_id_t CPlatformSocketInfoNoConfig::port_to_socket(port_id_t port){
134 void CPlatformSocketInfoNoConfig::set_rx_thread_is_enabled(bool enable) {
135 m_rx_is_enabled = enable;
138 void CPlatformSocketInfoNoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
139 m_dual_if = num_dual_ports;
143 void CPlatformSocketInfoNoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
144 m_threads_per_dual_if = num_threads;
147 bool CPlatformSocketInfoNoConfig::sanity_check(){
151 /* return the core mask */
152 uint64_t CPlatformSocketInfoNoConfig::get_cores_mask(){
154 uint32_t cores_number = m_threads_per_dual_if*m_dual_if;
155 if ( m_rx_is_enabled ) {
158 cores_number += 1; /* only MASTER*/
164 uint64_t mask=(1LL<<(offset+1));
165 for (i=0; i<(cores_number-1); i++) {
172 virtual_thread_id_t CPlatformSocketInfoNoConfig::thread_phy_to_virt(physical_thread_id_t phy_id){
176 physical_thread_id_t CPlatformSocketInfoNoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
180 physical_thread_id_t CPlatformSocketInfoNoConfig::get_master_phy_id() {
184 bool CPlatformSocketInfoNoConfig::thread_phy_is_rx(physical_thread_id_t phy_id){
185 return (phy_id==(m_threads_per_dual_if*m_dual_if+1));
189 void CPlatformSocketInfoNoConfig::dump(FILE *fd){
190 fprintf(fd," there is no configuration file given \n");
193 ////////////////////////////////////////
195 bool CPlatformSocketInfoConfig::Create(CPlatformCoresYamlInfo * platform){
198 assert(m_platform->m_is_exists);
203 bool CPlatformSocketInfoConfig::init(){
205 /* iterate the sockets */
206 uint32_t num_threads=0;
207 uint32_t num_dual_if = m_platform->m_dual_if.size();
209 if ( m_num_dual_if > num_dual_if ){
210 printf("ERROR number of dual if %d is higher than defined in configuration file %d\n",
216 for (i=0; i<m_num_dual_if; i++) {
217 CPlatformDualIfYamlInfo * lp=&m_platform->m_dual_if[i];
218 if ( lp->m_socket>=MAX_SOCKETS_SUPPORTED ){
219 printf("ERROR socket %d is bigger than max %d \n",lp->m_socket,MAX_SOCKETS_SUPPORTED);
223 if (!m_sockets_enable[lp->m_socket] ) {
224 m_sockets_enable[lp->m_socket]=true;
228 m_socket_per_dual_if[i]=lp->m_socket;
230 /* learn how many threads per dual-if */
232 num_threads = lp->m_threads.size();
233 m_max_threads_per_dual_if = num_threads;
235 if (lp->m_threads.size() != num_threads) {
236 printf("ERROR, the number of threads per dual ports should be the same for all dual ports\n");
241 if (m_threads_per_dual_if > m_max_threads_per_dual_if) {
242 printf("ERROR: Maximum threads in platform section of config file is %d, unable to run with -c %d.\n",
243 m_max_threads_per_dual_if, m_threads_per_dual_if);
244 printf("Please increase the pool in config or use lower -c.\n");
250 for (j=0; j<m_threads_per_dual_if; j++) {
251 uint8_t virt_thread = 1+ i + j*m_num_dual_if; /* virtual thread */
252 uint8_t phy_thread = lp->m_threads[j];
254 if (phy_thread>MAX_THREADS_SUPPORTED) {
255 printf("ERROR, physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
259 if (virt_thread>MAX_THREADS_SUPPORTED) {
260 printf("ERROR virtual thread id is %d higher than max %d \n",virt_thread,MAX_THREADS_SUPPORTED);
264 if ( m_thread_phy_to_virtual[phy_thread] ){
265 printf("ERROR physical thread %d defined twice\n",phy_thread);
268 m_thread_phy_to_virtual[phy_thread]=virt_thread;
269 m_thread_virt_to_phy[virt_thread] =phy_thread;
273 if ( m_thread_phy_to_virtual[m_platform->m_master_thread] ){
274 printf("ERROR physical master thread %d already defined \n",m_platform->m_master_thread);
278 if ( m_thread_phy_to_virtual[m_platform->m_rx_thread] ){
279 printf("ERROR physical latency thread %d already defined \n",m_platform->m_rx_thread);
283 if (m_max_threads_per_dual_if < m_threads_per_dual_if ) {
284 printf("ERROR number of threads asked per dual if is %d lower than max %d \n",
285 (int)m_threads_per_dual_if,
286 (int)m_max_threads_per_dual_if);
293 void CPlatformSocketInfoConfig::dump(FILE *fd){
294 fprintf(fd," core_mask %llx \n",(unsigned long long)get_cores_mask());
295 fprintf(fd," sockets :");
297 for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
298 if ( is_sockets_enable(i) ){
299 fprintf(fd," %d ",i);
303 fprintf(fd," active sockets : %d \n",max_num_active_sockets());
305 fprintf(fd," ports_sockets : %d \n",max_num_active_sockets());
307 for (i = 0; i < TREX_MAX_PORTS; i++) {
308 fprintf(fd,"%d,",port_to_socket(i));
312 fprintf(fd," phy | virt \n");
313 for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
314 virtual_thread_id_t virt=thread_phy_to_virt(i);
316 fprintf(fd," %d %d \n",i,virt);
322 void CPlatformSocketInfoConfig::reset(){
325 for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
326 m_sockets_enable[i]=false;
329 for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
330 m_thread_virt_to_phy[i]=0;
332 for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
333 m_thread_phy_to_virtual[i]=0;
335 for (i = 0; i < TREX_MAX_PORTS >> 1; i++) {
336 m_socket_per_dual_if[i]=0;
341 m_threads_per_dual_if=0;
342 m_rx_is_enabled=false;
343 m_max_threads_per_dual_if=0;
347 void CPlatformSocketInfoConfig::Delete(){
351 bool CPlatformSocketInfoConfig::is_sockets_enable(socket_id_t socket){
352 assert(socket<MAX_SOCKETS_SUPPORTED);
353 return ( m_sockets_enable[socket] );
356 socket_id_t CPlatformSocketInfoConfig::max_num_active_sockets(){
357 return ((socket_id_t)m_sockets_enabled);
360 socket_id_t CPlatformSocketInfoConfig::port_to_socket(port_id_t port){
361 return ( m_socket_per_dual_if[(port>>1)]);
364 void CPlatformSocketInfoConfig::set_rx_thread_is_enabled(bool enable){
365 m_rx_is_enabled =enable;
368 void CPlatformSocketInfoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
369 m_num_dual_if = num_dual_ports;
372 void CPlatformSocketInfoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
373 m_threads_per_dual_if =num_threads;
376 bool CPlatformSocketInfoConfig::sanity_check(){
380 /* return the core mask */
381 uint64_t CPlatformSocketInfoConfig::get_cores_mask(){
384 for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
385 if ( m_thread_phy_to_virtual[i] ) {
388 printf(" ERROR phy threads can't be higher than 64 \n");
395 mask |=(1LL<<m_platform->m_master_thread);
396 assert(m_platform->m_master_thread<64);
397 if (m_rx_is_enabled) {
398 mask |=(1LL<<m_platform->m_rx_thread);
399 assert(m_platform->m_rx_thread<64);
404 virtual_thread_id_t CPlatformSocketInfoConfig::thread_phy_to_virt(physical_thread_id_t phy_id){
405 return (m_thread_phy_to_virtual[phy_id]);
408 physical_thread_id_t CPlatformSocketInfoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
409 return ( m_thread_virt_to_phy[virt_id]);
412 physical_thread_id_t CPlatformSocketInfoConfig::get_master_phy_id() {
413 return m_platform->m_master_thread;
416 bool CPlatformSocketInfoConfig::thread_phy_is_rx(physical_thread_id_t phy_id){
417 return (m_platform->m_rx_thread == phy_id?true:false);
422 ////////////////////////////////////////
425 bool CPlatformSocketInfo::Create(CPlatformCoresYamlInfo * platform){
426 if ( (platform) && (platform->m_is_exists) ) {
427 CPlatformSocketInfoConfig * lp=new CPlatformSocketInfoConfig();
429 lp->Create(platform);
432 m_obj= new CPlatformSocketInfoNoConfig();
437 void CPlatformSocketInfo::Delete(){
444 bool CPlatformSocketInfo::is_sockets_enable(socket_id_t socket){
445 return ( m_obj->is_sockets_enable(socket) );
448 socket_id_t CPlatformSocketInfo::max_num_active_sockets(){
449 return ( m_obj->max_num_active_sockets() );
453 socket_id_t CPlatformSocketInfo::port_to_socket(port_id_t port){
454 return ( m_obj->port_to_socket(port) );
458 void CPlatformSocketInfo::set_rx_thread_is_enabled(bool enable){
459 m_obj->set_rx_thread_is_enabled(enable);
462 void CPlatformSocketInfo::set_number_of_dual_ports(uint8_t num_dual_ports){
463 m_obj->set_number_of_dual_ports(num_dual_ports);
466 void CPlatformSocketInfo::set_number_of_threads_per_ports(uint8_t num_threads){
467 m_obj->set_number_of_threads_per_ports(num_threads);
470 bool CPlatformSocketInfo::sanity_check(){
471 return ( m_obj->sanity_check());
474 /* return the core mask */
475 uint64_t CPlatformSocketInfo::get_cores_mask(){
476 return ( m_obj->get_cores_mask());
479 virtual_thread_id_t CPlatformSocketInfo::thread_phy_to_virt(physical_thread_id_t phy_id){
480 return ( m_obj->thread_phy_to_virt(phy_id));
483 physical_thread_id_t CPlatformSocketInfo::thread_virt_to_phy(virtual_thread_id_t virt_id){
484 return ( m_obj->thread_virt_to_phy(virt_id));
487 bool CPlatformSocketInfo::thread_phy_is_master(physical_thread_id_t phy_id){
488 return ( m_obj->thread_phy_is_master(phy_id));
491 physical_thread_id_t CPlatformSocketInfo::get_master_phy_id() {
492 return ( m_obj->get_master_phy_id());
495 bool CPlatformSocketInfo::thread_phy_is_rx(physical_thread_id_t phy_id) {
496 return ( m_obj->thread_phy_is_rx(phy_id));
499 void CPlatformSocketInfo::dump(FILE *fd){
503 ////////////////////////////////////////
506 void CRteMemPool::dump_in_case_of_error(FILE *fd){
507 fprintf(fd," ERROR,there is no enough memory in socket %d \n",m_pool_id);
508 fprintf(fd," Try to enlarge the memory values in the configuration file -/etc/trex_cfg.yaml ,see manual for more detail \n");
512 void CRteMemPool::add_to_json(Json::Value &json, std::string name, rte_mempool_t * pool){
513 uint32_t p_free = rte_mempool_count(pool);
514 uint32_t p_size = pool->size;
515 json[name].append((unsigned long long)p_free);
516 json[name].append((unsigned long long)p_size);
520 void CRteMemPool::dump_as_json(Json::Value &json){
521 add_to_json(json, "64b", m_small_mbuf_pool);
522 add_to_json(json, "128b", m_mbuf_pool_128);
523 add_to_json(json, "256b", m_mbuf_pool_256);
524 add_to_json(json, "512b", m_mbuf_pool_512);
525 add_to_json(json, "1024b", m_mbuf_pool_1024);
526 add_to_json(json, "2048b", m_mbuf_pool_2048);
527 add_to_json(json, "4096b", m_mbuf_pool_4096);
528 add_to_json(json, "9kb", m_mbuf_pool_9k);
532 void CRteMemPool::dump(FILE *fd){
533 #define DUMP_MBUF(a,b) { float p=(100.0*(float)rte_mempool_count(b)/(float)b->size); fprintf(fd," %-30s : %.2f %% %s \n",a,p,(p<5.0?"<-":"OK") ); }
535 DUMP_MBUF("mbuf_64",m_small_mbuf_pool);
536 DUMP_MBUF("mbuf_128",m_mbuf_pool_128);
537 DUMP_MBUF("mbuf_256",m_mbuf_pool_256);
538 DUMP_MBUF("mbuf_512",m_mbuf_pool_512);
539 DUMP_MBUF("mbuf_1024",m_mbuf_pool_1024);
540 DUMP_MBUF("mbuf_2048",m_mbuf_pool_2048);
541 DUMP_MBUF("mbuf_4096",m_mbuf_pool_4096);
542 DUMP_MBUF("mbuf_9k",m_mbuf_pool_9k);
546 ////////////////////////////////////////
548 void CGlobalInfo::dump_pool_as_json(Json::Value &json){
549 CPlatformSocketInfo * lpSocket =&m_socket;
551 for (int i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
552 if (lpSocket->is_sockets_enable((socket_id_t)i)) {
553 std::string socket_id = "cpu-socket-" + std::to_string(i);
554 m_mem_pool[i].dump_as_json(json["mbuf_stats"][socket_id]);
559 std::string CGlobalInfo::dump_pool_as_json_str(void){
561 dump_pool_as_json(json);
562 return (json.toStyledString());
565 void CGlobalInfo::free_pools(){
566 CPlatformSocketInfo * lpSocket =&m_socket;
569 for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
570 if (lpSocket->is_sockets_enable((socket_id_t)i)) {
571 lpmem= &m_mem_pool[i];
572 utl_rte_mempool_delete(lpmem->m_small_mbuf_pool);
573 utl_rte_mempool_delete(lpmem->m_mbuf_pool_128);
574 utl_rte_mempool_delete(lpmem->m_mbuf_pool_256);
575 utl_rte_mempool_delete(lpmem->m_mbuf_pool_512);
576 utl_rte_mempool_delete(lpmem->m_mbuf_pool_1024);
577 utl_rte_mempool_delete(lpmem->m_mbuf_pool_2048);
578 utl_rte_mempool_delete(lpmem->m_mbuf_pool_4096);
579 utl_rte_mempool_delete(lpmem->m_mbuf_pool_9k);
581 utl_rte_mempool_delete(m_mem_pool[0].m_mbuf_global_nodes);
586 void CGlobalInfo::init_pools(uint32_t rx_buffers){
587 /* this include the pkt from 64- */
588 CGlobalMemory * lp=&CGlobalInfo::m_memory_cfg;
589 CPlatformSocketInfo * lpSocket =&m_socket;
594 for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
595 if (lpSocket->is_sockets_enable((socket_id_t)i)) {
596 lpmem= &m_mem_pool[i];
600 /* this include the packet from 0-64 this is for small packets */
601 lpmem->m_small_mbuf_pool =utl_rte_mempool_create("small-pkt-const",
603 CONST_SMALL_MBUF_SIZE,
605 assert(lpmem->m_small_mbuf_pool);
610 lpmem->m_mbuf_pool_128=utl_rte_mempool_create("_128-pkt-const",
611 lp->m_mbuf[MBUF_128],
616 assert(lpmem->m_mbuf_pool_128);
619 lpmem->m_mbuf_pool_256=utl_rte_mempool_create("_256-pkt-const",
620 lp->m_mbuf[MBUF_256],
624 assert(lpmem->m_mbuf_pool_256);
626 lpmem->m_mbuf_pool_512=utl_rte_mempool_create("_512_-pkt-const",
627 lp->m_mbuf[MBUF_512],
630 assert(lpmem->m_mbuf_pool_512);
632 lpmem->m_mbuf_pool_1024=utl_rte_mempool_create("_1024-pkt-const",
633 lp->m_mbuf[MBUF_1024],
634 CONST_1024_MBUF_SIZE,
637 assert(lpmem->m_mbuf_pool_1024);
639 lpmem->m_mbuf_pool_2048=utl_rte_mempool_create("_2048-pkt-const",
640 lp->m_mbuf[MBUF_2048],
641 CONST_2048_MBUF_SIZE,
644 assert(lpmem->m_mbuf_pool_2048);
646 lpmem->m_mbuf_pool_4096=utl_rte_mempool_create("_4096-pkt-const",
647 lp->m_mbuf[MBUF_4096],
648 CONST_4096_MBUF_SIZE,
651 assert(lpmem->m_mbuf_pool_4096);
653 lpmem->m_mbuf_pool_9k=utl_rte_mempool_create("_9k-pkt-const",
654 lp->m_mbuf[MBUF_9k]+rx_buffers,
658 assert(lpmem->m_mbuf_pool_9k);
663 /* global always from socket 0 */
664 m_mem_pool[0].m_mbuf_global_nodes = utl_rte_mempool_create_non_pkt("global-nodes",
665 lp->m_mbuf[MBUF_GLOBAL_FLOWS],
671 assert(m_mem_pool[0].m_mbuf_global_nodes);
678 void CFlowYamlInfo::Dump(FILE *fd){
679 fprintf(fd,"name : %s \n",m_name.c_str());
680 fprintf(fd,"cps : %f \n",m_k_cps);
681 fprintf(fd,"ipg : %f \n",m_ipg_sec);
682 fprintf(fd,"rtt : %f \n",m_rtt_sec);
683 fprintf(fd,"w : %d \n",m_w);
684 fprintf(fd,"wlength : %d \n",m_wlength);
685 fprintf(fd,"limit : %d \n",m_limit);
686 fprintf(fd,"limit_was_set : %d \n",m_limit_was_set?1:0);
687 fprintf(fd,"cap_mode : %d \n",m_cap_mode?1:0);
688 fprintf(fd,"plugin_id : %d \n",m_plugin_id);
689 fprintf(fd,"one_server : %d \n",m_one_app_server?1:0);
690 fprintf(fd,"one_server_was_set : %d \n",m_one_app_server_was_set?1:0);
699 void dump_mac_addr(FILE* fd,uint8_t *p){
701 for (i=0; i<6; i++) {
704 fprintf(fd,"%02x",a);
706 fprintf(fd,"%02x:",a);
714 static uint8_t human_tbl[]={
722 std::string double_to_human_str(double num,
724 human_kbyte_t etype){
730 int max_cnt=sizeof(human_tbl)/sizeof(human_tbl[0]);
733 if (etype ==KBYE_1024){
736 while ((abs_num > f ) && (i < max_cnt - 1)){
743 sprintf(buf,"%10.2f %c%s",num/div,human_tbl[i],units.c_str());
744 std::string res(buf);
749 void CPreviewMode::Dump(FILE *fd){
750 fprintf(fd," flags : %x\n", m_flags);
751 fprintf(fd," write_file : %d\n", getFileWrite()?1:0);
752 fprintf(fd," verbose : %d\n", (int)getVMode() );
753 fprintf(fd," realtime : %d\n", (int)getRealTime() );
754 fprintf(fd," flip : %d\n", (int)getClientServerFlip() );
755 fprintf(fd," cores : %d\n", (int)getCores() );
756 fprintf(fd," single core : %d\n", (int)getSingleCore() );
757 fprintf(fd," flow-flip : %d\n", (int)getClientServerFlowFlip() );
758 fprintf(fd," no clean close : %d\n", (int)getNoCleanFlowClose() );
759 fprintf(fd," zmq_publish : %d\n", (int)get_zmq_publish_enable() );
760 fprintf(fd," vlan_enable : %d\n", (int)get_vlan_mode_enable() );
761 fprintf(fd," client_cfg : %d\n", (int)get_is_client_cfg_enable() );
762 fprintf(fd," mbuf_cache_disable : %d\n", (int)isMbufCacheDisabled() );
763 fprintf(fd," vm mode : %d\n", (int)get_vm_one_queue_enable()?1:0 );
766 void CFlowGenStats::clear(){
767 m_nat_lookup_no_flow_id=0;
770 m_total_open_flows =0;
771 m_total_close_flows =0;
772 m_nat_lookup_no_flow_id=0;
773 m_nat_lookup_remove_flow_id=0;
774 m_nat_lookup_wait_ack_state = 0;
775 m_nat_lookup_add_flow_id=0;
776 m_nat_flow_timeout=0;
777 m_nat_flow_timeout_wait_ack = 0;
778 m_nat_flow_learn_error=0;
781 void CFlowGenStats::dump(FILE *fd){
782 std::string s_bytes=double_to_human_str((double )(m_total_bytes),
786 std::string s_pkt=double_to_human_str((double )(m_total_pkt),
790 std::string s_flows=double_to_human_str((double )(m_total_open_flows),
794 DP_S(m_total_bytes,s_bytes);
795 DP_S(m_total_pkt,s_pkt);
796 DP_S(m_total_open_flows,s_flows);
798 DP(m_total_open_flows);
799 DP(m_total_close_flows);
800 DP_name("active",(m_total_open_flows-m_total_close_flows));
802 DP(m_nat_lookup_no_flow_id);
804 DP(m_nat_lookup_no_flow_id);
805 DP(m_nat_lookup_remove_flow_id);
806 DP(m_nat_lookup_wait_ack_state);
807 DP(m_nat_lookup_add_flow_id);
808 DP(m_nat_flow_timeout);
809 DP(m_nat_flow_timeout_wait_ack);
810 DP_name("active_nat",(m_nat_lookup_add_flow_id-m_nat_lookup_remove_flow_id));
811 DP_name("active_nat_wait_syn", (m_nat_lookup_add_flow_id - m_nat_lookup_wait_ack_state));
812 DP(m_nat_flow_learn_error);
817 int CErfIF::open_file(std::string file_name){
818 BP_ASSERT(m_writer==0);
820 if ( m_preview_mode->getFileWrite() ){
821 capture_type_e file_type=ERF;
822 if ( m_preview_mode->get_pcap_mode_enable() ){
825 m_writer = CCapWriterFactory::CreateWriter(file_type,(char *)file_name.c_str());
826 if (m_writer == NULL) {
827 fprintf(stderr,"ERROR can't create cap file %s ",(char *)file_name.c_str());
831 m_raw = new CCapPktRaw();
836 int CErfIF::write_pkt(CCapPktRaw *pkt_raw){
840 if ( m_preview_mode->getFileWrite() ){
842 bool res=m_writer->write_packet(pkt_raw);
844 fprintf(stderr,"ERROR can't write to cap file");
852 int CErfIF::close_file(void){
859 if ( m_preview_mode->getFileWrite() ){
871 void CFlowKey::Clean(){
881 void CFlowKey::Dump(FILE *fd){
882 fprintf(fd," %x:%x:%x:%x:%x:%x:%x\n",m_ipaddr1,m_ipaddr2,m_port1,m_port2,m_ip_proto,m_l2_proto,m_vrfid);
887 void CPacketDescriptor::Dump(FILE *fd){
888 fprintf(fd," IsSwapTuple : %d \n",IsSwapTuple()?1:0);
889 fprintf(fd," IsSInitDir : %d \n",IsInitSide()?1:0);
890 fprintf(fd," Isvalid : %d ",IsValidPkt()?1:0);
891 fprintf(fd," IsRtt : %d ",IsRtt()?1:0);
892 fprintf(fd," IsLearn : %d ",IsLearn()?1:0);
899 fprintf(fd," IsLast Pkt : %d ", IsLastPkt() ?1:0);
900 fprintf(fd," id : %d \n",getId() );
902 fprintf(fd," flow_ID : %d , max_pkts : %u, max_aging: %d sec , pkt_id : %u, init: %d ( dir:%d dir_max :%d ) bid:%d \n",getFlowId(),
904 GetMaxFlowTimeout() ,
907 GetDirInfo()->GetPktNum(),
908 GetDirInfo()->GetMaxPkts(),
909 IsBiDirectionalFlow()?1:0
916 void CPacketIndication::UpdateOffsets(){
917 m_ether_offset = getEtherOffset();
918 m_ip_offset = getIpOffset();
919 m_udp_tcp_offset = getTcpOffset();
920 m_payload_offset = getPayloadOffset();
923 void CPacketIndication::UpdatePacketPadding(){
924 m_packet_padding = m_packet->getTotalLen() - (l3.m_ipv4->getTotalLength()+ getIpOffset());
928 void CPacketIndication::RefreshPointers(){
930 char *pobase=getBasePtr();
932 m_ether = (EthernetHeader *) (pobase + m_ether_offset);
933 l3.m_ipv4 = (IPHeader *) (pobase + m_ip_offset);
934 l4.m_tcp= (TCPHeader *)(pobase + m_udp_tcp_offset);
935 if ( m_payload_offset ){
936 m_payload =(uint8_t *)(pobase + m_payload_offset);
938 m_payload =(uint8_t *)(0);
942 // copy ref assume pkt point to a fresh
943 void CPacketIndication::Clone(CPacketIndication * obj,CCapPktRaw * pkt){
945 m_cap_ipg = obj->m_cap_ipg;
947 char *pobase=getBasePtr();
948 m_flow = obj->m_flow;
950 m_ether = (EthernetHeader *) (pobase + obj->getEtherOffset());
951 l3.m_ipv4 = (IPHeader *) (pobase + obj->getIpOffset());
952 m_is_ipv6 = obj->m_is_ipv6;
953 l4.m_tcp= (TCPHeader *)(pobase + obj->getTcpOffset());
954 if ( obj->getPayloadOffset() ){
955 m_payload =(uint8_t *)(pobase + obj->getPayloadOffset());
957 m_payload =(uint8_t *)(0);
959 m_payload_len = obj->m_payload_len;
960 m_flow_key = obj->m_flow_key;
961 m_desc = obj->m_desc;
963 m_packet_padding = obj->m_packet_padding;
965 m_ether_offset = obj->m_ether_offset;
966 m_ip_offset = obj->m_ip_offset;
967 m_udp_tcp_offset = obj->m_udp_tcp_offset;;
968 m_payload_offset = obj->m_payload_offset;
973 void CPacketIndication::Dump(FILE *fd,int verbose){
974 fprintf(fd," ipg : %f \n",m_cap_ipg);
975 fprintf(fd," key \n");
976 fprintf(fd," ------\n");
979 fprintf(fd," L2 info \n");
980 fprintf(fd," ------\n");
981 m_packet->Dump(fd,verbose);
983 fprintf(fd," Descriptor \n");
984 fprintf(fd," ------\n");
987 if ( m_desc.IsValidPkt() ) {
988 fprintf(fd," ipv4 \n");
990 if ( m_desc.IsUdp() ) {
995 fprintf(fd," payload len : %d \n",m_payload_len);
997 fprintf(fd," not valid packet \n");
1001 void CPacketIndication::Clean(){
1012 uint64_t CCPacketParserCounters::getTotalErrors(){
1017 m_non_valid_ipv4_ver+
1018 m_ip_checksum_error+
1020 m_ip_not_first_fragment_error+
1021 m_ip_ttl_is_zero_error+
1022 m_ip_multicast_error+
1029 m_tcp_udp_pkt_length_error;
1033 void CCPacketParserCounters::Clear(){
1040 m_non_valid_ipv4_ver=0;
1041 m_ip_checksum_error=0;
1042 m_ip_length_error=0;
1043 m_ip_not_first_fragment_error=0;
1044 m_ip_ttl_is_zero_error=0;
1045 m_ip_multicast_error=0;
1046 m_ip_header_options=0;
1050 m_non_tcp_udp_esp=0;
1051 m_non_tcp_udp_icmp=0;
1052 m_non_tcp_udp_gre=0;
1054 m_tcp_header_options=0;
1055 m_tcp_udp_pkt_length_error=0;
1062 void CCPacketParserCounters::Dump(FILE *fd){
1070 DP (m_non_valid_ipv4_ver);
1071 DP (m_ip_checksum_error);
1072 DP (m_ip_length_error);
1073 DP (m_ip_not_first_fragment_error);
1074 DP (m_ip_ttl_is_zero_error);
1075 DP (m_ip_multicast_error);
1076 DP (m_ip_header_options);
1079 DP (m_non_tcp_udp_ah);
1080 DP (m_non_tcp_udp_esp);
1081 DP (m_non_tcp_udp_icmp);
1082 DP (m_non_tcp_udp_gre);
1083 DP (m_non_tcp_udp_ip);
1084 DP (m_tcp_header_options);
1085 DP (m_tcp_udp_pkt_length_error);
1088 DP (m_valid_udp_tcp);
1092 bool CPacketParser::Create(){
1097 void CPacketParser::Delete(){
1101 bool CPacketParser::ProcessPacket(CPacketIndication * pkt_indication,
1102 CCapPktRaw * raw_packet){
1103 BP_ASSERT(pkt_indication);
1104 pkt_indication->ProcessPacket(this,raw_packet);
1105 if (pkt_indication->m_desc.IsValidPkt()) {
1111 void CPacketParser::Dump(FILE *fd){
1112 fprintf(fd," parser statistic \n");
1113 fprintf(fd," ===================== \n");
1118 void CPacketIndication::SetKey(void){
1119 uint32_t ip_src, ip_dst;
1121 m_desc.SetIsValidPkt(true);
1123 uint16_t ipv6_src[8];
1124 uint16_t ipv6_dst[8];
1126 l3.m_ipv6->getSourceIpv6(&ipv6_src[0]);
1127 l3.m_ipv6->getDestIpv6(&ipv6_dst[0]);
1128 ip_src=(ipv6_src[6] << 16) | ipv6_src[7];
1129 ip_dst=(ipv6_dst[6] << 16) | ipv6_dst[7];
1130 m_flow_key.m_ip_proto = l3.m_ipv6->getNextHdr();
1132 ip_src=l3.m_ipv4->getSourceIp();
1133 ip_dst=l3.m_ipv4->getDestIp();
1134 m_flow_key.m_ip_proto = l3.m_ipv4->getProtocol();
1137 /* UDP/TCP has same place */
1138 uint16_t src_port = l4.m_udp->getSourcePort();
1139 uint16_t dst_port = l4.m_udp->getDestPort();
1140 if (ip_src > ip_dst ) {
1141 m_flow_key.m_ipaddr1 =ip_dst;
1142 m_flow_key.m_ipaddr2 =ip_src;
1143 m_flow_key.m_port1 = dst_port;
1144 m_flow_key.m_port2 = src_port;
1146 m_desc.SetSwapTuple(true);
1147 m_flow_key.m_ipaddr1 = ip_src;
1148 m_flow_key.m_ipaddr2 = ip_dst;
1149 m_flow_key.m_port1 = src_port;
1150 m_flow_key.m_port2 = dst_port;
1152 m_flow_key.m_l2_proto = 0;
1153 m_flow_key.m_vrfid = 0;
1156 uint8_t CPacketIndication::ProcessIpPacketProtocol(CCPacketParserCounters *m_cnt,
1157 uint8_t protocol, int *offset){
1159 char * packetBase = m_packet->raw;
1162 uint16_t tcp_header_len=0;
1165 case IPHeader::Protocol::TCP :
1166 m_desc.SetIsTcp(true);
1167 tcp =(TCPHeader *)(packetBase +*offset);
1170 tcp_header_len = tcp->getHeaderLength();
1171 if ( tcp_header_len > (5*4) ){
1172 // we have ip option
1173 m_cnt->m_tcp_header_options++;
1175 *offset += tcp_header_len;
1178 case IPHeader::Protocol::UDP :
1179 m_desc.SetIsUdp(true);
1180 udp =(UDPHeader *)(packetBase +*offset);
1185 case IPHeader::Protocol::AH:
1186 m_cnt->m_non_tcp_udp_ah++;
1189 case IPHeader::Protocol::ESP:
1190 m_cnt->m_non_tcp_udp_esp++;
1193 case IPHeader::Protocol::ICMP:
1194 case IPHeader::Protocol::IPV6_ICMP:
1195 m_cnt->m_non_tcp_udp_icmp++;
1198 case IPHeader::Protocol::GRE:
1199 m_cnt->m_non_tcp_udp_gre++;
1202 case IPHeader::Protocol::IP:
1208 m_cnt->m_non_tcp_udp++;
1214 if ( *offset > m_packet->getTotalLen() ) {
1215 m_cnt->m_tcp_udp_pkt_length_error++;
1222 void CPacketIndication::ProcessIpPacket(CPacketParser *parser,
1226 CCPacketParserCounters * m_cnt=&parser->m_counter;
1227 packetBase = m_packet->raw;
1229 BP_ASSERT(l3.m_ipv4);
1231 parser->m_counter.m_pkt++;
1233 if ( l3.m_ipv4->getVersion() == 4 ){
1236 m_cnt->m_non_valid_ipv4_ver++;
1239 // check the IP Length
1240 if( (uint32_t)(l3.m_ipv4->getTotalLength()+offset) > (uint32_t)( m_packet->getTotalLen()) ){
1241 m_cnt->m_ip_length_error++;
1245 uint16_t ip_offset=offset;
1246 uint16_t ip_header_length = l3.m_ipv4->getHeaderLength();
1248 if ( ip_header_length >(5*4) ){
1249 m_cnt->m_ip_header_options++;
1252 if ( (uint32_t)(ip_header_length + offset) > (uint32_t)m_packet->getTotalLen() ) {
1253 m_cnt->m_ip_length_error++;
1256 offset += ip_header_length;
1259 if( l3.m_ipv4->getTimeToLive() ==0 ){
1260 m_cnt->m_ip_ttl_is_zero_error++;
1264 if( l3.m_ipv4->isNotFirstFragment() ) {
1265 m_cnt->m_ip_not_first_fragment_error++;
1269 protocol = l3.m_ipv4->getProtocol();
1270 if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
1274 uint16_t payload_offset_from_ip = offset-ip_offset;
1275 if ( payload_offset_from_ip > l3.m_ipv4->getTotalLength() ) {
1276 m_cnt->m_tcp_udp_pkt_length_error++;
1280 if ( m_packet->pkt_len > MAX_PKT_SIZE ){
1281 m_cnt->m_tcp_udp_pkt_length_error++;
1282 printf("ERROR packet is too big, not supported jumbo packets that larger than %d \n",MAX_PKT_SIZE);
1286 // Set packet length and include padding if needed
1287 m_packet->pkt_len = l3.m_ipv4->getTotalLength() + getIpOffset();
1288 if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
1290 m_cnt->m_valid_udp_tcp++;
1291 m_payload_len = l3.m_ipv4->getTotalLength() - (payload_offset_from_ip);
1292 m_payload = (uint8_t *)(packetBase +offset);
1293 UpdatePacketPadding();
1299 void CPacketIndication::ProcessIpv6Packet(CPacketParser *parser,
1302 char * packetBase = m_packet->raw;
1303 CCPacketParserCounters * m_cnt=&parser->m_counter;
1304 uint16_t src_ipv6[6];
1305 uint16_t dst_ipv6[6];
1308 BP_ASSERT(l3.m_ipv6);
1310 parser->m_counter.m_pkt++;
1312 if ( l3.m_ipv6->getVersion() == 6 ){
1315 m_cnt->m_non_valid_ipv6_ver++;
1320 if ((uint32_t)(l3.m_ipv6->getPayloadLen()+offset+l3.m_ipv6->getHeaderLength()) >
1321 (uint32_t)( m_packet->getTotalLen()) ){
1322 m_cnt->m_ipv6_length_error++;
1326 for (idx=0; idx<6; idx++){
1327 src_ipv6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
1328 dst_ipv6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
1330 l3.m_ipv6->updateMSBIpv6Src(&src_ipv6[0]);
1331 l3.m_ipv6->updateMSBIpv6Dst(&dst_ipv6[0]);
1333 offset += l3.m_ipv6->getHeaderLength();
1334 protocol = l3.m_ipv6->getNextHdr();
1335 if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
1339 // Set packet length and include padding if needed
1340 uint16_t real_pkt_size = l3.m_ipv6->getPayloadLen()+ getIpOffset() + l3.m_ipv6->getHeaderLength();
1341 m_packet->pkt_len = real_pkt_size;
1342 if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
1344 m_cnt->m_valid_udp_tcp++;
1345 m_payload_len = l3.m_ipv6->getPayloadLen();
1346 m_payload = (uint8_t *)(packetBase +offset);
1348 m_packet_padding = m_packet->getTotalLen() - real_pkt_size;
1349 assert( m_packet->getTotalLen()>= real_pkt_size );
1354 static uint8_t cbuff[MAX_PKT_SIZE];
1356 bool CPacketIndication::ConvertPacketToIpv6InPlace(CCapPktRaw * pkt,
1359 // Copy l2 data and set l2 type to ipv6
1360 memcpy(cbuff, pkt->raw, offset);
1361 *(uint16_t*)(cbuff+12) = PKT_HTONS(EthernetHeader::Protocol::IPv6);
1363 // Create the ipv6 header
1364 IPHeader *ipv4 = (IPHeader *) (pkt->raw+offset);
1365 IPv6Header *ipv6 = (IPv6Header *) (cbuff+offset);
1366 uint8_t ipv6_hdrlen = ipv6->getHeaderLength();
1367 memset(ipv6,0,ipv6_hdrlen);
1368 ipv6->setVersion(6);
1369 if (ipv4->getTotalLength() < ipv4->getHeaderLength()) {
1372 // Calculate the payload length
1373 uint16_t p_len = ipv4->getTotalLength() - ipv4->getHeaderLength();
1374 ipv6->setPayloadLen(p_len);
1375 uint8_t l4_proto = ipv4->getProtocol();
1376 ipv6->setNextHdr(l4_proto);
1377 ipv6->setHopLimit(64);
1379 // Update the least signficant 32-bits of ipv6 address
1380 // using the ipv4 address
1381 ipv6->updateLSBIpv6Src(ipv4->getSourceIp());
1382 ipv6->updateLSBIpv6Dst(ipv4->getDestIp());
1384 // Copy rest of packet
1385 uint16_t ipv4_offset = offset + ipv4->getHeaderLength();
1386 uint16_t ipv6_offset = offset + ipv6_hdrlen;
1387 memcpy(cbuff+ipv6_offset,pkt->raw+ipv4_offset,p_len);
1390 memcpy(pkt->raw,cbuff,ipv6_offset);
1392 // Set packet length
1393 pkt->pkt_len = ipv6_offset;
1399 void CPacketIndication::ProcessPacket(CPacketParser *parser,
1401 _ProcessPacket(parser,pkt);
1402 if ( m_desc.IsValidPkt() ){
1403 UpdateOffsets(); /* update fast offsets */
1407 /* process packet */
1408 void CPacketIndication::_ProcessPacket(CPacketParser *parser,
1414 CCPacketParserCounters * m_cnt=&parser->m_counter;
1418 packetBase = m_packet->raw;
1419 BP_ASSERT(packetBase);
1420 m_ether = (EthernetHeader *)packetBase;
1424 switch( m_ether->getNextProtocol() ) {
1425 case EthernetHeader::Protocol::IP :
1427 l3.m_ipv4 =(IPHeader *)(packetBase+offset);
1429 case EthernetHeader::Protocol::IPv6 :
1431 l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
1434 case EthernetHeader::Protocol::VLAN :
1436 switch ( m_ether->getVlanProtocol() ){
1437 case EthernetHeader::Protocol::IP:
1439 l3.m_ipv4 =(IPHeader *)(packetBase+offset);
1441 case EthernetHeader::Protocol::IPv6 :
1443 l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
1446 case EthernetHeader::Protocol::MPLS_Multicast :
1447 case EthernetHeader::Protocol::MPLS_Unicast :
1451 case EthernetHeader::Protocol::ARP :
1457 return ; /* Non IP */
1460 case EthernetHeader::Protocol::ARP :
1462 return; /* Non IP */
1465 case EthernetHeader::Protocol::MPLS_Multicast :
1466 case EthernetHeader::Protocol::MPLS_Unicast :
1468 return; /* Non IP */
1473 return; /* Non IP */
1476 if (is_ipv6() == false) {
1477 if( (14+20) > (uint32_t)( m_packet->getTotalLen()) ){
1478 m_cnt->m_ip_length_error++;
1483 // For now, we can not mix ipv4 and ipv4 packets
1484 // so we require --ipv6 option be set for ipv6 packets
1485 if ((m_is_ipv6) && (CGlobalInfo::is_ipv6_enable() == false)){
1486 fprintf(stderr,"ERROR --ipv6 must be set to process ipv6 packets\n");
1490 // Convert to Ipv6 if requested and not already Ipv6
1491 if ((CGlobalInfo::is_ipv6_enable()) && (is_ipv6() == false )) {
1492 if (ConvertPacketToIpv6InPlace(pkt, offset) == false){
1493 /* Move to next packet as this was erroneous */
1494 printf(" unable to convert packet to IPv6, skipping...\n");
1500 ProcessIpv6Packet(parser,offset);
1502 ProcessIpPacket(parser,offset);
1508 void CFlowTableStats::Clear(){
1518 void CFlowTableStats::Dump(FILE *fd){
1529 void CFlow::Dump(FILE *fd){
1530 fprintf(fd," fif is swap : %d \n",is_fif_swap);
1534 void CFlowTableManagerBase::Dump(FILE *fd){
1538 // Return flow that has given key. If flow does not exist, create one, and add to CFlow data structure.
1539 // key - key to lookup by.
1540 // is_fif - return: true if flow did not exist (This is the first packet we see in this flow).
1541 // false if flow already existed
1542 CFlow * CFlowTableManagerBase::process(const CFlowKey & key, bool & is_fif) {
1545 CFlow * lp=lookup(key);
1557 m_stats.m_fif_err++;
1563 bool CFlowTableMap::Create(int max_size){
1568 void CFlowTableMap::Delete(){
1572 void CFlowTableMap::remove(const CFlowKey & key ) {
1573 CFlow *lp=lookup(key);
1585 CFlow * CFlowTableMap::lookup(const CFlowKey & key ) {
1586 flow_map_t::iterator iter;
1587 iter = m_map.find(key);
1588 if (iter != m_map.end() ) {
1589 return ( (*iter).second );
1591 return (( CFlow*)0);
1595 CFlow * CFlowTableMap::add(const CFlowKey & key ) {
1596 CFlow * flow = new CFlow();
1597 m_map.insert(flow_map_t::value_type(key,flow));
1601 void CFlowTableMap::remove_all(){
1602 if ( m_map.empty() )
1605 for (it= m_map.begin(); it != m_map.end(); ++it) {
1606 CFlow *lp = it->second;
1612 uint64_t CFlowTableMap::count(){
1613 return ( m_map.size());
1618 * This function will insert an IP option header containing metadata for the
1621 * An mbuf is created to hold the new option header plus the portion of the
1622 * packet after the base IP header (includes any IP options header that might
1623 * exist). This mbuf is then linked into the existing mbufs (becoming the
1626 * Note that the rxcheck option header is inserted as the first option header,
1627 * and any existing IP option headers are placed after it.
1629 void CFlowPktInfo::do_generate_new_mbuf_rxcheck(rte_mbuf_t * m,
1633 /* retrieve size of rx-check header, must be multiple of 8 */
1634 uint16_t opt_len = RX_CHECK_LEN;
1635 uint16_t current_opt_len = 0;
1636 assert( (opt_len % 8) == 0 );
1638 /* determine starting move location */
1639 char *mp1 = rte_pktmbuf_mtod(m, char*);
1640 uint16_t mp1_offset = m_pkt_indication.getFastIpOffsetFast();
1641 if (unlikely (m_pkt_indication.is_ipv6()) ) {
1642 mp1_offset += IPv6Header::DefaultSize;
1644 mp1_offset += IPHeader::DefaultSize;
1646 char *move_from = mp1 + mp1_offset;
1648 /* determine size of new mbuf required */
1649 uint16_t move_len = m->data_len - mp1_offset;
1650 uint16_t new_mbuf_size = move_len + opt_len;
1651 uint16_t mp2_offset = opt_len;
1653 /* obtain a new mbuf */
1654 rte_mbuf_t * new_mbuf = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), new_mbuf_size);
1656 char * mp2 = rte_pktmbuf_append(new_mbuf, new_mbuf_size);
1657 char * move_to = mp2 + mp2_offset;
1659 /* move part of packet from first mbuf to new mbuf */
1660 memmove(move_to, move_from, move_len);
1662 /* trim first mbuf and set pointer to option header*/
1663 CRx_check_header *rxhdr;
1664 uint16_t buf_adjust = move_len;
1665 rxhdr = (CRx_check_header *)mp2;
1666 m->data_len -= buf_adjust;
1668 /* insert rx-check data as an IPv4 option header or IPv6 extension header*/
1669 CFlowPktInfo * lp=node->m_pkt_info;
1670 CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
1672 /* set option type and update ip header length */
1673 IPHeader * ipv4=(IPHeader *)(mp1 + 14);
1674 if (unlikely (m_pkt_indication.is_ipv6()) ) {
1675 IPv6Header * ipv6=(IPv6Header *)(mp1 + 14);
1676 uint8_t save_header= ipv6->getNextHdr();
1677 ipv6->setNextHdr(RX_CHECK_V6_OPT_TYPE);
1678 ipv6->setHopLimit(TTL_RESERVE_DUPLICATE);
1679 ipv6->setTrafficClass(ipv6->getTrafficClass()|TOS_TTL_RESERVE_DUPLICATE);
1680 ipv6->setPayloadLen( ipv6->getPayloadLen() +
1681 sizeof(CRx_check_header));
1682 rxhdr->m_option_type = save_header;
1683 rxhdr->m_option_len = RX_CHECK_V6_OPT_LEN;
1685 current_opt_len = ipv4->getHeaderLength();
1686 ipv4->setHeaderLength(current_opt_len+opt_len);
1687 ipv4->setTotalLength(ipv4->getTotalLength()+opt_len);
1688 ipv4->setTimeToLive(TTL_RESERVE_DUPLICATE);
1689 ipv4->setTOS(ipv4->getTOS()|TOS_TTL_RESERVE_DUPLICATE);
1691 rxhdr->m_option_type = RX_CHECK_V4_OPT_TYPE;
1692 rxhdr->m_option_len = RX_CHECK_V4_OPT_LEN;
1695 /* fill in the rx-check metadata in the options header */
1696 if ( CGlobalInfo::m_options.is_rxcheck_const_ts() ){
1697 /* Runtime flag to use a constant value for the timestamp field. */
1698 /* This is used by simulation to provide consistency across runs. */
1699 rxhdr->m_time_stamp = 0xB3B2B1B0;
1701 rxhdr->m_time_stamp = os_get_hr_tick_32();
1703 rxhdr->m_magic = RX_CHECK_MAGIC;
1704 rxhdr->m_flow_id = node->m_flow_id | ( ( (uint64_t)(desc->getFlowId() & 0xf))<<52 ) ; // include thread_id, node->flow_id, sub_flow in case of multi-flow template
1706 rxhdr->m_aging_sec = desc->GetMaxFlowTimeout();
1707 rxhdr->m_template_id = (uint8_t)desc->getId();
1709 /* add the flow packets goes to the same port */
1711 rxhdr->m_pkt_id = desc->getFlowPktNum();
1712 rxhdr->m_flow_size = desc->GetMaxPktsPerFlow();
1715 rxhdr->m_pkt_id = desc->GetDirInfo()->GetPktNum();
1716 rxhdr->m_flow_size = desc->GetDirInfo()->GetMaxPkts();
1718 rxhdr->set_dir(desc->IsInitSide()?1:0);
1719 rxhdr->set_both_dir(desc->IsBiDirectionalFlow()?1:0);
1722 /* update checksum for IPv4, split across 2 mbufs */
1723 if (likely ( ! m_pkt_indication.is_ipv6()) ) {
1724 ipv4->updateCheckSum2((uint8_t *)ipv4, current_opt_len, (uint8_t *)rxhdr, opt_len);
1728 new_mbuf->next = m->next;
1729 new_mbuf->nb_segs++;
1732 m->pkt_len += opt_len;
1736 char * CFlowPktInfo::push_ipv4_option_offline(uint8_t bytes){
1737 /* must be align by 4*/
1738 assert( (bytes % 4)== 0 );
1739 assert(m_pkt_indication.is_ipv6()==false);
1740 if ( m_pkt_indication.l3.m_ipv4->getHeaderLength()+bytes>60 ){
1741 printf(" ERROR ipv4 options size is too big, should be able to add %d bytes for internal info \n",bytes);
1744 /* now we can do that !*/
1746 /* add more bytes to the packet */
1747 m_packet->append(bytes);
1748 uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPHeader::DefaultSize;
1749 char *p=m_packet->raw+ip_offset_to_move;
1750 uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
1752 /* move the start of ipv4 options */
1753 memmove(p+bytes ,p, bytes_to_move);
1755 /* fix all other stuff */
1756 if ( m_pkt_indication.m_udp_tcp_offset ){
1757 m_pkt_indication.m_udp_tcp_offset+=bytes;
1759 if ( m_pkt_indication.m_payload_offset ) {
1760 m_pkt_indication.m_payload_offset+=bytes;
1763 m_pkt_indication.RefreshPointers();
1764 /* now pointer are updated we can manipulate ipv4 header */
1765 IPHeader * ipv4=m_pkt_indication.l3.m_ipv4;
1767 ipv4->setTotalLength(ipv4->getTotalLength()+bytes);
1768 ipv4->setHeaderLength(ipv4->getHeaderLength()+(bytes));
1770 m_pkt_indication.UpdatePacketPadding();
1772 /* refresh the global mbuf */
1778 void CFlowPktInfo::mask_as_learn(){
1780 if ( m_pkt_indication.is_ipv6() ) {
1781 lpNat=(CNatOption *)push_ipv6_option_offline(CNatOption::noOPTION_LEN);
1782 lpNat->set_init_ipv6_header();
1784 lpNat->set_thread_id(0);
1786 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_IP_OPTION)) {
1787 // Make space in IPv4 header for NAT option
1788 lpNat=(CNatOption *)push_ipv4_option_offline(CNatOption::noOPTION_LEN);
1789 lpNat->set_init_ipv4_header();
1791 lpNat->set_thread_id(0);
1792 m_pkt_indication.l3.m_ipv4->updateCheckSum();
1795 m_pkt_indication.m_desc.SetLearn(true);
1799 char * CFlowPktInfo::push_ipv6_option_offline(uint8_t bytes){
1801 /* must be align by 8*/
1802 assert( (bytes % 8)== 0 );
1803 assert(m_pkt_indication.is_ipv6()==true);
1805 /* add more bytes to the packet */
1806 m_packet->append(bytes);
1807 uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPv6Header::DefaultSize;
1808 char *p=m_packet->raw+ip_offset_to_move;
1809 uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
1811 /* move the start of ipv4 options */
1812 memmove(p+bytes ,p, bytes_to_move);
1814 /* fix all other stuff */
1815 if ( m_pkt_indication.m_udp_tcp_offset ){
1816 m_pkt_indication.m_udp_tcp_offset+=bytes;
1818 if ( m_pkt_indication.m_payload_offset ) {
1819 m_pkt_indication.m_payload_offset+=bytes;
1822 m_pkt_indication.RefreshPointers();
1823 /* now pointer are updated we can manipulate ipv6 header */
1824 IPv6Header * ipv6=m_pkt_indication.l3.m_ipv6;
1826 ipv6->setPayloadLen(ipv6->getPayloadLen()+bytes);
1827 uint8_t save_header= ipv6->getNextHdr();
1828 *p=save_header; /* copy next header */
1829 ipv6->setNextHdr(CNatOption::noIPV6_OPTION);
1831 m_pkt_indication.UpdatePacketPadding();
1833 /* refresh the global mbuf */
1840 void CFlowPktInfo::alloc_const_mbuf(){
1842 if ( m_packet->pkt_len > FIRST_PKT_SIZE ) {
1843 /* pkt size is bigger than FIRST_PKT_SIZE let's create an offline buffer */
1845 for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
1846 if ( CGlobalInfo::m_socket.is_sockets_enable(i) ){
1849 uint16_t pkt_s=(m_packet->pkt_len - FIRST_PKT_SIZE);
1851 m = CGlobalInfo::pktmbuf_alloc(i,pkt_s);
1853 char *p=rte_pktmbuf_append(m, pkt_s);
1854 rte_memcpy(p,(m_packet->raw+FIRST_PKT_SIZE),pkt_s);
1856 assert(m_big_mbuf[i]==NULL);
1863 void CFlowPktInfo::free_const_mbuf(){
1865 for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
1866 rte_mbuf_t * m=m_big_mbuf[i];
1868 rte_pktmbuf_free(m );
1875 bool CFlowPktInfo::Create(CPacketIndication * pkt_ind){
1876 /* clone the packet*/
1877 m_packet = new CCapPktRaw(pkt_ind->m_packet);
1878 /* clone of the offsets */
1879 m_pkt_indication.Clone(pkt_ind,m_packet);
1882 for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
1883 m_big_mbuf[i] = NULL;
1889 void CFlowPktInfo::Delete(){
1894 void CFlowPktInfo::Dump(FILE *fd){
1895 m_pkt_indication.Dump(fd,0);
1901 void CCapFileFlowInfo::save_to_erf(std::string cap_file_name,int pcap){
1903 fprintf(stderr,"ERROR no info for this flow ");
1906 capture_type_e file_type=ERF;
1912 CFileWriterBase * lpWriter=CCapWriterFactory::CreateWriter(file_type,(char *)cap_file_name.c_str());
1913 if (lpWriter == NULL) {
1914 fprintf(stderr,"ERROR can't create cap file %s ",(char *)cap_file_name.c_str());
1919 for (i=0; i<(int)Size(); i++) {
1920 CFlowPktInfo * lp=GetPacket((uint32_t)i);
1921 bool res=lpWriter->write_packet(lp->m_packet);
1929 struct CTmpFlowPerDirInfo {
1930 CTmpFlowPerDirInfo(){
1937 class CTmpFlowInfo {
1941 m_max_aging_sec=0.0;
1948 uint32_t m_max_pkts;
1949 dsec_t m_max_aging_sec;
1952 CTmpFlowPerDirInfo m_per_dir[CS_NUM];
1955 typedef CTmpFlowInfo * flow_tmp_t;
1956 typedef std::map<uint16_t, flow_tmp_t> flow_tmp_map_t;
1957 typedef flow_tmp_map_t::iterator flow_tmp_map_iter_t;
1959 enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::is_valid_template_load_time(){
1961 for (i=0; i<Size(); i++) {
1962 CFlowPktInfo * lp= GetPacket((uint32_t)i);
1963 CPacketIndication * lpd=&lp->m_pkt_indication;
1964 if ( lpd->getEtherOffset() !=0 ){
1965 fprintf(stderr, "Error: Bad CAP file. Ether offset start is not 0 in packet %d \n", i+1);
1969 if ( CGlobalInfo::is_learn_mode() ) {
1970 // We change TCP ports. Need them to be in first 64 byte mbuf.
1971 // Since we also add IP option, and rx-check feature might add another IP option, better not allow
1972 // OP options in this mode. If needed this limitation can be refined a bit.
1973 if ( lpd->getTcpOffset() - lpd->getIpOffset() != 20 ) {
1974 fprintf(stderr, "Error: Bad CAP file. In learn (NAT) mode, no IP options allowed \n");
1975 return kIPOptionNotAllowed;
1977 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
1978 if (lpd->getIpProto() != IPPROTO_TCP && !lpd->m_desc.IsInitSide()) {
1979 fprintf(stderr, "Error: In the chosen learn mode, all packets from server to client in CAP file should be TCP.\n");
1980 fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
1981 return kNoTCPFromServer;
1987 if ( CGlobalInfo::is_learn_mode() ) {
1988 CPacketIndication &pkt_0_indication = GetPacket(0)->m_pkt_indication;
1990 if ( pkt_0_indication.m_desc.IsPluginEnable() ) {
1991 fprintf(stderr, "Error: plugins are not supported with --learn mode \n");
1992 return kPlugInWithLearn;
1995 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
1996 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
1999 , "Error: In the chosen learn mode, need at least the 3 TCP handshake packets.\n");
2001 , " Please give different CAP file, or try different --learn-mode\n");
2002 return kTCPLearnModeBadFlow;
2005 CPacketIndication &pkt_1_indication = GetPacket(1)->m_pkt_indication;
2008 // verify first packet is TCP SYN from client
2009 TCPHeader *tcp = (TCPHeader *)(pkt_0_indication.getBasePtr() + pkt_0_indication.getTcpOffset());
2010 if ( (! pkt_0_indication.m_desc.IsInitSide()) || (! tcp->getSynFlag()) ) {
2011 fprintf(stderr, "Error: In the chosen learn mode, first TCP packet should be SYN from client side.\n");
2012 fprintf(stderr, " In cap file, first packet side direction is %s. TCP header is:\n"
2013 , pkt_0_indication.m_desc.IsInitSide() ? "outside":"inside");
2015 fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
2019 // We want at least the TCP flags to be inside first mbuf
2020 if (pkt_0_indication.getTcpOffset() + 14 > FIRST_PKT_SIZE) {
2022 , "Error: In the chosen learn mode, TCP flags offset should be less than %d, but it is %d.\n"
2023 , FIRST_PKT_SIZE, pkt_0_indication.getTcpOffset() + 14);
2024 fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
2025 return kTCPOffsetTooBig;
2027 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
2028 // To support TCP seq randomization from server to client, we need second packet in flow to be the server SYN+ACK
2030 if (pkt_1_indication.getIpProto() != IPPROTO_TCP) {
2033 TCPHeader *tcp = (TCPHeader *)(pkt_1_indication.getBasePtr() + pkt_1_indication.getTcpOffset());
2034 if ( (! tcp->getSynFlag()) || (! tcp->getAckFlag()) || ( pkt_1_indication.m_desc.IsInitSide())) {
2039 fprintf(stderr, "Error: In the chosen learn mode, second packet should be SYN+ACK from server.\n");
2040 fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
2041 return kNoTCPSynAck;
2044 CPacketIndication &pkt_2_indication = GetPacket(2)->m_pkt_indication;
2045 if ( (! pkt_2_indication.m_desc.IsInitSide()) ) {
2047 , "Error: Wrong third packet. In the chosen learn mode, need at least the 3 TCP handshake packets.\n");
2049 , " Please give different CAP file, or try different --learn-mode\n");
2050 return kTCPLearnModeBadFlow;
2052 if ((pkt_0_indication.m_cap_ipg < (double)LEARN_MODE_MIN_IPG / 1000)
2053 || (pkt_1_indication.m_cap_ipg < (double)LEARN_MODE_MIN_IPG / 1000)) {
2055 , "Error: Bad cap file timings. In the chosen learn mode");
2056 fprintf(stderr, "IPG between TCP handshake packets should be at least %d msec.\n", LEARN_MODE_MIN_IPG);
2057 fprintf(stderr, " Current delay is %f between second and first, %f between third and second"
2058 , pkt_0_indication.m_cap_ipg, pkt_1_indication.m_cap_ipg);
2060 , " Please give different CAP file, try different --learn-mode, or edit ipg parameters in template file\n");
2061 return kTCPIpgTooLow;
2072 * update global info
2074 * 2. per sub-flow pkt_num/max-pkt per dir and per global
2076 void CCapFileFlowInfo::update_info(){
2077 flow_tmp_map_iter_t iter;
2079 CTmpFlowInfo * lpFlow;
2083 // first iteration, lern all the info into a temp flow table
2084 for (i=0; i<Size(); i++) {
2085 CFlowPktInfo * lp= GetPacket((uint32_t)i);
2087 CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
2088 uint16_t flow_id = desc->getFlowId();
2089 CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
2090 pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
2093 iter = ft.find(flow_id);
2094 if (iter != ft.end() ) {
2095 lpFlow=(*iter).second;
2097 lpFlow = new CTmpFlowInfo();
2099 ft.insert(flow_tmp_map_t::value_type(flow_id,lpFlow));
2105 lpCurPacket->SetPktNum(lpFlow->m_per_dir[dir].m_pkt_id);
2106 lpFlow->m_max_pkts++;
2107 lpFlow->m_per_dir[dir].m_pkt_id++;
2109 dsec_t delta = ctime - lpFlow->m_last_pkt ;
2110 lpFlow->m_last_pkt = ctime;
2111 if (delta > lpFlow->m_max_aging_sec) {
2112 lpFlow->m_max_aging_sec = delta;
2114 // per direction info
2117 ctime += lp->m_pkt_indication.m_cap_ipg;
2122 for (i=0; i<Size(); i++) {
2123 CFlowPktInfo * lp= GetPacket((uint32_t)i);
2125 CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
2126 uint16_t flow_id = desc->getFlowId();
2127 CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
2128 pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
2130 iter = ft.find(flow_id);
2131 assert( iter != ft.end() );
2132 lpFlow=(*iter).second;
2134 if ( (lpFlow->m_per_dir[0].m_pkt_id >0) &&
2135 (lpFlow->m_per_dir[1].m_pkt_id >0) ) {
2136 /* we have both dir */
2137 lp->m_pkt_indication.m_desc.SetBiPluginEnable(true);
2141 lpCurPacket->SetMaxPkts(lpFlow->m_per_dir[dir].m_pkt_id);
2142 lp->m_pkt_indication.m_desc.SetMaxPktsPerFlow(lpFlow->m_max_pkts);
2143 lp->m_pkt_indication.m_desc.SetMaxFlowTimeout(lpFlow->m_max_aging_sec);
2147 /* in case of learn mode , we need to mark the first packet */
2148 if ( CGlobalInfo::is_learn_mode() ) {
2149 CFlowPktInfo * lp= GetPacket(0);
2151 /* only for bi directionl traffic mask the learn flag , only for the first packet */
2152 if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ){
2153 lp->mask_as_learn();
2156 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
2157 // In this mode, we need to see the SYN+ACK as well.
2160 lp->m_pkt_indication.setTTL(TTL_RESERVE_DUPLICATE);
2161 lp->m_pkt_indication.setTOSReserve();
2168 flow_tmp_map_iter_t it;
2169 for (it= ft.begin(); it != ft.end(); ++it) {
2170 CTmpFlowInfo *lp = it->second;
2178 enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::load_cap_file(std::string cap_file, uint16_t _id, uint8_t plugin_id) {
2181 fprintf(stdout," -- loading cap file %s \n",cap_file.c_str());
2182 CPacketParser parser;
2183 CPacketIndication pkt_indication;
2184 CCapReaderBase * lp=CCapReaderFactory::CreateReader((char *)cap_file.c_str(),0);
2187 printf(" ERROR file %s does not exist or not supported \n",(char *)cap_file.c_str());
2188 return kFileNotExist;
2190 bool multi_flow_enable =( (plugin_id!=0)?true:false);
2200 CFlow * first_flow=0;
2201 bool first_flow_fif_is_swap=false;
2202 bool time_was_set=false;
2203 double last_time=0.0;
2204 CCapPktRaw raw_packet;
2208 if ( lp->ReadPacket(&raw_packet) ==false ){
2213 if ( !time_was_set ){
2214 last_time=raw_packet.get_time();
2217 if (raw_packet.get_time()<last_time) {
2218 fprintf(stderr, "Error: Non valid pcap file. Timestamp is negative at packet %d\n", cnt);
2219 return kNegTimestamp;
2221 last_time=raw_packet.get_time();
2224 if ( parser.ProcessPacket(&pkt_indication, &raw_packet) ){
2226 if ( pkt_indication.m_desc.IsValidPkt() ) {
2227 pkt_indication.m_desc.SetPluginEnable(multi_flow_enable);
2228 pkt_indication.m_desc.SetPluginId(plugin_id);
2230 pkt_indication.m_desc.SetId(_id);
2232 CFlow * lpflow=flow.process(pkt_indication.m_flow_key,is_fif);
2233 m_total_bytes += pkt_indication.m_packet->pkt_len;
2234 pkt_indication.m_cap_ipg = raw_packet.get_time();
2236 pkt_indication.m_flow =lpflow;
2237 pkt_indication.m_desc.SetFlowPktNum(lpflow->pkt_id);
2238 /* inc pkt_id inside the flow */
2241 /* check that we don't have reserve TTL for duplication */
2242 uint8_t ttl = pkt_indication.getTTL();
2243 if ( (ttl == TTL_RESERVE_DUPLICATE) ||
2244 (ttl == (TTL_RESERVE_DUPLICATE-1)) ) {
2245 pkt_indication.setTTL(TTL_RESERVE_DUPLICATE-4);
2248 pkt_indication.clearTOSReserve();
2251 // Validation for first packet in flow
2253 lpflow->flow_id = m_total_flows;
2254 pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
2256 if (m_total_flows == 0) {
2258 first_flow =lpflow;/* save it for single flow support , to signal error */
2259 lpflow->is_fif_swap =pkt_indication.m_desc.IsSwapTuple();
2260 first_flow_fif_is_swap = pkt_indication.m_desc.IsSwapTuple();
2261 pkt_indication.m_desc.SetInitSide(true);
2262 Append(&pkt_indication);
2265 if ( multi_flow_enable ) {
2266 lpflow->is_fif_swap = pkt_indication.m_desc.IsSwapTuple();
2267 /* in respect to the first flow */
2268 bool init_side_in_repect_to_first_flow =
2269 ((first_flow_fif_is_swap?true:false) == lpflow->is_fif_swap)?true:false;
2270 pkt_indication.m_desc.SetInitSide(init_side_in_repect_to_first_flow);
2271 Append(&pkt_indication);
2274 printf("More than one flow in this cap. Ignoring it !! \n");
2275 pkt_indication.m_flow_key.Dump(stderr);
2280 pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
2282 if ( multi_flow_enable ==false ){
2283 if (lpflow == first_flow) {
2286 ((lpflow->is_fif_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
2287 pkt_indication.m_desc.SetInitSide( init_side );
2288 Append(&pkt_indication);
2290 //printf(" more than one flow in this cap ignot it !! \n");
2294 /* support multi-flow, */
2296 /* work in respect to first flow */
2298 ((first_flow_fif_is_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
2299 pkt_indication.m_desc.SetInitSide( init_side );
2300 Append(&pkt_indication);
2305 fprintf(stderr, "ERROR packet %d is not supported, should be Ethernet/IP(0x0800)/(TCP|UDP) format try to convert it using Wireshark !\n",cnt);
2309 fprintf(stderr, "ERROR packet %d is not supported, should be Ethernet/IP(0x0800)/(TCP|UDP) format try to convert it using Wireshark !\n",cnt);
2310 return kPktProcessFail;
2315 CFlowPktInfo * last_pkt =GetPacket((uint32_t)(Size()-1));
2316 last_pkt->m_pkt_indication.m_desc.SetIsLastPkt(true);
2320 for (i=1; i<Size(); i++) {
2321 CFlowPktInfo * lp_prev= GetPacket((uint32_t)i-1);
2322 CFlowPktInfo * lp= GetPacket((uint32_t)i);
2324 lp_prev->m_pkt_indication.m_cap_ipg = lp->m_pkt_indication.m_cap_ipg-
2325 lp_prev->m_pkt_indication.m_cap_ipg;
2326 if ( lp->m_pkt_indication.m_desc.IsInitSide() !=
2327 lp_prev->m_pkt_indication.m_desc.IsInitSide()) {
2328 lp_prev->m_pkt_indication.m_desc.SetRtt(true);
2332 GetPacket((uint32_t)Size()-1)->m_pkt_indication.m_cap_ipg=0.0;
2333 m_total_errors += parser.m_counter.getTotalErrors();
2339 //flow.Dump(stdout);
2341 //parser.Dump(stdout);
2343 //fprintf(stdout," -- finish loading cap file \n");
2344 //fprintf(stdout,"\n");
2346 if ( m_total_errors > 0 ) {
2347 parser.m_counter.Dump(stdout);
2348 fprintf(stderr, " ERORR in one of the cap file, you should have one flow per cap file or valid plugin \n");
2354 void CCapFileFlowInfo::update_pcap_mode(){
2356 for (i=0; i<(int)Size(); i++) {
2357 CFlowPktInfo * lp=GetPacket((uint32_t)i);
2358 lp->m_pkt_indication.m_desc.SetPcapTiming(true);
2362 void CCapFileFlowInfo::get_total_memory(CCCapFileMemoryUsage & memory){
2365 for (i=0; i<(int)Size(); i++) {
2366 CFlowPktInfo * lp=GetPacket((uint32_t)i);
2367 if ( lp->m_packet->pkt_len > FIRST_PKT_SIZE ) {
2368 memory.add_size(lp->m_packet->pkt_len - FIRST_PKT_SIZE);
2374 double CCapFileFlowInfo::get_cap_file_length_sec(){
2377 for (i=0; i<(int)Size(); i++) {
2378 CFlowPktInfo * lp=GetPacket((uint32_t)i);
2379 sum+=lp->m_pkt_indication.m_cap_ipg;
2385 void CCapFileFlowInfo::update_min_ipg(dsec_t min_ipg,
2386 dsec_t override_ipg){
2389 for (i=0; i<(int)Size(); i++) {
2390 CFlowPktInfo * lp=GetPacket((uint32_t)i);
2391 if ( lp->m_pkt_indication.m_cap_ipg < min_ipg ){
2392 lp->m_pkt_indication.m_cap_ipg=override_ipg;
2394 if ( lp->m_pkt_indication.m_cap_ipg < override_ipg ){
2395 lp->m_pkt_indication.m_cap_ipg=override_ipg;
2401 void CCapFileFlowInfo::Dump(FILE *fd){
2405 //CCapPacket::DumpHeader(fd);
2406 for (i=0; i<(int)Size(); i++) {
2407 fprintf(fd,"pkt_id : %d \n",i+1);
2408 fprintf(fd,"-----------\n");
2409 CFlowPktInfo * lp=GetPacket((uint32_t)i);
2414 // add pkt indication
2415 void CCapFileFlowInfo::Append(CPacketIndication * pkt_indication){
2418 lp = new CFlowPktInfo();
2419 lp->Create( pkt_indication );
2420 m_flow_pkts.push_back(lp);
2425 void CCCapFileMemoryUsage::Add(const CCCapFileMemoryUsage & obj){
2427 for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
2428 m_buf[i] += obj.m_buf[i];
2430 m_total_bytes +=obj.m_total_bytes;
2435 void CCCapFileMemoryUsage::dump(FILE *fd){
2436 fprintf(fd, " Memory usage \n");
2438 int c_size=CCCapFileMemoryUsage::SIZE_MIN;
2441 for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
2442 fprintf(fd," size_%-7d : %lu \n",c_size, (ulong)m_buf[i]);
2443 c_total +=m_buf[i]*c_size;
2446 fprintf(fd," Total : %s %.0f%% util due to buckets \n",double_to_human_str(c_total,"bytes",KBYE_1024).c_str(),100.0*float(c_total)/float(m_total_bytes) );
2450 bool CCapFileFlowInfo::Create(){
2458 void CCapFileFlowInfo::dump_pkt_sizes(void){
2460 for (i=0; i<(int)Size(); i++) {
2461 flow_pkt_info_t lp=GetPacket((uint32_t)i);
2463 node.m_dest_ip = 0x10000110;
2464 node.m_src_ip = 0x20000110;
2465 node.m_src_port = 12;
2466 rte_mbuf_t * buf=lp->generate_new_mbuf(&node);
2467 //rte_pktmbuf_dump(buf, buf->pkt_len);
2468 rte_pktmbuf_free(buf);
2472 void CCapFileFlowInfo::RemoveAll(){
2477 for (i=0; i<(int)Size(); i++) {
2478 flow_pkt_info_t lp=GetPacket((uint32_t)i);
2482 // free all the pointers
2483 m_flow_pkts.clear();
2486 void CCapFileFlowInfo::Delete(){
2490 void operator >> (const YAML::Node& node, CFlowYamlDpPkt & fi) {
2492 node["pkt_id"] >> val;
2493 fi.m_pkt_id =(uint8_t)val;
2494 node["pyld_offset"] >> val;
2495 fi.m_pyld_offset =(uint8_t)val;
2496 node["type"] >> val;
2497 fi.m_type =(uint8_t)val;
2499 fi.m_len =(uint8_t)val;
2500 node["mask"] >> val;
2504 void operator >> (const YAML::Node& node, CVlanYamlInfo & fi) {
2507 if ( node.FindValue("enable") ){
2508 node["enable"] >> tmp ;
2510 node["vlan0"] >> tmp;
2511 fi.m_vlan_per_port[0] = tmp;
2512 node["vlan1"] >> tmp;
2513 fi.m_vlan_per_port[1] = tmp;
2519 void operator >> (const YAML::Node& node, CFlowYamlInfo & fi) {
2520 node["name"] >> fi.m_name;
2522 if ( node.FindValue("client_pool") ){
2523 node["client_pool"] >> fi.m_client_pool_name;
2525 fi.m_client_pool_name = "default";
2527 if ( node.FindValue("server_pool") ){
2528 node["server_pool"] >> fi.m_server_pool_name;
2530 fi.m_server_pool_name = "default";
2533 node["cps"] >> fi.m_k_cps;
2534 fi.m_k_cps = fi.m_k_cps/1000.0;
2537 fi.m_ipg_sec =t/1000000.0;
2539 fi.m_rtt_sec = t/1000000.0;
2540 node["w"] >> fi.m_w;
2542 if ( node.FindValue("cap_ipg") ){
2543 node["cap_ipg"] >> fi.m_cap_mode;
2544 fi.m_cap_mode_was_set =true;
2546 fi.m_cap_mode_was_set =false;
2549 if ( node.FindValue("wlength") ){
2550 node["wlength"] >> fi.m_wlength;
2551 fi.m_wlength_set=true;
2553 fi.m_wlength_set=false;
2557 if ( node.FindValue("limit") ){
2558 node["limit"] >> fi.m_limit;
2559 fi.m_limit_was_set = true;
2561 fi.m_limit_was_set = false;
2565 if ( node.FindValue("plugin_id") ){
2566 uint32_t plugin_val;
2567 node["plugin_id"] >> plugin_val;
2568 fi.m_plugin_id=plugin_val;
2574 fi.m_one_app_server_was_set = false;
2575 fi.m_one_app_server = false;
2576 if ( utl_yaml_read_ip_addr(node,
2578 fi.m_server_addr) ){
2580 node["one_app_server"] >> fi.m_one_app_server;
2581 fi.m_one_app_server_was_set=true;
2582 } catch ( const std::exception& e ) {
2583 fi.m_one_app_server_was_set = false;
2584 fi.m_one_app_server = false;
2590 if ( ( fi.m_limit_was_set ) && (fi.m_plugin_id !=0) ){
2591 fprintf(stderr," limit can't be non zero when plugin is set, you must have only one of the options set");
2596 if ( node.FindValue("dyn_pyload") ){
2597 const YAML::Node& dyn_pyload = node["dyn_pyload"];
2598 for(unsigned i=0;i<dyn_pyload.size();i++) {
2600 dyn_pyload[i] >> fd;
2601 if ( fi.m_dpPkt == 0 ){
2602 fi.m_dpPkt = new CFlowYamlDynamicPyloadPlugin();
2603 if (fi.m_plugin_id == 0) {
2604 fi.m_plugin_id = mpDYN_PYLOAD;
2606 fprintf(stderr," plugin should be zero with dynamic pyload program");
2610 fi.m_dpPkt->Add(fd);
2619 void operator >> (const YAML::Node& node, CFlowsYamlInfo & flows_info) {
2621 node["duration"] >> flows_info.m_duration_sec;
2623 if ( node.FindValue("generator") ) {
2624 node["generator"] >> flows_info.m_tuple_gen;
2625 flows_info.m_tuple_gen_was_set =true;
2627 flows_info.m_tuple_gen_was_set =false;
2630 // m_ipv6_set will be true if and only if both src_ipv6
2631 // and dst_ipv6 are provided. These are used to set
2632 // the most significant 96-bits of the IPv6 address; the
2633 // least significant 32-bits come from the ipv4 address
2634 // (what is set above).
2636 // If the IPv6 src/dst is not provided in the yaml file,
2637 // then the most significant 96-bits will be set to 0
2638 // which represents an IPv4-compatible IPv6 address.
2640 // If desired, an IPv4-mapped IPv6 address can be
2641 // formed by providing src_ipv6,dst_ipv6 and specifying
2642 // {0,0,0,0,0,0xffff}
2643 flows_info.m_ipv6_set=true;
2645 if ( node.FindValue("src_ipv6") ) {
2646 const YAML::Node& src_ipv6_info = node["src_ipv6"];
2647 if (src_ipv6_info.size() == 6 ){
2648 for(unsigned i=0;i<src_ipv6_info.size();i++) {
2650 const YAML::Node & node =src_ipv6_info;
2652 flows_info.m_src_ipv6.push_back(fi);
2656 flows_info.m_ipv6_set=false;
2660 if ( node.FindValue("dst_ipv6") ) {
2661 const YAML::Node& dst_ipv6_info = node["dst_ipv6"];
2662 if (dst_ipv6_info.size() == 6 ){
2663 for(unsigned i=0;i<dst_ipv6_info.size();i++) {
2665 const YAML::Node & node =dst_ipv6_info;
2667 flows_info.m_dst_ipv6.push_back(fi);
2671 flows_info.m_ipv6_set=false;
2674 if ( node.FindValue("cap_ipg") ) {
2675 node["cap_ipg"] >> flows_info.m_cap_mode;
2676 flows_info.m_cap_mode_set=true;
2678 flows_info.m_cap_mode=false;
2679 flows_info.m_cap_mode_set=false;
2684 if ( node.FindValue("cap_ipg_min") ) {
2685 node["cap_ipg_min"] >> t ;
2686 flows_info.m_cap_ipg_min = t/1000000.0;
2687 flows_info.m_cap_ipg_min_set=true;
2689 flows_info.m_cap_ipg_min_set=false;
2690 flows_info.m_cap_ipg_min = 20;
2693 if ( node.FindValue("cap_override_ipg") ) {
2694 node["cap_override_ipg"] >> t;
2695 flows_info.m_cap_overide_ipg = t/1000000.0;
2696 flows_info.m_cap_overide_ipg_set = true;
2698 flows_info.m_cap_overide_ipg_set = false;
2699 flows_info.m_cap_overide_ipg = 0;
2702 if (node.FindValue("wlength")) {
2703 node["wlength"] >> flows_info.m_wlength;
2704 flows_info.m_wlength_set=true;
2706 flows_info.m_wlength_set=false;
2707 flows_info.m_wlength =100;
2710 if (node.FindValue("one_app_server")) {
2711 printf("one_app_server should be configured per template. \n"
2712 "Will ignore this configuration\n");
2714 flows_info.m_one_app_server =false;
2715 flows_info.m_one_app_server_was_set=false;
2717 if (node.FindValue("vlan")) {
2718 node["vlan"] >> flows_info.m_vlan_info;
2721 if (node.FindValue("mac_override_by_ip")) {
2722 node["mac_override_by_ip"] >> flows_info.m_mac_replace_by_ip;
2724 flows_info.m_mac_replace_by_ip =false;
2727 const YAML::Node& mac_info = node["mac"];
2728 for(unsigned i=0;i<mac_info.size();i++) {
2730 const YAML::Node & node =mac_info;
2732 flows_info.m_mac_base.push_back(fi);
2735 const YAML::Node& cap_info = node["cap_info"];
2736 for(unsigned i=0;i<cap_info.size();i++) {
2739 fi.m_client_pool_idx =
2740 flows_info.m_tuple_gen.get_client_pool_id(fi.m_client_pool_name);
2741 fi.m_server_pool_idx =
2742 flows_info.m_tuple_gen.get_server_pool_id(fi.m_server_pool_name);
2743 flows_info.m_vec.push_back(fi);
2747 void CVlanYamlInfo::Dump(FILE *fd){
2748 fprintf(fd," vlan enable : %d \n",m_enable);
2749 fprintf(fd," vlan val : %d ,%d \n",m_vlan_per_port[0],m_vlan_per_port[1]);
2753 void CFlowsYamlInfo::Dump(FILE *fd){
2754 fprintf(fd," duration : %f sec \n",m_duration_sec);
2757 if (CGlobalInfo::is_ipv6_enable()) {
2759 fprintf(fd," src_ipv6 : ");
2760 for (idx=0; idx<5; idx++){
2761 fprintf(fd,"%04x:", CGlobalInfo::m_options.m_src_ipv6[idx]);
2763 fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_src_ipv6[5]);
2764 fprintf(fd," dst_ipv6 : ");
2765 for (idx=0; idx<5; idx++){
2766 fprintf(fd,"%04x:", CGlobalInfo::m_options.m_dst_ipv6[idx]);
2768 fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_dst_ipv6[5]);
2770 if ( !m_cap_mode_set ) {
2771 fprintf(fd," cap_ipg : wasn't set \n");
2773 fprintf(fd," cap_ipg : %d \n",m_cap_mode?1:0);
2776 if ( !m_cap_ipg_min_set ){
2777 fprintf(fd," cap_ipg_min : wasn't set \n");
2779 fprintf(fd," cap_ipg_min : %f \n",m_cap_ipg_min);
2782 if ( !m_cap_overide_ipg_set ){
2783 fprintf(fd," cap_override_ipg : wasn't set \n");
2785 fprintf(fd," cap_override_ipg : %f \n",m_cap_overide_ipg);
2788 if ( !m_wlength_set ){
2789 fprintf(fd," wlength : wasn't set \n");
2791 fprintf(fd," m_wlength : %d \n",m_wlength);
2793 fprintf(fd," one_server_for_application : %d \n",m_one_app_server?1:0);
2794 fprintf(fd," one_server_for_application_was_set : %d \n",m_one_app_server_was_set?1:0);
2796 m_vlan_info.Dump(fd);
2798 fprintf(fd," mac base : ");
2800 for (i=0; i<(int)m_mac_base.size(); i++) {
2801 if (i< (int)(m_mac_base.size()-1) ) {
2802 fprintf(fd,"0x%02x,",m_mac_base[i]);
2804 fprintf(fd,"0x%02x",m_mac_base[i]);
2809 fprintf(fd," cap file info \n");
2810 fprintf(fd," ------------- \n");
2811 for (i=0; i<(int)m_vec.size(); i++) {
2819 example for YAML file
2833 bool CFlowsYamlInfo::verify_correctness(uint32_t num_threads) {
2834 if ( m_tuple_gen_was_set ==false ){
2835 printf(" ERROR there must be a generator field in YAML , the old format is deprecated \n");
2836 printf(" This is not supported : \n");
2837 printf(" min_src_ip : 0x10000001 \n");
2838 printf(" max_src_ip : 0x50000001 \n");
2839 printf(" min_dst_ip : 0x60000001 \n");
2840 printf(" max_dst_ip : 0x60000010 \n");
2841 printf(" This is supported : \n");
2842 printf("generator : \n");
2843 printf(" distribution : \"seq\" \n");
2844 printf(" clients_start : \"16.0.0.1\" \n");
2845 printf(" clients_end : \"16.0.1.255\" \n");
2846 printf(" servers_start : \"48.0.0.1\" \n");
2847 printf(" servers_end : \"48.0.0.255\" \n");
2848 printf(" clients_per_gb : 201 \n");
2849 printf(" min_clients : 101 \n");
2850 printf(" dual_port_mask : \"1.0.0.0\" \n");
2851 printf(" tcp_aging : 1 \n");
2852 printf(" udp_aging : 1 \n");
2855 if ( !m_tuple_gen.is_valid(num_threads,is_any_plugin_configured()) ){
2858 /* patch defect trex-54 */
2859 if ( is_any_plugin_configured() ){
2860 /*Plugin is configured. in that case due to a limitation ( defect trex-54 )
2861 the number of servers should be bigger than number of clients */
2864 for (i=0; i<(int)m_vec.size(); i++) {
2865 CFlowYamlInfo * lp=&m_vec[i];
2866 if ( lp->m_plugin_id ){
2867 uint8_t c_idx = lp->m_client_pool_idx;
2868 uint8_t s_idx = lp->m_server_pool_idx;
2869 uint32_t total_clients = m_tuple_gen.m_client_pool[c_idx].getTotalIps();
2870 uint32_t total_servers = m_tuple_gen.m_server_pool[s_idx].getTotalIps();
2871 if ( total_servers < total_clients ){
2872 printf(" Plugin is configured. in that case due to a limitation ( defect trex-54 ) \n");
2873 printf(" the number of servers should be bigger than number of clients \n");
2874 printf(" client_pool_name : %s \n", lp->m_client_pool_name.c_str());
2875 printf(" server_pool_name : %s \n", lp->m_server_pool_name.c_str());
2878 uint32_t mul = total_servers / total_clients;
2879 uint32_t new_server_num = mul * total_clients;
2880 if ( new_server_num != total_servers ) {
2881 printf(" Plugin is configured. in that case due to a limitation ( defect trex-54 ) \n");
2882 printf(" the number of servers should be exact multiplication of the number of clients \n");
2883 printf(" client_pool_name : %s clients %d \n", lp->m_client_pool_name.c_str(),total_clients);
2884 printf(" server_pool_name : %s servers %d should be %d \n", lp->m_server_pool_name.c_str(),total_servers,new_server_num);
2896 int CFlowsYamlInfo::load_from_yaml_file(std::string file_name){
2899 if ( !utl_is_file_exists (file_name) ){
2900 printf(" ERROR file %s does not exist \n",file_name.c_str());
2905 std::ifstream fin((char *)file_name.c_str());
2906 YAML::Parser parser(fin);
2909 parser.GetNextDocument(doc);
2910 for(unsigned i=0;i<doc.size();i++) {
2914 } catch ( const std::exception& e ) {
2915 std::cout << e.what() << "\n";
2919 /* update from user input */
2920 if (CGlobalInfo::m_options.m_duration > 0.1) {
2921 m_duration_sec = CGlobalInfo::m_options.m_duration;
2924 m_is_plugin_configured=false;
2925 for (i=0; i<(int)m_vec.size(); i++) {
2926 m_vec[i].m_k_cps =m_vec[i].m_k_cps*CGlobalInfo::m_options.m_factor;
2927 if (( ! m_vec[i].m_cap_mode_was_set ) && (m_cap_mode_set ) ){
2928 m_vec[i].m_cap_mode = m_cap_mode;
2930 if (( ! m_vec[i].m_wlength_set ) && (m_wlength_set ) ){
2931 m_vec[i].m_wlength = m_wlength;
2934 if (( ! m_vec[i].m_one_app_server_was_set ) && (m_one_app_server_was_set ) ){
2935 m_vec[i].m_one_app_server = m_one_app_server;
2938 if ( m_cap_overide_ipg_set ){
2939 m_vec[i].m_ipg_sec = m_cap_overide_ipg;
2940 m_vec[i].m_rtt_sec = m_cap_overide_ipg;
2943 if ( m_vec[i].m_plugin_id ){
2944 m_is_plugin_configured=true;
2952 void CFlowStats::Clear(){
2969 void CFlowStats::Add(const CFlowStats & obj){
2971 m_pkt += obj.m_pkt ;
2972 m_bytes += obj.m_bytes ;
2973 m_cps += obj.m_cps ;
2974 m_mb_sec += obj.m_mb_sec ;
2975 m_mB_sec += obj.m_mB_sec ;
2976 m_c_flows += obj.m_c_flows ;
2977 m_pps += obj.m_pps ;
2978 m_total_Mbytes +=obj.m_total_Mbytes ;
2979 m_errors +=obj.m_errors;
2980 m_flows +=obj.m_flows ;
2982 m_memory.Add(obj.m_memory);
2986 void CFlowStats::DumpHeader(FILE *fd){
2987 fprintf(fd," %2s,%-40s,%4s,%4s,%5s,%7s,%9s,%9s,%9s,%10s,%5s,%7s,%4s,%4s \n",
2988 "id","name","tps","cps","f-pkts","f-bytes","duration","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows");
2990 void CFlowStats::Dump(FILE *fd){
2991 //"name","cps","f-pkts","f-bytes","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows"
2992 fprintf(fd," %02d, %-40s ,%4.2f,%4.2f, %5.0f , %7.0f ,%7.2f ,%7.2f , %7.2f , %10.0f , %5.0f , %7.0f , %llu , %llu \n",
3005 (unsigned long long)m_errors,
3006 (unsigned long long)m_flows);
3009 bool CFlowGeneratorRecPerThread::Create(CTupleGeneratorSmart * global_gen,
3010 CFlowYamlInfo * info,
3011 CFlowsYamlInfo * yaml_flow_info,
3012 CCapFileFlowInfo * flow_info,
3014 uint32_t thread_id){
3017 m_thread_id =thread_id ;
3019 tuple_gen.Create(global_gen, info->m_client_pool_idx,
3020 info->m_server_pool_idx);
3021 CTupleGenYamlInfo * lpt;
3022 lpt = &yaml_flow_info->m_tuple_gen;
3024 tuple_gen.SetSingleServer(info->m_one_app_server,
3025 info->m_server_addr,
3026 getDualPortId(thread_id),
3027 lpt->m_client_pool[info->m_client_pool_idx].getDualMask()
3030 tuple_gen.SetW(info->m_w);
3036 m_flows_info = yaml_flow_info;
3037 // set policer give bucket size for bursts
3038 m_policer.set_cir(info->m_k_cps*1000.0);
3039 m_policer.set_level(0.0);
3040 m_policer.set_bucket_size(100.0);
3041 /* pointer to global */
3042 m_flow_info = flow_info;
3047 void CFlowGeneratorRecPerThread::Delete(){
3054 void CFlowGeneratorRecPerThread::Dump(FILE *fd){
3055 fprintf(fd," configuration info ");
3056 fprintf(fd," -----------------");
3058 fprintf(fd," -----------------");
3059 m_flow_info->Dump(fd);
3063 void CFlowGeneratorRecPerThread::getFlowStats(CFlowStats * stats){
3065 double t_pkt=(double)m_flow_info->Size();
3066 double t_bytes=(double)m_flow_info->get_total_bytes();
3067 double cps=m_info->m_k_cps *1000.0;
3068 double mb_sec = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
3069 double mB_sec = (cps*t_bytes)/(_1Mb_DOUBLE);
3071 double c_flow_windows_sec=0.0;
3073 if (m_info->m_cap_mode) {
3074 c_flow_windows_sec = m_flow_info->get_cap_file_length_sec();
3076 c_flow_windows_sec = t_pkt * m_info->m_ipg_sec;
3080 double c_flows = cps*c_flow_windows_sec*m_flow_info->get_total_flows();
3081 double pps =cps*t_pkt;
3082 double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
3083 uint64_t errors = m_flow_info->get_total_errors();
3084 uint64_t flows = m_flow_info->get_total_flows();
3088 stats->m_pkt = t_pkt;
3089 stats->m_bytes = t_bytes;
3090 stats->duration_sec = c_flow_windows_sec;
3091 stats->m_name = m_info->m_name.c_str();
3093 stats->m_mb_sec = mb_sec;
3094 stats->m_mB_sec = mB_sec;
3095 stats->m_c_flows = c_flows;
3097 stats->m_total_Mbytes = total_Mbytes;
3098 stats->m_errors = errors;
3099 stats->m_flows = flows;
3104 void CFlowGeneratorRec::Dump(FILE *fd){
3105 fprintf(fd," configuration info ");
3106 fprintf(fd," -----------------");
3108 fprintf(fd," -----------------");
3109 m_flow_info.Dump(fd);
3113 void CFlowGeneratorRec::getFlowStats(CFlowStats * stats){
3115 double t_pkt=(double)m_flow_info.Size();
3116 double t_bytes=(double)m_flow_info.get_total_bytes();
3117 double cps=m_info->m_k_cps *1000.0;
3118 double mb_sec = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
3119 double mB_sec = (cps*t_bytes)/(_1Mb_DOUBLE);
3121 double c_flow_windows_sec=0.0;
3123 if (m_info->m_cap_mode) {
3124 c_flow_windows_sec = m_flow_info.get_cap_file_length_sec();
3126 c_flow_windows_sec = t_pkt * m_info->m_ipg_sec;
3129 m_flow_info.get_total_memory(stats->m_memory);
3132 double c_flows = cps*c_flow_windows_sec;
3133 double pps =cps*t_pkt;
3134 double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
3135 uint64_t errors = m_flow_info.get_total_errors();
3136 uint64_t flows = m_flow_info.get_total_flows();
3140 stats->m_pkt = t_pkt;
3141 stats->m_bytes = t_bytes;
3142 stats->duration_sec = c_flow_windows_sec;
3143 stats->m_name = m_info->m_name.c_str();
3145 stats->m_mb_sec = mb_sec;
3146 stats->m_mB_sec = mB_sec;
3147 stats->m_c_flows = c_flows;
3149 stats->m_total_Mbytes = total_Mbytes;
3150 stats->m_errors = errors;
3151 stats->m_flows = flows;
3155 void CFlowGeneratorRec::fixup_ipg_if_needed(void){
3156 if ( m_flows_info->m_cap_mode ) {
3157 m_flow_info.update_pcap_mode();
3160 if ( (m_flows_info->m_cap_mode) &&
3161 (m_flows_info->m_cap_ipg_min_set) &&
3162 (m_flows_info->m_cap_overide_ipg_set)
3164 m_flow_info.update_min_ipg(m_flows_info->m_cap_ipg_min,
3165 m_flows_info->m_cap_overide_ipg);
3170 bool CFlowGeneratorRec::Create(CFlowYamlInfo * info,
3171 CFlowsYamlInfo * flows_info,
3176 m_flows_info=flows_info;
3177 m_flow_info.Create();
3179 // set policer give bucket size for bursts
3180 m_policer.set_cir(info->m_k_cps*1000.0);
3181 m_policer.set_level(0.0);
3182 m_policer.set_bucket_size(100.0);
3184 int res=m_flow_info.load_cap_file(info->m_name.c_str(),_id,m_info->m_plugin_id);
3186 fixup_ipg_if_needed();
3188 if (m_flow_info.is_valid_template_load_time() != 0) {
3191 m_flow_info.update_info();
3198 void CFlowGeneratorRec::Delete(){
3199 m_flow_info.Delete();
3203 void CGenNode::DumpHeader(FILE *fd){
3204 fprintf(fd," pkt_id,time,fid,pkt_info,pkt,len,type,is_init,is_last,type,thread_id,src_ip,dest_ip,src_port \n");
3208 void CGenNode::free_gen_node(){
3209 rte_mbuf_t * m=get_cache_mbuf();
3210 if ( unlikely(m != NULL) ) {
3211 rte_pktmbuf_free(m);
3217 void CGenNode::Dump(FILE *fd){
3218 fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n",
3220 (unsigned long long)m_flow_id,
3222 (unsigned long long)m_pkt_info->m_pkt_indication.m_packet->pkt_cnt,
3223 m_pkt_info->m_pkt_indication.m_packet->pkt_len,
3224 m_pkt_info->m_pkt_indication.m_desc.getId(),
3225 (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0),
3226 m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(),
3235 void CNodeGenerator::set_vif(CVirtualIF * v_if){
3239 bool CNodeGenerator::Create(CFlowGenListPerThread * parent){
3243 m_realtime_his.Create();
3244 m_last_sync_time_sec = 0;
3249 void CNodeGenerator::Delete(){
3250 m_realtime_his.Delete();
3254 void CNodeGenerator::add_node(CGenNode * mynode){
3255 m_p_queue.push(mynode);
3260 void CNodeGenerator::remove_all(CFlowGenListPerThread * thread){
3262 while (!m_p_queue.empty()) {
3263 node = m_p_queue.top();
3266 if (node->m_type == CGenNode::STATELESS_PKT) {
3267 CGenNodeStateless * p=(CGenNodeStateless *)node;
3268 /* need to be changed in Pause support */
3269 assert(p->is_mask_for_free());
3272 thread->free_node( node);
3276 int CNodeGenerator::open_file(std::string file_name,
3277 CPreviewMode * preview_mode){
3279 m_preview_mode =*preview_mode;
3280 /* ser preview mode */
3281 m_v_if->set_review_mode(preview_mode);
3282 m_v_if->open_file(file_name);
3290 int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
3293 m_v_if->close_file();
3297 int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){
3299 if (!node_sl->is_node_active()) {
3303 if ( m_preview_mode.getVMode() >2 ){
3304 fprintf(stdout," %4lu ,", (ulong)m_cnt);
3305 fprintf(stdout," %4lu ,", (ulong)m_non_active);
3306 node_sl->Dump(stdout);
3314 int CNodeGenerator::update_stats(CGenNode * node){
3315 if ( m_preview_mode.getVMode() >2 ){
3316 fprintf(stdout," %llu ,", (unsigned long long)m_cnt);
3323 bool CNodeGenerator::has_limit_reached() {
3324 /* do we have a limit and has it passed ? */
3325 return ( (m_limit > 0) && (m_cnt >= m_limit) );
3328 bool CFlowGenListPerThread::Create(uint32_t thread_id,
3330 CFlowGenList * flow_list,
3331 uint32_t max_threads){
3334 m_non_active_nodes = 0;
3335 m_terminated_by_master=false;
3336 m_flow_list =flow_list;
3340 m_max_threads=max_threads;
3341 m_thread_id=thread_id;
3343 m_cpu_cp_u.Create(&m_cpu_dp_u);
3345 uint32_t socket_id=rte_lcore_to_socket_id(m_core_id);
3348 sprintf(name,"nodes-%d",m_core_id);
3350 //printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id);
3352 m_node_pool = utl_rte_mempool_create_non_pkt(name,
3353 CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(),
3359 //printf(" pool %p \n",m_node_pool);
3361 m_node_gen.Create(this);
3362 m_flow_id_to_node_lookup.Create();
3364 /* split the clients to threads */
3365 CTupleGenYamlInfo * tuple_gen = &m_flow_list->m_yaml_info.m_tuple_gen;
3367 m_smart_gen.Create(0,m_thread_id);
3369 /* split the clients to threads using the mask */
3371 for (int i=0;i<tuple_gen->m_client_pool.size();i++) {
3372 split_ips(m_thread_id, m_max_threads, getDualPortId(),
3373 tuple_gen->m_client_pool[i],
3376 m_smart_gen.add_client_pool(tuple_gen->m_client_pool[i].m_dist,
3379 get_longest_flow(i,true),
3380 get_total_kcps(i,true)*1000,
3381 m_flow_list->m_client_config_info,
3382 tuple_gen->m_client_pool[i].m_tcp_aging_sec,
3383 tuple_gen->m_client_pool[i].m_udp_aging_sec
3386 for (int i=0;i<tuple_gen->m_server_pool.size();i++) {
3387 split_ips(m_thread_id, m_max_threads, getDualPortId(),
3388 tuple_gen->m_server_pool[i],
3390 m_smart_gen.add_server_pool(tuple_gen->m_server_pool[i].m_dist,
3393 get_longest_flow(i,false),
3394 get_total_kcps(i,false)*1000,
3395 tuple_gen->m_server_pool[i].m_is_bundling);
3399 init_from_global(portion);
3401 CMessagingManager * rx_dp=CMsgIns::Ins()->getRxDp();
3403 m_ring_from_rx = rx_dp->getRingCpToDp(thread_id);
3404 m_ring_to_rx =rx_dp->getRingDpToCp(thread_id);
3406 assert(m_ring_from_rx);
3407 assert(m_ring_to_rx);
3409 /* create the info required for stateless DP core */
3410 m_stateless_dp_info.create(thread_id, this);
3415 /* return the client ip , port */
3416 FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job(CGenNode *p){
3417 CGenNodeDeferPort * defer=(CGenNodeDeferPort *)p;
3419 for (i=0; i<defer->m_cnt; i++) {
3420 m_smart_gen.FreePort(defer->m_pool_idx[i],
3421 defer->m_clients[i],defer->m_ports[i]);
3425 FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job_flush(void){
3426 /* flush the pending job of free ports */
3428 handler_defer_job((CGenNode *)m_tcp_dpc);
3429 free_node((CGenNode *)m_tcp_dpc);
3433 handler_defer_job((CGenNode *)m_udp_dpc);
3434 free_node((CGenNode *)m_udp_dpc);
3440 void CFlowGenListPerThread::defer_client_port_free(bool is_tcp,
3444 CTupleGeneratorSmart * gen){
3445 /* free is not required in this case */
3446 if (!gen->IsFreePortRequired(c_pool_idx) ){
3449 CGenNodeDeferPort * defer;
3451 if (gen->get_tcp_aging(c_pool_idx)==0) {
3452 gen->FreePort(c_pool_idx,c_idx,port);
3455 defer=get_tcp_defer();
3457 if (gen->get_udp_aging(c_pool_idx)==0) {
3458 gen->FreePort(c_pool_idx, c_idx,port);
3461 defer=get_udp_defer();
3463 if ( defer->add_client(c_pool_idx, c_idx,port) ){
3465 m_node_gen.schedule_node((CGenNode *)defer,gen->get_tcp_aging(c_pool_idx));
3468 m_node_gen.schedule_node((CGenNode *)defer,gen->get_udp_aging(c_pool_idx));
3475 void CFlowGenListPerThread::defer_client_port_free(CGenNode *p){
3476 defer_client_port_free(p->m_pkt_info->m_pkt_indication.m_desc.IsTcp(),
3477 p->m_src_idx,p->m_src_port,p->m_template_info->m_client_pool_idx,
3483 /* copy all info from global and div by num of threads */
3484 void CFlowGenListPerThread::init_from_global(CIpPortion& portion){
3485 /* copy generator , it is the same */
3486 m_yaml_info =m_flow_list->m_yaml_info;
3488 /* copy first the flow info */
3490 for (i=0; i<(int)m_flow_list->m_cap_gen.size(); i++) {
3491 CFlowGeneratorRec * lp=m_flow_list->m_cap_gen[i];
3492 CFlowGeneratorRecPerThread * lp_thread=new CFlowGeneratorRecPerThread();
3493 /* TBD leak of memory */
3494 CFlowYamlInfo * yaml_info =new CFlowYamlInfo();
3496 yaml_info->m_name = lp->m_info->m_name;
3497 yaml_info->m_k_cps = lp->m_info->m_k_cps/(double)m_max_threads;
3498 yaml_info->m_ipg_sec = lp->m_info->m_ipg_sec;
3499 yaml_info->m_rtt_sec = lp->m_info->m_rtt_sec;
3500 yaml_info->m_w = lp->m_info->m_w;
3501 yaml_info->m_cap_mode =lp->m_info->m_cap_mode;
3502 yaml_info->m_wlength =lp->m_info->m_wlength;
3503 yaml_info->m_plugin_id = lp->m_info->m_plugin_id;
3504 yaml_info->m_one_app_server = lp->m_info->m_one_app_server;
3505 yaml_info->m_server_addr = lp->m_info->m_server_addr;
3506 yaml_info->m_dpPkt =lp->m_info->m_dpPkt;
3507 yaml_info->m_server_pool_idx=lp->m_info->m_server_pool_idx;
3508 yaml_info->m_client_pool_idx=lp->m_info->m_client_pool_idx;
3509 yaml_info->m_server_pool_name=lp->m_info->m_server_pool_name;
3510 yaml_info->m_client_pool_name=lp->m_info->m_client_pool_name;
3512 assert(m_max_threads>0);
3513 if ( m_max_threads == 1 ) {
3514 /* we have one thread the limit */
3515 yaml_info->m_limit = lp->m_info->m_limit;
3517 yaml_info->m_limit = lp->m_info->m_limit/m_max_threads;
3518 /* thread is zero base */
3519 if ( m_thread_id == 0){
3520 yaml_info->m_limit += lp->m_info->m_limit % m_max_threads;
3522 if (yaml_info->m_limit==0) {
3523 yaml_info->m_limit=1;
3527 yaml_info->m_limit_was_set = lp->m_info->m_limit_was_set;
3528 yaml_info->m_flowcnt = 0;
3529 yaml_info->m_restart_time = ( yaml_info->m_limit_was_set ) ?
3530 (yaml_info->m_limit / (yaml_info->m_k_cps * 1000.0)) : 0;
3532 lp_thread->Create(&m_smart_gen,
3539 m_cap_gen.push_back(lp_thread);
3543 static void free_map_flow_id_to_node(CGenNode *p){
3544 CGlobalInfo::free_node(p);
3548 void CFlowGenListPerThread::Delete(){
3550 // free all current maps
3551 m_flow_id_to_node_lookup.remove_all(free_map_flow_id_to_node);
3553 m_flow_id_to_node_lookup.Delete();
3555 m_smart_gen.Delete();
3556 m_node_gen.Delete();
3558 m_cpu_cp_u.Delete();
3560 utl_rte_mempool_delete(m_node_pool);
3565 void CFlowGenListPerThread::Clean(){
3567 for (i=0; i<(int)m_cap_gen.size(); i++) {
3568 CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
3569 if (lp->m_tuple_gen_was_set) {
3570 CTupleGeneratorSmart *gen;
3571 gen = lp->tuple_gen.get_gen();
3581 //uint64_t _start_time;
3583 void CNodeGenerator::dump_json(std::string & json){
3585 json="{\"name\":\"tx-gen\",\"type\":0,\"data\":{";
3586 m_realtime_his.dump_json("realtime-hist",json);
3587 json+="\"unknown\":0}}" ;
3590 void CNodeGenerator::add_exit_node(CFlowGenListPerThread * thread,
3593 if ( max_time > 0 ) {
3594 CGenNode *exit_node = thread->create_node();
3595 exit_node->m_type = CGenNode::EXIT_SCHED;
3596 exit_node->m_time = max_time;
3597 add_node(exit_node);
3601 inline bool CNodeGenerator::handle_stl_node(CGenNode * node,
3602 CFlowGenListPerThread * thread){
3603 uint8_t type=node->m_type;
3605 if ( likely( type == CGenNode::STATELESS_PKT ) ) {
3607 CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
3608 /* if the stream has been deactivated - end */
3609 if ( unlikely( node_sl->is_mask_for_free() ) ) {
3610 thread->free_node(node);
3612 /* count before handle - node might be destroyed */
3614 update_stl_stats(node_sl);
3617 node_sl->handle(thread);
3620 if (has_limit_reached()) {
3621 thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
3631 inline bool CNodeGenerator::do_work_stl(CGenNode * node,
3632 CFlowGenListPerThread * thread,
3635 if ( handle_stl_node(node,thread)){
3638 return (handle_slow_messages(node->m_type,node,thread,always));
3642 inline bool CNodeGenerator::do_work_both(CGenNode * node,
3643 CFlowGenListPerThread * thread,
3648 bool exit_scheduler=false;
3649 uint8_t type=node->m_type;
3652 if ( handle_stl_node (node,thread) ){
3654 if ( likely( type == CGenNode::FLOW_PKT ) ) {
3656 if ( !(node->is_repeat_flow()) || (always==false)) {
3657 flush_one_node_to_file(node);
3663 if ( node->is_last_in_flow() ) {
3664 if ((node->is_repeat_flow()) && (always==false)) {
3665 /* Flow is repeated, reschedule it */
3666 thread->reschedule_flow( node);
3668 /* Flow will not be repeated, so free node */
3669 thread->free_last_flow_node( node);
3672 node->update_next_pkt_in_flow();
3673 m_p_queue.push(node);
3676 if ((type == CGenNode::FLOW_FIF)) {
3677 /* callback to our method */
3679 if ( always == false) {
3680 thread->m_cur_time_sec = node->m_time ;
3682 thread->generate_flows_roundrobin(&done);
3685 node->m_time +=d_time;
3686 m_p_queue.push(node);
3688 thread->free_node(node);
3691 thread->free_node(node);
3695 exit_scheduler = handle_slow_messages(type,node,thread,always);
3700 return (exit_scheduler);
3705 template<int SCH_MODE>
3706 inline bool CNodeGenerator::do_work(CGenNode * node,
3707 CFlowGenListPerThread * thread,
3711 /* template filter in compile time */
3712 if ( SCH_MODE == smSTATELESS ) {
3713 return ( do_work_stl(node,thread,always) );
3716 return ( do_work_both(node,thread,d_time,always) );
3721 inline void CNodeGenerator::do_sleep(dsec_t & cur_time,
3722 CFlowGenListPerThread * thread,
3724 thread->m_cpu_dp_u.commit1();
3727 /* TBD make this better using calculation, minimum now_sec() */
3729 cur_time = now_sec();
3730 dt = cur_time - n_time ;
3732 if (dt> WAIT_WINDOW_SIZE ) {
3739 thread->m_cpu_dp_u.start_work1();
3743 inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread,
3748 thread->m_cpu_dp_u.commit1();
3751 if ( thread->is_terminated_by_master() ) {
3758 // free the left other
3759 thread->handler_defer_job_flush();
3766 template<int SCH_MODE>
3767 inline int CNodeGenerator::flush_file_realtime(dsec_t max_time,
3770 CFlowGenListPerThread * thread,
3771 double &old_offset) {
3779 add_exit_node(thread,max_time);
3782 thread->m_cpu_dp_u.start_work1();
3784 sch_state_t state = scINIT;
3785 node = m_p_queue.top();
3786 n_time = node->m_time + offset;
3787 cur_time = now_sec();
3789 while (state!=scTERMINATE) {
3793 cur_time = now_sec();
3795 dsec_t dt = cur_time - n_time ;
3797 if (dt > BURST_OFFSET_DTIME) {
3799 } else if (dt > 0) {
3813 bool s=do_work<SCH_MODE>(node,thread,d_time,always);
3814 if (s) { // can we remove this IF ?
3818 node = m_p_queue.top();
3819 n_time = node->m_time + offset;
3822 /* we either out of the time frame or every 1024 nodes we get out for time checking */
3823 if ( ( (n_time - cur_time) > EAT_WINDOW_DTIME ) || (node_count > 1024) ) {
3833 do_sleep(cur_time,thread,n_time); // estimate loop
3839 handle_slow_operations(state, node, cur_time, n_time, offset, thread);
3845 return (teardown(thread,always,old_offset,offset));
3849 FORCE_NO_INLINE void CNodeGenerator::handle_slow_operations(sch_state_t &state,
3854 CFlowGenListPerThread *thread) {
3858 handle_time_strech(node, cur_time, n_time, offset, thread);
3860 /* go back to work */
3873 * when time is streched - the flow_sync node
3874 * might be postpond too much
3875 * this can result a watchdog crash and lack
3876 * of responsivness from the DP core
3877 * (no handling of messages)
3879 * @author imarom (7/31/2016)
3882 void CNodeGenerator::handle_time_strech(CGenNode * &node,
3886 CFlowGenListPerThread *thread) {
3889 /* fix the time offset */
3890 dsec_t dt = cur_time - n_time;
3893 /* check if flow sync message was delayed too much */
3894 if ( (cur_time - m_last_sync_time_sec) > SYNC_TIME_OUT ) {
3895 handle_maintenance(thread);
3897 /* re-read the top of the queue - it might have changed with messaging */
3898 node = m_p_queue.top();
3899 n_time = node->m_time + offset;
3904 int CNodeGenerator::flush_file_sim(dsec_t max_time,
3907 CFlowGenListPerThread * thread,
3908 double &old_offset){
3912 add_exit_node(thread,max_time);
3916 node = m_p_queue.top();
3919 if ( get_is_stateless() ) {
3920 do_exit=do_work<smSTATELESS>(node,thread,d_time,always);
3922 do_exit=do_work<smSTATEFUL>(node,thread,d_time,always);
3928 return (teardown(thread,always,old_offset,0));
3931 int CNodeGenerator::flush_file(dsec_t max_time,
3934 CFlowGenListPerThread * thread,
3935 double &old_offset){
3937 return ( flush_file_sim(max_time, d_time,always,thread,old_offset) );
3939 if ( get_is_stateless() ) {
3940 return ( flush_file_realtime<smSTATELESS>(max_time, d_time,always,thread,old_offset) );
3942 return ( flush_file_realtime<smSTATEFUL>(max_time, d_time,always,thread,old_offset) );
3950 void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
3952 /*repeat and NAT is not supported together */
3953 if ( node->is_nat_first_state() ) {
3954 node->set_nat_wait_state();
3955 flush_one_node_to_file(node);
3960 if ( node->is_nat_wait_state() ) {
3961 if (node->is_responder_pkt()) {
3963 /* time out, need to free the flow and remove the association , we didn't get conversion yet*/
3964 thread->terminate_nat_flows(node);
3968 flush_one_node_to_file(node);
3974 if ( node->is_nat_wait_ack_state() ) {
3975 if (node->is_initiator_pkt()) {
3977 /* time out, need to free the flow and remove the association , we didn't get conversion yet*/
3978 thread->terminate_nat_flows(node);
3982 flush_one_node_to_file(node);
3993 if ( node->is_last_in_flow() ) {
3994 thread->free_last_flow_node( node);
3996 node->update_next_pkt_in_flow();
3997 m_p_queue.push(node);
4001 void CNodeGenerator::handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
4004 /* flow sync message is a sync point for time */
4005 thread->m_cur_time_sec = node->m_time;
4007 /* first pop the node */
4010 /* call all the maintenance required */
4011 handle_maintenance(thread);
4013 /* exit in case this is the last node*/
4014 if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
4015 thread->free_node(node);
4016 exit_scheduler = true;
4018 /* schedule for next maintenace */
4019 node->m_time += SYNC_TIME_OUT;
4020 m_p_queue.push(node);
4026 CNodeGenerator::handle_maintenance(CFlowGenListPerThread *thread) {
4028 thread->tickle(); /* tickle the watchdog */
4029 thread->check_msgs(); /* check messages */
4030 m_v_if->flush_tx_queue(); /* flush pkt each timeout */
4032 /* save last sync time as realtime */
4033 m_last_sync_time_sec = now_sec();
4037 void CNodeGenerator::handle_command(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
4039 CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
4040 TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
4041 cmd->handle(&thread->m_stateless_dp_info);
4042 exit_scheduler = cmd->is_quit();
4043 thread->free_node((CGenNode *)node_cmd);/* free the node */
4046 void CNodeGenerator::handle_pcap_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
4049 CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
4051 /* might have been marked for free */
4052 if ( unlikely( node_pcap->is_marked_for_free() ) ) {
4053 thread->free_node(node);
4055 node_pcap->handle(thread);
4060 CNodeGenerator::handle_slow_messages(uint8_t type,
4062 CFlowGenListPerThread * thread,
4065 /* should we continue after */
4066 bool exit_scheduler = false;
4069 case CGenNode::PCAP_PKT:
4070 handle_pcap_pkt(node, thread);
4073 case CGenNode::FLOW_DEFER_PORT_RELEASE:
4075 thread->handler_defer_job(node);
4076 thread->free_node(node);
4079 case CGenNode::FLOW_PKT_NAT:
4080 handle_flow_pkt(node, thread);
4083 case CGenNode::FLOW_SYNC:
4084 handle_flow_sync(node, thread, exit_scheduler);
4087 case CGenNode::EXIT_SCHED:
4089 thread->free_node(node);
4090 exit_scheduler = true;
4094 case CGenNode::COMMAND:
4095 handle_command(node, thread, exit_scheduler);
4102 return (exit_scheduler);
4106 void CFlowGenListPerThread::Dump(FILE *fd){
4107 fprintf(fd,"yaml info ");
4108 m_yaml_info.Dump(fd);
4111 fprintf(fd,"cap file info");
4113 for (i=0; i<(int)m_cap_gen.size(); i++) {
4114 CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4122 void CFlowGenListPerThread::DumpStats(FILE *fd){
4127 void CFlowGenListPerThread::DumpCsv(FILE *fd){
4128 CFlowStats::DumpHeader(fd);
4134 for (i=0; i<(int)m_cap_gen.size(); i++) {
4135 CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4136 lp->getFlowStats(&stats);
4146 uint32_t CFlowGenListPerThread::getDualPortId(){
4147 return ( ::getDualPortId(m_thread_id) );
4150 double CFlowGenListPerThread::get_longest_flow(uint8_t pool_idx, bool is_client){
4152 double longest_flow = 0.0;
4153 for (i=0;i<(int)m_cap_gen.size(); i++) {
4154 CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4156 lp->m_info->m_client_pool_idx != pool_idx)
4159 lp->m_info->m_server_pool_idx != pool_idx)
4162 tmp_len = lp->m_flow_info->get_cap_file_length_sec();
4163 if (longest_flow < tmp_len ) {
4164 longest_flow = tmp_len;
4167 return longest_flow;
4171 double CFlowGenListPerThread::get_longest_flow(){
4173 double longest_flow = 0.0;
4174 for (i=0;i<(int)m_cap_gen.size(); i++) {
4175 CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4177 tmp_len = lp->m_flow_info->get_cap_file_length_sec();
4178 if (longest_flow < tmp_len ) {
4179 longest_flow = tmp_len;
4182 return longest_flow;
4185 double CFlowGenListPerThread::get_total_kcps(uint8_t pool_idx, bool is_client){
4188 for (i=0; i<(int)m_cap_gen.size(); i++) {
4189 CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4191 lp->m_info->m_client_pool_idx != pool_idx)
4194 lp->m_info->m_server_pool_idx != pool_idx)
4196 total +=lp->m_info->m_k_cps;
4201 double CFlowGenListPerThread::get_total_kcps(){
4204 for (i=0; i<(int)m_cap_gen.size(); i++) {
4205 CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4206 total +=lp->m_info->m_k_cps;
4211 double CFlowGenListPerThread::get_delta_flow_is_sec(){
4212 return (1.0/(1000.0*get_total_kcps()));
4216 void CFlowGenListPerThread::inc_current_template(void){
4218 if (m_cur_template == m_cap_gen.size()) {
4224 int CFlowGenListPerThread::generate_flows_roundrobin(bool *done){
4227 CFlowGeneratorRecPerThread * cur;
4232 for (i=0;i<(int)m_cap_gen.size();i++ ) {
4233 cur=m_cap_gen[m_cur_template];
4234 if (!(cur->m_info->m_limit_was_set) ||
4235 (cur->m_info->m_flowcnt < cur->m_info->m_limit)) {
4237 if ( cur->m_policer.update(1.0,m_cur_time_sec) ){
4238 cur->m_info->m_flowcnt++;
4243 inc_current_template();
4247 /* generate the flow into the generator*/
4248 CGenNode * node= create_node() ;
4250 cur->generate_flow(&m_node_gen,m_cur_time_sec,m_cur_flow_id,node);
4253 /* this is estimation */
4254 m_stats.m_total_open_flows += cur->m_flow_info->get_total_flows();
4255 m_stats.m_total_bytes += cur->m_flow_info->get_total_bytes();
4256 m_stats.m_total_pkt += cur->m_flow_info->Size();
4257 inc_current_template();
4263 int CFlowGenListPerThread::reschedule_flow(CGenNode *node){
4265 // Re-schedule the node
4266 node->reset_pkt_in_flow();
4267 node->m_time += node->m_template_info->m_restart_time;
4268 m_node_gen.add_node(node);
4270 m_stats.m_total_bytes += node->m_flow_info->get_total_bytes();
4271 m_stats.m_total_pkt += node->m_flow_info->Size();
4276 void CFlowGenListPerThread::terminate_nat_flows(CGenNode *p){
4277 m_stats.m_nat_flow_timeout++;
4278 m_stats.m_nat_lookup_remove_flow_id++;
4279 if (p->is_nat_wait_ack_state()) {
4280 m_stats.m_nat_flow_timeout_wait_ack++;
4282 m_stats.m_nat_lookup_wait_ack_state++;
4284 m_flow_id_to_node_lookup.remove_no_lookup(p->get_short_fid());
4285 free_last_flow_node( p);
4289 void CFlowGenListPerThread::handle_latency_pkt_msg(CGenNodeLatencyPktInfo * msg){
4290 /* send the packet */
4291 #ifdef RX_QUEUE_TRACE_
4292 printf(" latency msg dir %d\n",msg->m_dir);
4293 struct rte_mbuf * m;
4295 rte_pktmbuf_dump(stdout,m, rte_pktmbuf_pkt_len(m));
4298 /* update timestamp */
4299 struct rte_mbuf * m;
4301 uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
4302 latency_header * h=(latency_header *)(p+msg->m_latency_offset);
4303 h->time_stamp = os_get_hr_tick_64();
4305 m_node_gen.m_v_if->send_one_pkt((pkt_dir_t)msg->m_dir,msg->m_pkt);
4308 void CFlowGenListPerThread::handle_nat_msg(CGenNodeNatInfo * msg){
4310 bool first = true, second = true;
4312 for (i=0; i<msg->m_cnt; i++) {
4315 CNatFlowInfo * nat_msg=&msg->m_data[i];
4316 CGenNode * node=m_flow_id_to_node_lookup.lookup(nat_msg->m_fid);
4318 /* this should be moved to a notification module */
4320 printf(" ERORR not valid flow_id %d probably flow was aged \n",nat_msg->m_fid);
4322 m_stats.m_nat_lookup_no_flow_id++;
4326 // Calculate diff between tcp seq of SYN packet, and TCP ack of SYN+ACK packet
4327 // For supporting firewalls who do TCP seq num randomization
4328 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
4329 if (node->is_nat_wait_state()) {
4330 char *syn_pkt = node->m_flow_info->GetPacket(0)->m_packet->raw;
4331 TCPHeader *tcp = (TCPHeader *)(syn_pkt + node->m_pkt_info->m_pkt_indication.getFastTcpOffset());
4332 node->set_nat_tcp_seq_diff_client(nat_msg->m_tcp_seq - tcp->getSeqNumber());
4333 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
4334 node->set_nat_wait_ack_state();
4335 m_stats.m_nat_lookup_wait_ack_state++;
4338 node->set_nat_learn_state();
4341 char *syn_ack_pkt = node->m_flow_info->GetPacket(1)->m_packet->raw;
4342 TCPHeader *tcp = (TCPHeader *)(syn_ack_pkt + node->m_pkt_info->m_pkt_indication.getFastTcpOffset());
4343 node->set_nat_tcp_seq_diff_server(nat_msg->m_tcp_seq - tcp->getSeqNumber());
4344 assert(node->is_nat_wait_ack_state());
4345 node->set_nat_learn_state();
4349 assert(node->is_nat_wait_state());
4350 node->set_nat_learn_state();
4355 printf(" %.03f RX :set node %p:%x %x:%x TCP diff %x\n"
4356 , now_sec(), node,nat_msg->m_fid, nat_msg->m_external_ip, nat_msg->m_external_port
4357 , node->get_nat_tcp_seq_diff_client());
4360 node->set_nat_ipv4_addr(nat_msg->m_external_ip);
4361 node->set_nat_ipv4_port(nat_msg->m_external_port);
4363 if ( CGlobalInfo::is_learn_verify_mode() ){
4364 if (!node->is_external_is_eq_to_internal_ip() ||
4365 node->get_nat_tcp_seq_diff_client() != 0) {
4366 m_stats.m_nat_flow_learn_error++;
4372 /* remove from the hash */
4373 m_flow_id_to_node_lookup.remove_no_lookup(nat_msg->m_fid);
4374 m_stats.m_nat_lookup_remove_flow_id++;
4379 void CFlowGenListPerThread::check_msgs(void) {
4381 /* inlined for performance */
4382 m_stateless_dp_info.periodic_check_for_cp_messages();
4384 if ( likely ( m_ring_from_rx->isEmpty() ) ) {
4389 printf(" %.03f got message from RX \n",now_sec());
4393 if ( m_ring_from_rx->Dequeue(node)!=0 ){
4397 //printf ( " message: thread %d, node->m_flow_id : %d \n", m_thread_id,node->m_flow_id);
4398 /* only one message is supported right now */
4400 CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node;
4402 uint8_t msg_type = msg->m_msg_type;
4403 switch (msg_type ) {
4404 case CGenNodeMsgBase::NAT_FIRST:
4405 handle_nat_msg((CGenNodeNatInfo * )msg);
4408 case CGenNodeMsgBase::LATENCY_PKT:
4409 handle_latency_pkt_msg((CGenNodeLatencyPktInfo *) msg);
4413 printf("ERROR pkt-thread message type is not valid %d \n",msg_type);
4417 CGlobalInfo::free_node(node);
4423 void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name,
4424 CPreviewMode &preview,
4426 m_preview_mode = preview;
4427 m_node_gen.open_file(erf_file_name,&m_preview_mode);
4428 m_node_gen.set_packet_limit(limit);
4431 void CFlowGenListPerThread::stop_stateless_simulation_file(){
4432 m_node_gen.m_v_if->close_file();
4435 void CFlowGenListPerThread::start_stateless_daemon_simulation(){
4436 CGlobalInfo::m_options.m_run_mode = CParserOption::RUN_MODE_INTERACTIVE;
4439 /* if no pending CP messages - the core will simply be stuck forever */
4440 if (m_stateless_dp_info.are_any_pending_cp_messages()) {
4441 m_stateless_dp_info.run_once();
4446 /* return true if we need to shedule next_stream, */
4448 bool CFlowGenListPerThread::set_stateless_next_node( CGenNodeStateless * cur_node,
4449 CGenNodeStateless * next_node){
4450 return ( m_stateless_dp_info.set_stateless_next_node(cur_node,next_node) );
4454 void CFlowGenListPerThread::start_stateless_daemon(CPreviewMode &preview){
4455 CGlobalInfo::m_options.m_run_mode = CParserOption::RUN_MODE_INTERACTIVE;
4457 /* set per thread global info, for performance */
4458 m_preview_mode = preview;
4459 m_node_gen.open_file("",&m_preview_mode);
4461 m_stateless_dp_info.start();
4465 void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name,
4466 CPreviewMode & preview){
4467 /* now we are ready to generate*/
4468 if ( m_cap_gen.size()==0 ){
4469 fprintf(stderr," nothing to generate no template loaded \n");
4473 m_preview_mode = preview;
4474 m_node_gen.open_file(erf_file_name,&m_preview_mode);
4475 dsec_t d_time_flow=get_delta_flow_is_sec();
4477 m_cur_time_sec = 0.01 + m_thread_id*m_flow_list->get_delta_flow_is_sec();
4480 if ( CGlobalInfo::is_realtime() ){
4481 if (m_cur_time_sec > 0.2 ) {
4482 m_cur_time_sec = 0.01 + m_thread_id*0.01;
4484 m_cur_time_sec += now_sec() + 0.1 ;
4486 dsec_t c_stop_sec = m_cur_time_sec + m_yaml_info.m_duration_sec;
4487 m_stop_time_sec =c_stop_sec;
4489 m_cur_template =(m_thread_id % m_cap_gen.size());
4492 fprintf(stdout," Generating erf file ... \n");
4493 CGenNode * node= create_node() ;
4495 node->m_type = CGenNode::FLOW_FIF;
4496 node->m_time = m_cur_time_sec;
4497 m_node_gen.add_node(node);
4499 double old_offset=0.0;
4501 node= create_node() ;
4502 node->m_type = CGenNode::FLOW_SYNC;
4503 node->m_time = m_cur_time_sec + SYNC_TIME_OUT ;
4505 m_node_gen.add_node(node);
4508 if ( m_preview_mode.getVMode() >2 ){
4510 CGenNode::DumpHeader(stdout);
4514 m_node_gen.flush_file(c_stop_sec,d_time_flow, false,this,old_offset);
4518 CALLGRIND_STOP_INSTRUMENTATION;
4519 printf (" %llu \n",os_get_hr_tick_64()-_start_time);
4521 if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() && (is_terminated_by_master()==false) ){
4523 m_node_gen.flush_file(m_cur_time_sec, d_time_flow, true,this,old_offset);
4526 if (m_preview_mode.getVMode() > 1 ) {
4527 fprintf(stdout,"\n\n");
4528 fprintf(stdout,"\n\n");
4529 fprintf(stdout,"file stats \n");
4530 fprintf(stdout,"=================\n");
4531 m_stats.dump(stdout);
4533 m_node_gen.close_file(this);
4536 void CFlowGenList::Delete(){
4537 clean_p_thread_info();
4539 if (CPluginCallback::callback) {
4540 delete CPluginCallback::callback;
4541 CPluginCallback::callback = NULL;
4546 bool CFlowGenList::Create(){
4547 check_objects_sizes();
4548 CPluginCallback::callback= new CPluginCallbackSimple();
4553 void CFlowGenList::generate_p_thread_info(uint32_t num_threads){
4554 clean_p_thread_info();
4555 BP_ASSERT(num_threads < 64);
4557 for (i=0; i<(int)num_threads; i++) {
4558 CFlowGenListPerThread * lp= new CFlowGenListPerThread();
4559 lp->Create(i,i,this,num_threads);
4560 m_threads_info.push_back(lp);
4565 void CFlowGenList::clean_p_thread_info(void){
4567 for (i=0; i<(int)m_threads_info.size(); i++) {
4568 CFlowGenListPerThread * lp=m_threads_info[i];
4572 m_threads_info.clear();
4575 int CFlowGenList::load_client_config_file(std::string file_name) {
4576 m_client_config_info.load_yaml_file(file_name);
4580 void CFlowGenList::set_client_config_tuple_gen_info(CTupleGenYamlInfo * tg) {
4581 m_client_config_info.set_tuple_gen_info(tg);
4584 std::vector<ClientCfgCompactEntry *> CFlowGenList::get_client_cfg_ip_list() {
4585 return m_client_config_info.get_entry_list();
4588 void CFlowGenList::set_client_config_resolved_macs(CManyIPInfo &pretest_result) {
4589 m_client_config_info.set_resolved_macs(pretest_result);
4592 void CFlowGenList::dump_client_config(FILE *fd) {
4593 m_client_config_info.dump(fd);
4596 int CFlowGenList::load_from_yaml(std::string file_name,
4597 uint32_t num_threads){
4599 m_yaml_info.load_from_yaml_file(file_name);
4600 if (m_yaml_info.verify_correctness(num_threads) ==false){
4604 /* move it to global info, better CPU D-cache usage */
4605 CGlobalInfo::m_options.preview.set_vlan_mode_enable(m_yaml_info.m_vlan_info.m_enable);
4606 CGlobalInfo::m_options.m_vlan_port[0] = m_yaml_info.m_vlan_info.m_vlan_per_port[0];
4607 CGlobalInfo::m_options.m_vlan_port[1] = m_yaml_info.m_vlan_info.m_vlan_per_port[1];
4608 CGlobalInfo::m_options.preview.set_mac_ip_overide_enable(m_yaml_info.m_mac_replace_by_ip);
4610 if ( m_yaml_info.m_mac_base.size() != 6 ){
4611 printf(" mac addr is not valid \n");
4615 if (m_yaml_info.m_ipv6_set == true) {
4616 // Copy the most significant 96-bits from yaml data
4617 for (idx=0; idx<6; idx++){
4618 CGlobalInfo::m_options.m_src_ipv6[idx] = m_yaml_info.m_src_ipv6[idx];
4619 CGlobalInfo::m_options.m_dst_ipv6[idx] = m_yaml_info.m_dst_ipv6[idx];
4622 // Set the most signifcant 96-bits to zero which represents an
4623 // IPv4-compatible IPv6 address
4624 for (idx=0; idx<6; idx++){
4625 CGlobalInfo::m_options.m_src_ipv6[idx] = 0;
4626 CGlobalInfo::m_options.m_dst_ipv6[idx] = 0;
4632 bool all_template_has_one_direction=true;
4633 for (i=0; i<(int)m_yaml_info.m_vec.size(); i++) {
4634 CFlowGeneratorRec * lp=new CFlowGeneratorRec();
4635 if ( lp->Create(&m_yaml_info.m_vec[i],&m_yaml_info,i) == false){
4636 fprintf(stdout,"\n ERROR reading YAML template files, please verify that they are valid \n\n");
4640 m_cap_gen.push_back(lp);
4642 if (lp->m_flow_info.GetPacket(0)->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) {
4643 all_template_has_one_direction=false;
4647 if ( CGlobalInfo::is_learn_mode() && all_template_has_one_direction ) {
4648 fprintf(stdout,"\n Warning --learn mode has nothing to do when all templates are one directional, please remove it \n");
4655 void CFlowGenList::Clean(){
4657 for (i=0; i<(int)m_cap_gen.size(); i++) {
4658 CFlowGeneratorRec * lp=m_cap_gen[i];
4665 double CFlowGenList::GetCpuUtil(){
4668 for (i=0; i<(int)m_threads_info.size(); i++) {
4669 CFlowGenListPerThread * lp=m_threads_info[i];
4670 c+=lp->m_cpu_cp_u.GetVal();
4672 return (c/m_threads_info.size());
4675 double CFlowGenList::GetCpuUtilRaw(){
4678 for (i=0; i<(int)m_threads_info.size(); i++) {
4679 CFlowGenListPerThread * lp=m_threads_info[i];
4680 c+=lp->m_cpu_cp_u.GetValRaw();
4682 return (c/m_threads_info.size());
4686 void CFlowGenList::UpdateFast(){
4688 for (int i=0; i<(int)m_threads_info.size(); i++) {
4689 CFlowGenListPerThread * lp=m_threads_info[i];
4696 void CFlowGenList::Dump(FILE *fd){
4697 fprintf(fd,"yaml info \n");
4698 fprintf(fd,"--------------\n");
4699 m_yaml_info.Dump(fd);
4702 fprintf(fd,"cap file info \n");
4703 fprintf(fd,"----------------------\n");
4705 for (i=0; i<(int)m_cap_gen.size(); i++) {
4706 CFlowGeneratorRec * lp=m_cap_gen[i];
4712 void CFlowGenList::DumpPktSize(){
4715 for (i=0; i<(int)m_cap_gen.size(); i++) {
4716 CFlowGeneratorRec * lp=m_cap_gen[i];
4717 lp->m_flow_info.dump_pkt_sizes();
4722 void CFlowGenList::DumpCsv(FILE *fd){
4723 CFlowStats::DumpHeader(fd);
4729 for (i=0; i<(int)m_cap_gen.size(); i++) {
4730 CFlowGeneratorRec * lp=m_cap_gen[i];
4731 lp->getFlowStats(&stats);
4738 sum.m_memory.dump(fd);
4742 uint32_t CFlowGenList::get_total_repeat_flows(){
4745 for (i=0; i<(int)m_cap_gen.size(); i++) {
4746 CFlowGeneratorRec * lp=m_cap_gen[i];
4747 flows+=lp->m_info->m_limit ;
4753 double CFlowGenList::get_total_tx_bps(){
4758 for (i=0; i<(int)m_cap_gen.size(); i++) {
4759 CFlowGeneratorRec * lp=m_cap_gen[i];
4760 lp->getFlowStats(&stats);
4761 total+=(stats.m_mb_sec);
4763 return (_1Mb_DOUBLE*total);
4766 double CFlowGenList::get_total_pps(){
4772 for (i=0; i<(int)m_cap_gen.size(); i++) {
4773 CFlowGeneratorRec * lp=m_cap_gen[i];
4774 lp->getFlowStats(&stats);
4781 double CFlowGenList::get_total_kcps(){
4787 for (i=0; i<(int)m_cap_gen.size(); i++) {
4788 CFlowGeneratorRec * lp=m_cap_gen[i];
4789 lp->getFlowStats(&stats);
4790 total+= stats.get_normal_cps();
4792 return ((total/1000.0));
4795 double CFlowGenList::get_delta_flow_is_sec(){
4796 return (1.0/(1000.0*get_total_kcps()));
4801 bool CPolicer::update(double dsize,double now_sec){
4802 if ( m_last_time ==0.0 ) {
4804 m_last_time = now_sec;
4811 // check if there is a need to add tokens
4812 if(now_sec > m_last_time) {
4813 dsec_t dtime=(now_sec - m_last_time);
4814 dsec_t dsize =dtime*m_cir;
4816 if (m_level > m_bucket_size) {
4817 m_level = m_bucket_size;
4819 m_last_time = now_sec;
4822 if (m_level > dsize) {
4831 float CPPSMeasure::add(uint64_t pkts){
4832 if ( false == m_start ){
4834 m_last_time_msec = os_get_time_msec() ;
4839 uint32_t ctime=os_get_time_msec();
4840 if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
4841 return (m_last_result);
4844 uint32_t dtime_msec = ctime-m_last_time_msec;
4845 uint32_t dpkts = (pkts - m_last_pkts);
4847 m_last_time_msec = ctime;
4850 m_last_result= 0.5*calc_pps(dtime_msec,dpkts) +0.5*(m_last_result);
4851 return ( m_last_result );
4856 CBwMeasure::CBwMeasure() {
4860 void CBwMeasure::reset(void) {
4867 double CBwMeasure::calc_MBsec(uint32_t dtime_msec,
4869 double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) );
4873 double CBwMeasure::add(uint64_t size) {
4874 if ( false == m_start ){
4876 m_last_time_msec = os_get_time_msec() ;
4881 uint32_t ctime=os_get_time_msec();
4882 if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
4883 return (m_last_result);
4886 uint32_t dtime_msec = ctime-m_last_time_msec;
4887 uint64_t dbytes = size - m_last_bytes;
4889 m_last_time_msec = ctime;
4890 m_last_bytes = size;
4892 m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result);
4893 return ( m_last_result );
4899 * Test if option value is within allowed range.
4900 * val - Value to test
4901 * min, max - minimum, maximum allowed values.
4902 * opt_name - option name for error report.
4904 bool CParserOption::is_valid_opt_val(int val, int min, int max, const std::string &opt_name) {
4905 if (val < min || val > max) {
4906 std::cerr << "Value " << val << " for option " << opt_name << " is out of range. Should be (" << min << "-" << max << ")." << std::endl;
4913 void CParserOption::dump(FILE *fd){
4915 fprintf(fd," cfg file : %s \n",cfg_file.c_str());
4916 fprintf(fd," mac file : %s \n",client_cfg_file.c_str());
4917 fprintf(fd," out file : %s \n",out_file.c_str());
4918 fprintf(fd," client cfg file : %s \n",out_file.c_str());
4919 fprintf(fd," duration : %.0f \n",m_duration);
4920 fprintf(fd," factor : %.0f \n",m_factor);
4921 fprintf(fd," mbuf_factor : %.0f \n",m_mbuf_factor);
4922 fprintf(fd," latency : %d pkt/sec \n",m_latency_rate);
4923 fprintf(fd," zmq_port : %d \n",m_zmq_port);
4924 fprintf(fd," telnet_port : %d \n",m_telnet_port);
4925 fprintf(fd," expected_ports : %d \n",m_expected_portd);
4926 if (preview.get_vlan_mode_enable() ) {
4927 fprintf(fd," vlans : [%d,%d] \n",m_vlan_port[0],m_vlan_port[1]);
4931 for (i = 0; i < TREX_MAX_PORTS; i++) {
4932 fprintf(fd," port : %d dst:",i);
4933 CMacAddrCfg * lp=&m_mac_addr[i];
4934 dump_mac_addr(fd,lp->u.m_mac.dest);
4935 fprintf(fd," src:");
4936 dump_mac_addr(fd,lp->u.m_mac.src);
4941 void CParserOption::verify() {
4942 /* check for mutual exclusion options */
4943 if (preview.get_is_client_cfg_enable()) {
4944 if (preview.get_vlan_mode_enable() || preview.get_mac_ip_overide_enable()) {
4945 throw std::runtime_error("VLAN / MAC override cannot be combined with client configuration");
4952 void CTupleGlobalGenerator::Dump(FILE *fd){
4953 fprintf(fd," src:%x dest: %x \n",m_result_src_ip,m_result_dest_ip);
4956 bool CTupleGlobalGenerator::Create(){
4957 was_generated=false;
4962 void CTupleGlobalGenerator::Copy(CTupleGlobalGenerator * gen){
4963 was_generated=false;
4964 m_min_src_ip = gen->m_min_src_ip;
4965 m_max_src_ip = gen->m_max_src_ip;
4966 m_min_dest_ip = gen->m_min_dest_ip;
4967 m_max_dest_ip = gen->m_max_dest_ip;
4971 void CTupleGlobalGenerator::Delete(){
4972 was_generated=false;
4977 static uint32_t get_rand_32(uint32_t MinimumRange ,
4978 uint32_t MaximumRange );
4982 void CTupleGlobalGenerator::Generate(uint32_t thread_id,
4983 uint32_t num_addr ){
4984 if ( was_generated == false) {
4986 was_generated = true;
4987 cur_src_ip = m_min_src_ip;
4988 cur_dst_ip = m_min_dest_ip;
4991 if ( ( cur_src_ip + num_addr ) > m_max_src_ip ) {
4992 cur_src_ip = m_min_src_ip;
4995 /* copy the results */
4996 m_result_src_ip = cur_src_ip;
4997 m_result_dest_ip = cur_dst_ip;
4998 cur_src_ip += num_addr;
5000 if (cur_dst_ip > m_max_dest_ip ) {
5001 cur_dst_ip = m_min_dest_ip;
5008 void CTupleTemplateGenerator::Dump(FILE *fd){
5009 fprintf(fd," id: %x, %x:%x - %x \n",m_id,m_result_src_ip,m_result_dest_ip,m_result_src_port);
5013 bool CTupleTemplateGenerator::Create(CTupleGlobalGenerator * global_gen,
5017 uint32_t thread_id){
5018 m_was_generated = false;
5019 m_thread_id = thread_id;
5020 m_lp_global_gen = global_gen;
5021 BP_ASSERT(m_lp_global_gen);
5023 m_cur_src_port_cnt=0;
5026 m_wlength = wlength;
5033 void CTupleTemplateGenerator::Delete(){
5034 m_was_generated = false;
5038 void CTupleTemplateGenerator::Generate_src_dest(){
5039 /* TBD need to fix the 100*/
5040 m_lp_global_gen->Generate(m_thread_id,m_wlength);
5041 m_result_src_ip = m_lp_global_gen->m_result_src_ip;
5043 m_dest_ip = m_lp_global_gen->m_result_dest_ip;
5044 m_result_dest_ip = update_dest_ip(m_dest_ip );
5048 uint16_t CTupleTemplateGenerator::GenerateOneSourcePort(){
5051 /* do not use port zero */
5052 if (m_cur_src_port == 0) {
5055 m_result_src_port=m_cur_src_port;
5056 return (m_cur_src_port);
5059 void CTupleTemplateGenerator::Generate(){
5060 BP_ASSERT(m_was_init);
5061 if ( m_was_generated == false ) {
5063 Generate_src_dest();
5064 m_was_generated = true;
5066 /* ip+cnt,dest+cnt*/
5068 if ( m_cnt >= m_wlength ) {
5070 m_result_src_ip -=m_wlength;
5071 m_result_dest_ip = m_dest_ip;
5072 m_cur_src_port_cnt++;
5073 if (m_cur_src_port_cnt >= m_w ) {
5074 Generate_src_dest();
5075 m_cur_src_port_cnt=0;
5078 m_result_src_ip += 1;
5079 m_result_dest_ip = update_dest_ip(m_dest_ip +m_cnt );
5085 /* do not use port zero */
5086 if (m_cur_src_port == 0) {
5089 m_result_src_ip =update_src_ip( m_result_src_ip );
5090 m_result_src_port=m_cur_src_port;
5095 static uint32_t get_rand_32(uint32_t MinimumRange,
5096 uint32_t MaximumRange) __attribute__ ((unused));
5098 static uint32_t get_rand_32(uint32_t MinimumRange,
5099 uint32_t MaximumRange) {
5100 enum {RANDS_NUM = 2 , RAND_MAX_BITS = 0xf , UNSIGNED_INT_BITS = 0x20 , TWO_BITS_MASK = 0x3};
5101 const double TWO_POWER_32_BITS = 0x10000000 * (double)0x10;
5102 uint32_t RandomNumber = 0;
5104 for (int i = 0 ; i < RANDS_NUM;i++) {
5105 RandomNumber = (RandomNumber<<RAND_MAX_BITS) + rand();
5107 RandomNumber = (RandomNumber<<(UNSIGNED_INT_BITS - RAND_MAX_BITS * RANDS_NUM)) + (rand() | TWO_BITS_MASK);
5110 if ((Range = MaximumRange - MinimumRange) == 0xffffffff) {
5111 return RandomNumber;
5113 return (uint32_t)(((Range + 1) / TWO_POWER_32_BITS * RandomNumber) + MinimumRange );
5118 int CNullIF::send_node(CGenNode * node){
5120 CFlowPktInfo * lp=node->m_pkt_info;
5121 rte_mbuf_t * buf=lp->generate_new_mbuf(node);
5122 //rte_pktmbuf_dump(buf, buf->pkt_len);
5124 // free it here as if driver does
5125 rte_pktmbuf_free(buf);
5132 void CErfIF::fill_raw_packet(rte_mbuf_t * m,CGenNode * node,pkt_dir_t dir){
5136 CPktNsecTimeStamp t_c(node->m_time);
5137 m_raw->time_nsec = t_c.m_time_nsec;
5138 m_raw->time_sec = t_c.m_time_sec;
5139 uint8_t p_id = (uint8_t)dir;
5140 m_raw->setInterface(p_id);
5144 pkt_dir_t CErfIFStl::port_id_to_dir(uint8_t port_id) {
5145 return ((pkt_dir_t)(port_id&1));
5149 int CErfIFStl::update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t * p){
5150 memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(dir),12);
5154 int CErfIFStl::send_sl_node(CGenNodeStateless *node_sl) {
5155 pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
5158 if ( likely(node_sl->is_cache_mbuf_array()) ) {
5159 m=node_sl->cache_mbuf_array_get_cur();
5160 fill_raw_packet(m,(CGenNode *)node_sl,dir);
5162 m=node_sl->get_cache_mbuf();
5163 bool is_const = false;
5166 rte_pktmbuf_refcnt_update(m,1);
5168 m=node_sl->alloc_node_with_vm();
5172 if (node_sl->is_stat_needed() && (node_sl->get_stat_hw_id() >= MAX_FLOW_STATS) ) {
5173 /* latency packet. flow stat without latency handled like normal packet in simulation */
5174 uint16_t hw_id = node_sl->get_stat_hw_id();
5176 struct flow_stat_payload_header *fsp_head;
5177 mi = node_sl->alloc_flow_stat_mbuf(m, fsp_head, is_const);
5178 fsp_head->seq = 0x12345678;
5179 fsp_head->hw_id = hw_id - MAX_FLOW_STATS;
5180 fsp_head->magic = FLOW_STAT_PAYLOAD_MAGIC;
5181 fsp_head->flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ;
5182 fsp_head->time_stamp = 0x8899aabbccddeeff;
5183 fill_raw_packet(mi, (CGenNode *)node_sl, dir);
5184 rte_pktmbuf_free(mi);
5186 fill_raw_packet(m,(CGenNode *)node_sl,dir);
5187 rte_pktmbuf_free(m);
5190 /* check that we have mbuf */
5191 int rc = write_pkt(m_raw);
5197 int CErfIFStl::send_pcap_node(CGenNodePCAP *pcap_node) {
5198 rte_mbuf_t *m = pcap_node->get_pkt();
5203 pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
5204 fill_raw_packet(m, (CGenNode*)pcap_node, dir);
5205 rte_pktmbuf_free(m);
5207 int rc = write_pkt(m_raw);
5214 * This is the simulation stateless send_node.
5215 * in simulation (bp-sim-64) it is called instead of CCoreEthIFStateless::send_node
5216 * Purpose is to test the mbuf manipulation functions which are the same in simulation and "real" code
5218 int CErfIFStl::send_node(CGenNode * _no_to_use){
5220 if ( m_preview_mode->getFileWrite() ) {
5222 switch (_no_to_use->m_type) {
5223 case CGenNode::STATELESS_PKT:
5224 return send_sl_node((CGenNodeStateless *) _no_to_use);
5226 case CGenNode::PCAP_PKT:
5227 return send_pcap_node((CGenNodePCAP *) _no_to_use);
5236 void CErfIF::add_vlan(uint16_t vlan_id) {
5237 uint8_t *buffer =(uint8_t *)m_raw->raw;
5239 uint16_t vlan_protocol = EthernetHeader::Protocol::VLAN;
5240 uint32_t vlan_tag = (vlan_protocol << 16) | vlan_id;
5241 vlan_tag = PKT_HTONL(vlan_tag);
5243 /* insert vlan tag and adjust packet size */
5244 memcpy(cbuff+4, buffer + 12, m_raw->pkt_len - 12);
5245 memcpy(cbuff, &vlan_tag, 4);
5246 memcpy(buffer + 12, cbuff, m_raw->pkt_len - 8);
5248 m_raw->pkt_len += 4;
5251 void CErfIF::apply_client_config(const ClientCfg *cfg, pkt_dir_t dir) {
5253 uint8_t *p = (uint8_t *)m_raw->raw;
5255 const ClientCfgDir &cfg_dir = ( (dir == CLIENT_SIDE) ? cfg->m_initiator : cfg->m_responder);
5258 if (cfg_dir.has_dst_mac_addr()) {
5259 memcpy(p, cfg_dir.get_dst_mac_addr(), 6);
5263 if (cfg_dir.has_src_mac_addr()) {
5264 memcpy(p + 6, cfg_dir.get_src_mac_addr(), 6);
5268 if (cfg_dir.has_vlan()) {
5269 add_vlan(cfg_dir.get_vlan());
5273 int CErfIF::send_node(CGenNode *node){
5275 if (!m_preview_mode->getFileWrite()) {
5279 CFlowPktInfo *lp = node->m_pkt_info;
5280 rte_mbuf_t *m = lp->generate_new_mbuf(node);
5281 pkt_dir_t dir = node->cur_interface_dir();
5283 fill_raw_packet(m, node, dir);
5285 /* update mac addr dest/src 12 bytes */
5286 uint8_t *p=(uint8_t *)m_raw->raw;
5288 memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12);
5290 /* if a client configuration was provided - apply the config */
5291 if (CGlobalInfo::m_options.preview.get_is_client_cfg_enable()) {
5292 apply_client_config(node->m_client_cfg, dir);
5294 } else if (CGlobalInfo::m_options.preview.get_vlan_mode_enable()) {
5295 uint8_t vlan_port = (node->m_src_ip & 1);
5296 uint16_t vlan_id = CGlobalInfo::m_options.m_vlan_port[vlan_port];
5300 //utl_DumpBuffer(stdout,p, 12,0);
5302 int rc = write_pkt(m_raw);
5305 rte_pktmbuf_free(m);
5310 int CErfIF::flush_tx_queue(void){
5314 void CTcpSeq::update(uint8_t *p, CFlowPktInfo *pkt_info, int16_t s_size){
5315 TCPHeader *tcp= (TCPHeader *)(p+pkt_info->m_pkt_indication.getFastTcpOffset());
5316 uint32_t seqnum, acknum;
5318 // This routine will adjust the TCP segment size for packets
5319 // based on the modifications made by the plugins.
5320 // Basically it will keep track of the size changes
5321 // and adjust the TCP sequence numbers accordingly.
5323 bool is_init=pkt_info->m_pkt_indication.m_desc.IsInitSide();
5325 // Update TCP seq number
5326 seqnum = tcp->getSeqNumber();
5327 acknum = tcp->getAckNumber();
5329 // Packet is from client
5330 seqnum += client_seq_delta;
5331 acknum += server_seq_delta;
5333 // Packet is from server
5334 seqnum += server_seq_delta;
5335 acknum += client_seq_delta;
5337 tcp->setSeqNumber(seqnum);
5338 tcp->setAckNumber(acknum);
5340 // Adjust delta being tracked
5342 client_seq_delta += s_size;
5344 server_seq_delta += s_size;
5349 void on_node_first(uint8_t plugin_id,CGenNode * node,
5350 CFlowYamlInfo * template_info,
5351 CTupleTemplateGeneratorSmart * tuple_gen,
5352 CFlowGenListPerThread * flow_gen){
5354 if (CPluginCallback::callback) {
5355 CPluginCallback::callback->on_node_first(plugin_id,node,template_info, tuple_gen,flow_gen);
5359 void on_node_last(uint8_t plugin_id,CGenNode * node){
5360 if (CPluginCallback::callback) {
5361 CPluginCallback::callback->on_node_last(plugin_id,node);
5366 rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
5368 assert(CPluginCallback::callback);
5369 m=CPluginCallback::callback->on_node_generate_mbuf(plugin_id,node,pkt_info);
5375 class CPlugin_rtsp : public CTcpSeq {
5378 uint16_t rtp_client_0;
5379 uint16_t rtp_client_1;
5383 void CPluginCallbackSimple::on_node_first(uint8_t plugin_id,
5385 CFlowYamlInfo * template_info,
5386 CTupleTemplateGeneratorSmart * tuple_gen,
5387 CFlowGenListPerThread * flow_gen ){
5388 //printf(" on on_node_first callback %d node %x! \n",(int)plugin_id,node);
5389 /* generate 2 ports from client side */
5391 if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
5392 CPlugin_rtsp * lpP=new CPlugin_rtsp();
5395 /* TBD need to be fixed using new API */
5396 lpP->rtp_client_0 = tuple_gen->GenerateOneSourcePort();
5397 lpP->rtp_client_1 = tuple_gen->GenerateOneSourcePort();
5398 lpP->m_gen=flow_gen;
5399 node->m_plugin_info = (void *)lpP;
5401 if (plugin_id ==mpDYN_PYLOAD) {
5404 if (plugin_id ==mpAVL_HTTP_BROWSIN) {
5405 CTcpSeq * lpP=new CTcpSeq();
5407 node->m_plugin_info = (void *)lpP;
5409 /* do not support this */
5416 void CPluginCallbackSimple::on_node_last(uint8_t plugin_id,CGenNode * node){
5417 //printf(" on on_node_last callback %d %x! \n",(int)plugin_id,node);
5418 if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
5419 CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
5420 /* free the ports */
5421 CFlowGenListPerThread * flow_gen=(CFlowGenListPerThread *) lpP->m_gen;
5422 bool is_tcp=node->m_pkt_info->m_pkt_indication.m_desc.IsTcp();
5423 flow_gen->defer_client_port_free(is_tcp,node->m_src_idx,lpP->rtp_client_0,
5424 node->m_template_info->m_client_pool_idx,node->m_tuple_gen);
5425 flow_gen->defer_client_port_free(is_tcp,node->m_src_idx,lpP->rtp_client_1,
5426 node->m_template_info->m_client_pool_idx, node->m_tuple_gen);
5430 node->m_plugin_info=0;
5432 if (plugin_id ==mpDYN_PYLOAD) {
5435 if (plugin_id ==mpAVL_HTTP_BROWSIN) {
5437 CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
5439 node->m_plugin_info=0;
5441 /* do not support this */
5448 rte_mbuf_t * CPluginCallbackSimple::http_plugin(uint8_t plugin_id,
5450 CFlowPktInfo * pkt_info){
5451 CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5452 assert(lpd->getFlowId()==0); /* only one flow */
5453 CMiniVMCmdBase * program[2];
5454 CMiniVMReplaceIP replace_cmd;
5455 CMiniVMCmdBase eop_cmd;
5456 CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
5461 if ( likely (lpd->getFlowPktNum() != 3) ){
5462 if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
5463 // Request a larger initial segment for IPv6
5464 mbuf = pkt_info->do_generate_new_mbuf_big(node);
5466 mbuf = pkt_info->do_generate_new_mbuf(node);
5470 CFlowInfo flow_info;
5471 flow_info.vm_program=0;
5473 flow_info.client_ip = node->m_src_ip;
5474 flow_info.server_ip = node->m_dest_ip;
5475 flow_info.client_port = node->m_src_port;
5476 flow_info.server_port = 0;
5477 flow_info.replace_server_port =false;
5478 flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5479 flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5482 replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
5483 replace_cmd.m_flags = 0;
5485 // Determine how much larger the packet needs to be to
5486 // handle the largest IP address. There is a single address
5487 // string of 8 bytes that needs to be replaced.
5488 if (CGlobalInfo::is_ipv6_enable() ) {
5489 // For IPv6, accomodate use of brackets (+2 bytes)
5490 replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 8;
5492 // Mark as IPv6 and set the upper 96-bits
5493 replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5494 for (uint8_t idx=0; idx<6; idx++){
5495 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5498 replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
5501 // Set m_start_0/m_stop_1 at start/end of IP address to be replaced.
5502 // For this packet we know the IP addr string length is 8 bytes.
5503 replace_cmd.m_start_0 = 10+16;
5504 replace_cmd.m_stop_1 = replace_cmd.m_start_0 + 8;
5506 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
5508 eop_cmd.m_cmd = VM_EOP;
5510 program[0] = &replace_cmd;
5511 program[1] = &eop_cmd;
5513 flow_info.vm_program = program;
5515 mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
5518 // Fixup the TCP sequence numbers
5519 uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
5521 // Update TCP sequence numbers
5522 lpP->update(p, pkt_info, s_size);
5527 rte_mbuf_t * CPluginCallbackSimple::dyn_pyload_plugin(uint8_t plugin_id,
5529 CFlowPktInfo * pkt_info){
5531 CMiniVMCmdBase * program[2];
5533 CMiniVMDynPyload dyn_cmd;
5534 CMiniVMCmdBase eop_cmd;
5536 CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5537 CFlowYamlDynamicPyloadPlugin * lpt = node->m_template_info->m_dpPkt;
5539 CFlowInfo flow_info;
5540 flow_info.vm_program=0;
5543 // IPv6 packets are not supported
5544 if (CGlobalInfo::is_ipv6_enable() ) {
5545 fprintf (stderr," IPv6 is not supported for dynamic pyload change\n");
5549 if ( lpd->getFlowId() == 0 ) {
5551 flow_info.client_ip = node->m_src_ip;
5552 flow_info.server_ip = node->m_dest_ip;
5553 flow_info.client_port = node->m_src_port;
5554 flow_info.server_port = 0;
5555 flow_info.replace_server_port =false;
5556 flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5557 flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5559 uint32_t pkt_num = lpd->getFlowPktNum();
5560 if (pkt_num < 253) {
5563 for (i=0; i<lpt->m_num; i++) {
5564 if (lpt->m_pkt_ids[i] == pkt_num ) {
5565 //add a program here
5566 dyn_cmd.m_cmd = VM_DYN_PYLOAD;
5567 dyn_cmd.m_ptr= &lpt->m_program[i];
5568 dyn_cmd.m_flags = 0;
5569 dyn_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
5570 dyn_cmd.m_ip.v4=node->m_src_ip;
5572 eop_cmd.m_cmd = VM_EOP;
5573 program[0] = &dyn_cmd;
5574 program[1] = &eop_cmd;
5576 flow_info.vm_program = program;
5580 // only for the first flow
5582 fprintf (stderr," only one flow is allowed for dynamic pyload change \n");
5584 }/* only for the first flow */
5586 if ( unlikely( flow_info.vm_program != 0 ) ) {
5588 return ( pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
5590 return ( pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
5594 rte_mbuf_t * CPluginCallbackSimple::sip_voice_plugin(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
5595 CMiniVMCmdBase * program[2];
5597 CMiniVMReplaceIP_PORT_IP_IP_Port via_replace_cmd;
5598 CMiniVMCmdBase eop_cmd;
5600 CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5601 CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
5603 // printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
5604 CFlowInfo flow_info;
5605 flow_info.vm_program=0;
5608 switch ( lpd->getFlowId() ) {
5609 /* flow - SIP , packet #0,#1 control */
5611 flow_info.client_ip = node->m_src_ip;
5612 flow_info.server_ip = node->m_dest_ip;
5613 flow_info.client_port = node->m_src_port;
5614 flow_info.server_port = 0;
5615 flow_info.replace_server_port =false;
5616 flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5617 flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5620 /* program to replace ip server */
5621 switch ( lpd->getFlowPktNum() ) {
5624 via_replace_cmd.m_cmd = VM_REPLACE_IPVIA_IP_IP_PORT;
5625 via_replace_cmd.m_flags = 0;
5626 via_replace_cmd.m_start_0 = 0;
5627 via_replace_cmd.m_stop_1 = 0;
5629 // Determine how much larger the packet needs to be to
5630 // handle the largest IP address. There are 3 address
5631 // strings (each 9 bytes) that needs to be replaced.
5632 // We also need to accomodate IPv6 use of brackets
5633 // (+2 bytes) in a URI.
5634 // There are also 2 port strings that needs to be
5635 // replaced (1 is 4 bytes the other is 5 bytes).
5636 if (CGlobalInfo::is_ipv6_enable() ) {
5637 via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9) * 3) +
5638 ((INET_PORTSTRLEN * 2) - 9);
5640 // Mark as IPv6 and set the upper 96-bits
5641 via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5642 for (uint8_t idx=0; idx<6; idx++){
5643 via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
5644 via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
5647 via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9) * 3) +
5648 ((INET_PORTSTRLEN * 2) - 9);
5650 via_replace_cmd.m_ip.v4 =node->m_src_ip;
5651 via_replace_cmd.m_ip0_start = 377;
5652 via_replace_cmd.m_ip0_stop = 377+9;
5654 via_replace_cmd.m_ip1_start = 409;
5655 via_replace_cmd.m_ip1_stop = 409+9;
5658 via_replace_cmd.m_port =lpP->rtp_client_0;
5659 via_replace_cmd.m_port_start = 435;
5660 via_replace_cmd.m_port_stop = 435+5;
5662 via_replace_cmd.m_ip_via.v4 = node->m_src_ip;
5663 via_replace_cmd.m_port_via = node->m_src_port;
5665 via_replace_cmd.m_ip_via_start = 208;
5666 via_replace_cmd.m_ip_via_stop = 208+9+5;
5669 eop_cmd.m_cmd = VM_EOP;
5671 program[0] = &via_replace_cmd;
5672 program[1] = &eop_cmd;
5674 flow_info.vm_program = program;
5679 via_replace_cmd.m_cmd = VM_REPLACE_IPVIA_IP_IP_PORT;
5680 via_replace_cmd.m_flags = 0;
5681 via_replace_cmd.m_start_0 = 0;
5682 via_replace_cmd.m_stop_1 = 0;
5684 // Determine how much larger the packet needs to be to
5685 // handle the largest IP address. There are 3 address
5686 // strings (each 9 bytes) that needs to be replaced.
5687 // We also need to accomodate IPv6 use of brackets
5688 // (+2 bytes) in a URI.
5689 // There are also 2 port strings that needs to be
5690 // replaced (1 is 4 bytes the other is 5 bytes).
5691 if (CGlobalInfo::is_ipv6_enable() ) {
5692 via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9) * 3) +
5693 ((INET_PORTSTRLEN * 2) - 9);
5695 // Mark as IPv6 and set the upper 96-bits
5696 via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5697 for (uint8_t idx=0; idx<6; idx++){
5698 via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5699 via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
5702 via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9) * 3) +
5703 ((INET_PORTSTRLEN * 2) - 9);
5706 via_replace_cmd.m_ip.v4 =node->m_dest_ip;
5707 via_replace_cmd.m_ip0_start = 370;
5708 via_replace_cmd.m_ip0_stop = 370+8;
5710 via_replace_cmd.m_ip1_start = 401;
5711 via_replace_cmd.m_ip1_stop = 401+8;
5714 via_replace_cmd.m_port =lpP->rtp_client_0;
5715 via_replace_cmd.m_port_start = 426;
5716 via_replace_cmd.m_port_stop = 426+5;
5719 via_replace_cmd.m_ip_via.v4 = node->m_src_ip;
5720 via_replace_cmd.m_port_via = node->m_src_port;
5722 via_replace_cmd.m_ip_via_start = 207;
5723 via_replace_cmd.m_ip_via_stop = 207+9+5;
5725 eop_cmd.m_cmd = VM_EOP;
5727 program[0] = &via_replace_cmd;
5728 program[1] = &eop_cmd;
5730 flow_info.vm_program = program;
5735 }/* end of big switch on packet */
5739 flow_info.client_ip = node->m_src_ip ;
5740 flow_info.server_ip = node->m_dest_ip;
5741 flow_info.client_port = lpP->rtp_client_0;
5742 /* this is tricky ..*/
5743 flow_info.server_port = lpP->rtp_client_0;
5744 flow_info.replace_server_port = true;
5745 flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5746 flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5754 //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x init:%x replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
5756 //printf(" program %p \n",flow_info.vm_program);
5757 if ( unlikely( flow_info.vm_program != 0 ) ) {
5759 return ( pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
5761 return ( pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
5765 rte_mbuf_t * CPluginCallbackSimple::rtsp_plugin(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
5767 CMiniVMCmdBase * program[2];
5769 CMiniVMReplaceIP replace_cmd;
5770 CMiniVMCmdBase eop_cmd;
5771 CMiniVMReplaceIPWithPort replace_port_cmd;
5774 CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5775 CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
5778 // printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
5779 CFlowInfo flow_info;
5780 flow_info.vm_program=0;
5783 switch ( lpd->getFlowId() ) {
5784 /* flow - control */
5786 flow_info.client_ip = node->m_src_ip;
5787 flow_info.server_ip = node->m_dest_ip;
5788 flow_info.client_port = node->m_src_port;
5789 flow_info.server_port = 0;
5790 flow_info.replace_server_port =false;
5791 flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5792 flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5795 /* program to replace ip server */
5796 switch ( lpd->getFlowPktNum() ) {
5799 replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
5800 replace_cmd.m_flags = 0;
5801 replace_cmd.m_start_0 = 16;
5802 replace_cmd.m_stop_1 = 16+9;
5804 // Determine how much larger the packet needs to be to
5805 // handle the largest IP address. There is a single address
5806 // string of 9 bytes that needs to be replaced.
5807 if (CGlobalInfo::is_ipv6_enable() ) {
5808 // For IPv6, accomodate use of brackets (+2 bytes)
5809 replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
5811 // Mark as IPv6 and set the upper 96-bits
5812 replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5813 for (uint8_t idx=0; idx<6; idx++){
5814 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5817 replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
5819 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
5821 eop_cmd.m_cmd = VM_EOP;
5823 program[0] = &replace_cmd;
5824 program[1] = &eop_cmd;
5826 flow_info.vm_program = program;
5831 replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
5832 replace_cmd.m_flags = 0;
5833 replace_cmd.m_start_0 = 46;
5834 replace_cmd.m_stop_1 = 46+9;
5836 // Determine how much larger the packet needs to be to
5837 // handle the largest IP address. There is a single address
5838 // string of 9 bytes that needs to be replaced.
5839 if (CGlobalInfo::is_ipv6_enable() ) {
5840 // For IPv6, accomodate use of brackets (+2 bytes)
5841 replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
5843 // Mark as IPv6 and set the upper 96-bits
5844 replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5845 for (uint8_t idx=0; idx<6; idx++){
5846 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5849 replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
5851 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
5853 eop_cmd.m_cmd = VM_EOP;
5855 program[0] = &replace_cmd;
5856 program[1] = &eop_cmd;
5858 flow_info.vm_program = program;
5865 replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_OFFSET;
5866 replace_port_cmd.m_flags = 0;
5867 replace_port_cmd.m_start_0 = 13;
5868 replace_port_cmd.m_stop_1 = 13+9;
5870 // Determine how much larger the packet needs to be to
5871 // handle the largest IP address. There is a single address
5872 // string of 9 bytes that needs to be replaced.
5873 // There are also 2 port strings (8 bytes) that needs to be
5875 if (CGlobalInfo::is_ipv6_enable() ) {
5876 // For IPv6, accomodate use of brackets (+2 bytes)
5877 replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
5878 ((INET_PORTSTRLEN * 2) - 8);
5880 // Mark as IPv6 and set the upper 96-bits
5881 replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5882 for (uint8_t idx=0; idx<6; idx++){
5883 replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5886 replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
5887 ((INET_PORTSTRLEN * 2) - 8);
5889 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5890 replace_port_cmd.m_start_port = 164;
5891 replace_port_cmd.m_stop_port = 164+(4*2)+1;
5892 replace_port_cmd.m_client_port = lpP->rtp_client_0;
5893 replace_port_cmd.m_server_port =0;
5896 eop_cmd.m_cmd = VM_EOP;
5898 program[0] = &replace_port_cmd;
5899 program[1] = &eop_cmd;
5901 flow_info.vm_program = program;
5908 replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
5909 replace_port_cmd.m_flags = 0;
5910 replace_port_cmd.m_start_0 = 0;
5911 replace_port_cmd.m_stop_1 = 0;
5913 // Determine how much larger the packet needs to be to
5914 // handle the largest port addresses. There are 4 port address
5915 // strings (16 bytes) that needs to be replaced.
5916 replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
5918 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5919 replace_port_cmd.m_start_port = 247;
5920 replace_port_cmd.m_stop_port = 247+(4*4)+2+13;
5921 replace_port_cmd.m_client_port = lpP->rtp_client_0;
5922 replace_port_cmd.m_server_port = lpP->rtp_client_0;
5925 eop_cmd.m_cmd = VM_EOP;
5927 program[0] = &replace_port_cmd;
5928 program[1] = &eop_cmd;
5930 flow_info.vm_program = program;
5938 replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_OFFSET;
5939 replace_port_cmd.m_flags = 0;
5940 replace_port_cmd.m_start_0 = 13;
5941 replace_port_cmd.m_stop_1 = 13+9;
5943 // Determine how much larger the packet needs to be to
5944 // handle the largest IP address. There is a single address
5945 // string of 9 bytes that needs to be replaced.
5946 // There are also 2 port strings (8 bytes) that needs to be
5948 if (CGlobalInfo::is_ipv6_enable() ) {
5949 // For IPv6, accomodate use of brackets (+2 bytes)
5950 replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
5951 ((INET_PORTSTRLEN * 2) - 8);
5953 // Mark as IPv6 and set the upper 96-bits
5954 replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5955 for (uint8_t idx=0; idx<6; idx++){
5956 replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5959 replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
5960 ((INET_PORTSTRLEN * 2) - 8);
5962 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5963 replace_port_cmd.m_start_port = 164;
5964 replace_port_cmd.m_stop_port = 164+(4*2)+1;
5965 replace_port_cmd.m_client_port = lpP->rtp_client_1;
5966 replace_port_cmd.m_server_port =0;
5969 eop_cmd.m_cmd = VM_EOP;
5971 program[0] = &replace_port_cmd;
5972 program[1] = &eop_cmd;
5974 flow_info.vm_program = program;
5982 replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
5983 replace_port_cmd.m_flags = 0;
5984 replace_port_cmd.m_start_0 = 0;
5985 replace_port_cmd.m_stop_1 = 0;
5987 // Determine how much larger the packet needs to be to
5988 // handle the largest port addresses. There are 4 port address
5989 // strings (16 bytes) that needs to be replaced.
5990 replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
5992 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5993 replace_port_cmd.m_start_port = 247;
5994 replace_port_cmd.m_stop_port = 247+(4*4)+2+13;
5995 replace_port_cmd.m_client_port = lpP->rtp_client_1;
5996 replace_port_cmd.m_server_port = lpP->rtp_client_1;
5999 eop_cmd.m_cmd = VM_EOP;
6001 program[0] = &replace_port_cmd;
6002 program[1] = &eop_cmd;
6004 flow_info.vm_program = program;
6012 replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
6013 replace_cmd.m_flags = 0;
6014 replace_cmd.m_start_0 = 12;
6015 replace_cmd.m_stop_1 = 12+9;
6017 // Determine how much larger the packet needs to be to
6018 // handle the largest IP address. There is a single address
6019 // string of 9 bytes that needs to be replaced.
6020 if (CGlobalInfo::is_ipv6_enable() ) {
6021 // For IPv6, accomodate use of brackets (+2 bytes)
6022 replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6024 // Mark as IPv6 and set the upper 96-bits
6025 replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6026 for (uint8_t idx=0; idx<6; idx++){
6027 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6030 replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6032 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6034 eop_cmd.m_cmd = VM_EOP;
6036 program[0] = &replace_cmd;
6037 program[1] = &eop_cmd;
6039 flow_info.vm_program = program;
6047 replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
6048 replace_cmd.m_flags = 0;
6049 replace_cmd.m_start_0 = 15;
6050 replace_cmd.m_stop_1 = 15+9;
6052 // Determine how much larger the packet needs to be to
6053 // handle the largest IP address. There is a single address
6054 // string of 9 bytes that needs to be replaced.
6055 if (CGlobalInfo::is_ipv6_enable() ) {
6056 // For IPv6, accomodate use of brackets (+2 bytes)
6057 replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6059 // Mark as IPv6 and set the upper 96-bits
6060 replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6061 for (uint8_t idx=0; idx<6; idx++){
6062 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6065 replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6067 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6069 eop_cmd.m_cmd = VM_EOP;
6071 program[0] = &replace_cmd;
6072 program[1] = &eop_cmd;
6074 flow_info.vm_program = program;
6082 replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
6083 replace_cmd.m_flags = 0;
6084 replace_cmd.m_start_0 = 15;
6085 replace_cmd.m_stop_1 = 15+9;
6087 // Determine how much larger the packet needs to be to
6088 // handle the largest IP address. There is a single address
6089 // string of 9 bytes that needs to be replaced.
6090 if (CGlobalInfo::is_ipv6_enable() ) {
6091 // For IPv6, accomodate use of brackets (+2 bytes)
6092 replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6094 // Mark as IPv6 and set the upper 96-bits
6095 replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6096 for (uint8_t idx=0; idx<6; idx++){
6097 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6100 replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6102 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6104 eop_cmd.m_cmd = VM_EOP;
6106 program[0] = &replace_cmd;
6107 program[1] = &eop_cmd;
6109 flow_info.vm_program = program;
6116 replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
6117 replace_cmd.m_flags = 0;
6118 replace_cmd.m_start_0 = 16;
6119 replace_cmd.m_stop_1 = 16+9;
6121 // Determine how much larger the packet needs to be to
6122 // handle the largest IP address. There is a single address
6123 // string of 9 bytes that needs to be replaced.
6124 if (CGlobalInfo::is_ipv6_enable() ) {
6125 // For IPv6, accomodate use of brackets (+2 bytes)
6126 replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6128 // Mark as IPv6 and set the upper 96-bits
6129 replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6130 for (uint8_t idx=0; idx<6; idx++){
6131 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6134 replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6136 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6138 eop_cmd.m_cmd = VM_EOP;
6140 program[0] = &replace_cmd;
6141 program[1] = &eop_cmd;
6143 flow_info.vm_program = program;
6148 }/* end of big switch on packet */
6152 flow_info.client_ip = node->m_src_ip ;
6153 flow_info.server_ip = node->m_dest_ip;
6154 flow_info.client_port = lpP->rtp_client_0;
6155 /* this is tricky ..*/
6156 flow_info.server_port = lpP->rtp_client_0;
6157 flow_info.replace_server_port = true;
6158 flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
6159 flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
6163 flow_info.client_ip = node->m_src_ip ;
6164 flow_info.server_ip = node->m_dest_ip;
6165 flow_info.client_port = lpP->rtp_client_1;
6166 /* this is tricky ..*/
6167 flow_info.server_port = lpP->rtp_client_1;
6168 flow_info.replace_server_port =true;
6169 flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
6170 flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
6178 //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x init:%x replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
6180 //printf(" program %p \n",flow_info.vm_program);
6181 if ( unlikely( flow_info.vm_program != 0 ) ) {
6183 mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
6185 if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
6186 // Request a larger initial segment for IPv6
6187 mbuf = pkt_info->do_generate_new_mbuf_ex_big(node,&flow_info);
6189 mbuf = pkt_info->do_generate_new_mbuf_ex(node,&flow_info);
6193 // Fixup the TCP sequence numbers for the TCP flow
6194 if ( lpd->getFlowId() == 0 ) {
6195 uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
6197 // Update TCP sequence numbers
6198 lpP->update(p, pkt_info, s_size);
6205 /* replace the tuples */
6206 rte_mbuf_t * CPluginCallbackSimple::on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
6208 rte_mbuf_t * m=NULL;
6209 switch (plugin_id) {
6211 m=rtsp_plugin(plugin_id,node,pkt_info);
6214 m=sip_voice_plugin(plugin_id,node,pkt_info);
6217 m=dyn_pyload_plugin(plugin_id,node,pkt_info);
6219 case mpAVL_HTTP_BROWSIN:
6220 m=http_plugin(plugin_id,node,pkt_info);
6229 int CMiniVM::mini_vm_run(CMiniVMCmdBase * cmds[]){
6232 bool need_to_stop=false;
6234 CMiniVMCmdBase * cmd=cmds[cnt];
6235 while (! need_to_stop) {
6236 switch (cmd->m_cmd) {
6237 case VM_REPLACE_IP_OFFSET:
6238 mini_vm_replace_ip((CMiniVMReplaceIP *)cmd);
6240 case VM_REPLACE_IP_PORT_OFFSET:
6241 mini_vm_replace_port_ip((CMiniVMReplaceIPWithPort *)cmd);
6243 case VM_REPLACE_IP_PORT_RESPONSE_OFFSET:
6244 mini_vm_replace_ports((CMiniVMReplaceIPWithPort *)cmd);
6247 case VM_REPLACE_IP_IP_PORT:
6248 mini_vm_replace_ip_ip_ports((CMiniVMReplaceIP_IP_Port * )cmd);
6251 case VM_REPLACE_IPVIA_IP_IP_PORT:
6252 mini_vm_replace_ip_via_ip_ip_ports((CMiniVMReplaceIP_PORT_IP_IP_Port *)cmd);
6256 mini_vm_dyn_payload((CMiniVMDynPyload *)cmd);
6263 printf(" vm cmd %d does not exist \n",cmd->m_cmd);
6272 inline int cp_pkt_len(char *to,char *from,uint16_t from_offset,uint16_t len){
6273 memcpy(to, from+from_offset , len);
6277 /* not including the to_offset
6283 inline int cp_pkt_to_from(char *to,char *from,uint16_t from_offset,uint16_t to_offset){
6284 memcpy(to, from+from_offset , to_offset-from_offset) ;
6285 return (to_offset-from_offset);
6289 int CMiniVM::mini_vm_dyn_payload( CMiniVMDynPyload * cmd){
6290 /* copy all the packet */
6291 CFlowYamlDpPkt * dyn=(CFlowYamlDpPkt *)cmd->m_ptr;
6292 uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6293 uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset ;
6294 char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6295 char * p=m_pyload_mbuf_ptr;
6297 memcpy(p,original_l7_ptr,len);
6298 if ( ( dyn->m_pyld_offset+ (dyn->m_len*4)) < ( len-4) ){
6299 // we can change the packet
6301 uint32_t *l=(uint32_t *)(p+dyn->m_pyld_offset);
6302 for (i=0; i<dyn->m_len; i++) {
6303 if ( dyn->m_type==0 ) {
6304 *l=(rand() & dyn->m_pkt_mask);
6305 }else if (dyn->m_type==1){
6306 *l=(PKT_NTOHL(cmd->m_ip.v4) & dyn->m_pkt_mask);
6313 // Return packet size which hasn't changed
6314 m_new_pkt_size = m_pkt_info->m_packet->pkt_len;
6320 int CMiniVM::mini_vm_replace_ip_via_ip_ip_ports(CMiniVMReplaceIP_PORT_IP_IP_Port * cmd){
6321 uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6322 uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
6323 char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6324 char * p=m_pyload_mbuf_ptr;
6326 p+=cp_pkt_to_from(p,original_l7_ptr,
6328 cmd->m_ip_via_start);
6330 if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6331 p+=ipv6_to_str(&cmd->m_ip_via,p);
6333 p+=ip_to_str(cmd->m_ip_via.v4,p);
6335 p+=sprintf(p,":%u",cmd->m_port_via);
6338 p+=cp_pkt_to_from(p,original_l7_ptr,
6342 if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6344 p+=ipv6_to_str(&cmd->m_ip,p);
6346 p+=ip_to_str(cmd->m_ip.v4,p);
6349 p+=cp_pkt_to_from(p, original_l7_ptr ,
6353 if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6355 p+=ipv6_to_str(&cmd->m_ip,p);
6357 p+=ip_to_str(cmd->m_ip.v4,p);
6361 p+=cp_pkt_to_from(p, original_l7_ptr ,
6365 p+=sprintf(p,"%u",cmd->m_port);
6368 p+=cp_pkt_to_from(p, original_l7_ptr ,
6372 // Determine new packet size
6373 m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6379 int CMiniVM::mini_vm_replace_ip_ip_ports(CMiniVMReplaceIP_IP_Port * cmd){
6380 uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6381 uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
6382 char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6383 char * p=m_pyload_mbuf_ptr;
6386 p+=cp_pkt_to_from(p,original_l7_ptr,
6390 if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6391 p+=ipv6_to_str(&cmd->m_ip,p);
6393 p+=ip_to_str(cmd->m_ip.v4,p);
6396 p+=cp_pkt_to_from(p, original_l7_ptr ,
6400 if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6401 p+=ipv6_to_str(&cmd->m_ip,p);
6403 p+=ip_to_str(cmd->m_ip.v4,p);
6407 p+=cp_pkt_to_from(p, original_l7_ptr ,
6411 p+=sprintf(p,"%u",cmd->m_port);
6414 p+=cp_pkt_to_from(p, original_l7_ptr ,
6418 // Determine new packet size
6419 m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6424 int CMiniVM::mini_vm_replace_ports(CMiniVMReplaceIPWithPort * cmd){
6425 uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6426 uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
6427 char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6429 memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_port);
6430 char * p=m_pyload_mbuf_ptr+cmd->m_start_port;
6431 p+=sprintf(p,"%u-%u;server_port=%u-%u",cmd->m_client_port,cmd->m_client_port+1,cmd->m_server_port,cmd->m_server_port+1);
6432 memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
6433 p+=(len-cmd->m_stop_port);
6435 // Determine new packet size
6436 m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6442 int CMiniVM::mini_vm_replace_port_ip(CMiniVMReplaceIPWithPort * cmd){
6443 uint16_t l7_offset=m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6444 uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset - 0;
6445 char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6447 memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
6448 char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
6449 if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6450 p+=ipv6_to_str(&cmd->m_server_ip,p);
6452 p+=ip_to_str(cmd->m_server_ip.v4,p);
6454 /* copy until the port start offset */
6455 int len1=cmd->m_start_port-cmd->m_stop_1 ;
6456 memcpy(p, original_l7_ptr+cmd->m_stop_1,len1);
6458 p+=sprintf(p,"%u-%u",cmd->m_client_port,cmd->m_client_port+1);
6459 memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
6460 p+=len-cmd->m_stop_port;
6462 // Determine new packet size
6463 m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6468 int CMiniVM::mini_vm_replace_ip(CMiniVMReplaceIP * cmd){
6469 uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6470 uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
6471 char * original_l7_ptr = m_pkt_info->m_packet->raw+l7_offset;
6473 memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
6474 char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
6477 if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6478 n_size=ipv6_to_str(&cmd->m_server_ip,p);
6480 n_size=ip_to_str(cmd->m_server_ip.v4,p);
6483 memcpy(p, original_l7_ptr+cmd->m_stop_1,len-cmd->m_stop_1);
6485 // Determine new packet size
6486 m_new_pkt_size= ((p+l7_offset+(len-cmd->m_stop_1)) - m_pyload_mbuf_ptr);
6492 void CFlowYamlDpPkt::Dump(FILE *fd){
6493 fprintf(fd," pkt_id : %d \n",(int)m_pkt_id);
6494 fprintf(fd," offset : %d \n",(int)m_pyld_offset);
6495 fprintf(fd," offset : %d \n",(int)m_type);
6496 fprintf(fd," len : %d \n",(int)m_len);
6497 fprintf(fd," mask : 0x%x \n",(int)m_pkt_mask);
6501 void CFlowYamlDynamicPyloadPlugin::Add(CFlowYamlDpPkt & fd){
6502 if (m_num ==MAX_PYLOAD_PKT_CHANGE) {
6503 fprintf (stderr,"ERROR can set only %d rules \n",MAX_PYLOAD_PKT_CHANGE);
6506 m_pkt_ids[m_num]=fd.m_pkt_id;
6507 m_program[m_num]=fd;
6511 void CFlowYamlDynamicPyloadPlugin::Dump(FILE *fd){
6513 fprintf(fd," pkts :");
6514 for (i=0; i<m_num; i++) {
6515 fprintf(fd," %d ",m_pkt_ids[i]);
6518 for (i=0; i<m_num; i++) {
6519 fprintf(fd," program : %d \n",i);
6520 fprintf(fd,"---------------- \n");
6521 m_program[i].Dump(fd);
6525 /* free the right object.
6526 it is classic to use virtual function but we can't do it here and we don't even want to use callback function
6527 as we want to save space and in most cases there is nothing to free.
6528 this might be changed in the future
6530 void CGenNodeBase::free_base(){
6531 if ( m_type == FLOW_PKT ) {
6532 CGenNode* p=(CGenNode*)this;
6536 if (m_type==STATELESS_PKT) {
6537 CGenNodeStateless* p=(CGenNodeStateless*)this;
6542 if (m_type == PCAP_PKT) {
6543 CGenNodePCAP *p = (CGenNodePCAP *)this;
6548 if ( m_type == COMMAND ) {
6549 CGenNodeCommand* p=(CGenNodeCommand*)this;