client config ARP resolve working. Still missing IPv6 support.
[trex.git] / src / bp_sim.cpp
1 /*
2  Hanoh Haim
3  Cisco Systems, Inc.
4 */
5
6 /*
7 Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9 Licensed under the Apache License, Version 2.0 (the "License");
10 you may not use this file except in compliance with the License.
11 You may obtain a copy of the License at
12
13     http://www.apache.org/licenses/LICENSE-2.0
14
15 Unless required by applicable law or agreed to in writing, software
16 distributed under the License is distributed on an "AS IS" BASIS,
17 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 See the License for the specific language governing permissions and
19 limitations under the License.
20 */
21
22 #include "bp_sim.h"
23 #include "stateful_rx_core.h"
24 #include "utl_json.h"
25 #include "utl_yaml.h"
26 #include "msg_manager.h"
27 #include "trex_watchdog.h"
28
29 #include <common/basic_utils.h>
30
31 #include <trex_stream_node.h>
32 #include <trex_stateless_messaging.h>
33
34 #undef VALG
35
36 #ifdef VALG
37 #include <valgrind/callgrind.h>
38 #endif
39
40
41 CPluginCallback * CPluginCallback::callback;
42
43
44 uint32_t getDualPortId(uint32_t thread_id){
45     return  ( thread_id % (CGlobalInfo::m_options.get_expected_dual_ports()) );
46 }
47
48
49
50 CRteMemPool       CGlobalInfo::m_mem_pool[MAX_SOCKETS_SUPPORTED];
51
52 uint32_t           CGlobalInfo::m_nodes_pool_size = 10*1024;
53 CParserOption      CGlobalInfo::m_options;
54 CGlobalMemory      CGlobalInfo::m_memory_cfg;
55 CPlatformSocketInfo CGlobalInfo::m_socket;
56
57
58
59
60
61 void CGlobalMemory::Dump(FILE *fd){
62     fprintf(fd," Total Memory : \n");
63
64     const std::string * names =get_mbuf_names();
65
66     uint32_t c_size=64;
67     uint32_t c_total=0;
68
69     int i=0;
70     for (i=0; i<MBUF_ELM_SIZE; i++) {
71         if ( (i>MBUF_9k) && (i<MBUF_DP_FLOWS)){
72             continue;
73         }
74         if ( i<TRAFFIC_MBUF_64 ){
75             c_total= m_mbuf[i] *c_size;
76             c_size=c_size*2;
77         }
78
79         fprintf(fd," %-40s  : %lu \n",names[i].c_str(),(ulong)m_mbuf[i]);
80     }
81     c_total += (m_mbuf[MBUF_DP_FLOWS] * sizeof(CGenNode));
82
83     fprintf(fd," %-40s  : %lu \n","get_each_core_dp_flows",(ulong)get_each_core_dp_flows());
84     fprintf(fd," %-40s  : %s  \n","Total memory",double_to_human_str(c_total,"bytes",KBYE_1024).c_str() );
85 }
86
87
88 void CGlobalMemory::set(const CPlatformMemoryYamlInfo &info,float mul){
89     int i;
90     for (i=0; i<MBUF_ELM_SIZE; i++) {
91         m_mbuf[i]=(uint32_t)((float)info.m_mbuf[i]*mul);
92     }
93     /* no need to multiply */
94     m_mbuf[MBUF_64]   += info.m_mbuf[TRAFFIC_MBUF_64];
95     m_mbuf[MBUF_128]  += info.m_mbuf[TRAFFIC_MBUF_128];
96     m_mbuf[MBUF_256]  += info.m_mbuf[TRAFFIC_MBUF_256];
97     m_mbuf[MBUF_512]  += info.m_mbuf[TRAFFIC_MBUF_512];
98     m_mbuf[MBUF_1024] += info.m_mbuf[TRAFFIC_MBUF_1024];
99     m_mbuf[MBUF_2048] += info.m_mbuf[TRAFFIC_MBUF_2048];
100     m_mbuf[MBUF_4096] += info.m_mbuf[TRAFFIC_MBUF_4096];
101     m_mbuf[MBUF_9k]   += info.m_mbuf[MBUF_9k];
102
103     for (i=0; i<MBUF_1024; i++) {
104         float per_queue_factor= (float)m_mbuf[i]/((float)m_pool_cache_size*(float)m_num_cores);
105         if (per_queue_factor<2.0) {
106             printf("WARNING not enough mbuf memory for this configuration trying to auto update\n");
107             printf(" %d : %f \n",(int)i,per_queue_factor);
108             m_mbuf[i]=(uint32_t)(m_mbuf[i]*2.0/per_queue_factor);
109         }
110     }
111 }
112
113
114 ////////////////////////////////////////
115
116
117 bool CPlatformSocketInfoNoConfig::is_sockets_enable(socket_id_t socket){
118     if ( socket==0 ) {
119         return(true);
120     }
121     return (false);
122 }
123
124 socket_id_t CPlatformSocketInfoNoConfig::max_num_active_sockets(){
125     return (1);
126 }
127
128
129 socket_id_t CPlatformSocketInfoNoConfig::port_to_socket(port_id_t port){
130     return (0);
131 }
132
133
134 void CPlatformSocketInfoNoConfig::set_rx_thread_is_enabled(bool enable) {
135     m_rx_is_enabled = enable;
136 }
137
138 void CPlatformSocketInfoNoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
139     m_dual_if   = num_dual_ports;
140 }
141
142
143 void CPlatformSocketInfoNoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
144     m_threads_per_dual_if = num_threads;
145 }
146
147 bool CPlatformSocketInfoNoConfig::sanity_check(){
148     return (true);
149 }
150
151 /* return the core mask */
152 uint64_t CPlatformSocketInfoNoConfig::get_cores_mask(){
153
154     uint32_t cores_number = m_threads_per_dual_if*m_dual_if;
155     if ( m_rx_is_enabled ) {
156         cores_number +=   2;
157     }else{
158         cores_number += 1; /* only MASTER*/
159     }
160     int i;
161     int offset=0;
162     /* master */
163     uint64_t res=1;
164     uint64_t mask=(1LL<<(offset+1));
165     for (i=0; i<(cores_number-1); i++) {
166         res |= mask ;
167         mask = mask <<1;
168    }
169    return (res);
170 }
171
172 virtual_thread_id_t CPlatformSocketInfoNoConfig::thread_phy_to_virt(physical_thread_id_t  phy_id){
173     return (phy_id);
174 }
175
176 physical_thread_id_t CPlatformSocketInfoNoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
177     return (virt_id);
178 }
179
180 physical_thread_id_t CPlatformSocketInfoNoConfig::get_master_phy_id() {
181     return (0);
182 }
183
184 bool CPlatformSocketInfoNoConfig::thread_phy_is_rx(physical_thread_id_t  phy_id){
185     return (phy_id==(m_threads_per_dual_if*m_dual_if+1));
186 }
187
188
189 void CPlatformSocketInfoNoConfig::dump(FILE *fd){
190     fprintf(fd," there is no configuration file given \n");
191 }
192
193 ////////////////////////////////////////
194
195 bool CPlatformSocketInfoConfig::Create(CPlatformCoresYamlInfo * platform){
196     m_platform=platform;
197     assert(m_platform);
198     assert(m_platform->m_is_exists);
199     reset();
200     return (true);
201 }
202
203 bool CPlatformSocketInfoConfig::init(){
204
205     /* iterate the sockets */
206     uint32_t num_threads=0;
207     uint32_t num_dual_if = m_platform->m_dual_if.size();
208
209     if ( m_num_dual_if > num_dual_if ){
210         printf("ERROR number of dual if %d is higher than defined in configuration file %d\n",
211                (int)m_num_dual_if,
212                (int)num_dual_if);
213     }
214
215     int i;
216     for (i=0; i<m_num_dual_if; i++) {
217         CPlatformDualIfYamlInfo * lp=&m_platform->m_dual_if[i];
218         if ( lp->m_socket>=MAX_SOCKETS_SUPPORTED ){
219             printf("ERROR socket %d is bigger than max %d \n",lp->m_socket,MAX_SOCKETS_SUPPORTED);
220             exit(1);
221         }
222
223         if (!m_sockets_enable[lp->m_socket] ) {
224             m_sockets_enable[lp->m_socket]=true;
225             m_sockets_enabled++;
226         }
227
228         m_socket_per_dual_if[i]=lp->m_socket;
229
230         /* learn how many threads per dual-if */
231         if (i==0) {
232             num_threads = lp->m_threads.size();
233             m_max_threads_per_dual_if = num_threads;
234         }else{
235             if (lp->m_threads.size() != num_threads) {
236                 printf("ERROR, the number of threads per dual ports should be the same for all dual ports\n");
237                 exit(1);
238             }
239         }
240
241         if (m_threads_per_dual_if > m_max_threads_per_dual_if) {
242             printf("ERROR: Maximum threads in platform section of config file is %d, unable to run with -c %d.\n",
243                     m_max_threads_per_dual_if, m_threads_per_dual_if);
244             printf("Please increase the pool in config or use lower -c.\n");
245             exit(1);
246         }
247
248             int j;
249
250             for (j=0; j<m_threads_per_dual_if; j++) {
251                 uint8_t virt_thread = 1+ i + j*m_num_dual_if; /* virtual thread */
252                 uint8_t phy_thread  = lp->m_threads[j];
253
254                 if (phy_thread>MAX_THREADS_SUPPORTED) {
255                     printf("ERROR, physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
256                     exit(1);
257                 }
258
259                 if (virt_thread>MAX_THREADS_SUPPORTED) {
260                     printf("ERROR virtual thread id is %d higher than max %d \n",virt_thread,MAX_THREADS_SUPPORTED);
261                     exit(1);
262                 }
263
264                 if ( m_thread_phy_to_virtual[phy_thread] ){
265                     printf("ERROR physical thread %d defined twice\n",phy_thread);
266                     exit(1);
267                 }
268                 m_thread_phy_to_virtual[phy_thread]=virt_thread;
269                 m_thread_virt_to_phy[virt_thread] =phy_thread;
270             }
271     }
272
273     if ( m_thread_phy_to_virtual[m_platform->m_master_thread] ){
274         printf("ERROR physical master thread %d already defined  \n",m_platform->m_master_thread);
275         exit(1);
276     }
277
278     if ( m_thread_phy_to_virtual[m_platform->m_rx_thread] ){
279         printf("ERROR physical latency thread %d already defined \n",m_platform->m_rx_thread);
280         exit(1);
281     }
282
283     if (m_max_threads_per_dual_if < m_threads_per_dual_if ) {
284         printf("ERROR number of threads asked per dual if is %d lower than max %d \n",
285                (int)m_threads_per_dual_if,
286                (int)m_max_threads_per_dual_if);
287         exit(1);
288     }
289     return (true);
290 }
291
292
293 void CPlatformSocketInfoConfig::dump(FILE *fd){
294     fprintf(fd," core_mask  %llx  \n",(unsigned long long)get_cores_mask());
295     fprintf(fd," sockets :");
296     int i;
297     for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
298         if ( is_sockets_enable(i) ){
299             fprintf(fd," %d ",i);
300         }
301     }
302     fprintf(fd," \n");
303     fprintf(fd," active sockets : %d \n",max_num_active_sockets());
304
305     fprintf(fd," ports_sockets : %d \n",max_num_active_sockets());
306
307     for (i = 0; i <  TREX_MAX_PORTS; i++) {
308         fprintf(fd,"%d,",port_to_socket(i));
309     }
310     fprintf(fd,"\n");
311
312     fprintf(fd," phy   |   virt   \n");
313     for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
314         virtual_thread_id_t virt=thread_phy_to_virt(i);
315         if ( virt ){
316             fprintf(fd," %d      %d   \n",i,virt);
317         }
318     }
319 }
320
321
322 void CPlatformSocketInfoConfig::reset(){
323     m_sockets_enabled=0;
324     int i;
325     for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
326         m_sockets_enable[i]=false;
327     }
328
329     for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
330         m_thread_virt_to_phy[i]=0;
331     }
332     for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
333         m_thread_phy_to_virtual[i]=0;
334     }
335     for (i = 0; i < TREX_MAX_PORTS >> 1; i++) {
336         m_socket_per_dual_if[i]=0;
337     }
338
339     m_num_dual_if=0;
340
341     m_threads_per_dual_if=0;
342     m_rx_is_enabled=false;
343     m_max_threads_per_dual_if=0;
344 }
345
346
347 void CPlatformSocketInfoConfig::Delete(){
348
349 }
350
351 bool CPlatformSocketInfoConfig::is_sockets_enable(socket_id_t socket){
352     assert(socket<MAX_SOCKETS_SUPPORTED);
353     return ( m_sockets_enable[socket] );
354 }
355
356 socket_id_t CPlatformSocketInfoConfig::max_num_active_sockets(){
357     return  ((socket_id_t)m_sockets_enabled);
358 }
359
360 socket_id_t CPlatformSocketInfoConfig::port_to_socket(port_id_t port){
361     return ( m_socket_per_dual_if[(port>>1)]);
362 }
363
364 void CPlatformSocketInfoConfig::set_rx_thread_is_enabled(bool enable){
365     m_rx_is_enabled =enable;
366 }
367
368 void CPlatformSocketInfoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
369     m_num_dual_if = num_dual_ports;
370 }
371
372 void CPlatformSocketInfoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
373      m_threads_per_dual_if =num_threads;
374 }
375
376 bool CPlatformSocketInfoConfig::sanity_check(){
377     return (init());
378 }
379
380 /* return the core mask */
381 uint64_t CPlatformSocketInfoConfig::get_cores_mask(){
382     int i;
383     uint64_t mask=0;
384     for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
385         if ( m_thread_phy_to_virtual[i] ) {
386
387             if (i>=64) {
388                 printf(" ERROR phy threads can't be higher than 64 \n");
389                 exit(1);
390             }
391             mask |=(1LL<<i);
392         }
393     }
394
395     mask |=(1LL<<m_platform->m_master_thread);
396     assert(m_platform->m_master_thread<64);
397     if (m_rx_is_enabled) {
398         mask |=(1LL<<m_platform->m_rx_thread);
399         assert(m_platform->m_rx_thread<64);
400     }
401     return (mask);
402 }
403
404 virtual_thread_id_t CPlatformSocketInfoConfig::thread_phy_to_virt(physical_thread_id_t  phy_id){
405     return (m_thread_phy_to_virtual[phy_id]);
406 }
407
408 physical_thread_id_t CPlatformSocketInfoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
409     return ( m_thread_virt_to_phy[virt_id]);
410 }
411
412 physical_thread_id_t CPlatformSocketInfoConfig::get_master_phy_id() {
413     return m_platform->m_master_thread;
414 }
415
416 bool CPlatformSocketInfoConfig::thread_phy_is_rx(physical_thread_id_t  phy_id){
417     return (m_platform->m_rx_thread == phy_id?true:false);
418 }
419
420
421
422 ////////////////////////////////////////
423
424
425 bool CPlatformSocketInfo::Create(CPlatformCoresYamlInfo * platform){
426     if ( (platform) && (platform->m_is_exists) ) {
427         CPlatformSocketInfoConfig * lp=new CPlatformSocketInfoConfig();
428         assert(lp);
429         lp->Create(platform);
430         m_obj= lp;
431     }else{
432         m_obj= new CPlatformSocketInfoNoConfig();
433     }
434     return(true);
435 }
436
437 void CPlatformSocketInfo::Delete(){
438     if ( m_obj ){
439         delete m_obj;
440         m_obj=NULL;
441     }
442 }
443
444 bool CPlatformSocketInfo::is_sockets_enable(socket_id_t socket){
445      return ( m_obj->is_sockets_enable(socket) );
446 }
447
448 socket_id_t CPlatformSocketInfo::max_num_active_sockets(){
449     return ( m_obj->max_num_active_sockets() );
450 }
451
452
453 socket_id_t CPlatformSocketInfo::port_to_socket(port_id_t port){
454     return ( m_obj->port_to_socket(port) );
455 }
456
457
458 void CPlatformSocketInfo::set_rx_thread_is_enabled(bool enable){
459     m_obj->set_rx_thread_is_enabled(enable);
460 }
461
462 void CPlatformSocketInfo::set_number_of_dual_ports(uint8_t num_dual_ports){
463     m_obj->set_number_of_dual_ports(num_dual_ports);
464 }
465
466 void CPlatformSocketInfo::set_number_of_threads_per_ports(uint8_t num_threads){
467     m_obj->set_number_of_threads_per_ports(num_threads);
468 }
469
470 bool CPlatformSocketInfo::sanity_check(){
471     return ( m_obj->sanity_check());
472 }
473
474 /* return the core mask */
475 uint64_t CPlatformSocketInfo::get_cores_mask(){
476     return ( m_obj->get_cores_mask());
477 }
478
479 virtual_thread_id_t CPlatformSocketInfo::thread_phy_to_virt(physical_thread_id_t  phy_id){
480     return ( m_obj->thread_phy_to_virt(phy_id));
481 }
482
483 physical_thread_id_t CPlatformSocketInfo::thread_virt_to_phy(virtual_thread_id_t virt_id){
484     return ( m_obj->thread_virt_to_phy(virt_id));
485 }
486
487 bool CPlatformSocketInfo::thread_phy_is_master(physical_thread_id_t  phy_id){
488     return ( m_obj->thread_phy_is_master(phy_id));
489 }
490
491 physical_thread_id_t CPlatformSocketInfo::get_master_phy_id() {
492     return ( m_obj->get_master_phy_id());
493 }
494
495 bool CPlatformSocketInfo::thread_phy_is_rx(physical_thread_id_t  phy_id) {
496     return ( m_obj->thread_phy_is_rx(phy_id));
497 }
498
499 void CPlatformSocketInfo::dump(FILE *fd){
500     m_obj->dump(fd);
501 }
502
503 ////////////////////////////////////////
504
505
506 void CRteMemPool::dump_in_case_of_error(FILE *fd){
507     fprintf(fd," ERROR,there is no enough memory in socket  %d \n",m_pool_id);
508     fprintf(fd," Try to enlarge the memory values in the configuration file -/etc/trex_cfg.yaml ,see manual for more detail  \n");
509     dump(fd);
510 }
511
512 void CRteMemPool::add_to_json(Json::Value &json, std::string name, rte_mempool_t * pool){
513     uint32_t p_free = rte_mempool_count(pool);
514     uint32_t p_size = pool->size;
515     json[name].append((unsigned long long)p_free);
516     json[name].append((unsigned long long)p_size);
517 }
518
519
520 void CRteMemPool::dump_as_json(Json::Value &json){
521     add_to_json(json, "64b", m_small_mbuf_pool);
522     add_to_json(json, "128b", m_mbuf_pool_128);
523     add_to_json(json, "256b", m_mbuf_pool_256);
524     add_to_json(json, "512b", m_mbuf_pool_512);
525     add_to_json(json, "1024b", m_mbuf_pool_1024);
526     add_to_json(json, "2048b", m_mbuf_pool_2048);
527     add_to_json(json, "4096b", m_mbuf_pool_4096);
528     add_to_json(json, "9kb", m_mbuf_pool_9k);
529 }
530
531
532 void CRteMemPool::dump(FILE *fd){
533     #define DUMP_MBUF(a,b)  { float p=(100.0*(float)rte_mempool_count(b)/(float)b->size); fprintf(fd," %-30s  : %.2f %%   %s \n",a,p,(p<5.0?"<-":"OK") ); }
534
535     DUMP_MBUF("mbuf_64",m_small_mbuf_pool);
536     DUMP_MBUF("mbuf_128",m_mbuf_pool_128);
537     DUMP_MBUF("mbuf_256",m_mbuf_pool_256);
538     DUMP_MBUF("mbuf_512",m_mbuf_pool_512);
539     DUMP_MBUF("mbuf_1024",m_mbuf_pool_1024);
540     DUMP_MBUF("mbuf_2048",m_mbuf_pool_2048);
541     DUMP_MBUF("mbuf_4096",m_mbuf_pool_4096);
542     DUMP_MBUF("mbuf_9k",m_mbuf_pool_9k);
543
544 }
545
546 ////////////////////////////////////////
547
548 void CGlobalInfo::dump_pool_as_json(Json::Value &json){
549     CPlatformSocketInfo * lpSocket =&m_socket;
550
551     for (int i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
552         if (lpSocket->is_sockets_enable((socket_id_t)i)) {
553             std::string socket_id = "cpu-socket-" + std::to_string(i);
554             m_mem_pool[i].dump_as_json(json["mbuf_stats"][socket_id]);
555         }
556     }
557 }
558
559 std::string CGlobalInfo::dump_pool_as_json_str(void){
560     Json::Value json;
561     dump_pool_as_json(json);
562     return (json.toStyledString());
563 }
564
565 void CGlobalInfo::free_pools(){
566     CPlatformSocketInfo * lpSocket =&m_socket;
567     CRteMemPool * lpmem;
568     int i;
569     for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
570         if (lpSocket->is_sockets_enable((socket_id_t)i)) {
571             lpmem= &m_mem_pool[i];
572             utl_rte_mempool_delete(lpmem->m_small_mbuf_pool);
573             utl_rte_mempool_delete(lpmem->m_mbuf_pool_128);
574             utl_rte_mempool_delete(lpmem->m_mbuf_pool_256);
575             utl_rte_mempool_delete(lpmem->m_mbuf_pool_512);
576             utl_rte_mempool_delete(lpmem->m_mbuf_pool_1024);
577             utl_rte_mempool_delete(lpmem->m_mbuf_pool_2048);
578             utl_rte_mempool_delete(lpmem->m_mbuf_pool_4096);
579             utl_rte_mempool_delete(lpmem->m_mbuf_pool_9k);
580     }
581     utl_rte_mempool_delete(m_mem_pool[0].m_mbuf_global_nodes);
582   }
583 }
584
585
586 void CGlobalInfo::init_pools(uint32_t rx_buffers){
587         /* this include the pkt from 64- */
588     CGlobalMemory * lp=&CGlobalInfo::m_memory_cfg;
589     CPlatformSocketInfo * lpSocket =&m_socket;
590
591    CRteMemPool * lpmem;
592
593     int i;
594     for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
595         if (lpSocket->is_sockets_enable((socket_id_t)i)) {
596             lpmem= &m_mem_pool[i];
597             lpmem->m_pool_id=i;
598
599
600             /* this include the packet from 0-64 this is for small packets */
601             lpmem->m_small_mbuf_pool =utl_rte_mempool_create("small-pkt-const",
602                                                       lp->m_mbuf[MBUF_64],
603                                                        CONST_SMALL_MBUF_SIZE,
604                                                        32,(i<<5)+ 2,i);
605             assert(lpmem->m_small_mbuf_pool);
606
607
608
609
610             lpmem->m_mbuf_pool_128=utl_rte_mempool_create("_128-pkt-const",
611                                                        lp->m_mbuf[MBUF_128],
612                                                        CONST_128_MBUF_SIZE,
613                                                        32,(i<<5)+ 6,i);
614
615
616             assert(lpmem->m_mbuf_pool_128);
617
618
619             lpmem->m_mbuf_pool_256=utl_rte_mempool_create("_256-pkt-const",
620                                                    lp->m_mbuf[MBUF_256],
621                                                    CONST_256_MBUF_SIZE,
622                                                    32,(i<<5)+ 3,i);
623
624             assert(lpmem->m_mbuf_pool_256);
625
626             lpmem->m_mbuf_pool_512=utl_rte_mempool_create("_512_-pkt-const",
627                                                    lp->m_mbuf[MBUF_512],
628                                                    CONST_512_MBUF_SIZE,
629                                                    32,(i<<5)+ 4,i);
630             assert(lpmem->m_mbuf_pool_512);
631
632             lpmem->m_mbuf_pool_1024=utl_rte_mempool_create("_1024-pkt-const",
633                                                     lp->m_mbuf[MBUF_1024],
634                                                     CONST_1024_MBUF_SIZE,
635                                                     32,(i<<5)+ 5,i);
636
637             assert(lpmem->m_mbuf_pool_1024);
638
639             lpmem->m_mbuf_pool_2048=utl_rte_mempool_create("_2048-pkt-const",
640                                                     lp->m_mbuf[MBUF_2048],
641                                                     CONST_2048_MBUF_SIZE,
642                                                     32,(i<<5)+ 5,i);
643
644             assert(lpmem->m_mbuf_pool_2048);
645
646             lpmem->m_mbuf_pool_4096=utl_rte_mempool_create("_4096-pkt-const",
647                                                     lp->m_mbuf[MBUF_4096],
648                                                     CONST_4096_MBUF_SIZE,
649                                                     32,(i<<5)+ 5,i);
650
651             assert(lpmem->m_mbuf_pool_4096);
652
653             lpmem->m_mbuf_pool_9k=utl_rte_mempool_create("_9k-pkt-const",
654                                                     lp->m_mbuf[MBUF_9k]+rx_buffers,
655                                                     CONST_9k_MBUF_SIZE,
656                                                     32,(i<<5)+ 5,i);
657
658             assert(lpmem->m_mbuf_pool_9k);
659
660         }
661     }
662
663     /* global always from socket 0 */
664     m_mem_pool[0].m_mbuf_global_nodes = utl_rte_mempool_create_non_pkt("global-nodes",
665                                                          lp->m_mbuf[MBUF_GLOBAL_FLOWS],
666                                                          sizeof(CGenNode),
667                                                          128,
668                                                          0 ,
669                                                          SOCKET_ID_ANY);
670
671     assert(m_mem_pool[0].m_mbuf_global_nodes);
672
673
674 }
675
676
677
678 void CFlowYamlInfo::Dump(FILE *fd){
679     fprintf(fd,"name        : %s \n",m_name.c_str());
680     fprintf(fd,"cps         : %f \n",m_k_cps);
681     fprintf(fd,"ipg         : %f \n",m_ipg_sec);
682     fprintf(fd,"rtt         : %f \n",m_rtt_sec);
683     fprintf(fd,"w           : %d \n",m_w);
684     fprintf(fd,"wlength     : %d \n",m_wlength);
685     fprintf(fd,"limit       : %d \n",m_limit);
686     fprintf(fd,"limit_was_set       : %d \n",m_limit_was_set?1:0);
687     fprintf(fd,"cap_mode    : %d \n",m_cap_mode?1:0);
688     fprintf(fd,"plugin_id   : %d \n",m_plugin_id);
689     fprintf(fd,"one_server  : %d \n",m_one_app_server?1:0);
690     fprintf(fd,"one_server_was_set  : %d \n",m_one_app_server_was_set?1:0);
691     if (m_dpPkt) {
692         m_dpPkt->Dump(fd);
693     }
694 }
695
696
697
698
699 void  dump_mac_addr(FILE* fd,uint8_t *p){
700     int i;
701     for (i=0; i<6; i++) {
702         uint8_t a=p[i];
703         if (i==5) {
704             fprintf(fd,"%02x",a);
705         }else{
706             fprintf(fd,"%02x:",a);
707         }
708     }
709
710 }
711
712
713
714 static uint8_t human_tbl[]={
715     ' ',
716     'K',
717     'M',
718     'G',
719     'T'
720 };
721
722 std::string double_to_human_str(double num,
723                                 std::string units,
724                                 human_kbyte_t etype){
725     double abs_num=num;
726     if (num<0.0) {
727         abs_num=-num;
728     }
729     int i=0;
730     int max_cnt=sizeof(human_tbl)/sizeof(human_tbl[0]);
731     double div =1.0;
732     double f=1000.0;
733     if (etype ==KBYE_1024){
734         f=1024.0;
735     }
736     while ((abs_num > f ) && (i < max_cnt - 1)){
737         abs_num/=f;
738         div*=f;
739         i++;
740     }
741
742     char buf [100];
743     sprintf(buf,"%10.2f %c%s",num/div,human_tbl[i],units.c_str());
744     std::string res(buf);
745     return (res);
746 }
747
748
749 void CPreviewMode::Dump(FILE *fd){
750     fprintf(fd," flags           : %x\n", m_flags);
751     fprintf(fd," write_file      : %d\n", getFileWrite()?1:0);
752     fprintf(fd," verbose         : %d\n", (int)getVMode() );
753     fprintf(fd," realtime        : %d\n", (int)getRealTime() );
754     fprintf(fd," flip            : %d\n", (int)getClientServerFlip() );
755     fprintf(fd," cores           : %d\n", (int)getCores()  );
756     fprintf(fd," single core     : %d\n", (int)getSingleCore() );
757     fprintf(fd," flow-flip       : %d\n", (int)getClientServerFlowFlip() );
758     fprintf(fd," no clean close  : %d\n", (int)getNoCleanFlowClose() );
759     fprintf(fd," zmq_publish     : %d\n", (int)get_zmq_publish_enable() );
760     fprintf(fd," vlan_enable     : %d\n", (int)get_vlan_mode_enable() );
761     fprintf(fd," client_cfg      : %d\n", (int)get_is_client_cfg_enable() );
762     fprintf(fd," mbuf_cache_disable  : %d\n", (int)isMbufCacheDisabled() );
763     fprintf(fd," vm mode         : %d\n", (int)get_vm_one_queue_enable()?1:0 );
764 }
765
766 void CFlowGenStats::clear(){
767    m_nat_lookup_no_flow_id=0;
768    m_total_bytes=0;
769    m_total_pkt=0;
770    m_total_open_flows =0;
771    m_total_close_flows =0;
772    m_nat_lookup_no_flow_id=0;
773    m_nat_lookup_remove_flow_id=0;
774    m_nat_lookup_wait_ack_state = 0;
775    m_nat_lookup_add_flow_id=0;
776    m_nat_flow_timeout=0;
777    m_nat_flow_timeout_wait_ack = 0;
778    m_nat_flow_learn_error=0;
779 }
780
781 void CFlowGenStats::dump(FILE *fd){
782     std::string s_bytes=double_to_human_str((double )(m_total_bytes),
783                                     "bytes",
784                                     KBYE_1024);
785
786     std::string s_pkt=double_to_human_str((double )(m_total_pkt),
787                                     "pkt",
788                                     KBYE_1000);
789
790     std::string s_flows=double_to_human_str((double )(m_total_open_flows),
791                                     "flows",
792                                     KBYE_1000);
793
794    DP_S(m_total_bytes,s_bytes);
795    DP_S(m_total_pkt,s_pkt);
796    DP_S(m_total_open_flows,s_flows);
797    DP(m_total_pkt);
798    DP(m_total_open_flows);
799    DP(m_total_close_flows);
800    DP_name("active",(m_total_open_flows-m_total_close_flows));
801    DP(m_total_bytes);
802    DP(m_nat_lookup_no_flow_id);
803
804    DP(m_nat_lookup_no_flow_id);
805    DP(m_nat_lookup_remove_flow_id);
806    DP(m_nat_lookup_wait_ack_state);
807    DP(m_nat_lookup_add_flow_id);
808    DP(m_nat_flow_timeout);
809    DP(m_nat_flow_timeout_wait_ack);
810    DP_name("active_nat",(m_nat_lookup_add_flow_id-m_nat_lookup_remove_flow_id));
811    DP_name("active_nat_wait_syn", (m_nat_lookup_add_flow_id - m_nat_lookup_wait_ack_state));
812    DP(m_nat_flow_learn_error);
813 }
814
815
816
817 int CErfIF::open_file(std::string file_name){
818     BP_ASSERT(m_writer==0);
819
820     if ( m_preview_mode->getFileWrite() ){
821         capture_type_e file_type=ERF;
822         if ( m_preview_mode->get_pcap_mode_enable() ){
823             file_type=LIBPCAP;
824         }
825         m_writer = CCapWriterFactory::CreateWriter(file_type,(char *)file_name.c_str());
826         if (m_writer == NULL) {
827             fprintf(stderr,"ERROR can't create cap file %s ",(char *)file_name.c_str());
828             return (-1);
829         }
830     }
831     m_raw = new CCapPktRaw();
832     return (0);
833 }
834
835
836 int CErfIF::write_pkt(CCapPktRaw *pkt_raw){
837
838     BP_ASSERT(m_writer);
839
840     if ( m_preview_mode->getFileWrite() ){
841         BP_ASSERT(m_writer);
842         bool res=m_writer->write_packet(pkt_raw);
843         if (res != true) {
844             fprintf(stderr,"ERROR can't write to cap file");
845             return (-1);
846         }
847     }
848     return (0);
849 }
850
851
852 int CErfIF::close_file(void){
853
854     if (m_raw) {
855         delete m_raw;
856         m_raw = NULL;
857     }
858
859     if ( m_preview_mode->getFileWrite() ){
860         if (m_writer) {
861             delete m_writer;
862             m_writer = NULL;
863         }
864     }
865
866     return (0);
867 }
868
869
870
871 void CFlowKey::Clean(){
872     m_ipaddr1=0;
873     m_ipaddr2=0;
874     m_port1=0;
875     m_port2=0;
876     m_ip_proto=0;
877     m_l2_proto=0;
878     m_vrfid=0;
879 }
880
881 void CFlowKey::Dump(FILE *fd){
882     fprintf(fd," %x:%x:%x:%x:%x:%x:%x\n",m_ipaddr1,m_ipaddr2,m_port1,m_port2,m_ip_proto,m_l2_proto,m_vrfid);
883 }
884
885
886
887 void CPacketDescriptor::Dump(FILE *fd){
888     fprintf(fd," IsSwapTuple : %d \n",IsSwapTuple()?1:0);
889     fprintf(fd," IsSInitDir  : %d \n",IsInitSide()?1:0);
890     fprintf(fd," Isvalid : %d ",IsValidPkt()?1:0);
891     fprintf(fd," IsRtt   : %d ",IsRtt()?1:0);
892     fprintf(fd," IsLearn   : %d ",IsLearn()?1:0);
893
894     if (IsTcp() ) {
895         fprintf(fd," TCP ");
896     }else{
897         fprintf(fd," UDP ");
898     }
899     fprintf(fd," IsLast Pkt   : %d ", IsLastPkt() ?1:0);
900     fprintf(fd," id           : %d \n",getId() );
901
902     fprintf(fd," flow_ID      : %d , max_pkts : %u, max_aging: %d sec , pkt_id : %u, init: %d   ( dir:%d  dir_max :%d  ) bid:%d \n",getFlowId(),
903             GetMaxPktsPerFlow(),
904             GetMaxFlowTimeout() ,
905             getFlowPktNum(),
906             IsInitSide(),
907             GetDirInfo()->GetPktNum(),
908             GetDirInfo()->GetMaxPkts(),
909             IsBiDirectionalFlow()?1:0
910
911             );
912     fprintf(fd,"\n");
913 }
914
915
916 void CPacketIndication::UpdateOffsets(){
917     m_ether_offset   = getEtherOffset();
918     m_ip_offset      = getIpOffset();
919     m_udp_tcp_offset = getTcpOffset();
920     m_payload_offset = getPayloadOffset();
921 }
922
923 void CPacketIndication::UpdatePacketPadding(){
924     m_packet_padding = m_packet->getTotalLen() - (l3.m_ipv4->getTotalLength()+ getIpOffset());
925 }
926
927
928 void CPacketIndication::RefreshPointers(){
929
930     char *pobase=getBasePtr();
931
932     m_ether = (EthernetHeader *) (pobase + m_ether_offset);
933     l3.m_ipv4  = (IPHeader       *) (pobase + m_ip_offset);
934     l4.m_tcp=  (TCPHeader *)(pobase + m_udp_tcp_offset);
935     if ( m_payload_offset ){
936         m_payload =(uint8_t *)(pobase + m_payload_offset);
937     }else{
938         m_payload =(uint8_t *)(0);
939     }
940 }
941
942 // copy ref assume pkt point to a fresh
943 void CPacketIndication::Clone(CPacketIndication * obj,CCapPktRaw * pkt){
944     Clean();
945     m_cap_ipg = obj->m_cap_ipg;
946     m_packet  = pkt;
947     char *pobase=getBasePtr();
948     m_flow = obj->m_flow;
949
950     m_ether = (EthernetHeader *) (pobase + obj->getEtherOffset());
951     l3.m_ipv4  = (IPHeader       *) (pobase + obj->getIpOffset());
952     m_is_ipv6 = obj->m_is_ipv6;
953     l4.m_tcp=  (TCPHeader *)(pobase + obj->getTcpOffset());
954     if ( obj->getPayloadOffset() ){
955         m_payload =(uint8_t *)(pobase + obj->getPayloadOffset());
956     }else{
957         m_payload =(uint8_t *)(0);
958     }
959     m_payload_len = obj->m_payload_len;
960     m_flow_key    = obj->m_flow_key;
961     m_desc        = obj->m_desc;
962
963     m_packet_padding = obj->m_packet_padding;
964     /* copy offsets*/
965     m_ether_offset   = obj->m_ether_offset;
966     m_ip_offset      = obj->m_ip_offset;
967     m_udp_tcp_offset = obj->m_udp_tcp_offset;;
968     m_payload_offset = obj->m_payload_offset;
969 }
970
971
972
973 void CPacketIndication::Dump(FILE *fd,int verbose){
974     fprintf(fd," ipg : %f \n",m_cap_ipg);
975     fprintf(fd," key \n");
976     fprintf(fd," ------\n");
977     m_flow_key.Dump(fd);
978
979     fprintf(fd," L2 info \n");
980     fprintf(fd," ------\n");
981     m_packet->Dump(fd,verbose);
982
983     fprintf(fd," Descriptor \n");
984     fprintf(fd," ------\n");
985     m_desc.Dump(fd);
986
987     if ( m_desc.IsValidPkt() ) {
988         fprintf(fd," ipv4 \n");
989         l3.m_ipv4->dump(fd);
990         if ( m_desc.IsUdp() ) {
991             l4.m_udp->dump(fd);
992         }else{
993             l4.m_tcp->dump(fd);
994         }
995         fprintf(fd," payload len : %d \n",m_payload_len);
996     }else{
997         fprintf(fd," not valid packet \n");
998     }
999 }
1000
1001 void CPacketIndication::Clean(){
1002     m_desc.Clear();
1003     m_ether=0;
1004     l3.m_ipv4=0;
1005     l4.m_tcp=0;
1006     m_payload=0;
1007     m_payload_len=0;
1008 }
1009
1010
1011
1012 uint64_t CCPacketParserCounters::getTotalErrors(){
1013     uint64_t res=
1014     m_non_ip+
1015     m_arp+
1016     m_mpls+
1017     m_non_valid_ipv4_ver+
1018     m_ip_checksum_error+
1019     m_ip_length_error+
1020     m_ip_not_first_fragment_error+
1021     m_ip_ttl_is_zero_error+
1022     m_ip_multicast_error+
1023
1024     m_non_tcp_udp_ah+
1025     m_non_tcp_udp_esp+
1026     m_non_tcp_udp_icmp+
1027     m_non_tcp_udp_gre+
1028     m_non_tcp_udp_ip+
1029     m_tcp_udp_pkt_length_error;
1030     return (res);
1031 }
1032
1033 void CCPacketParserCounters::Clear(){
1034     m_pkt=0;
1035     m_non_ip=0;
1036     m_vlan=0;
1037     m_arp=0;
1038     m_mpls=0;
1039
1040     m_non_valid_ipv4_ver=0;
1041     m_ip_checksum_error=0;
1042     m_ip_length_error=0;
1043     m_ip_not_first_fragment_error=0;
1044     m_ip_ttl_is_zero_error=0;
1045     m_ip_multicast_error=0;
1046     m_ip_header_options=0;
1047
1048     m_non_tcp_udp=0;
1049     m_non_tcp_udp_ah=0;
1050     m_non_tcp_udp_esp=0;
1051     m_non_tcp_udp_icmp=0;
1052     m_non_tcp_udp_gre=0;
1053     m_non_tcp_udp_ip=0;
1054     m_tcp_header_options=0;
1055     m_tcp_udp_pkt_length_error=0;
1056     m_tcp=0;
1057     m_udp=0;
1058     m_valid_udp_tcp=0;
1059 }
1060
1061
1062 void CCPacketParserCounters::Dump(FILE *fd){
1063
1064     DP (m_pkt);
1065     DP (m_non_ip);
1066     DP (m_vlan);
1067     DP (m_arp);
1068     DP (m_mpls);
1069
1070     DP (m_non_valid_ipv4_ver);
1071     DP (m_ip_checksum_error);
1072     DP (m_ip_length_error);
1073     DP (m_ip_not_first_fragment_error);
1074     DP (m_ip_ttl_is_zero_error);
1075     DP (m_ip_multicast_error);
1076     DP (m_ip_header_options);
1077
1078     DP (m_non_tcp_udp);
1079     DP (m_non_tcp_udp_ah);
1080     DP (m_non_tcp_udp_esp);
1081     DP (m_non_tcp_udp_icmp);
1082     DP (m_non_tcp_udp_gre);
1083     DP (m_non_tcp_udp_ip);
1084     DP (m_tcp_header_options);
1085     DP (m_tcp_udp_pkt_length_error);
1086     DP (m_tcp);
1087     DP (m_udp);
1088     DP (m_valid_udp_tcp);
1089 }
1090
1091
1092 bool CPacketParser::Create(){
1093     m_counter.Clear();
1094     return (true);
1095 }
1096
1097 void CPacketParser::Delete(){
1098 }
1099
1100
1101 bool CPacketParser::ProcessPacket(CPacketIndication * pkt_indication,
1102                                   CCapPktRaw * raw_packet){
1103     BP_ASSERT(pkt_indication);
1104     pkt_indication->ProcessPacket(this,raw_packet);
1105     if (pkt_indication->m_desc.IsValidPkt()) {
1106         return (true);
1107     }
1108     return (false);
1109 }
1110
1111 void CPacketParser::Dump(FILE *fd){
1112     fprintf(fd," parser statistic \n");
1113     fprintf(fd," ===================== \n");
1114     m_counter.Dump(fd);
1115 }
1116
1117
1118 void CPacketIndication::SetKey(void){
1119     uint32_t ip_src, ip_dst;
1120
1121     m_desc.SetIsValidPkt(true);
1122     if (is_ipv6()){
1123         uint16_t ipv6_src[8];
1124         uint16_t ipv6_dst[8];
1125
1126         l3.m_ipv6->getSourceIpv6(&ipv6_src[0]);
1127         l3.m_ipv6->getDestIpv6(&ipv6_dst[0]);
1128         ip_src=(ipv6_src[6] << 16) | ipv6_src[7];
1129         ip_dst=(ipv6_dst[6] << 16) | ipv6_dst[7];
1130         m_flow_key.m_ip_proto   = l3.m_ipv6->getNextHdr();
1131     }else{
1132         ip_src=l3.m_ipv4->getSourceIp();
1133         ip_dst=l3.m_ipv4->getDestIp();
1134         m_flow_key.m_ip_proto   = l3.m_ipv4->getProtocol();
1135     }
1136
1137     /* UDP/TCP has same place */
1138     uint16_t src_port = l4.m_udp->getSourcePort();
1139     uint16_t dst_port = l4.m_udp->getDestPort();
1140     if (ip_src > ip_dst ) {
1141        m_flow_key.m_ipaddr1 =ip_dst;
1142        m_flow_key.m_ipaddr2 =ip_src;
1143        m_flow_key.m_port1   = dst_port;
1144        m_flow_key.m_port2   = src_port;
1145     }else{
1146         m_desc.SetSwapTuple(true);
1147         m_flow_key.m_ipaddr1 = ip_src;
1148         m_flow_key.m_ipaddr2 = ip_dst;
1149         m_flow_key.m_port1   = src_port;
1150         m_flow_key.m_port2   = dst_port;
1151     }
1152     m_flow_key.m_l2_proto   = 0;
1153     m_flow_key.m_vrfid      = 0;
1154 }
1155
1156 uint8_t CPacketIndication::ProcessIpPacketProtocol(CCPacketParserCounters *m_cnt,
1157                                   uint8_t protocol, int *offset){
1158
1159     char * packetBase = m_packet->raw;
1160     TCPHeader * tcp=0;
1161     UDPHeader * udp=0;
1162     uint16_t tcp_header_len=0;
1163
1164     switch (protocol) {
1165     case IPHeader::Protocol::TCP :
1166         m_desc.SetIsTcp(true);
1167         tcp =(TCPHeader *)(packetBase +*offset);
1168         l4.m_tcp = tcp;
1169
1170         tcp_header_len = tcp->getHeaderLength();
1171         if ( tcp_header_len  > (5*4) ){
1172             // we have ip option
1173             m_cnt->m_tcp_header_options++;
1174         }
1175         *offset += tcp_header_len;
1176         m_cnt->m_tcp++;
1177         break;
1178     case IPHeader::Protocol::UDP :
1179         m_desc.SetIsUdp(true);
1180         udp =(UDPHeader *)(packetBase +*offset);
1181         l4.m_udp = udp;
1182         *offset += 8;
1183         m_cnt->m_udp++;
1184         break;
1185     case         IPHeader::Protocol::AH:
1186         m_cnt->m_non_tcp_udp_ah++;
1187         return (1);
1188         break;
1189     case         IPHeader::Protocol::ESP:
1190         m_cnt->m_non_tcp_udp_esp++;
1191         return (1);
1192         break;
1193     case         IPHeader::Protocol::ICMP:
1194     case         IPHeader::Protocol::IPV6_ICMP:
1195         m_cnt->m_non_tcp_udp_icmp++;
1196         return (1);
1197         break;
1198     case         IPHeader::Protocol::GRE:
1199         m_cnt->m_non_tcp_udp_gre++;
1200         return (1);
1201         break;
1202     case         IPHeader::Protocol::IP:
1203         m_cnt->m_non_ip++;
1204         return (1);
1205         break;
1206
1207     default:
1208         m_cnt->m_non_tcp_udp++;
1209         return (1);
1210         break;
1211     }
1212
1213     /* out of packet */
1214     if ( *offset > m_packet->getTotalLen() ) {
1215         m_cnt->m_tcp_udp_pkt_length_error++;
1216         return (1);
1217     }
1218     return (0);
1219 }
1220
1221
1222 void CPacketIndication::ProcessIpPacket(CPacketParser *parser,
1223                                         int offset){
1224
1225     char * packetBase;
1226     CCPacketParserCounters * m_cnt=&parser->m_counter;
1227     packetBase = m_packet->raw;
1228     uint8_t protocol;
1229     BP_ASSERT(l3.m_ipv4);
1230
1231     parser->m_counter.m_pkt++;
1232
1233     if ( l3.m_ipv4->getVersion() == 4 ){
1234         m_cnt->m_ipv4++;
1235     }else{
1236         m_cnt->m_non_valid_ipv4_ver++;
1237         return;
1238     }
1239     // check the IP Length
1240     if( (uint32_t)(l3.m_ipv4->getTotalLength()+offset) > (uint32_t)( m_packet->getTotalLen())   ){
1241         m_cnt->m_ip_length_error++;
1242         return;
1243     }
1244
1245     uint16_t ip_offset=offset;
1246     uint16_t ip_header_length = l3.m_ipv4->getHeaderLength();
1247
1248     if ( ip_header_length >(5*4) ){
1249         m_cnt->m_ip_header_options++;
1250     }
1251
1252     if ( (uint32_t)(ip_header_length + offset) > (uint32_t)m_packet->getTotalLen() ) {
1253         m_cnt->m_ip_length_error++;
1254         return;
1255     }
1256     offset += ip_header_length;
1257
1258
1259     if( l3.m_ipv4->getTimeToLive() ==0 ){
1260         m_cnt->m_ip_ttl_is_zero_error++;
1261         return;
1262     }
1263
1264     if( l3.m_ipv4->isNotFirstFragment() ) {
1265         m_cnt->m_ip_not_first_fragment_error++;
1266         return;
1267     }
1268
1269     protocol = l3.m_ipv4->getProtocol();
1270     if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
1271         return;
1272     };
1273
1274     uint16_t payload_offset_from_ip = offset-ip_offset;
1275     if ( payload_offset_from_ip >  l3.m_ipv4->getTotalLength() ) {
1276         m_cnt->m_tcp_udp_pkt_length_error++;
1277         return;
1278     }
1279
1280     if ( m_packet->pkt_len > MAX_PKT_SIZE ){
1281         m_cnt->m_tcp_udp_pkt_length_error++;
1282         printf("ERROR packet is too big, not supported jumbo packets that larger than %d \n",MAX_PKT_SIZE);
1283         return;
1284     }
1285
1286     // Set packet length and include padding if needed
1287     m_packet->pkt_len = l3.m_ipv4->getTotalLength() + getIpOffset();
1288     if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
1289
1290     m_cnt->m_valid_udp_tcp++;
1291     m_payload_len = l3.m_ipv4->getTotalLength() - (payload_offset_from_ip);
1292     m_payload     = (uint8_t *)(packetBase +offset);
1293     UpdatePacketPadding();
1294     SetKey();
1295 }
1296
1297
1298
1299 void CPacketIndication::ProcessIpv6Packet(CPacketParser *parser,
1300                                         int offset){
1301
1302     char * packetBase = m_packet->raw;
1303     CCPacketParserCounters * m_cnt=&parser->m_counter;
1304     uint16_t src_ipv6[6];
1305     uint16_t dst_ipv6[6];
1306     uint16_t idx;
1307     uint8_t protocol;
1308     BP_ASSERT(l3.m_ipv6);
1309
1310     parser->m_counter.m_pkt++;
1311
1312     if ( l3.m_ipv6->getVersion() == 6 ){
1313         m_cnt->m_ipv6++;
1314     }else{
1315         m_cnt->m_non_valid_ipv6_ver++;
1316         return;
1317     }
1318
1319      // Check length
1320     if ((uint32_t)(l3.m_ipv6->getPayloadLen()+offset+l3.m_ipv6->getHeaderLength()) >
1321         (uint32_t)( m_packet->getTotalLen())   ){
1322         m_cnt->m_ipv6_length_error++;
1323         return;
1324     }
1325
1326     for (idx=0; idx<6; idx++){
1327         src_ipv6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
1328         dst_ipv6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
1329     }
1330     l3.m_ipv6->updateMSBIpv6Src(&src_ipv6[0]);
1331     l3.m_ipv6->updateMSBIpv6Dst(&dst_ipv6[0]);
1332
1333     offset += l3.m_ipv6->getHeaderLength();
1334     protocol = l3.m_ipv6->getNextHdr();
1335     if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
1336         return;
1337     };
1338
1339     // Set packet length and include padding if needed
1340     uint16_t real_pkt_size = l3.m_ipv6->getPayloadLen()+ getIpOffset() + l3.m_ipv6->getHeaderLength();
1341     m_packet->pkt_len = real_pkt_size;
1342     if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
1343
1344     m_cnt->m_valid_udp_tcp++;
1345     m_payload_len = l3.m_ipv6->getPayloadLen();
1346     m_payload     = (uint8_t *)(packetBase +offset);
1347
1348     m_packet_padding = m_packet->getTotalLen() -  real_pkt_size;
1349     assert( m_packet->getTotalLen()>= real_pkt_size );
1350     SetKey();
1351 }
1352
1353
1354 static uint8_t cbuff[MAX_PKT_SIZE];
1355
1356 bool CPacketIndication::ConvertPacketToIpv6InPlace(CCapPktRaw * pkt,
1357                                                        int offset){
1358
1359     // Copy l2 data and set l2 type to ipv6
1360     memcpy(cbuff, pkt->raw, offset);
1361     *(uint16_t*)(cbuff+12) = PKT_HTONS(EthernetHeader::Protocol::IPv6);
1362
1363     // Create the ipv6 header
1364     IPHeader *ipv4 = (IPHeader *) (pkt->raw+offset);
1365     IPv6Header *ipv6 = (IPv6Header *) (cbuff+offset);
1366     uint8_t ipv6_hdrlen = ipv6->getHeaderLength();
1367     memset(ipv6,0,ipv6_hdrlen);
1368     ipv6->setVersion(6);
1369     if (ipv4->getTotalLength() < ipv4->getHeaderLength()) {
1370         return(false);
1371     }
1372     // Calculate the payload length
1373     uint16_t p_len = ipv4->getTotalLength() - ipv4->getHeaderLength();
1374     ipv6->setPayloadLen(p_len);
1375     uint8_t l4_proto = ipv4->getProtocol();
1376     ipv6->setNextHdr(l4_proto);
1377     ipv6->setHopLimit(64);
1378
1379     // Update the least signficant 32-bits of ipv6 address
1380     //   using the ipv4 address
1381     ipv6->updateLSBIpv6Src(ipv4->getSourceIp());
1382     ipv6->updateLSBIpv6Dst(ipv4->getDestIp());
1383
1384     // Copy rest of packet
1385     uint16_t ipv4_offset = offset + ipv4->getHeaderLength();
1386     uint16_t ipv6_offset = offset + ipv6_hdrlen;
1387     memcpy(cbuff+ipv6_offset,pkt->raw+ipv4_offset,p_len);
1388
1389     ipv6_offset+=p_len;
1390     memcpy(pkt->raw,cbuff,ipv6_offset);
1391
1392     // Set packet length
1393     pkt->pkt_len = ipv6_offset;
1394     m_is_ipv6 = true;
1395
1396     return (true);
1397 }
1398
1399 void CPacketIndication::ProcessPacket(CPacketParser *parser,
1400                                       CCapPktRaw * pkt){
1401     _ProcessPacket(parser,pkt);
1402     if ( m_desc.IsValidPkt() ){
1403         UpdateOffsets(); /* update fast offsets */
1404     }
1405 }
1406
1407 /* process packet */
1408 void CPacketIndication::_ProcessPacket(CPacketParser *parser,
1409                                       CCapPktRaw * pkt){
1410
1411     BP_ASSERT(pkt);
1412     m_packet =pkt;
1413     Clean();
1414     CCPacketParserCounters * m_cnt=&parser->m_counter;
1415
1416     int offset = 0;
1417     char * packetBase;
1418     packetBase = m_packet->raw;
1419     BP_ASSERT(packetBase);
1420     m_ether = (EthernetHeader *)packetBase;
1421     m_is_ipv6 = false;
1422
1423     // IP
1424     switch( m_ether->getNextProtocol() ) {
1425     case EthernetHeader::Protocol::IP :
1426         offset = 14;
1427         l3.m_ipv4 =(IPHeader *)(packetBase+offset);
1428         break;
1429     case EthernetHeader::Protocol::IPv6 :
1430         offset = 14;
1431         l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
1432         m_is_ipv6 = true;
1433         break;
1434     case EthernetHeader::Protocol::VLAN :
1435         m_cnt->m_vlan++;
1436         switch ( m_ether->getVlanProtocol() ){
1437           case EthernetHeader::Protocol::IP:
1438                offset = 18;
1439                l3.m_ipv4 =(IPHeader *)(packetBase+offset);
1440               break;
1441           case EthernetHeader::Protocol::IPv6 :
1442               offset = 18;
1443               l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
1444               m_is_ipv6 = true;
1445               break;
1446           case EthernetHeader::Protocol::MPLS_Multicast   :
1447           case EthernetHeader::Protocol::MPLS_Unicast  :
1448               m_cnt->m_mpls++;
1449               return;
1450
1451         case EthernetHeader::Protocol::ARP :
1452             m_cnt->m_arp++;
1453             return;
1454
1455         default:
1456             m_cnt->m_non_ip++;
1457             return ; /* Non IP */
1458             }
1459         break;
1460     case EthernetHeader::Protocol::ARP  :
1461         m_cnt->m_arp++;
1462         return; /* Non IP */
1463         break;
1464
1465     case EthernetHeader::Protocol::MPLS_Multicast   :
1466     case EthernetHeader::Protocol::MPLS_Unicast  :
1467         m_cnt->m_mpls++;
1468         return; /* Non IP */
1469         break;
1470
1471     default:
1472         m_cnt->m_non_ip++;
1473         return; /* Non IP */
1474     }
1475
1476     if (is_ipv6() == false) {
1477         if( (14+20) > (uint32_t)( m_packet->getTotalLen())   ){
1478             m_cnt->m_ip_length_error++;
1479             return;
1480         }
1481     }
1482
1483     // For now, we can not mix ipv4 and ipv4 packets
1484     // so we require --ipv6 option be set for ipv6 packets
1485     if ((m_is_ipv6) && (CGlobalInfo::is_ipv6_enable() == false)){
1486         fprintf(stderr,"ERROR  --ipv6 must be set to process ipv6 packets\n");
1487         exit(-1);
1488     }
1489
1490     // Convert to Ipv6 if requested and not already Ipv6
1491     if ((CGlobalInfo::is_ipv6_enable()) && (is_ipv6() == false )) {
1492         if (ConvertPacketToIpv6InPlace(pkt, offset) == false){
1493             /* Move to next packet as this was erroneous */
1494             printf(" unable to convert packet to IPv6, skipping...\n");
1495             return;
1496         }
1497     }
1498
1499     if (is_ipv6()){
1500         ProcessIpv6Packet(parser,offset);
1501     }else{
1502         ProcessIpPacket(parser,offset);
1503     }
1504 }
1505
1506
1507
1508 void CFlowTableStats::Clear(){
1509   m_lookup=0;
1510   m_found=0;
1511   m_fif=0;
1512   m_add=0;
1513   m_remove=0;
1514   m_fif_err=0;
1515   m_active=0;
1516 }
1517
1518 void CFlowTableStats::Dump(FILE *fd){
1519     DP (m_lookup);
1520     DP (m_found);
1521     DP (m_fif);
1522     DP (m_add);
1523     DP (m_remove);
1524     DP (m_fif_err);
1525     DP (m_active);
1526 }
1527
1528
1529 void CFlow::Dump(FILE *fd){
1530     fprintf(fd," fif is swap : %d \n",is_fif_swap);
1531 }
1532
1533
1534 void CFlowTableManagerBase::Dump(FILE *fd){
1535     m_stats.Dump(fd);
1536 }
1537
1538 // Return flow that has given key. If flow does not exist, create one, and add to CFlow data structure.
1539 // key - key to lookup by.
1540 // is_fif - return: true if flow did not exist (This is the first packet we see in this flow).
1541 //                  false if flow already existed
1542 CFlow * CFlowTableManagerBase::process(const CFlowKey & key, bool & is_fif) {
1543     m_stats.m_lookup++;
1544     is_fif=false;
1545     CFlow * lp=lookup(key);
1546     if ( lp  ) {
1547         m_stats.m_found++;
1548         return (lp);
1549     }else{
1550         m_stats.m_fif++;
1551         m_stats.m_active++;
1552         m_stats.m_add++;
1553         is_fif=true;
1554         lp= add(key );
1555         if (lp) {
1556         }else{
1557             m_stats.m_fif_err++;
1558         }
1559     }
1560     return (lp);
1561 }
1562
1563 bool CFlowTableMap::Create(int max_size){
1564     m_stats.Clear();
1565     return (true);
1566 }
1567
1568 void CFlowTableMap::Delete(){
1569     remove_all();
1570 }
1571
1572 void CFlowTableMap::remove(const CFlowKey & key ) {
1573     CFlow *lp=lookup(key);
1574     if ( lp ) {
1575         delete lp;
1576         m_stats.m_remove++;
1577         m_stats.m_active--;
1578         m_map.erase(key);
1579     }else{
1580         BP_ASSERT(0);
1581     }
1582 }
1583
1584
1585 CFlow * CFlowTableMap::lookup(const CFlowKey & key ) {
1586     flow_map_t::iterator iter;
1587     iter = m_map.find(key);
1588     if (iter != m_map.end() ) {
1589         return ( (*iter).second );
1590     }else{
1591         return (( CFlow*)0);
1592     }
1593 }
1594
1595 CFlow * CFlowTableMap::add(const CFlowKey & key ) {
1596     CFlow * flow = new CFlow();
1597     m_map.insert(flow_map_t::value_type(key,flow));
1598     return (flow);
1599 }
1600
1601 void CFlowTableMap::remove_all(){
1602     if ( m_map.empty() )
1603         return;
1604     flow_map_iter_t it;
1605     for (it= m_map.begin(); it != m_map.end(); ++it) {
1606         CFlow *lp = it->second;
1607         delete lp;
1608     }
1609     m_map.clear();
1610 }
1611
1612 uint64_t CFlowTableMap::count(){
1613     return ( m_map.size());
1614 }
1615
1616
1617 /*
1618  * This function will insert an IP option header containing metadata for the
1619  * rx-check feature.
1620  *
1621  * An mbuf is created to hold the new option header plus the portion of the
1622  * packet after the base IP header (includes any IP options header that might
1623  * exist).  This mbuf is then linked into the existing mbufs (becoming the
1624  * second mbuf).
1625  *
1626  * Note that the rxcheck option header is inserted as the first option header,
1627  * and any existing IP option headers are placed after it.
1628  */
1629 void CFlowPktInfo::do_generate_new_mbuf_rxcheck(rte_mbuf_t * m,
1630                                  CGenNode * node,
1631                                  bool single_port){
1632
1633     /* retrieve size of rx-check header, must be multiple of 8 */
1634     uint16_t opt_len =  RX_CHECK_LEN;
1635     uint16_t current_opt_len =  0;
1636     assert( (opt_len % 8) == 0 );
1637
1638     /* determine starting move location */
1639     char *mp1 = rte_pktmbuf_mtod(m, char*);
1640     uint16_t mp1_offset = m_pkt_indication.getFastIpOffsetFast();
1641     if (unlikely (m_pkt_indication.is_ipv6()) ) {
1642         mp1_offset += IPv6Header::DefaultSize;
1643     }else{
1644         mp1_offset += IPHeader::DefaultSize;
1645     }
1646     char *move_from = mp1 + mp1_offset;
1647
1648     /* determine size of new mbuf required */
1649     uint16_t move_len = m->data_len - mp1_offset;
1650     uint16_t new_mbuf_size = move_len + opt_len;
1651     uint16_t mp2_offset = opt_len;
1652
1653     /* obtain a new mbuf */
1654     rte_mbuf_t * new_mbuf = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), new_mbuf_size);
1655     assert(new_mbuf);
1656     char * mp2 = rte_pktmbuf_append(new_mbuf, new_mbuf_size);
1657     char * move_to = mp2 + mp2_offset;
1658
1659     /* move part of packet from first mbuf to new mbuf */
1660     memmove(move_to, move_from, move_len);
1661
1662     /* trim first mbuf and set pointer to option header*/
1663     CRx_check_header *rxhdr;
1664     uint16_t buf_adjust = move_len;
1665     rxhdr = (CRx_check_header *)mp2;
1666     m->data_len -= buf_adjust;
1667
1668     /* insert rx-check data as an IPv4 option header or IPv6 extension header*/
1669     CFlowPktInfo *  lp=node->m_pkt_info;
1670     CPacketDescriptor   * desc=&lp->m_pkt_indication.m_desc;
1671
1672     /* set option type and update ip header length */
1673     IPHeader * ipv4=(IPHeader *)(mp1 + 14);
1674     if (unlikely (m_pkt_indication.is_ipv6()) ) {
1675         IPv6Header * ipv6=(IPv6Header *)(mp1 + 14);
1676         uint8_t save_header= ipv6->getNextHdr();
1677         ipv6->setNextHdr(RX_CHECK_V6_OPT_TYPE);
1678          ipv6->setHopLimit(TTL_RESERVE_DUPLICATE);
1679         ipv6->setTrafficClass(ipv6->getTrafficClass()|TOS_TTL_RESERVE_DUPLICATE);
1680         ipv6->setPayloadLen( ipv6->getPayloadLen() +
1681                                   sizeof(CRx_check_header));
1682         rxhdr->m_option_type = save_header;
1683         rxhdr->m_option_len = RX_CHECK_V6_OPT_LEN;
1684     }else{
1685         current_opt_len = ipv4->getHeaderLength();
1686         ipv4->setHeaderLength(current_opt_len+opt_len);
1687         ipv4->setTotalLength(ipv4->getTotalLength()+opt_len);
1688         ipv4->setTimeToLive(TTL_RESERVE_DUPLICATE);
1689         ipv4->setTOS(ipv4->getTOS()|TOS_TTL_RESERVE_DUPLICATE);
1690
1691         rxhdr->m_option_type = RX_CHECK_V4_OPT_TYPE;
1692         rxhdr->m_option_len = RX_CHECK_V4_OPT_LEN;
1693     }
1694
1695     /* fill in the rx-check metadata in the options header */
1696     if ( CGlobalInfo::m_options.is_rxcheck_const_ts() ){
1697         /* Runtime flag to use a constant value for the timestamp field. */
1698         /* This is used by simulation to provide consistency across runs. */
1699         rxhdr->m_time_stamp = 0xB3B2B1B0;
1700     }else{
1701         rxhdr->m_time_stamp = os_get_hr_tick_32();
1702     }
1703     rxhdr->m_magic      = RX_CHECK_MAGIC;
1704     rxhdr->m_flow_id     = node->m_flow_id | ( ( (uint64_t)(desc->getFlowId() & 0xf))<<52 ) ; // include thread_id, node->flow_id, sub_flow in case of multi-flow template
1705     rxhdr->m_flags       =  0;
1706     rxhdr->m_aging_sec   =  desc->GetMaxFlowTimeout();
1707     rxhdr->m_template_id    = (uint8_t)desc->getId();
1708
1709     /* add the flow packets goes to the same port */
1710     if (single_port) {
1711         rxhdr->m_pkt_id     = desc->getFlowPktNum();
1712         rxhdr->m_flow_size  = desc->GetMaxPktsPerFlow();
1713
1714     }else{
1715         rxhdr->m_pkt_id     = desc->GetDirInfo()->GetPktNum();
1716         rxhdr->m_flow_size  = desc->GetDirInfo()->GetMaxPkts();
1717         /* set dir */
1718         rxhdr->set_dir(desc->IsInitSide()?1:0);
1719         rxhdr->set_both_dir(desc->IsBiDirectionalFlow()?1:0);
1720     }
1721
1722     /* update checksum for IPv4, split across 2 mbufs */
1723     if (likely ( ! m_pkt_indication.is_ipv6()) ) {
1724         ipv4->updateCheckSum2((uint8_t *)ipv4, current_opt_len, (uint8_t *)rxhdr, opt_len);
1725     }
1726
1727     /* link new mbuf */
1728     new_mbuf->next = m->next;
1729     new_mbuf->nb_segs++;
1730     m->next = new_mbuf;
1731     m->nb_segs++;
1732     m->pkt_len += opt_len;
1733 }
1734
1735
1736 char * CFlowPktInfo::push_ipv4_option_offline(uint8_t bytes){
1737     /* must be align by 4*/
1738     assert( (bytes % 4)== 0 );
1739     assert(m_pkt_indication.is_ipv6()==false);
1740     if ( m_pkt_indication.l3.m_ipv4->getHeaderLength()+bytes>60 ){
1741         printf(" ERROR ipv4 options size is too big, should be able to add %d bytes for internal info \n",bytes);
1742         return((char *)0);
1743     }
1744     /* now we can do that !*/
1745
1746     /* add more bytes to the packet */
1747     m_packet->append(bytes);
1748     uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPHeader::DefaultSize;
1749     char *p=m_packet->raw+ip_offset_to_move;
1750     uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
1751
1752     /* move the start of ipv4 options */
1753     memmove(p+bytes ,p, bytes_to_move);
1754
1755     /* fix all other stuff */
1756     if ( m_pkt_indication.m_udp_tcp_offset ){
1757         m_pkt_indication.m_udp_tcp_offset+=bytes;
1758     }
1759     if ( m_pkt_indication.m_payload_offset ) {
1760         m_pkt_indication.m_payload_offset+=bytes;
1761     }
1762
1763     m_pkt_indication.RefreshPointers();
1764     /* now pointer are updated we can manipulate ipv4 header */
1765     IPHeader       * ipv4=m_pkt_indication.l3.m_ipv4;
1766
1767     ipv4->setTotalLength(ipv4->getTotalLength()+bytes);
1768     ipv4->setHeaderLength(ipv4->getHeaderLength()+(bytes));
1769
1770     m_pkt_indication.UpdatePacketPadding();
1771
1772     /* refresh the global mbuf */
1773     free_const_mbuf();
1774     alloc_const_mbuf();
1775     return (p);
1776 }
1777
1778 void   CFlowPktInfo::mask_as_learn(){
1779     CNatOption *lpNat;
1780     if ( m_pkt_indication.is_ipv6() ) {
1781         lpNat=(CNatOption *)push_ipv6_option_offline(CNatOption::noOPTION_LEN);
1782         lpNat->set_init_ipv6_header();
1783         lpNat->set_fid(0);
1784         lpNat->set_thread_id(0);
1785     } else {
1786         if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_IP_OPTION)) {
1787             // Make space in IPv4 header for NAT option
1788             lpNat=(CNatOption *)push_ipv4_option_offline(CNatOption::noOPTION_LEN);
1789             lpNat->set_init_ipv4_header();
1790             lpNat->set_fid(0);
1791             lpNat->set_thread_id(0);
1792             m_pkt_indication.l3.m_ipv4->updateCheckSum();
1793         }
1794         /* learn is true */
1795         m_pkt_indication.m_desc.SetLearn(true);
1796     }
1797 }
1798
1799 char * CFlowPktInfo::push_ipv6_option_offline(uint8_t bytes){
1800
1801     /* must be align by 8*/
1802     assert( (bytes % 8)== 0 );
1803     assert(m_pkt_indication.is_ipv6()==true);
1804
1805     /* add more bytes to the packet */
1806     m_packet->append(bytes);
1807     uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPv6Header::DefaultSize;
1808     char *p=m_packet->raw+ip_offset_to_move;
1809     uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
1810
1811     /* move the start of ipv4 options */
1812     memmove(p+bytes ,p, bytes_to_move);
1813
1814     /* fix all other stuff */
1815     if ( m_pkt_indication.m_udp_tcp_offset ){
1816         m_pkt_indication.m_udp_tcp_offset+=bytes;
1817     }
1818     if ( m_pkt_indication.m_payload_offset ) {
1819         m_pkt_indication.m_payload_offset+=bytes;
1820     }
1821
1822     m_pkt_indication.RefreshPointers();
1823     /* now pointer are updated we can manipulate ipv6 header */
1824     IPv6Header       * ipv6=m_pkt_indication.l3.m_ipv6;
1825
1826     ipv6->setPayloadLen(ipv6->getPayloadLen()+bytes);
1827     uint8_t save_header= ipv6->getNextHdr();
1828     *p=save_header; /* copy next header */
1829     ipv6->setNextHdr(CNatOption::noIPV6_OPTION);
1830
1831     m_pkt_indication.UpdatePacketPadding();
1832
1833     /* refresh the global mbuf */
1834     free_const_mbuf();
1835     alloc_const_mbuf();
1836     return (p);
1837 }
1838
1839
1840 void CFlowPktInfo::alloc_const_mbuf(){
1841
1842     if ( m_packet->pkt_len > FIRST_PKT_SIZE ) {
1843         /* pkt size is bigger than FIRST_PKT_SIZE let's create an offline buffer */
1844         int i;
1845         for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
1846             if ( CGlobalInfo::m_socket.is_sockets_enable(i) ){
1847
1848                 rte_mbuf_t        * m;
1849                 uint16_t pkt_s=(m_packet->pkt_len - FIRST_PKT_SIZE);
1850
1851                 m = CGlobalInfo::pktmbuf_alloc(i,pkt_s);
1852                 BP_ASSERT(m);
1853                 char *p=rte_pktmbuf_append(m, pkt_s);
1854                 rte_memcpy(p,(m_packet->raw+FIRST_PKT_SIZE),pkt_s);
1855
1856                 assert(m_big_mbuf[i]==NULL);
1857                 m_big_mbuf[i]=m;
1858             }
1859         }
1860     }
1861 }
1862
1863 void CFlowPktInfo::free_const_mbuf(){
1864     int i;
1865     for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
1866         rte_mbuf_t   * m=m_big_mbuf[i];
1867         if (m) {
1868             rte_pktmbuf_free(m );
1869             m_big_mbuf[i]=NULL;
1870         }
1871     }
1872 }
1873
1874
1875 bool CFlowPktInfo::Create(CPacketIndication  * pkt_ind){
1876     /* clone the packet*/
1877     m_packet = new CCapPktRaw(pkt_ind->m_packet);
1878     /* clone of the offsets */
1879     m_pkt_indication.Clone(pkt_ind,m_packet);
1880
1881     int i;
1882     for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
1883         m_big_mbuf[i] = NULL;
1884     }
1885     alloc_const_mbuf();
1886     return (true);
1887 }
1888
1889 void CFlowPktInfo::Delete(){
1890     free_const_mbuf();
1891     delete m_packet;
1892 }
1893
1894 void CFlowPktInfo::Dump(FILE *fd){
1895     m_pkt_indication.Dump(fd,0);
1896 }
1897
1898
1899
1900
1901 void CCapFileFlowInfo::save_to_erf(std::string cap_file_name,int pcap){
1902     if (Size() ==0) {
1903         fprintf(stderr,"ERROR no info for this flow ");
1904         return ;
1905     }
1906     capture_type_e file_type=ERF;
1907     if ( pcap ){
1908         file_type=LIBPCAP;
1909     }
1910
1911
1912     CFileWriterBase * lpWriter=CCapWriterFactory::CreateWriter(file_type,(char *)cap_file_name.c_str());
1913     if (lpWriter == NULL) {
1914         fprintf(stderr,"ERROR can't create cap file %s ",(char *)cap_file_name.c_str());
1915         return ;
1916     }
1917     int i;
1918
1919     for (i=0; i<(int)Size(); i++) {
1920         CFlowPktInfo *  lp=GetPacket((uint32_t)i);
1921         bool res=lpWriter->write_packet(lp->m_packet);
1922         BP_ASSERT(res);
1923     }
1924     delete lpWriter;
1925 }
1926
1927
1928
1929 struct CTmpFlowPerDirInfo {
1930     CTmpFlowPerDirInfo(){
1931         m_pkt_id=0;
1932     }
1933
1934     uint16_t    m_pkt_id;
1935 };
1936
1937 class CTmpFlowInfo {
1938 public:
1939     CTmpFlowInfo(){
1940         m_max_pkts=0;
1941         m_max_aging_sec=0.0;
1942         m_last_pkt=0.0;
1943
1944     }
1945     ~CTmpFlowInfo(){
1946        }
1947 public:
1948     uint32_t  m_max_pkts;
1949     dsec_t    m_max_aging_sec;
1950     dsec_t    m_last_pkt;
1951
1952     CTmpFlowPerDirInfo  m_per_dir[CS_NUM];
1953 };
1954
1955 typedef CTmpFlowInfo * flow_tmp_t;
1956 typedef std::map<uint16_t, flow_tmp_t> flow_tmp_map_t;
1957 typedef flow_tmp_map_t::iterator flow_tmp_map_iter_t;
1958
1959 enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::is_valid_template_load_time(){
1960    int i;
1961     for (i=0; i<Size(); i++) {
1962         CFlowPktInfo * lp= GetPacket((uint32_t)i);
1963         CPacketIndication * lpd=&lp->m_pkt_indication;
1964         if ( lpd->getEtherOffset() !=0 ){
1965             fprintf(stderr, "Error: Bad CAP file. Ether offset start is not 0 in packet %d \n", i+1);
1966             return kPktNotSupp;
1967         }
1968
1969         if  ( CGlobalInfo::is_learn_mode() ) {
1970             // We change TCP ports. Need them to be in first 64 byte mbuf.
1971             // Since we also add IP option, and rx-check feature might add another IP option, better not allow
1972             // OP options in this mode. If needed this limitation can be refined a bit.
1973             if ( lpd->getTcpOffset() - lpd->getIpOffset() != 20 ) {
1974                 fprintf(stderr, "Error: Bad CAP file. In learn (NAT) mode, no IP options allowed \n");
1975                 return kIPOptionNotAllowed;
1976             }
1977             if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
1978                 if (lpd->getIpProto() != IPPROTO_TCP && !lpd->m_desc.IsInitSide()) {
1979                     fprintf(stderr, "Error: In the chosen learn mode, all packets from server to client in CAP file should be TCP.\n");
1980                     fprintf(stderr, "       Please give different CAP file, or try different --learn-mode\n");
1981                     return kNoTCPFromServer;
1982                 }
1983             }
1984         }
1985     }
1986
1987     if  ( CGlobalInfo::is_learn_mode() ) {
1988         CPacketIndication &pkt_0_indication = GetPacket(0)->m_pkt_indication;
1989
1990         if ( pkt_0_indication.m_desc.IsPluginEnable() ) {
1991             fprintf(stderr, "Error: plugins are not supported with --learn mode \n");
1992             return kPlugInWithLearn;
1993         }
1994
1995         if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
1996             if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
1997                 if (Size() < 3) {
1998                     fprintf(stderr
1999                             , "Error: In the chosen learn mode, need at least the 3 TCP handshake packets.\n");
2000                     fprintf(stderr
2001                             , "       Please give different CAP file, or try different --learn-mode\n");
2002                     return kTCPLearnModeBadFlow;
2003                 }
2004             }
2005             CPacketIndication &pkt_1_indication = GetPacket(1)->m_pkt_indication;
2006
2007
2008             // verify first packet is TCP SYN from client
2009             TCPHeader *tcp = (TCPHeader *)(pkt_0_indication.getBasePtr() + pkt_0_indication.getTcpOffset());
2010             if ( (! pkt_0_indication.m_desc.IsInitSide()) || (! tcp->getSynFlag()) ) {
2011                 fprintf(stderr, "Error: In the chosen learn mode, first TCP packet should be SYN from client side.\n");
2012                 fprintf(stderr, "       In cap file, first packet side direction is %s. TCP header is:\n"
2013                         , pkt_0_indication.m_desc.IsInitSide() ? "outside":"inside");
2014                 tcp->dump(stderr);
2015                 fprintf(stderr, "       Please give different CAP file, or try different --learn-mode\n");
2016                 return kNoSyn;
2017             }
2018
2019             // We want at least the TCP flags to be inside first mbuf
2020             if (pkt_0_indication.getTcpOffset() + 14 > FIRST_PKT_SIZE) {
2021                 fprintf(stderr
2022                         , "Error: In the chosen learn mode, TCP flags offset should be less than %d, but it is %d.\n"
2023                         , FIRST_PKT_SIZE, pkt_0_indication.getTcpOffset() + 14);
2024                 fprintf(stderr, "       Please give different CAP file, or try different --learn-mode\n");
2025                 return kTCPOffsetTooBig;
2026             }
2027             if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
2028                 // To support TCP seq randomization from server to client, we need second packet in flow to be the server SYN+ACK
2029                 bool error = false;
2030                 if (pkt_1_indication.getIpProto() != IPPROTO_TCP) {
2031                     error = true;
2032                 } else {
2033                     TCPHeader *tcp = (TCPHeader *)(pkt_1_indication.getBasePtr() + pkt_1_indication.getTcpOffset());
2034                     if ( (! tcp->getSynFlag()) ||  (! tcp->getAckFlag()) || ( pkt_1_indication.m_desc.IsInitSide())) {
2035                         error = true;
2036                     }
2037                 }
2038                 if (error) {
2039                     fprintf(stderr, "Error: In the chosen learn mode, second packet should be SYN+ACK from server.\n");
2040                     fprintf(stderr, "       Please give different CAP file, or try different --learn-mode\n");
2041                     return kNoTCPSynAck;
2042                 }
2043
2044                 CPacketIndication &pkt_2_indication = GetPacket(2)->m_pkt_indication;
2045                 if ( (! pkt_2_indication.m_desc.IsInitSide()) ) {
2046                     fprintf(stderr
2047                             , "Error: Wrong third packet. In the chosen learn mode, need at least the 3 TCP handshake packets.\n");
2048                     fprintf(stderr
2049                             , "       Please give different CAP file, or try different --learn-mode\n");
2050                     return kTCPLearnModeBadFlow;
2051                 }
2052                 if ((pkt_0_indication.m_cap_ipg < (double)LEARN_MODE_MIN_IPG / 1000)
2053                     || (pkt_1_indication.m_cap_ipg < (double)LEARN_MODE_MIN_IPG / 1000)) {
2054                     fprintf(stderr
2055                             , "Error: Bad cap file timings. In the chosen learn mode");
2056                     fprintf(stderr, "IPG between TCP handshake packets should be at least %d msec.\n", LEARN_MODE_MIN_IPG);
2057                     fprintf(stderr, "       Current delay is %f between second and first, %f between third and second"
2058                             , pkt_0_indication.m_cap_ipg, pkt_1_indication.m_cap_ipg);
2059                     fprintf(stderr
2060                             , "       Please give different CAP file, try different --learn-mode, or edit ipg parameters in template file\n");
2061                     return kTCPIpgTooLow;
2062                 }
2063             }
2064         }
2065     }
2066
2067     return(kOK);
2068 }
2069
2070
2071 /**
2072  * update global info
2073  * 1. maximum aging
2074  * 2. per sub-flow pkt_num/max-pkt per dir and per global
2075  */
2076 void CCapFileFlowInfo::update_info(){
2077     flow_tmp_map_iter_t iter;
2078     flow_tmp_map_t      ft;
2079     CTmpFlowInfo *      lpFlow;
2080     int i;
2081     dsec_t ctime=0.0;
2082
2083     // first iteration, lern all the info into a temp flow table
2084     for (i=0; i<Size(); i++) {
2085         CFlowPktInfo * lp= GetPacket((uint32_t)i);
2086         // extract flow_id
2087         CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
2088         uint16_t flow_id = desc->getFlowId();
2089         CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
2090         pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
2091
2092         //update lpFlow
2093         iter = ft.find(flow_id);
2094         if (iter != ft.end() ) {
2095             lpFlow=(*iter).second;
2096         }else{
2097             lpFlow = new CTmpFlowInfo();
2098             assert(lpFlow);
2099             ft.insert(flow_tmp_map_t::value_type(flow_id,lpFlow));
2100             //add it
2101
2102         }
2103
2104         // main info
2105         lpCurPacket->SetPktNum(lpFlow->m_per_dir[dir].m_pkt_id);
2106         lpFlow->m_max_pkts++;
2107         lpFlow->m_per_dir[dir].m_pkt_id++;
2108
2109         dsec_t delta = ctime - lpFlow->m_last_pkt ;
2110         lpFlow->m_last_pkt = ctime;
2111         if (delta > lpFlow->m_max_aging_sec) {
2112             lpFlow->m_max_aging_sec = delta;
2113         }
2114         // per direction info
2115
2116         if (i<Size()) {
2117              ctime += lp->m_pkt_indication.m_cap_ipg;
2118         }
2119     }
2120
2121
2122     for (i=0; i<Size(); i++) {
2123         CFlowPktInfo * lp= GetPacket((uint32_t)i);
2124
2125         CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
2126         uint16_t flow_id = desc->getFlowId();
2127         CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
2128         pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
2129
2130         iter = ft.find(flow_id);
2131         assert( iter != ft.end() );
2132         lpFlow=(*iter).second;
2133
2134         if ( (lpFlow->m_per_dir[0].m_pkt_id >0) &&
2135              (lpFlow->m_per_dir[1].m_pkt_id >0) ) {
2136             /* we have both dir */
2137             lp->m_pkt_indication.m_desc.SetBiPluginEnable(true);
2138         }
2139
2140
2141         lpCurPacket->SetMaxPkts(lpFlow->m_per_dir[dir].m_pkt_id);
2142         lp->m_pkt_indication.m_desc.SetMaxPktsPerFlow(lpFlow->m_max_pkts);
2143         lp->m_pkt_indication.m_desc.SetMaxFlowTimeout(lpFlow->m_max_aging_sec);
2144     }
2145
2146
2147     /* in case of learn mode , we need to mark the first packet */
2148     if ( CGlobalInfo::is_learn_mode() ) {
2149         CFlowPktInfo * lp= GetPacket(0);
2150         assert(lp);
2151         /* only for bi directionl traffic mask the learn flag , only for the first packet */
2152         if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ){
2153             lp->mask_as_learn();
2154         }
2155
2156         if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
2157             // In this mode, we need to see the SYN+ACK as well.
2158             lp = GetPacket(1);
2159             assert(lp);
2160             lp->m_pkt_indication.setTTL(TTL_RESERVE_DUPLICATE);
2161             lp->m_pkt_indication.setTOSReserve();
2162         }
2163     }
2164
2165     if ( ft.empty() )
2166         return;
2167
2168     flow_tmp_map_iter_t it;
2169     for (it= ft.begin(); it != ft.end(); ++it) {
2170         CTmpFlowInfo *lp = it->second;
2171         assert(lp);
2172         delete lp;
2173     }
2174     ft.clear();
2175 }
2176
2177
2178 enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::load_cap_file(std::string cap_file, uint16_t _id, uint8_t plugin_id) {
2179     RemoveAll();
2180
2181     fprintf(stdout," -- loading cap file %s \n",cap_file.c_str());
2182     CPacketParser parser;
2183     CPacketIndication pkt_indication;
2184     CCapReaderBase * lp=CCapReaderFactory::CreateReader((char *)cap_file.c_str(),0);
2185
2186     if (lp == 0) {
2187         printf(" ERROR file %s does not exist or not supported \n",(char *)cap_file.c_str());
2188         return kFileNotExist;
2189     }
2190     bool multi_flow_enable =( (plugin_id!=0)?true:false);
2191
2192
2193     CFlowTableMap flow;
2194
2195     parser.Create();
2196     flow.Create(0);
2197     m_total_bytes=0;
2198     m_total_flows=0;
2199     m_total_errors=0;
2200     CFlow * first_flow=0;
2201     bool first_flow_fif_is_swap=false;
2202     bool time_was_set=false;
2203     double last_time=0.0;
2204     CCapPktRaw raw_packet;
2205     int cnt=0;
2206     while ( true ) {
2207         /* read packet */
2208         if ( lp->ReadPacket(&raw_packet) ==false ){
2209             break;
2210         }
2211         cnt++;
2212
2213         if ( !time_was_set ){
2214             last_time=raw_packet.get_time();
2215             time_was_set=true;
2216         }else{
2217             if (raw_packet.get_time()<last_time) {
2218                 fprintf(stderr, "Error: Non valid pcap file. Timestamp is negative at packet %d\n", cnt);
2219                 return kNegTimestamp;
2220             }
2221             last_time=raw_packet.get_time();
2222         }
2223
2224         if ( parser.ProcessPacket(&pkt_indication, &raw_packet) ){
2225
2226             if ( pkt_indication.m_desc.IsValidPkt() ) {
2227                 pkt_indication.m_desc.SetPluginEnable(multi_flow_enable);
2228                 pkt_indication.m_desc.SetPluginId(plugin_id);
2229
2230                 pkt_indication.m_desc.SetId(_id);
2231                 bool is_fif;
2232                 CFlow * lpflow=flow.process(pkt_indication.m_flow_key,is_fif);
2233                 m_total_bytes += pkt_indication.m_packet->pkt_len;
2234                 pkt_indication.m_cap_ipg = raw_packet.get_time();
2235
2236                 pkt_indication.m_flow =lpflow;
2237                 pkt_indication.m_desc.SetFlowPktNum(lpflow->pkt_id);
2238                 /* inc pkt_id inside the flow */
2239                 lpflow->pkt_id++;
2240
2241                 /* check that we don't have reserve TTL for duplication  */
2242                 uint8_t ttl = pkt_indication.getTTL();
2243                 if ( (ttl == TTL_RESERVE_DUPLICATE) ||
2244                      (ttl == (TTL_RESERVE_DUPLICATE-1)) ) {
2245                         pkt_indication.setTTL(TTL_RESERVE_DUPLICATE-4);
2246                 }
2247
2248                 pkt_indication.clearTOSReserve();
2249
2250
2251                 // Validation for first packet in flow
2252                 if (is_fif) {
2253                     lpflow->flow_id = m_total_flows;
2254                     pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
2255
2256                     if (m_total_flows == 0) {
2257                         /* first flow */
2258                         first_flow =lpflow;/* save it for single flow support , to signal error */
2259                         lpflow->is_fif_swap =pkt_indication.m_desc.IsSwapTuple();
2260                         first_flow_fif_is_swap = pkt_indication.m_desc.IsSwapTuple();
2261                         pkt_indication.m_desc.SetInitSide(true);
2262                         Append(&pkt_indication);
2263                         m_total_flows++;
2264                     } else {
2265                         if ( multi_flow_enable ) {
2266                             lpflow->is_fif_swap = pkt_indication.m_desc.IsSwapTuple();
2267                             /* in respect to the first flow */
2268                             bool init_side_in_repect_to_first_flow =
2269                                 ((first_flow_fif_is_swap?true:false) == lpflow->is_fif_swap)?true:false;
2270                             pkt_indication.m_desc.SetInitSide(init_side_in_repect_to_first_flow);
2271                             Append(&pkt_indication);
2272                             m_total_flows++;
2273                         } else {
2274                             printf("More than one flow in this cap. Ignoring it !! \n");
2275                             pkt_indication.m_flow_key.Dump(stderr);
2276                             m_total_errors++;
2277                         }
2278                     }
2279                 }else{ /* no FIF */
2280                     pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
2281
2282                     if ( multi_flow_enable ==false ){
2283                         if (lpflow == first_flow) {
2284                             // add to
2285                             bool init_side=
2286                                 ((lpflow->is_fif_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
2287                             pkt_indication.m_desc.SetInitSide( init_side  );
2288                             Append(&pkt_indication);
2289                         }else{
2290                             //printf(" more than one flow in this cap ignot it !! \n");
2291                             m_total_errors++;
2292                         }
2293                     }else{
2294                         /* support multi-flow,  */
2295
2296                         /* work in respect to first flow */
2297                         bool init_side=
2298                             ((first_flow_fif_is_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
2299                         pkt_indication.m_desc.SetInitSide( init_side  );
2300                         Append(&pkt_indication);
2301
2302                     }
2303                 }
2304             }else{
2305                 fprintf(stderr, "ERROR packet %d is not supported, should be Ethernet/IP(0x0800)/(TCP|UDP) format try to convert it using Wireshark !\n",cnt);
2306                 return kPktNotSupp;
2307             }
2308         }else{
2309             fprintf(stderr, "ERROR packet %d is not supported, should be Ethernet/IP(0x0800)/(TCP|UDP) format try to convert it using Wireshark !\n",cnt);
2310             return kPktProcessFail;
2311         }
2312     }
2313
2314     /* set the last */
2315     CFlowPktInfo * last_pkt =GetPacket((uint32_t)(Size()-1));
2316     last_pkt->m_pkt_indication.m_desc.SetIsLastPkt(true);
2317
2318     int i;
2319
2320     for (i=1; i<Size(); i++) {
2321         CFlowPktInfo * lp_prev= GetPacket((uint32_t)i-1);
2322         CFlowPktInfo * lp= GetPacket((uint32_t)i);
2323
2324         lp_prev->m_pkt_indication.m_cap_ipg = lp->m_pkt_indication.m_cap_ipg-
2325                                               lp_prev->m_pkt_indication.m_cap_ipg;
2326         if ( lp->m_pkt_indication.m_desc.IsInitSide() !=
2327              lp_prev->m_pkt_indication.m_desc.IsInitSide()) {
2328             lp_prev->m_pkt_indication.m_desc.SetRtt(true);
2329         }
2330     }
2331
2332     GetPacket((uint32_t)Size()-1)->m_pkt_indication.m_cap_ipg=0.0;
2333     m_total_errors += parser.m_counter.getTotalErrors();
2334
2335
2336     /* dump the flow */
2337     //Dump(stdout);
2338
2339     //flow.Dump(stdout);
2340     flow.Delete();
2341     //parser.Dump(stdout);
2342     parser.Delete();
2343     //fprintf(stdout," -- finish loading cap file \n");
2344     //fprintf(stdout,"\n");
2345     delete lp;
2346     if ( m_total_errors > 0 ) {
2347         parser.m_counter.Dump(stdout);
2348         fprintf(stderr, " ERORR in one of the cap file, you should have one flow per cap file or valid plugin \n");
2349         return kCapFileErr;
2350     }
2351     return kOK;
2352 }
2353
2354 void CCapFileFlowInfo::update_pcap_mode(){
2355     int i;
2356     for (i=0; i<(int)Size(); i++) {
2357         CFlowPktInfo * lp=GetPacket((uint32_t)i);
2358         lp->m_pkt_indication.m_desc.SetPcapTiming(true);
2359     }
2360 }
2361
2362 void CCapFileFlowInfo::get_total_memory(CCCapFileMemoryUsage & memory){
2363     memory.clear();
2364     int i;
2365     for (i=0; i<(int)Size(); i++) {
2366         CFlowPktInfo * lp=GetPacket((uint32_t)i);
2367         if ( lp->m_packet->pkt_len > FIRST_PKT_SIZE ) {
2368             memory.add_size(lp->m_packet->pkt_len - FIRST_PKT_SIZE);
2369         }
2370     }
2371 }
2372
2373
2374 double CCapFileFlowInfo::get_cap_file_length_sec(){
2375     dsec_t sum=0.0;
2376     int i;
2377     for (i=0; i<(int)Size(); i++) {
2378         CFlowPktInfo * lp=GetPacket((uint32_t)i);
2379         sum+=lp->m_pkt_indication.m_cap_ipg;
2380     }
2381     return (sum);
2382 }
2383
2384
2385 void CCapFileFlowInfo::update_min_ipg(dsec_t min_ipg,
2386                                       dsec_t override_ipg){
2387
2388     int i;
2389     for (i=0; i<(int)Size(); i++) {
2390         CFlowPktInfo * lp=GetPacket((uint32_t)i);
2391         if ( lp->m_pkt_indication.m_cap_ipg  < min_ipg ){
2392             lp->m_pkt_indication.m_cap_ipg=override_ipg;
2393         }
2394         if ( lp->m_pkt_indication.m_cap_ipg  < override_ipg ){
2395             lp->m_pkt_indication.m_cap_ipg=override_ipg;
2396         }
2397     }
2398 }
2399
2400
2401 void CCapFileFlowInfo::Dump(FILE *fd){
2402
2403
2404     int i;
2405     //CCapPacket::DumpHeader(fd);
2406     for (i=0; i<(int)Size(); i++) {
2407         fprintf(fd,"pkt_id : %d \n",i+1);
2408         fprintf(fd,"-----------\n");
2409         CFlowPktInfo * lp=GetPacket((uint32_t)i);
2410         lp->Dump(fd);
2411     }
2412 }
2413
2414 // add pkt indication
2415 void CCapFileFlowInfo::Append(CPacketIndication * pkt_indication){
2416
2417     CFlowPktInfo * lp;
2418     lp = new CFlowPktInfo();
2419     lp->Create( pkt_indication );
2420     m_flow_pkts.push_back(lp);
2421 }
2422
2423
2424
2425 void CCCapFileMemoryUsage::Add(const CCCapFileMemoryUsage & obj){
2426     int i;
2427     for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
2428         m_buf[i] += obj.m_buf[i];
2429     }
2430     m_total_bytes +=obj.m_total_bytes;
2431
2432 }
2433
2434
2435 void CCCapFileMemoryUsage::dump(FILE *fd){
2436     fprintf(fd, " Memory usage \n");
2437     int i;
2438     int c_size=CCCapFileMemoryUsage::SIZE_MIN;
2439     int c_total=0;
2440
2441     for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
2442         fprintf(fd," size_%-7d   : %lu \n",c_size, (ulong)m_buf[i]);
2443         c_total +=m_buf[i]*c_size;
2444         c_size = c_size*2;
2445     }
2446     fprintf(fd," Total    : %s  %.0f%% util due to buckets \n",double_to_human_str(c_total,"bytes",KBYE_1024).c_str(),100.0*float(c_total)/float(m_total_bytes) );
2447 }
2448
2449
2450 bool CCapFileFlowInfo::Create(){
2451     m_total_bytes=0;
2452     m_total_errors = 0;
2453     m_total_flows  = 0;
2454     return (true);
2455 }
2456
2457
2458 void CCapFileFlowInfo::dump_pkt_sizes(void){
2459     int i;
2460     for (i=0; i<(int)Size(); i++) {
2461         flow_pkt_info_t lp=GetPacket((uint32_t)i);
2462         CGenNode node;
2463         node.m_dest_ip  = 0x10000110;
2464         node.m_src_ip   = 0x20000110;
2465         node.m_src_port = 12;
2466         rte_mbuf_t * buf=lp->generate_new_mbuf(&node);
2467         //rte_pktmbuf_dump(buf, buf->pkt_len);
2468         rte_pktmbuf_free(buf);
2469     }
2470 }
2471
2472 void CCapFileFlowInfo::RemoveAll(){
2473     int i;
2474     m_total_bytes=0;
2475     m_total_errors = 0;
2476     m_total_flows  = 0;
2477     for (i=0; i<(int)Size(); i++) {
2478         flow_pkt_info_t lp=GetPacket((uint32_t)i);
2479         lp->Delete();
2480         delete lp;
2481     }
2482     // free all the pointers
2483     m_flow_pkts.clear();
2484 }
2485
2486 void CCapFileFlowInfo::Delete(){
2487     RemoveAll();
2488 }
2489
2490 void operator >> (const YAML::Node& node, CFlowYamlDpPkt & fi) {
2491     uint32_t val;
2492     node["pkt_id"]      >> val;
2493     fi.m_pkt_id =(uint8_t)val;
2494     node["pyld_offset"] >>  val;
2495     fi.m_pyld_offset =(uint8_t)val;
2496     node["type"]        >>  val;
2497     fi.m_type =(uint8_t)val;
2498     node["len"]         >>  val;
2499     fi.m_len =(uint8_t)val;
2500     node["mask"]        >>  val;
2501     fi.m_pkt_mask =val;
2502 }
2503
2504 void operator >> (const YAML::Node& node, CVlanYamlInfo & fi) {
2505
2506     uint32_t tmp;
2507     if ( node.FindValue("enable") ){
2508         node["enable"] >> tmp ;
2509         fi.m_enable=tmp;
2510         node["vlan0"] >>  tmp;
2511         fi.m_vlan_per_port[0] = tmp;
2512         node["vlan1"] >>  tmp;
2513         fi.m_vlan_per_port[1] = tmp;
2514     }
2515 }
2516
2517
2518
2519 void operator >> (const YAML::Node& node, CFlowYamlInfo & fi) {
2520    node["name"] >> fi.m_name;
2521
2522    if ( node.FindValue("client_pool") ){
2523        node["client_pool"] >> fi.m_client_pool_name;
2524    }else{
2525        fi.m_client_pool_name = "default";
2526    }
2527    if ( node.FindValue("server_pool") ){
2528        node["server_pool"] >> fi.m_server_pool_name;
2529    }else{
2530        fi.m_server_pool_name = "default";
2531    }
2532
2533    node["cps"] >>  fi.m_k_cps;
2534    fi.m_k_cps = fi.m_k_cps/1000.0;
2535    double t;
2536    node["ipg"] >>  t;
2537    fi.m_ipg_sec =t/1000000.0;
2538    node["rtt"] >>  t;
2539    fi.m_rtt_sec = t/1000000.0;
2540    node["w"] >>  fi.m_w;
2541
2542    if ( node.FindValue("cap_ipg") ){
2543        node["cap_ipg"] >> fi.m_cap_mode;
2544        fi.m_cap_mode_was_set =true;
2545    }else{
2546        fi.m_cap_mode_was_set =false;
2547    }
2548
2549    if ( node.FindValue("wlength") ){
2550        node["wlength"] >> fi.m_wlength;
2551        fi.m_wlength_set=true;
2552    }else{
2553        fi.m_wlength_set=false;
2554        fi.m_wlength =500;
2555    }
2556
2557    if ( node.FindValue("limit") ){
2558        node["limit"] >>  fi.m_limit;
2559        fi.m_limit_was_set = true;
2560    }else{
2561        fi.m_limit_was_set = false;
2562        fi.m_limit = 0;
2563    }
2564
2565    if ( node.FindValue("plugin_id") ){
2566        uint32_t plugin_val;
2567        node["plugin_id"] >> plugin_val;
2568        fi.m_plugin_id=plugin_val;
2569    }else{
2570        fi.m_plugin_id=0;
2571    }
2572
2573
2574    fi.m_one_app_server_was_set = false;
2575    fi.m_one_app_server = false;
2576    if ( utl_yaml_read_ip_addr(node,
2577                          "server_addr",
2578                          fi.m_server_addr) ){
2579        try {
2580            node["one_app_server"] >> fi.m_one_app_server;
2581            fi.m_one_app_server_was_set=true;
2582        } catch ( const std::exception& e ) {
2583            fi.m_one_app_server_was_set = false;
2584            fi.m_one_app_server = false;
2585        }
2586    }
2587
2588
2589
2590    if ( ( fi.m_limit_was_set ) && (fi.m_plugin_id !=0) ){
2591        fprintf(stderr," limit can't be non zero when plugin is set, you must have only one of the options set");
2592        exit(-1);
2593    }
2594
2595
2596     if ( node.FindValue("dyn_pyload") ){
2597         const YAML::Node& dyn_pyload = node["dyn_pyload"];
2598         for(unsigned i=0;i<dyn_pyload.size();i++) {
2599             CFlowYamlDpPkt fd;
2600             dyn_pyload[i] >> fd;
2601             if ( fi.m_dpPkt == 0 ){
2602                 fi.m_dpPkt = new CFlowYamlDynamicPyloadPlugin();
2603                 if (fi.m_plugin_id == 0) {
2604                     fi.m_plugin_id = mpDYN_PYLOAD;
2605                 }else{
2606                     fprintf(stderr," plugin should be zero with dynamic pyload program");
2607                     exit(-1);
2608                 }
2609             }
2610             fi.m_dpPkt->Add(fd);
2611         }
2612     }else{
2613         fi.m_dpPkt=0;
2614     }
2615 }
2616
2617
2618
2619 void operator >> (const YAML::Node& node, CFlowsYamlInfo & flows_info) {
2620
2621    node["duration"] >> flows_info.m_duration_sec;
2622
2623    if ( node.FindValue("generator") ) {
2624        node["generator"] >> flows_info.m_tuple_gen;
2625        flows_info.m_tuple_gen_was_set =true;
2626    }else{
2627        flows_info.m_tuple_gen_was_set =false;
2628    }
2629
2630    // m_ipv6_set will be true if and only if both src_ipv6
2631    // and dst_ipv6 are provided.  These are used to set
2632    // the most significant 96-bits of the IPv6 address; the
2633    // least significant 32-bits come from the ipv4 address
2634    // (what is set above).
2635    //
2636    // If the IPv6 src/dst is not provided in the yaml file,
2637    // then the  most significant 96-bits will be set to 0
2638    // which represents an IPv4-compatible IPv6 address.
2639    //
2640    // If desired, an IPv4-mapped IPv6 address can be
2641    // formed by providing src_ipv6,dst_ipv6 and specifying
2642    // {0,0,0,0,0,0xffff}
2643    flows_info.m_ipv6_set=true;
2644
2645    if ( node.FindValue("src_ipv6") ) {
2646        const YAML::Node& src_ipv6_info = node["src_ipv6"];
2647        if (src_ipv6_info.size() == 6 ){
2648             for(unsigned i=0;i<src_ipv6_info.size();i++) {
2649                uint32_t fi;
2650                const YAML::Node & node =src_ipv6_info;
2651                node[i]  >> fi;
2652                flows_info.m_src_ipv6.push_back(fi);
2653            }
2654        }
2655    }else{
2656        flows_info.m_ipv6_set=false;
2657    }
2658
2659
2660    if ( node.FindValue("dst_ipv6") ) {
2661        const YAML::Node& dst_ipv6_info = node["dst_ipv6"];
2662        if (dst_ipv6_info.size() == 6 ){
2663             for(unsigned i=0;i<dst_ipv6_info.size();i++) {
2664                uint32_t fi;
2665                const YAML::Node & node =dst_ipv6_info;
2666                node[i]  >> fi;
2667                flows_info.m_dst_ipv6.push_back(fi);
2668            }
2669        }
2670    }else{
2671        flows_info.m_ipv6_set=false;
2672    }
2673
2674    if ( node.FindValue("cap_ipg") ) {
2675        node["cap_ipg"] >> flows_info.m_cap_mode;
2676        flows_info.m_cap_mode_set=true;
2677    }else{
2678        flows_info.m_cap_mode=false;
2679        flows_info.m_cap_mode_set=false;
2680    }
2681
2682    double t=0.0;
2683
2684    if ( node.FindValue("cap_ipg_min") ) {
2685        node["cap_ipg_min"] >>  t  ;
2686        flows_info.m_cap_ipg_min = t/1000000.0;
2687        flows_info.m_cap_ipg_min_set=true;
2688    }else{
2689        flows_info.m_cap_ipg_min_set=false;
2690        flows_info.m_cap_ipg_min = 20;
2691    }
2692
2693    if ( node.FindValue("cap_override_ipg") ) {
2694        node["cap_override_ipg"] >> t;
2695        flows_info.m_cap_overide_ipg = t/1000000.0;
2696        flows_info.m_cap_overide_ipg_set = true;
2697    }else{
2698        flows_info.m_cap_overide_ipg_set = false;
2699        flows_info.m_cap_overide_ipg = 0;
2700    }
2701
2702    if (node.FindValue("wlength")) {
2703        node["wlength"] >> flows_info.m_wlength;
2704        flows_info.m_wlength_set=true;
2705    }else{
2706        flows_info.m_wlength_set=false;
2707        flows_info.m_wlength =100;
2708    }
2709
2710    if (node.FindValue("one_app_server")) {
2711        printf("one_app_server should be configured per template. \n"
2712               "Will ignore this configuration\n");
2713    }
2714    flows_info.m_one_app_server =false;
2715    flows_info.m_one_app_server_was_set=false;
2716
2717    if (node.FindValue("vlan")) {
2718        node["vlan"] >> flows_info.m_vlan_info;
2719    }
2720
2721    if (node.FindValue("mac_override_by_ip")) {
2722        node["mac_override_by_ip"] >> flows_info.m_mac_replace_by_ip;
2723    }else{
2724        flows_info.m_mac_replace_by_ip =false;
2725    }
2726
2727    const YAML::Node& mac_info = node["mac"];
2728    for(unsigned i=0;i<mac_info.size();i++) {
2729        uint32_t fi;
2730        const YAML::Node & node =mac_info;
2731        node[i]  >> fi;
2732        flows_info.m_mac_base.push_back(fi);
2733    }
2734
2735    const YAML::Node& cap_info = node["cap_info"];
2736    for(unsigned i=0;i<cap_info.size();i++) {
2737        CFlowYamlInfo fi;
2738        cap_info[i] >> fi;
2739        fi.m_client_pool_idx =
2740            flows_info.m_tuple_gen.get_client_pool_id(fi.m_client_pool_name);
2741        fi.m_server_pool_idx =
2742            flows_info.m_tuple_gen.get_server_pool_id(fi.m_server_pool_name);
2743        flows_info.m_vec.push_back(fi);
2744    }
2745 }
2746
2747 void CVlanYamlInfo::Dump(FILE *fd){
2748     fprintf(fd," vlan enable : %d  \n",m_enable);
2749     fprintf(fd," vlan val    : %d ,%d \n",m_vlan_per_port[0],m_vlan_per_port[1]);
2750 }
2751
2752
2753 void CFlowsYamlInfo::Dump(FILE *fd){
2754     fprintf(fd," duration : %f sec \n",m_duration_sec);
2755
2756     fprintf(fd,"\n");
2757     if (CGlobalInfo::is_ipv6_enable()) {
2758         int idx;
2759         fprintf(fd," src_ipv6 : ");
2760         for (idx=0; idx<5; idx++){
2761             fprintf(fd,"%04x:", CGlobalInfo::m_options.m_src_ipv6[idx]);
2762         }
2763         fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_src_ipv6[5]);
2764         fprintf(fd," dst_ipv6 : ");
2765         for (idx=0; idx<5; idx++){
2766             fprintf(fd,"%04x:", CGlobalInfo::m_options.m_dst_ipv6[idx]);
2767         }
2768         fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_dst_ipv6[5]);
2769     }
2770     if ( !m_cap_mode_set ) {
2771         fprintf(fd," cap_ipg : wasn't set  \n");
2772     }else{
2773         fprintf(fd," cap_ipg : %d \n",m_cap_mode?1:0);
2774     }
2775
2776     if ( !m_cap_ipg_min_set ){
2777         fprintf(fd," cap_ipg_min  : wasn't set  \n");
2778     }else{
2779         fprintf(fd," cap_ipg_min       : %f \n",m_cap_ipg_min);
2780     }
2781
2782     if ( !m_cap_overide_ipg_set ){
2783         fprintf(fd," cap_override_ipg  : wasn't set  \n");
2784     }else{
2785         fprintf(fd," cap_override_ipg  : %f \n",m_cap_overide_ipg);
2786     }
2787
2788     if ( !m_wlength_set ){
2789         fprintf(fd," wlength      : wasn't set  \n");
2790     }else{
2791         fprintf(fd," m_wlength  : %d  \n",m_wlength);
2792     }
2793     fprintf(fd," one_server_for_application : %d  \n",m_one_app_server?1:0);
2794     fprintf(fd," one_server_for_application_was_set : %d  \n",m_one_app_server_was_set?1:0);
2795
2796     m_vlan_info.Dump(fd);
2797
2798     fprintf(fd," mac base   : ");
2799     int i;
2800     for (i=0; i<(int)m_mac_base.size(); i++) {
2801         if (i< (int)(m_mac_base.size()-1) ) {
2802             fprintf(fd,"0x%02x,",m_mac_base[i]);
2803         }else{
2804             fprintf(fd,"0x%02x",m_mac_base[i]);
2805         }
2806     }
2807     fprintf(fd,"\n");
2808
2809     fprintf(fd," cap file info \n");
2810     fprintf(fd," ------------- \n");
2811     for (i=0; i<(int)m_vec.size(); i++) {
2812         m_vec[i].Dump(fd);
2813     }
2814 }
2815
2816
2817 /*
2818
2819 example for YAML file
2820
2821 - duration : 10.0
2822   cap_info :
2823      - name: hey1.pcap
2824        cps : 12.0
2825        ipg : 0.0001
2826      - name: hey2.pcap
2827        cps : 11.0
2828        ipg : 0.0001
2829
2830
2831 */
2832
2833 bool CFlowsYamlInfo::verify_correctness(uint32_t num_threads) {
2834     if ( m_tuple_gen_was_set ==false ){
2835         printf(" ERROR there must be a generator field in YAML , the old format is deprecated \n");
2836         printf(" This is not supported : \n");
2837         printf("       min_src_ip : 0x10000001  \n");
2838         printf("       max_src_ip : 0x50000001  \n");
2839         printf("       min_dst_ip : 0x60000001  \n");
2840         printf("       max_dst_ip : 0x60000010  \n");
2841         printf(" This is  supported : \n");
2842         printf("generator :  \n");
2843         printf("   distribution : \"seq\"         \n");
2844         printf("   clients_start : \"16.0.0.1\"   \n");
2845         printf("   clients_end   : \"16.0.1.255\" \n");
2846         printf("   servers_start : \"48.0.0.1\"   \n");
2847         printf("   servers_end   : \"48.0.0.255\" \n");
2848         printf("   clients_per_gb : 201           \n");
2849         printf("   min_clients    : 101           \n");
2850         printf("   dual_port_mask : \"1.0.0.0\"   \n");
2851         printf("   tcp_aging      : 1             \n");
2852         printf("   udp_aging      : 1             \n");
2853         return(false);
2854     }
2855     if ( !m_tuple_gen.is_valid(num_threads,is_any_plugin_configured()) ){
2856         return (false);
2857     }
2858     /* patch defect trex-54 */
2859     if ( is_any_plugin_configured() ){
2860          /*Plugin is configured. in that case due to a limitation ( defect trex-54 )
2861           the number of servers should be bigger than number of clients   */
2862
2863         int i;
2864         for (i=0; i<(int)m_vec.size(); i++) {
2865             CFlowYamlInfo * lp=&m_vec[i];
2866             if ( lp->m_plugin_id ){
2867                 uint8_t c_idx = lp->m_client_pool_idx;
2868                 uint8_t s_idx = lp->m_server_pool_idx;
2869                 uint32_t total_clients = m_tuple_gen.m_client_pool[c_idx].getTotalIps();
2870                 uint32_t total_servers = m_tuple_gen.m_server_pool[s_idx].getTotalIps();
2871                 if ( total_servers < total_clients ){
2872                     printf(" Plugin is configured. in that case due to a limitation ( defect trex-54 ) \n");
2873                     printf(" the number of servers should be bigger than number of clients  \n");
2874                     printf(" client_pool_name : %s \n", lp->m_client_pool_name.c_str());
2875                     printf(" server_pool_name : %s \n", lp->m_server_pool_name.c_str());
2876                     return (false);
2877                 }
2878             uint32_t mul = total_servers / total_clients;
2879             uint32_t new_server_num = mul * total_clients;
2880             if ( new_server_num != total_servers ) {
2881                 printf(" Plugin is configured. in that case due to a limitation ( defect trex-54 ) \n");
2882                 printf(" the number of servers should be exact multiplication of the number of clients  \n");
2883                 printf(" client_pool_name : %s  clients %d \n", lp->m_client_pool_name.c_str(),total_clients);
2884                 printf(" server_pool_name : %s  servers %d should be %d \n", lp->m_server_pool_name.c_str(),total_servers,new_server_num);
2885                 return (false);
2886             }
2887           }
2888         }
2889     }
2890
2891     return(true);
2892 }
2893
2894
2895
2896 int CFlowsYamlInfo::load_from_yaml_file(std::string file_name){
2897     m_vec.clear();
2898
2899     if ( !utl_is_file_exists (file_name)  ){
2900         printf(" ERROR file %s does not exist \n",file_name.c_str());
2901         exit(-1);
2902     }
2903
2904     try {
2905        std::ifstream fin((char *)file_name.c_str());
2906        YAML::Parser parser(fin);
2907        YAML::Node doc;
2908
2909        parser.GetNextDocument(doc);
2910        for(unsigned i=0;i<doc.size();i++) {
2911           doc[i] >> *this;
2912           break;
2913        }
2914     } catch ( const std::exception& e ) {
2915         std::cout << e.what() << "\n";
2916         exit(-1);
2917     }
2918
2919     /* update from user input */
2920     if (CGlobalInfo::m_options.m_duration > 0.1) {
2921         m_duration_sec = CGlobalInfo::m_options.m_duration;
2922     }
2923     int i;
2924     m_is_plugin_configured=false;
2925     for (i=0; i<(int)m_vec.size(); i++) {
2926       m_vec[i].m_k_cps =m_vec[i].m_k_cps*CGlobalInfo::m_options.m_factor;
2927       if (( ! m_vec[i].m_cap_mode_was_set  ) && (m_cap_mode_set ) ){
2928           m_vec[i].m_cap_mode = m_cap_mode;
2929       }
2930       if (( ! m_vec[i].m_wlength_set  ) && (m_wlength_set ) ){
2931           m_vec[i].m_wlength = m_wlength;
2932       }
2933
2934       if (( ! m_vec[i].m_one_app_server_was_set  ) && (m_one_app_server_was_set ) ){
2935           m_vec[i].m_one_app_server = m_one_app_server;
2936       }
2937
2938       if ( m_cap_overide_ipg_set ){
2939           m_vec[i].m_ipg_sec  = m_cap_overide_ipg;
2940           m_vec[i].m_rtt_sec  = m_cap_overide_ipg;
2941       }
2942
2943       if ( m_vec[i].m_plugin_id ){
2944           m_is_plugin_configured=true;
2945       }
2946     }
2947    return 0;
2948 }
2949
2950
2951
2952 void CFlowStats::Clear(){
2953
2954     m_id=0;
2955     m_name="";
2956     m_pkt=0.0;
2957     m_bytes=0.0;
2958     m_cps=0.0;
2959     m_mb_sec=0.0;
2960     m_mB_sec=0.0;
2961     m_c_flows=0.0;
2962     m_pps =0.0;
2963     m_total_Mbytes=00 ;
2964     m_errors =0;
2965     m_flows =0 ;
2966     m_memory.clear();
2967 }
2968
2969 void CFlowStats::Add(const CFlowStats & obj){
2970
2971     m_pkt     += obj.m_pkt     ;
2972     m_bytes   += obj.m_bytes   ;
2973     m_cps     += obj.m_cps     ;
2974     m_mb_sec  += obj.m_mb_sec  ;
2975     m_mB_sec  += obj.m_mB_sec  ;
2976     m_c_flows += obj.m_c_flows ;
2977     m_pps     += obj.m_pps     ;
2978     m_total_Mbytes +=obj.m_total_Mbytes ;
2979     m_errors  +=obj.m_errors;
2980     m_flows   +=obj.m_flows ;
2981
2982     m_memory.Add(obj.m_memory);
2983 }
2984
2985
2986 void CFlowStats::DumpHeader(FILE *fd){
2987     fprintf(fd," %2s,%-40s,%4s,%4s,%5s,%7s,%9s,%9s,%9s,%10s,%5s,%7s,%4s,%4s \n",
2988                  "id","name","tps","cps","f-pkts","f-bytes","duration","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows");
2989 }
2990 void CFlowStats::Dump(FILE *fd){
2991                  //"name","cps","f-pkts","f-bytes","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows"
2992     fprintf(fd," %02d, %-40s ,%4.2f,%4.2f, %5.0f , %7.0f ,%7.2f ,%7.2f , %7.2f , %10.0f , %5.0f , %7.0f , %llu , %llu \n",
2993             m_id,
2994             m_name.c_str(),
2995             m_cps,
2996             get_normal_cps(),
2997             m_pkt,
2998             m_bytes,
2999             duration_sec,
3000             m_mb_sec,
3001             m_mB_sec,
3002             m_c_flows,
3003             m_pps,
3004             m_total_Mbytes,
3005             (unsigned long long)m_errors,
3006             (unsigned long long)m_flows);
3007 }
3008
3009 bool CFlowGeneratorRecPerThread::Create(CTupleGeneratorSmart  * global_gen,
3010                                         CFlowYamlInfo *         info,
3011                                         CFlowsYamlInfo *        yaml_flow_info,
3012                                         CCapFileFlowInfo *      flow_info,
3013                                         uint16_t _id,
3014                                         uint32_t thread_id){
3015
3016     BP_ASSERT(info);
3017     m_thread_id =thread_id ;
3018
3019     tuple_gen.Create(global_gen, info->m_client_pool_idx,
3020                      info->m_server_pool_idx);
3021     CTupleGenYamlInfo * lpt;
3022     lpt = &yaml_flow_info->m_tuple_gen;
3023
3024     tuple_gen.SetSingleServer(info->m_one_app_server,
3025                               info->m_server_addr,
3026                               getDualPortId(thread_id),
3027                               lpt->m_client_pool[info->m_client_pool_idx].getDualMask()
3028                               );
3029
3030     tuple_gen.SetW(info->m_w);
3031
3032
3033
3034     m_id   =_id;
3035     m_info =info;
3036     m_flows_info = yaml_flow_info;
3037     // set policer give bucket size for bursts
3038     m_policer.set_cir(info->m_k_cps*1000.0);
3039     m_policer.set_level(0.0);
3040     m_policer.set_bucket_size(100.0);
3041     /* pointer to global */
3042     m_flow_info = flow_info;
3043     return (true);
3044 }
3045
3046
3047 void CFlowGeneratorRecPerThread::Delete(){
3048     tuple_gen.Delete();
3049 }
3050
3051
3052
3053
3054 void CFlowGeneratorRecPerThread::Dump(FILE *fd){
3055     fprintf(fd," configuration info ");
3056     fprintf(fd," -----------------");
3057     m_info->Dump(fd);
3058     fprintf(fd," -----------------");
3059     m_flow_info->Dump(fd);
3060 }
3061
3062
3063 void CFlowGeneratorRecPerThread::getFlowStats(CFlowStats * stats){
3064
3065     double t_pkt=(double)m_flow_info->Size();
3066     double t_bytes=(double)m_flow_info->get_total_bytes();
3067     double cps=m_info->m_k_cps *1000.0;
3068     double mb_sec   = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
3069     double mB_sec   = (cps*t_bytes)/(_1Mb_DOUBLE);
3070
3071     double c_flow_windows_sec=0.0;
3072
3073     if (m_info->m_cap_mode) {
3074         c_flow_windows_sec  = m_flow_info->get_cap_file_length_sec();
3075     }else{
3076         c_flow_windows_sec  = t_pkt * m_info->m_ipg_sec;
3077     }
3078
3079
3080     double c_flows  = cps*c_flow_windows_sec*m_flow_info->get_total_flows();
3081     double pps =cps*t_pkt;
3082     double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
3083     uint64_t errors = m_flow_info->get_total_errors();
3084     uint64_t flows  = m_flow_info->get_total_flows();
3085
3086
3087     stats->m_id     = m_id;
3088     stats->m_pkt    = t_pkt;
3089     stats->m_bytes  = t_bytes;
3090     stats->duration_sec =  c_flow_windows_sec;
3091     stats->m_name   = m_info->m_name.c_str();
3092     stats->m_cps    =  cps;
3093     stats->m_mb_sec =  mb_sec;
3094     stats->m_mB_sec =  mB_sec;
3095     stats->m_c_flows  =  c_flows;
3096     stats->m_pps    =  pps;
3097     stats->m_total_Mbytes    =  total_Mbytes;
3098     stats->m_errors    =  errors;
3099     stats->m_flows    =  flows;
3100 }
3101
3102
3103
3104 void CFlowGeneratorRec::Dump(FILE *fd){
3105     fprintf(fd," configuration info ");
3106     fprintf(fd," -----------------");
3107     m_info->Dump(fd);
3108     fprintf(fd," -----------------");
3109     m_flow_info.Dump(fd);
3110 }
3111
3112
3113 void CFlowGeneratorRec::getFlowStats(CFlowStats * stats){
3114
3115     double t_pkt=(double)m_flow_info.Size();
3116     double t_bytes=(double)m_flow_info.get_total_bytes();
3117     double cps=m_info->m_k_cps *1000.0;
3118     double mb_sec   = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
3119     double mB_sec   = (cps*t_bytes)/(_1Mb_DOUBLE);
3120
3121     double c_flow_windows_sec=0.0;
3122
3123     if (m_info->m_cap_mode) {
3124         c_flow_windows_sec  = m_flow_info.get_cap_file_length_sec();
3125     }else{
3126         c_flow_windows_sec  = t_pkt * m_info->m_ipg_sec;
3127     }
3128
3129     m_flow_info.get_total_memory(stats->m_memory);
3130
3131
3132     double c_flows  = cps*c_flow_windows_sec;
3133     double pps =cps*t_pkt;
3134     double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
3135     uint64_t errors = m_flow_info.get_total_errors();
3136     uint64_t flows  = m_flow_info.get_total_flows();
3137
3138
3139     stats->m_id     = m_id;
3140     stats->m_pkt    = t_pkt;
3141     stats->m_bytes  = t_bytes;
3142     stats->duration_sec =  c_flow_windows_sec;
3143     stats->m_name   = m_info->m_name.c_str();
3144     stats->m_cps    =  cps;
3145     stats->m_mb_sec =  mb_sec;
3146     stats->m_mB_sec =  mB_sec;
3147     stats->m_c_flows  =  c_flows;
3148     stats->m_pps    =  pps;
3149     stats->m_total_Mbytes    =  total_Mbytes;
3150     stats->m_errors    =  errors;
3151     stats->m_flows    =  flows;
3152 }
3153
3154
3155 void CFlowGeneratorRec::fixup_ipg_if_needed(void){
3156     if  ( m_flows_info->m_cap_mode ) {
3157         m_flow_info.update_pcap_mode();
3158     }
3159
3160     if ( (m_flows_info->m_cap_mode) &&
3161          (m_flows_info->m_cap_ipg_min_set) &&
3162          (m_flows_info->m_cap_overide_ipg_set)
3163          ){
3164         m_flow_info.update_min_ipg(m_flows_info->m_cap_ipg_min,
3165                                    m_flows_info->m_cap_overide_ipg);
3166     }
3167 }
3168
3169
3170 bool CFlowGeneratorRec::Create(CFlowYamlInfo * info,
3171                                CFlowsYamlInfo * flows_info,
3172                                uint16_t _id){
3173     BP_ASSERT(info);
3174     m_id=_id;
3175     m_info=info;
3176     m_flows_info=flows_info;
3177     m_flow_info.Create();
3178
3179     // set policer give bucket size for bursts
3180     m_policer.set_cir(info->m_k_cps*1000.0);
3181     m_policer.set_level(0.0);
3182     m_policer.set_bucket_size(100.0);
3183
3184     int res=m_flow_info.load_cap_file(info->m_name.c_str(),_id,m_info->m_plugin_id);
3185     if ( res==0 ) {
3186         fixup_ipg_if_needed();
3187
3188         if (m_flow_info.is_valid_template_load_time() != 0) {
3189             return (false);
3190         }
3191         m_flow_info.update_info();
3192         return (true);
3193     }else{
3194         return (false);
3195     }
3196 }
3197
3198 void CFlowGeneratorRec::Delete(){
3199     m_flow_info.Delete();
3200 }
3201
3202
3203 void CGenNode::DumpHeader(FILE *fd){
3204     fprintf(fd," pkt_id,time,fid,pkt_info,pkt,len,type,is_init,is_last,type,thread_id,src_ip,dest_ip,src_port \n");
3205 }
3206
3207
3208 void CGenNode::free_gen_node(){
3209     rte_mbuf_t * m=get_cache_mbuf();
3210     if ( unlikely(m != NULL) ) {
3211         rte_pktmbuf_free(m);
3212         m_plugin_info=0;
3213     }
3214 }
3215
3216
3217 void CGenNode::Dump(FILE *fd){
3218     fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n",
3219             m_time,
3220             (unsigned long long)m_flow_id,
3221             m_pkt_info,
3222             (unsigned long long)m_pkt_info->m_pkt_indication.m_packet->pkt_cnt,
3223             m_pkt_info->m_pkt_indication.m_packet->pkt_len,
3224             m_pkt_info->m_pkt_indication.m_desc.getId(),
3225             (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0),
3226             m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(),
3227             m_type,
3228             m_thread_id,
3229             m_src_ip,
3230             m_dest_ip,
3231             m_src_port);
3232
3233 }
3234
3235 void  CNodeGenerator::set_vif(CVirtualIF * v_if){
3236     m_v_if = v_if;
3237 }
3238
3239 bool  CNodeGenerator::Create(CFlowGenListPerThread  *  parent){
3240    m_v_if =0;
3241    m_parent=parent;
3242    m_socket_id =0;
3243    m_realtime_his.Create();
3244    m_last_sync_time_sec = 0;
3245
3246    return(true);
3247 }
3248
3249 void  CNodeGenerator::Delete(){
3250     m_realtime_his.Delete();
3251 }
3252
3253
3254 void  CNodeGenerator::add_node(CGenNode * mynode){
3255     m_p_queue.push(mynode);
3256 }
3257
3258
3259
3260 void CNodeGenerator::remove_all(CFlowGenListPerThread * thread){
3261     CGenNode *node;
3262     while (!m_p_queue.empty()) {
3263         node = m_p_queue.top();
3264         m_p_queue.pop();
3265         /* sanity check */
3266         if (node->m_type == CGenNode::STATELESS_PKT) {
3267             CGenNodeStateless * p=(CGenNodeStateless *)node;
3268             /* need to be changed in Pause support */
3269             assert(p->is_mask_for_free());
3270         }
3271
3272         thread->free_node( node);
3273     }
3274 }
3275
3276 int CNodeGenerator::open_file(std::string file_name,
3277                               CPreviewMode * preview_mode){
3278     BP_ASSERT(m_v_if);
3279     m_preview_mode =*preview_mode;
3280     /* ser preview mode */
3281     m_v_if->set_review_mode(preview_mode);
3282     m_v_if->open_file(file_name);
3283     m_cnt   = 0;
3284     m_non_active = 0;
3285     m_limit = 0;
3286     return (0);
3287 }
3288
3289
3290 int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
3291     remove_all(thread);
3292     BP_ASSERT(m_v_if);
3293     m_v_if->close_file();
3294     return (0);
3295 }
3296
3297 int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){
3298     m_cnt++;
3299     if (!node_sl->is_node_active()) {
3300         m_non_active++;
3301     }
3302     #ifdef _DEBUG
3303     if ( m_preview_mode.getVMode() >2 ){
3304         fprintf(stdout," %4lu ,", (ulong)m_cnt);
3305         fprintf(stdout," %4lu ,", (ulong)m_non_active);
3306         node_sl->Dump(stdout);
3307     }
3308     #endif
3309
3310     return (0);
3311 }
3312
3313
3314 int CNodeGenerator::update_stats(CGenNode * node){
3315     if ( m_preview_mode.getVMode() >2 ){
3316         fprintf(stdout," %llu ,", (unsigned long long)m_cnt);
3317         node->Dump(stdout);
3318         m_cnt++;
3319     }
3320     return (0);
3321 }
3322
3323 bool CNodeGenerator::has_limit_reached() {
3324     /* do we have a limit and has it passed ? */
3325     return ( (m_limit > 0) && (m_cnt >= m_limit) );
3326 }
3327
3328 bool CFlowGenListPerThread::Create(uint32_t           thread_id,
3329                                    uint32_t           core_id,
3330                                    CFlowGenList  *    flow_list,
3331                                    uint32_t           max_threads){
3332
3333
3334     m_non_active_nodes = 0;
3335     m_terminated_by_master=false;
3336     m_flow_list =flow_list;
3337     m_core_id= core_id;
3338     m_tcp_dpc= 0;
3339     m_udp_dpc=0;
3340     m_max_threads=max_threads;
3341     m_thread_id=thread_id;
3342
3343     m_cpu_cp_u.Create(&m_cpu_dp_u);
3344
3345     uint32_t socket_id=rte_lcore_to_socket_id(m_core_id);
3346
3347     char name[100];
3348     sprintf(name,"nodes-%d",m_core_id);
3349
3350     //printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id);
3351
3352     m_node_pool = utl_rte_mempool_create_non_pkt(name,
3353                                                  CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(),
3354                                                  sizeof(CGenNode),
3355                                                  128,
3356                                                  0 ,
3357                                                  socket_id);
3358
3359     //printf(" pool %p \n",m_node_pool);
3360
3361     m_node_gen.Create(this);
3362     m_flow_id_to_node_lookup.Create();
3363
3364     /* split the clients to threads */
3365     CTupleGenYamlInfo * tuple_gen = &m_flow_list->m_yaml_info.m_tuple_gen;
3366
3367     m_smart_gen.Create(0,m_thread_id);
3368
3369     /* split the clients to threads using the mask */
3370     CIpPortion  portion;
3371     for (int i=0;i<tuple_gen->m_client_pool.size();i++) {
3372         split_ips(m_thread_id, m_max_threads, getDualPortId(),
3373                   tuple_gen->m_client_pool[i],
3374                   portion);
3375
3376         m_smart_gen.add_client_pool(tuple_gen->m_client_pool[i].m_dist,
3377                                     portion.m_ip_start,
3378                                     portion.m_ip_end,
3379                                     get_longest_flow(i,true),
3380                                     get_total_kcps(i,true)*1000,
3381                                     m_flow_list->m_client_config_info,
3382                                     tuple_gen->m_client_pool[i].m_tcp_aging_sec,
3383                                     tuple_gen->m_client_pool[i].m_udp_aging_sec
3384                                     );
3385     }
3386     for (int i=0;i<tuple_gen->m_server_pool.size();i++) {
3387         split_ips(m_thread_id, m_max_threads, getDualPortId(),
3388                   tuple_gen->m_server_pool[i],
3389                   portion);
3390         m_smart_gen.add_server_pool(tuple_gen->m_server_pool[i].m_dist,
3391                         portion.m_ip_start,
3392                         portion.m_ip_end,
3393                         get_longest_flow(i,false),
3394                         get_total_kcps(i,false)*1000,
3395                         tuple_gen->m_server_pool[i].m_is_bundling);
3396     }
3397
3398
3399     init_from_global(portion);
3400
3401     CMessagingManager * rx_dp=CMsgIns::Ins()->getRxDp();
3402
3403     m_ring_from_rx = rx_dp->getRingCpToDp(thread_id);
3404     m_ring_to_rx =rx_dp->getRingDpToCp(thread_id);
3405
3406     assert(m_ring_from_rx);
3407     assert(m_ring_to_rx);
3408
3409     /* create the info required for stateless DP core */
3410     m_stateless_dp_info.create(thread_id, this);
3411
3412     return (true);
3413 }
3414
3415 /* return  the client ip , port */
3416 FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job(CGenNode *p){
3417     CGenNodeDeferPort     *   defer=(CGenNodeDeferPort     *)p;
3418     int i;
3419     for (i=0; i<defer->m_cnt; i++) {
3420         m_smart_gen.FreePort(defer->m_pool_idx[i],
3421                                  defer->m_clients[i],defer->m_ports[i]);
3422     }
3423 }
3424
3425 FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job_flush(void){
3426     /* flush the pending job of free ports */
3427     if (m_tcp_dpc) {
3428         handler_defer_job((CGenNode *)m_tcp_dpc);
3429         free_node((CGenNode *)m_tcp_dpc);
3430         m_tcp_dpc=0;
3431     }
3432     if (m_udp_dpc) {
3433         handler_defer_job((CGenNode *)m_udp_dpc);
3434         free_node((CGenNode *)m_udp_dpc);
3435         m_udp_dpc=0;
3436     }
3437 }
3438
3439
3440 void CFlowGenListPerThread::defer_client_port_free(bool is_tcp,
3441                                                    uint32_t c_idx,
3442                                                    uint16_t port,
3443                                                    uint8_t c_pool_idx,
3444                                                    CTupleGeneratorSmart * gen){
3445     /* free is not required in this case */
3446     if (!gen->IsFreePortRequired(c_pool_idx) ){
3447         return;
3448     }
3449     CGenNodeDeferPort     *   defer;
3450     if (is_tcp) {
3451         if (gen->get_tcp_aging(c_pool_idx)==0) {
3452             gen->FreePort(c_pool_idx,c_idx,port);
3453             return;
3454         }
3455         defer=get_tcp_defer();
3456     }else{
3457         if (gen->get_udp_aging(c_pool_idx)==0) {
3458             gen->FreePort(c_pool_idx, c_idx,port);
3459             return;
3460         }
3461         defer=get_udp_defer();
3462     }
3463     if ( defer->add_client(c_pool_idx, c_idx,port) ){
3464         if (is_tcp) {
3465             m_node_gen.schedule_node((CGenNode *)defer,gen->get_tcp_aging(c_pool_idx));
3466             m_tcp_dpc=0;
3467         }else{
3468             m_node_gen.schedule_node((CGenNode *)defer,gen->get_udp_aging(c_pool_idx));
3469             m_udp_dpc=0;
3470         }
3471     }
3472 }
3473
3474
3475 void CFlowGenListPerThread::defer_client_port_free(CGenNode *p){
3476     defer_client_port_free(p->m_pkt_info->m_pkt_indication.m_desc.IsTcp(),
3477                            p->m_src_idx,p->m_src_port,p->m_template_info->m_client_pool_idx,
3478                            p->m_tuple_gen);
3479 }
3480
3481
3482
3483 /* copy all info from global and div by num of threads */
3484 void CFlowGenListPerThread::init_from_global(CIpPortion& portion){
3485     /* copy generator , it is the same */
3486     m_yaml_info =m_flow_list->m_yaml_info;
3487
3488     /* copy first the flow info */
3489     int i;
3490     for (i=0; i<(int)m_flow_list->m_cap_gen.size(); i++) {
3491         CFlowGeneratorRec * lp=m_flow_list->m_cap_gen[i];
3492         CFlowGeneratorRecPerThread * lp_thread=new CFlowGeneratorRecPerThread();
3493         /* TBD leak of memory */
3494         CFlowYamlInfo *         yaml_info =new CFlowYamlInfo();
3495
3496         yaml_info->m_name    = lp->m_info->m_name;
3497         yaml_info->m_k_cps   = lp->m_info->m_k_cps/(double)m_max_threads;
3498         yaml_info->m_ipg_sec = lp->m_info->m_ipg_sec;
3499         yaml_info->m_rtt_sec = lp->m_info->m_rtt_sec;
3500         yaml_info->m_w   = lp->m_info->m_w;
3501         yaml_info->m_cap_mode =lp->m_info->m_cap_mode;
3502         yaml_info->m_wlength  =lp->m_info->m_wlength;
3503         yaml_info->m_plugin_id = lp->m_info->m_plugin_id;
3504         yaml_info->m_one_app_server = lp->m_info->m_one_app_server;
3505         yaml_info->m_server_addr = lp->m_info->m_server_addr;
3506         yaml_info->m_dpPkt          =lp->m_info->m_dpPkt;
3507         yaml_info->m_server_pool_idx=lp->m_info->m_server_pool_idx;
3508         yaml_info->m_client_pool_idx=lp->m_info->m_client_pool_idx;
3509         yaml_info->m_server_pool_name=lp->m_info->m_server_pool_name;
3510         yaml_info->m_client_pool_name=lp->m_info->m_client_pool_name;
3511         /* fix this */
3512         assert(m_max_threads>0);
3513         if ( m_max_threads == 1 ) {
3514             /* we have one thread the limit */
3515             yaml_info->m_limit         = lp->m_info->m_limit;
3516         }else{
3517             yaml_info->m_limit = lp->m_info->m_limit/m_max_threads;
3518             /* thread is zero base */
3519             if ( m_thread_id == 0){
3520                 yaml_info->m_limit += lp->m_info->m_limit % m_max_threads;
3521             }
3522             if (yaml_info->m_limit==0) {
3523                 yaml_info->m_limit=1;
3524             }
3525         }
3526
3527         yaml_info->m_limit_was_set = lp->m_info->m_limit_was_set;
3528         yaml_info->m_flowcnt       = 0;
3529         yaml_info->m_restart_time = ( yaml_info->m_limit_was_set ) ?
3530             (yaml_info->m_limit / (yaml_info->m_k_cps * 1000.0)) : 0;
3531
3532         lp_thread->Create(&m_smart_gen,
3533                            yaml_info,
3534                            lp->m_flows_info,
3535                            &lp->m_flow_info,
3536                            lp->m_id,
3537                            m_thread_id);
3538
3539         m_cap_gen.push_back(lp_thread);
3540     }
3541 }
3542
3543 static void free_map_flow_id_to_node(CGenNode *p){
3544     CGlobalInfo::free_node(p);
3545 }
3546
3547
3548 void CFlowGenListPerThread::Delete(){
3549
3550     // free all current maps
3551     m_flow_id_to_node_lookup.remove_all(free_map_flow_id_to_node);
3552     // free object
3553     m_flow_id_to_node_lookup.Delete();
3554
3555     m_smart_gen.Delete();
3556     m_node_gen.Delete();
3557     Clean();
3558     m_cpu_cp_u.Delete();
3559
3560     utl_rte_mempool_delete(m_node_pool);
3561 }
3562
3563
3564
3565 void CFlowGenListPerThread::Clean(){
3566     int i;
3567     for (i=0; i<(int)m_cap_gen.size(); i++) {
3568         CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
3569         if (lp->m_tuple_gen_was_set) {
3570             CTupleGeneratorSmart *gen;
3571             gen = lp->tuple_gen.get_gen();
3572             gen->Delete();
3573             delete gen;
3574         }
3575         lp->Delete();
3576         delete lp;
3577     }
3578     m_cap_gen.clear();
3579 }
3580
3581 //uint64_t _start_time;
3582
3583 void CNodeGenerator::dump_json(std::string & json){
3584
3585     json="{\"name\":\"tx-gen\",\"type\":0,\"data\":{";
3586     m_realtime_his.dump_json("realtime-hist",json);
3587     json+="\"unknown\":0}}" ;
3588 }
3589
3590 void CNodeGenerator::add_exit_node(CFlowGenListPerThread * thread,
3591                                   dsec_t max_time){
3592
3593     if ( max_time > 0 ) {
3594         CGenNode *exit_node = thread->create_node();
3595         exit_node->m_type = CGenNode::EXIT_SCHED;
3596         exit_node->m_time = max_time;
3597         add_node(exit_node);
3598     }
3599 }
3600
3601 inline bool CNodeGenerator::handle_stl_node(CGenNode * node,
3602                                              CFlowGenListPerThread * thread){
3603     uint8_t type=node->m_type;
3604
3605     if ( likely( type == CGenNode::STATELESS_PKT ) ) {
3606        m_p_queue.pop();
3607        CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
3608         /* if the stream has been deactivated - end */
3609         if ( unlikely( node_sl->is_mask_for_free() ) ) {
3610             thread->free_node(node);
3611         } else {
3612             /* count before handle - node might be destroyed */
3613             #ifdef TREX_SIM
3614             update_stl_stats(node_sl);
3615             #endif
3616
3617             node_sl->handle(thread);
3618
3619             #ifdef TREX_SIM
3620             if (has_limit_reached()) {
3621                 thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
3622             }
3623             #endif
3624         }
3625         return (true);
3626     }
3627     return(false);
3628 }
3629
3630
3631 inline bool CNodeGenerator::do_work_stl(CGenNode * node,
3632                                               CFlowGenListPerThread * thread,
3633                                               bool always){
3634
3635     if ( handle_stl_node(node,thread)){
3636         return (false);
3637     }else{
3638         return (handle_slow_messages(node->m_type,node,thread,always));
3639     }
3640 }
3641
3642 inline bool CNodeGenerator::do_work_both(CGenNode * node,
3643                                               CFlowGenListPerThread * thread,
3644                                               dsec_t d_time,
3645                                               bool always
3646                                               ){
3647
3648     bool exit_scheduler=false;
3649     uint8_t type=node->m_type;
3650     bool done;
3651
3652     if ( handle_stl_node (node,thread) ){
3653     }else{
3654         if ( likely( type == CGenNode::FLOW_PKT ) ) {
3655             /* PKT */
3656             if ( !(node->is_repeat_flow()) || (always==false)) {
3657                 flush_one_node_to_file(node);
3658                 #ifdef _DEBUG
3659                 update_stats(node);
3660                 #endif
3661             }
3662             m_p_queue.pop();
3663             if ( node->is_last_in_flow() ) {
3664                 if ((node->is_repeat_flow()) && (always==false)) {
3665                     /* Flow is repeated, reschedule it */
3666                     thread->reschedule_flow( node);
3667                 }else{
3668                     /* Flow will not be repeated, so free node */
3669                     thread->free_last_flow_node( node);
3670                 }
3671             }else{
3672                 node->update_next_pkt_in_flow();
3673                 m_p_queue.push(node);
3674             }
3675         }else{
3676             if ((type == CGenNode::FLOW_FIF)) {
3677                /* callback to our method */
3678                 m_p_queue.pop();
3679                 if ( always == false) {
3680                     thread->m_cur_time_sec = node->m_time ;
3681
3682                     thread->generate_flows_roundrobin(&done);
3683
3684                     if (!done) {
3685                         node->m_time +=d_time;
3686                         m_p_queue.push(node);
3687                     }else{
3688                         thread->free_node(node);
3689                     }
3690                 }else{
3691                     thread->free_node(node);
3692                 }
3693
3694             }else{
3695                 exit_scheduler = handle_slow_messages(type,node,thread,always);
3696             }
3697         }
3698     }
3699
3700     return (exit_scheduler);
3701 }
3702
3703
3704
3705 template<int SCH_MODE>
3706 inline bool CNodeGenerator::do_work(CGenNode * node,
3707                                           CFlowGenListPerThread * thread,
3708                                           dsec_t d_time,
3709                                           bool always
3710                                           ){
3711     /* template filter in compile time */
3712     if ( SCH_MODE == smSTATELESS  ) {
3713         return ( do_work_stl(node,thread,always) );
3714     }else{
3715         /* smSTATEFUL */
3716         return ( do_work_both(node,thread,d_time,always) );
3717     }
3718 }
3719
3720
3721 inline void CNodeGenerator::do_sleep(dsec_t & cur_time,
3722                                      CFlowGenListPerThread * thread,
3723                                      dsec_t n_time){
3724     thread->m_cpu_dp_u.commit1();
3725     dsec_t dt;
3726
3727     /* TBD make this better using calculation, minimum now_sec() */
3728     while ( true ) {
3729         cur_time = now_sec();
3730         dt = cur_time - n_time ;
3731
3732         if (dt> WAIT_WINDOW_SIZE ) {
3733             break;
3734         }
3735
3736         rte_pause();
3737     }
3738
3739     thread->m_cpu_dp_u.start_work1();
3740 }
3741
3742
3743 inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread,
3744                                            bool always,
3745                                            double &old_offset,
3746                                            double offset){
3747
3748     thread->m_cpu_dp_u.commit1();
3749
3750         /* to do */
3751     if ( thread->is_terminated_by_master() ) {
3752         return (0);
3753     }
3754
3755     if (!always) {
3756         old_offset =offset;
3757     }else{
3758         // free the left other
3759         thread->handler_defer_job_flush();
3760     }
3761     return (0);
3762 }
3763
3764
3765
3766 template<int SCH_MODE>
3767 inline int CNodeGenerator::flush_file_realtime(dsec_t max_time,
3768                                                dsec_t d_time,
3769                                                bool always,
3770                                                CFlowGenListPerThread * thread,
3771                                                double &old_offset) {
3772     CGenNode * node;
3773     dsec_t offset=0.0;
3774     dsec_t cur_time;
3775     dsec_t n_time;
3776     if (always) {
3777          offset=old_offset;
3778     }else{
3779         add_exit_node(thread,max_time);
3780     }
3781
3782     thread->m_cpu_dp_u.start_work1();
3783
3784     sch_state_t state = scINIT;
3785     node = m_p_queue.top();
3786     n_time = node->m_time + offset;
3787     cur_time = now_sec();
3788
3789     while (state!=scTERMINATE) {
3790
3791          switch (state) {
3792          case scINIT:
3793             cur_time = now_sec();
3794             {
3795                 dsec_t dt = cur_time - n_time ;
3796
3797                 if (dt > BURST_OFFSET_DTIME) {
3798                     state = scSTRECH;
3799                 } else if (dt > 0) {
3800                     state = scWORK;
3801                 } else {
3802                     state = scWAIT;
3803                 }
3804
3805             }
3806             break;
3807
3808          case scWORK:
3809             {
3810                 int node_count = 0;
3811                 do {
3812
3813                     bool s=do_work<SCH_MODE>(node,thread,d_time,always);
3814                     if (s) { // can we remove this IF ?
3815                         state=scTERMINATE;
3816                         break;
3817                     }
3818                     node = m_p_queue.top();
3819                     n_time = node->m_time + offset;
3820                     node_count++;
3821
3822                     /* we either out of the time frame or every 1024 nodes we get out for time checking */
3823                     if ( ( (n_time - cur_time) > EAT_WINDOW_DTIME ) || (node_count > 1024) ) {
3824                         state = scINIT;
3825                         break;
3826                     }
3827
3828                 } while (true);
3829                 break;
3830             }
3831
3832          case scWAIT:
3833                 do_sleep(cur_time,thread,n_time); // estimate  loop
3834                 state=scWORK;
3835                 break;
3836
3837
3838          default:
3839              handle_slow_operations(state, node, cur_time, n_time, offset, thread);
3840              break;
3841         } /* switch */
3842
3843     }/* while*/
3844
3845     return (teardown(thread,always,old_offset,offset));
3846 }
3847
3848
3849 FORCE_NO_INLINE void CNodeGenerator::handle_slow_operations(sch_state_t &state,
3850                                                             CGenNode * &node,
3851                                                             dsec_t &cur_time,
3852                                                             dsec_t &n_time,
3853                                                             dsec_t &offset,
3854                                                             CFlowGenListPerThread *thread) {
3855     switch (state) {
3856     case scSTRECH:
3857         {
3858             handle_time_strech(node, cur_time, n_time, offset, thread);
3859
3860             /* go back to work */
3861             state = scWORK;
3862
3863         }
3864         break;
3865
3866     default:
3867         assert(0);
3868     }
3869
3870 }
3871
3872 /**
3873  * when time is streched - the flow_sync node 
3874  * might be postpond too much 
3875  * this can result a watchdog crash and lack 
3876  * of responsivness from the DP core 
3877  * (no handling of messages) 
3878  * 
3879  * @author imarom (7/31/2016)
3880  * 
3881  */
3882 void CNodeGenerator::handle_time_strech(CGenNode * &node,
3883                                         dsec_t &cur_time,
3884                                         dsec_t &n_time,
3885                                         dsec_t &offset,
3886                                         CFlowGenListPerThread *thread) {
3887
3888
3889     /* fix the time offset */
3890     dsec_t dt = cur_time - n_time;
3891     offset += dt;
3892
3893     /* check if flow sync message was delayed too much */
3894     if ( (cur_time - m_last_sync_time_sec) > SYNC_TIME_OUT ) {
3895         handle_maintenance(thread);
3896
3897         /* re-read the top of the queue - it might have changed with messaging */
3898         node = m_p_queue.top();
3899         n_time = node->m_time + offset;
3900     }
3901
3902 }
3903
3904 int CNodeGenerator::flush_file_sim(dsec_t max_time,
3905                                    dsec_t d_time,
3906                                    bool always,
3907                                    CFlowGenListPerThread * thread,
3908                                    double &old_offset){
3909     CGenNode * node;
3910
3911     if (!always) {
3912         add_exit_node(thread,max_time);
3913     }
3914
3915     while (true) {
3916         node = m_p_queue.top();
3917
3918         bool do_exit;
3919         if ( get_is_stateless() ) {
3920             do_exit=do_work<smSTATELESS>(node,thread,d_time,always);
3921         }else{
3922             do_exit=do_work<smSTATEFUL>(node,thread,d_time,always);
3923         }
3924         if ( do_exit ){
3925             break;
3926         }
3927     }
3928     return (teardown(thread,always,old_offset,0));
3929 }
3930
3931 int CNodeGenerator::flush_file(dsec_t max_time,
3932                                dsec_t d_time,
3933                                bool always,
3934                                CFlowGenListPerThread * thread,
3935                                double &old_offset){
3936     #ifdef TREX_SIM
3937       return ( flush_file_sim(max_time, d_time,always,thread,old_offset) );
3938     #else
3939       if ( get_is_stateless() ) {
3940           return ( flush_file_realtime<smSTATELESS>(max_time, d_time,always,thread,old_offset) );
3941       }else{
3942           return ( flush_file_realtime<smSTATEFUL>(max_time, d_time,always,thread,old_offset) );
3943       }
3944
3945     #endif
3946 }
3947
3948
3949
3950 void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
3951
3952     /*repeat and NAT is not supported together */
3953     if ( node->is_nat_first_state()  ) {
3954         node->set_nat_wait_state();
3955         flush_one_node_to_file(node);
3956         #ifdef _DEBUG
3957         update_stats(node);
3958         #endif
3959     } else {
3960         if ( node->is_nat_wait_state() ) {
3961             if (node->is_responder_pkt()) {
3962                 m_p_queue.pop();
3963                 /* time out, need to free the flow and remove the association , we didn't get conversion yet*/
3964                 thread->terminate_nat_flows(node);
3965                 return;
3966
3967             } else {
3968                 flush_one_node_to_file(node);
3969                 #ifdef _DEBUG
3970                 update_stats(node);
3971                 #endif
3972             }
3973         } else {
3974             if ( node->is_nat_wait_ack_state() ) {
3975                 if (node->is_initiator_pkt()) {
3976                     m_p_queue.pop();
3977                     /* time out, need to free the flow and remove the association , we didn't get conversion yet*/
3978                     thread->terminate_nat_flows(node);
3979                     return;
3980
3981                 } else {
3982                     flush_one_node_to_file(node);
3983 #ifdef _DEBUG
3984                     update_stats(node);
3985 #endif
3986                 }
3987             } else {
3988                 assert(0);
3989             }
3990         }
3991     }
3992     m_p_queue.pop();
3993     if ( node->is_last_in_flow() ) {
3994         thread->free_last_flow_node( node);
3995     } else {
3996         node->update_next_pkt_in_flow();
3997         m_p_queue.push(node);
3998     }
3999 }
4000
4001 void CNodeGenerator::handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
4002
4003     
4004     /* flow sync message is a sync point for time */
4005     thread->m_cur_time_sec = node->m_time;
4006
4007     /* first pop the node */
4008     m_p_queue.pop();
4009
4010     /* call all the maintenance required */
4011     handle_maintenance(thread);
4012
4013     /* exit in case this is the last node*/
4014     if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
4015         thread->free_node(node);
4016         exit_scheduler = true;
4017     } else {
4018         /* schedule for next maintenace */
4019         node->m_time += SYNC_TIME_OUT;
4020         m_p_queue.push(node);
4021     }
4022
4023 }
4024
4025 void
4026 CNodeGenerator::handle_maintenance(CFlowGenListPerThread *thread) {
4027
4028     thread->tickle();         /* tickle the watchdog */
4029     thread->check_msgs();     /* check messages */
4030     m_v_if->flush_tx_queue(); /* flush pkt each timeout */
4031
4032     /* save last sync time as realtime */
4033     m_last_sync_time_sec = now_sec();
4034 }
4035
4036
4037 void CNodeGenerator::handle_command(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
4038     m_p_queue.pop();
4039     CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
4040     TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
4041     cmd->handle(&thread->m_stateless_dp_info);
4042     exit_scheduler = cmd->is_quit();
4043     thread->free_node((CGenNode *)node_cmd);/* free the node */
4044 }
4045
4046 void CNodeGenerator::handle_pcap_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
4047     m_p_queue.pop();
4048
4049     CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
4050
4051     /* might have been marked for free */
4052     if ( unlikely( node_pcap->is_marked_for_free() ) ) {
4053         thread->free_node(node);
4054     } else {
4055         node_pcap->handle(thread);
4056     }
4057 }
4058
4059 bool
4060 CNodeGenerator::handle_slow_messages(uint8_t type,
4061                                      CGenNode * node,
4062                                      CFlowGenListPerThread * thread,
4063                                      bool always){
4064
4065     /* should we continue after */
4066     bool exit_scheduler = false;
4067
4068     switch (type) {
4069     case CGenNode::PCAP_PKT:
4070         handle_pcap_pkt(node, thread);
4071         break;
4072
4073     case CGenNode::FLOW_DEFER_PORT_RELEASE:
4074         m_p_queue.pop();
4075         thread->handler_defer_job(node);
4076         thread->free_node(node);
4077         break;
4078
4079     case CGenNode::FLOW_PKT_NAT:
4080         handle_flow_pkt(node, thread);
4081         break;
4082
4083     case CGenNode::FLOW_SYNC:
4084         handle_flow_sync(node, thread, exit_scheduler);
4085         break;
4086
4087     case CGenNode::EXIT_SCHED:
4088         m_p_queue.pop();
4089         thread->free_node(node);
4090         exit_scheduler = true;
4091         break;
4092
4093
4094     case CGenNode::COMMAND:
4095         handle_command(node, thread, exit_scheduler);
4096         break;
4097
4098     default:
4099         assert(0);
4100     }
4101
4102     return (exit_scheduler);
4103
4104 }
4105
4106 void CFlowGenListPerThread::Dump(FILE *fd){
4107     fprintf(fd,"yaml info ");
4108     m_yaml_info.Dump(fd);
4109
4110     fprintf(fd,"\n");
4111     fprintf(fd,"cap file info");
4112     int i;
4113     for (i=0; i<(int)m_cap_gen.size(); i++) {
4114         CFlowGeneratorRecPerThread  * lp=m_cap_gen[i];
4115         lp->Dump(stdout);
4116     }
4117 }
4118
4119
4120
4121
4122 void CFlowGenListPerThread::DumpStats(FILE *fd){
4123     m_stats.dump(fd);
4124 }
4125
4126
4127 void CFlowGenListPerThread::DumpCsv(FILE *fd){
4128     CFlowStats::DumpHeader(fd);
4129
4130     CFlowStats stats;
4131     CFlowStats sum;
4132     int i;
4133
4134     for (i=0; i<(int)m_cap_gen.size(); i++) {
4135         CFlowGeneratorRecPerThread  * lp=m_cap_gen[i];
4136         lp->getFlowStats(&stats);
4137         stats.Dump(fd);
4138         sum.Add(stats);
4139     }
4140     fprintf(fd,"\n");
4141     sum.m_name= "sum";
4142     sum.Dump(fd);
4143 }
4144
4145
4146 uint32_t CFlowGenListPerThread::getDualPortId(){
4147     return ( ::getDualPortId(m_thread_id) );
4148 }
4149
4150 double CFlowGenListPerThread::get_longest_flow(uint8_t pool_idx, bool is_client){
4151     int i;
4152     double longest_flow = 0.0;
4153     for (i=0;i<(int)m_cap_gen.size(); i++) {
4154         CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4155         if (is_client &&
4156             lp->m_info->m_client_pool_idx != pool_idx)
4157             continue;
4158         if (!is_client &&
4159             lp->m_info->m_server_pool_idx != pool_idx)
4160             continue;
4161         double tmp_len;
4162         tmp_len = lp->m_flow_info->get_cap_file_length_sec();
4163         if (longest_flow < tmp_len ) {
4164             longest_flow = tmp_len;
4165         }
4166     }
4167     return longest_flow;
4168 }
4169
4170
4171 double CFlowGenListPerThread::get_longest_flow(){
4172     int i;
4173     double longest_flow = 0.0;
4174     for (i=0;i<(int)m_cap_gen.size(); i++) {
4175         CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4176         double tmp_len;
4177         tmp_len = lp->m_flow_info->get_cap_file_length_sec();
4178         if (longest_flow < tmp_len ) {
4179             longest_flow = tmp_len;
4180         }
4181     }
4182     return longest_flow;
4183 }
4184
4185 double CFlowGenListPerThread::get_total_kcps(uint8_t pool_idx, bool is_client){
4186     int i;
4187     double total=0.0;
4188     for (i=0; i<(int)m_cap_gen.size(); i++) {
4189         CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4190         if (is_client &&
4191             lp->m_info->m_client_pool_idx != pool_idx)
4192             continue;
4193         if (!is_client &&
4194             lp->m_info->m_server_pool_idx != pool_idx)
4195             continue;
4196         total +=lp->m_info->m_k_cps;
4197     }
4198     return (total);
4199 }
4200
4201 double CFlowGenListPerThread::get_total_kcps(){
4202     int i;
4203     double total=0.0;
4204     for (i=0; i<(int)m_cap_gen.size(); i++) {
4205         CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
4206         total +=lp->m_info->m_k_cps;
4207     }
4208     return (total);
4209 }
4210
4211 double CFlowGenListPerThread::get_delta_flow_is_sec(){
4212     return (1.0/(1000.0*get_total_kcps()));
4213 }
4214
4215
4216 void CFlowGenListPerThread::inc_current_template(void){
4217     m_cur_template++;
4218     if (m_cur_template == m_cap_gen.size()) {
4219         m_cur_template=0;
4220     }
4221 }
4222
4223
4224 int CFlowGenListPerThread::generate_flows_roundrobin(bool *done){
4225     // round robin
4226
4227     CFlowGeneratorRecPerThread * cur;
4228     bool found=false;
4229     // try current
4230     int i;
4231     *done = true;
4232     for (i=0;i<(int)m_cap_gen.size();i++ ) {
4233         cur=m_cap_gen[m_cur_template];
4234         if (!(cur->m_info->m_limit_was_set) ||
4235             (cur->m_info->m_flowcnt < cur->m_info->m_limit)) {
4236             *done = false;
4237             if ( cur->m_policer.update(1.0,m_cur_time_sec) ){
4238                 cur->m_info->m_flowcnt++;
4239                 found=true;
4240                 break;
4241             }
4242         }
4243         inc_current_template();
4244     }
4245
4246     if (found) {
4247         /* generate the flow into the generator*/
4248         CGenNode * node= create_node() ;
4249
4250         cur->generate_flow(&m_node_gen,m_cur_time_sec,m_cur_flow_id,node);
4251         m_cur_flow_id++;
4252
4253         /* this is estimation */
4254         m_stats.m_total_open_flows += cur->m_flow_info->get_total_flows();
4255         m_stats.m_total_bytes += cur->m_flow_info->get_total_bytes();
4256         m_stats.m_total_pkt   += cur->m_flow_info->Size();
4257         inc_current_template();
4258     }
4259     return (0);
4260 }
4261
4262
4263 int CFlowGenListPerThread::reschedule_flow(CGenNode *node){
4264
4265     // Re-schedule the node
4266     node->reset_pkt_in_flow();
4267     node->m_time += node->m_template_info->m_restart_time;
4268     m_node_gen.add_node(node);
4269
4270     m_stats.m_total_bytes += node->m_flow_info->get_total_bytes();
4271     m_stats.m_total_pkt   += node->m_flow_info->Size();
4272
4273     return (0);
4274 }
4275
4276 void CFlowGenListPerThread::terminate_nat_flows(CGenNode *p){
4277     m_stats.m_nat_flow_timeout++;
4278     m_stats.m_nat_lookup_remove_flow_id++;
4279     if (p->is_nat_wait_ack_state()) {
4280         m_stats.m_nat_flow_timeout_wait_ack++;
4281     } else {
4282         m_stats.m_nat_lookup_wait_ack_state++;
4283     }
4284     m_flow_id_to_node_lookup.remove_no_lookup(p->get_short_fid());
4285     free_last_flow_node( p);
4286 }
4287
4288
4289 void CFlowGenListPerThread::handle_latency_pkt_msg(CGenNodeLatencyPktInfo * msg){
4290     /* send the packet */
4291     #ifdef RX_QUEUE_TRACE_
4292     printf(" latency  msg dir %d\n",msg->m_dir);
4293     struct rte_mbuf * m;
4294     m=msg->m_pkt;
4295     rte_pktmbuf_dump(stdout,m, rte_pktmbuf_pkt_len(m));
4296     #endif
4297
4298     /* update timestamp */
4299     struct rte_mbuf * m;
4300     m=msg->m_pkt;
4301     uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
4302     latency_header * h=(latency_header *)(p+msg->m_latency_offset);
4303     h->time_stamp = os_get_hr_tick_64();
4304
4305     m_node_gen.m_v_if->send_one_pkt((pkt_dir_t)msg->m_dir,msg->m_pkt);
4306 }
4307
4308 void CFlowGenListPerThread::handle_nat_msg(CGenNodeNatInfo * msg){
4309     int i;
4310     bool first = true, second = true;
4311
4312     for (i=0; i<msg->m_cnt; i++) {
4313         first = true;
4314         second = true;
4315         CNatFlowInfo * nat_msg=&msg->m_data[i];
4316         CGenNode * node=m_flow_id_to_node_lookup.lookup(nat_msg->m_fid);
4317         if (!node) {
4318             /* this should be moved to a notification module */
4319 #ifdef NAT_TRACE_
4320             printf(" ERORR not valid flow_id %d probably flow was aged  \n",nat_msg->m_fid);
4321 #endif
4322             m_stats.m_nat_lookup_no_flow_id++;
4323             continue;
4324         }
4325
4326         // Calculate diff between tcp seq of SYN packet, and TCP ack of SYN+ACK packet
4327         // For supporting firewalls who do TCP seq num randomization
4328         if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
4329             if (node->is_nat_wait_state()) {
4330                 char *syn_pkt = node->m_flow_info->GetPacket(0)->m_packet->raw;
4331                 TCPHeader *tcp = (TCPHeader *)(syn_pkt + node->m_pkt_info->m_pkt_indication.getFastTcpOffset());
4332                 node->set_nat_tcp_seq_diff_client(nat_msg->m_tcp_seq - tcp->getSeqNumber());
4333                 if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
4334                     node->set_nat_wait_ack_state();
4335                     m_stats.m_nat_lookup_wait_ack_state++;
4336                     second = false;
4337                 } else {
4338                     node->set_nat_learn_state();
4339                 }
4340             } else {
4341                 char *syn_ack_pkt = node->m_flow_info->GetPacket(1)->m_packet->raw;
4342                 TCPHeader *tcp = (TCPHeader *)(syn_ack_pkt + node->m_pkt_info->m_pkt_indication.getFastTcpOffset());
4343                 node->set_nat_tcp_seq_diff_server(nat_msg->m_tcp_seq - tcp->getSeqNumber());
4344                 assert(node->is_nat_wait_ack_state());
4345                 node->set_nat_learn_state();
4346                 first = false;
4347             }
4348         } else {
4349             assert(node->is_nat_wait_state());
4350             node->set_nat_learn_state();
4351         }
4352
4353         if (first) {
4354 #ifdef NAT_TRACE_
4355             printf(" %.03f  RX :set node %p:%x  %x:%x TCP diff %x\n"
4356                    , now_sec(), node,nat_msg->m_fid, nat_msg->m_external_ip, nat_msg->m_external_port
4357                    , node->get_nat_tcp_seq_diff_client());
4358 #endif
4359
4360             node->set_nat_ipv4_addr(nat_msg->m_external_ip);
4361             node->set_nat_ipv4_port(nat_msg->m_external_port);
4362
4363             if ( CGlobalInfo::is_learn_verify_mode() ){
4364                 if (!node->is_external_is_eq_to_internal_ip() ||
4365                     node->get_nat_tcp_seq_diff_client() != 0) {
4366                     m_stats.m_nat_flow_learn_error++;
4367                 }
4368             }
4369         }
4370
4371         if (second) {
4372             /* remove from the hash */
4373             m_flow_id_to_node_lookup.remove_no_lookup(nat_msg->m_fid);
4374             m_stats.m_nat_lookup_remove_flow_id++;
4375         }
4376     }
4377 }
4378
4379 void CFlowGenListPerThread::check_msgs(void) {
4380
4381     /* inlined for performance */
4382     m_stateless_dp_info.periodic_check_for_cp_messages();
4383
4384     if ( likely ( m_ring_from_rx->isEmpty() ) ) {
4385         return;
4386     }
4387
4388     #ifdef  NAT_TRACE_
4389     printf(" %.03f got message from RX \n",now_sec());
4390     #endif
4391     while ( true ) {
4392         CGenNode * node;
4393         if ( m_ring_from_rx->Dequeue(node)!=0 ){
4394             break;
4395         }
4396         assert(node);
4397         //printf ( " message: thread %d,  node->m_flow_id : %d \n", m_thread_id,node->m_flow_id);
4398         /* only one message is supported right now */
4399
4400         CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node;
4401
4402         uint8_t   msg_type =  msg->m_msg_type;
4403         switch (msg_type ) {
4404         case CGenNodeMsgBase::NAT_FIRST:
4405             handle_nat_msg((CGenNodeNatInfo * )msg);
4406             break;
4407
4408         case CGenNodeMsgBase::LATENCY_PKT:
4409             handle_latency_pkt_msg((CGenNodeLatencyPktInfo *) msg);
4410             break;
4411
4412         default:
4413             printf("ERROR pkt-thread message type is not valid %d \n",msg_type);
4414             assert(0);
4415         }
4416
4417         CGlobalInfo::free_node(node);
4418     }
4419 }
4420
4421
4422
4423 void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name,
4424                                                             CPreviewMode &preview,
4425                                                             uint64_t limit){
4426     m_preview_mode = preview;
4427     m_node_gen.open_file(erf_file_name,&m_preview_mode);
4428     m_node_gen.set_packet_limit(limit);
4429 }
4430
4431 void CFlowGenListPerThread::stop_stateless_simulation_file(){
4432     m_node_gen.m_v_if->close_file();
4433 }
4434
4435 void CFlowGenListPerThread::start_stateless_daemon_simulation(){
4436     CGlobalInfo::m_options.m_run_mode = CParserOption::RUN_MODE_INTERACTIVE;
4437     m_cur_time_sec = 0;
4438
4439     /* if no pending CP messages - the core will simply be stuck forever */
4440     if (m_stateless_dp_info.are_any_pending_cp_messages()) {
4441         m_stateless_dp_info.run_once();
4442     }
4443 }
4444
4445
4446 /* return true if we need to shedule next_stream,  */
4447
4448 bool CFlowGenListPerThread::set_stateless_next_node( CGenNodeStateless * cur_node,
4449                                                      CGenNodeStateless * next_node){
4450     return ( m_stateless_dp_info.set_stateless_next_node(cur_node,next_node) );
4451 }
4452
4453
4454 void CFlowGenListPerThread::start_stateless_daemon(CPreviewMode &preview){
4455     CGlobalInfo::m_options.m_run_mode = CParserOption::RUN_MODE_INTERACTIVE;
4456     m_cur_time_sec = 0;
4457     /* set per thread global info, for performance */
4458     m_preview_mode = preview;
4459     m_node_gen.open_file("",&m_preview_mode);
4460
4461     m_stateless_dp_info.start();
4462 }
4463
4464
4465 void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name,
4466                                 CPreviewMode & preview){
4467     /* now we are ready to generate*/
4468     if ( m_cap_gen.size()==0 ){
4469         fprintf(stderr," nothing to generate no template loaded \n");
4470         return;
4471     }
4472
4473     m_preview_mode = preview;
4474     m_node_gen.open_file(erf_file_name,&m_preview_mode);
4475     dsec_t d_time_flow=get_delta_flow_is_sec();
4476
4477     m_cur_time_sec =  0.01 + m_thread_id*m_flow_list->get_delta_flow_is_sec();
4478
4479
4480     if ( CGlobalInfo::is_realtime()  ){
4481         if (m_cur_time_sec > 0.2 ) {
4482             m_cur_time_sec =  0.01 + m_thread_id*0.01;
4483         }
4484         m_cur_time_sec += now_sec() + 0.1 ;
4485     }
4486     dsec_t c_stop_sec = m_cur_time_sec + m_yaml_info.m_duration_sec;
4487     m_stop_time_sec =c_stop_sec;
4488     m_cur_flow_id =1;
4489     m_cur_template =(m_thread_id % m_cap_gen.size());
4490     m_stats.clear();
4491
4492     fprintf(stdout," Generating erf file ...  \n");
4493     CGenNode * node= create_node() ;
4494     /* add periodic */
4495     node->m_type = CGenNode::FLOW_FIF;
4496     node->m_time = m_cur_time_sec;
4497     m_node_gen.add_node(node);
4498
4499     double old_offset=0.0;
4500
4501     node= create_node() ;
4502     node->m_type = CGenNode::FLOW_SYNC;
4503     node->m_time = m_cur_time_sec + SYNC_TIME_OUT ;
4504
4505     m_node_gen.add_node(node);
4506
4507     #ifdef _DEBUG
4508     if ( m_preview_mode.getVMode() >2 ){
4509
4510         CGenNode::DumpHeader(stdout);
4511     }
4512     #endif
4513
4514     m_node_gen.flush_file(c_stop_sec,d_time_flow, false,this,old_offset);
4515
4516
4517 #ifdef VALG
4518     CALLGRIND_STOP_INSTRUMENTATION;
4519     printf (" %llu \n",os_get_hr_tick_64()-_start_time);
4520 #endif
4521     if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() &&  (is_terminated_by_master()==false) ){
4522         /* clean close */
4523         m_node_gen.flush_file(m_cur_time_sec, d_time_flow, true,this,old_offset);
4524     }
4525
4526     if (m_preview_mode.getVMode() > 1 ) {
4527         fprintf(stdout,"\n\n");
4528         fprintf(stdout,"\n\n");
4529         fprintf(stdout,"file stats \n");
4530         fprintf(stdout,"=================\n");
4531         m_stats.dump(stdout);
4532     }
4533     m_node_gen.close_file(this);
4534 }
4535
4536 void CFlowGenList::Delete(){
4537     clean_p_thread_info();
4538     Clean();
4539     if (CPluginCallback::callback) {
4540         delete  CPluginCallback::callback;
4541         CPluginCallback::callback = NULL;
4542     }
4543 }
4544
4545
4546 bool CFlowGenList::Create(){
4547     check_objects_sizes();
4548     CPluginCallback::callback=  new CPluginCallbackSimple();
4549     return (true);
4550 }
4551
4552
4553 void CFlowGenList::generate_p_thread_info(uint32_t num_threads){
4554     clean_p_thread_info();
4555     BP_ASSERT(num_threads < 64);
4556     int i;
4557     for (i=0; i<(int)num_threads; i++) {
4558         CFlowGenListPerThread   * lp= new CFlowGenListPerThread();
4559         lp->Create(i,i,this,num_threads);
4560         m_threads_info.push_back(lp);
4561     }
4562 }
4563
4564
4565 void CFlowGenList::clean_p_thread_info(void){
4566     int i;
4567     for (i=0; i<(int)m_threads_info.size(); i++) {
4568         CFlowGenListPerThread * lp=m_threads_info[i];
4569         lp->Delete();
4570         delete lp;
4571     }
4572     m_threads_info.clear();
4573 }
4574
4575 int CFlowGenList::load_client_config_file(std::string file_name) {
4576     m_client_config_info.load_yaml_file(file_name);
4577     return (0);
4578 }
4579
4580 void CFlowGenList::set_client_config_tuple_gen_info(CTupleGenYamlInfo * tg) {
4581     m_client_config_info.set_tuple_gen_info(tg);
4582 }
4583
4584 std::vector<ClientCfgCompactEntry *> CFlowGenList::get_client_cfg_ip_list() {
4585     return m_client_config_info.get_entry_list();
4586 }
4587
4588 void CFlowGenList::set_client_config_resolved_macs(CManyIPInfo &pretest_result) {
4589     m_client_config_info.set_resolved_macs(pretest_result);
4590 }
4591
4592 void CFlowGenList::dump_client_config(FILE *fd) {
4593     m_client_config_info.dump(fd);
4594 }
4595
4596 int CFlowGenList::load_from_yaml(std::string file_name,
4597                                  uint32_t num_threads){
4598     uint8_t idx;
4599     m_yaml_info.load_from_yaml_file(file_name);
4600     if (m_yaml_info.verify_correctness(num_threads) ==false){
4601         exit(0);
4602     }
4603
4604     /* move it to global info, better CPU D-cache  usage */
4605     CGlobalInfo::m_options.preview.set_vlan_mode_enable(m_yaml_info.m_vlan_info.m_enable);
4606     CGlobalInfo::m_options.m_vlan_port[0] =   m_yaml_info.m_vlan_info.m_vlan_per_port[0];
4607     CGlobalInfo::m_options.m_vlan_port[1] =   m_yaml_info.m_vlan_info.m_vlan_per_port[1];
4608     CGlobalInfo::m_options.preview.set_mac_ip_overide_enable(m_yaml_info.m_mac_replace_by_ip);
4609
4610     if ( m_yaml_info.m_mac_base.size() != 6 ){
4611         printf(" mac addr is not valid \n");
4612         exit(0);
4613     }
4614
4615     if (m_yaml_info.m_ipv6_set == true) {
4616         // Copy the most significant 96-bits from yaml data
4617         for (idx=0; idx<6; idx++){
4618             CGlobalInfo::m_options.m_src_ipv6[idx] = m_yaml_info.m_src_ipv6[idx];
4619             CGlobalInfo::m_options.m_dst_ipv6[idx] = m_yaml_info.m_dst_ipv6[idx];
4620         }
4621     }else{
4622         // Set the most signifcant 96-bits to zero which represents an
4623         // IPv4-compatible IPv6 address
4624         for (idx=0; idx<6; idx++){
4625             CGlobalInfo::m_options.m_src_ipv6[idx] = 0;
4626             CGlobalInfo::m_options.m_dst_ipv6[idx] = 0;
4627         }
4628     }
4629
4630     int i=0;
4631     Clean();
4632     bool all_template_has_one_direction=true;
4633     for (i=0; i<(int)m_yaml_info.m_vec.size(); i++) {
4634         CFlowGeneratorRec * lp=new CFlowGeneratorRec();
4635         if ( lp->Create(&m_yaml_info.m_vec[i],&m_yaml_info,i) == false){
4636             fprintf(stdout,"\n ERROR reading YAML template files, please verify that they are valid \n\n");
4637             exit(-1);
4638             return (-1);
4639         }
4640         m_cap_gen.push_back(lp);
4641
4642         if (lp->m_flow_info.GetPacket(0)->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) {
4643             all_template_has_one_direction=false;
4644         }
4645     }
4646
4647     if ( CGlobalInfo::is_learn_mode() && all_template_has_one_direction ) {
4648         fprintf(stdout,"\n Warning --learn mode has nothing to do when all templates are one directional, please remove it  \n");
4649     }
4650     return (0);
4651 }
4652
4653
4654
4655 void CFlowGenList::Clean(){
4656     int i;
4657     for (i=0; i<(int)m_cap_gen.size(); i++) {
4658         CFlowGeneratorRec * lp=m_cap_gen[i];
4659         lp->Delete();
4660         delete lp;
4661     }
4662     m_cap_gen.clear();
4663 }
4664
4665 double CFlowGenList::GetCpuUtil(){
4666     int i;
4667     double c=0.0;
4668     for (i=0; i<(int)m_threads_info.size(); i++) {
4669         CFlowGenListPerThread * lp=m_threads_info[i];
4670         c+=lp->m_cpu_cp_u.GetVal();
4671     }
4672     return (c/m_threads_info.size());
4673 }
4674
4675 double CFlowGenList::GetCpuUtilRaw(){
4676     int i;
4677     double c=0.0;
4678     for (i=0; i<(int)m_threads_info.size(); i++) {
4679         CFlowGenListPerThread * lp=m_threads_info[i];
4680         c+=lp->m_cpu_cp_u.GetValRaw();
4681     }
4682     return (c/m_threads_info.size());
4683 }
4684
4685
4686 void CFlowGenList::UpdateFast(){
4687
4688     for (int i=0; i<(int)m_threads_info.size(); i++) {
4689         CFlowGenListPerThread * lp=m_threads_info[i];
4690         lp->Update();
4691     }
4692 }
4693
4694
4695
4696 void CFlowGenList::Dump(FILE *fd){
4697     fprintf(fd,"yaml info \n");
4698     fprintf(fd,"--------------\n");
4699     m_yaml_info.Dump(fd);
4700
4701     fprintf(fd,"\n");
4702     fprintf(fd,"cap file info \n");
4703     fprintf(fd,"----------------------\n");
4704     int i;
4705     for (i=0; i<(int)m_cap_gen.size(); i++) {
4706         CFlowGeneratorRec * lp=m_cap_gen[i];
4707         lp->Dump(fd);
4708     }
4709 }
4710
4711
4712 void CFlowGenList::DumpPktSize(){
4713
4714     int i;
4715     for (i=0; i<(int)m_cap_gen.size(); i++) {
4716         CFlowGeneratorRec * lp=m_cap_gen[i];
4717         lp->m_flow_info.dump_pkt_sizes();
4718     }
4719 }
4720
4721
4722 void CFlowGenList::DumpCsv(FILE *fd){
4723     CFlowStats::DumpHeader(fd);
4724
4725     CFlowStats stats;
4726     CFlowStats sum;
4727     int i;
4728
4729     for (i=0; i<(int)m_cap_gen.size(); i++) {
4730         CFlowGeneratorRec * lp=m_cap_gen[i];
4731         lp->getFlowStats(&stats);
4732         stats.Dump(fd);
4733         sum.Add(stats);
4734     }
4735     fprintf(fd,"\n");
4736     sum.m_name= "sum";
4737     sum.Dump(fd);
4738     sum.m_memory.dump(fd);
4739 }
4740
4741
4742 uint32_t CFlowGenList::get_total_repeat_flows(){
4743     uint32_t flows=0;
4744     int i;
4745     for (i=0; i<(int)m_cap_gen.size(); i++) {
4746         CFlowGeneratorRec * lp=m_cap_gen[i];
4747         flows+=lp->m_info->m_limit ;
4748     }
4749     return (flows);
4750 }
4751
4752
4753 double CFlowGenList::get_total_tx_bps(){
4754     CFlowStats stats;
4755     double total=0.0;
4756     int i;
4757
4758     for (i=0; i<(int)m_cap_gen.size(); i++) {
4759         CFlowGeneratorRec * lp=m_cap_gen[i];
4760         lp->getFlowStats(&stats);
4761         total+=(stats.m_mb_sec);
4762     }
4763     return (_1Mb_DOUBLE*total);
4764 }
4765
4766 double CFlowGenList::get_total_pps(){
4767
4768     CFlowStats stats;
4769     double total=0.0;
4770     int i;
4771
4772     for (i=0; i<(int)m_cap_gen.size(); i++) {
4773         CFlowGeneratorRec * lp=m_cap_gen[i];
4774         lp->getFlowStats(&stats);
4775         total+=stats.m_pps;
4776     }
4777     return (total);
4778 }
4779
4780
4781 double CFlowGenList::get_total_kcps(){
4782
4783     CFlowStats stats;
4784     double total=0.0;
4785     int i;
4786
4787     for (i=0; i<(int)m_cap_gen.size(); i++) {
4788         CFlowGeneratorRec * lp=m_cap_gen[i];
4789         lp->getFlowStats(&stats);
4790         total+= stats.get_normal_cps();
4791     }
4792     return ((total/1000.0));
4793 }
4794
4795 double CFlowGenList::get_delta_flow_is_sec(){
4796     return (1.0/(1000.0*get_total_kcps()));
4797 }
4798
4799
4800
4801 bool CPolicer::update(double dsize,double now_sec){
4802     if ( m_last_time ==0.0 ) {
4803         /* first time */
4804         m_last_time = now_sec;
4805         return (true);
4806     }
4807     if (m_cir == 0.0) {
4808         return (false);
4809     }
4810
4811         // check if there is a need to add tokens
4812     if(now_sec > m_last_time) {
4813         dsec_t dtime=(now_sec - m_last_time);
4814         dsec_t dsize =dtime*m_cir;
4815         m_level +=dsize;
4816         if (m_level > m_bucket_size) {
4817             m_level = m_bucket_size;
4818         }
4819         m_last_time = now_sec;
4820     }
4821
4822     if (m_level > dsize) {
4823         m_level -= dsize;
4824         return (true);
4825     }else{
4826         return (false);
4827     }
4828 }
4829
4830
4831 float CPPSMeasure::add(uint64_t pkts){
4832     if ( false == m_start ){
4833         m_start=true;
4834         m_last_time_msec = os_get_time_msec() ;
4835         m_last_pkts=pkts;
4836         return (0.0);
4837     }
4838
4839     uint32_t ctime=os_get_time_msec();
4840     if ((ctime - m_last_time_msec) <os_get_time_freq() )  {
4841         return  (m_last_result);
4842     }
4843
4844     uint32_t dtime_msec = ctime-m_last_time_msec;
4845     uint32_t dpkts     = (pkts - m_last_pkts);
4846
4847     m_last_time_msec    = ctime;
4848     m_last_pkts        = pkts;
4849
4850     m_last_result= 0.5*calc_pps(dtime_msec,dpkts) +0.5*(m_last_result);
4851     return ( m_last_result );
4852 }
4853
4854
4855
4856 CBwMeasure::CBwMeasure() {
4857     reset();
4858 }
4859
4860 void CBwMeasure::reset(void) {
4861    m_start=false;
4862    m_last_time_msec=0;
4863    m_last_bytes=0;
4864    m_last_result=0.0;
4865 };
4866
4867 double CBwMeasure::calc_MBsec(uint32_t dtime_msec,
4868                              uint64_t dbytes){
4869     double rate=0.000008*( (  (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) );
4870     return (rate);
4871 }
4872
4873 double CBwMeasure::add(uint64_t size) {
4874     if ( false == m_start ){
4875         m_start=true;
4876         m_last_time_msec = os_get_time_msec() ;
4877         m_last_bytes=size;
4878         return (0.0);
4879     }
4880
4881     uint32_t ctime=os_get_time_msec();
4882     if ((ctime - m_last_time_msec) <os_get_time_freq() )  {
4883         return  (m_last_result);
4884     }
4885
4886     uint32_t dtime_msec = ctime-m_last_time_msec;
4887     uint64_t dbytes     = size - m_last_bytes;
4888
4889     m_last_time_msec    = ctime;
4890     m_last_bytes        = size;
4891
4892     m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result);
4893     return ( m_last_result );
4894 }
4895
4896
4897
4898 /*
4899  * Test if option value is within allowed range.
4900  * val - Value to test
4901  * min, max - minimum, maximum allowed values.
4902  * opt_name - option name for error report.
4903  */
4904 bool CParserOption::is_valid_opt_val(int val, int min, int max, const std::string &opt_name) {
4905     if (val < min || val > max) {
4906     std::cerr << "Value " << val << " for option " << opt_name << " is out of range. Should be (" <<  min << "-" << max << ")." << std::endl;
4907     return false;
4908     }
4909
4910     return true;
4911 }
4912
4913 void CParserOption::dump(FILE *fd){
4914     preview.Dump(fd);
4915     fprintf(fd," cfg file        : %s \n",cfg_file.c_str());
4916     fprintf(fd," mac file        : %s \n",client_cfg_file.c_str());
4917     fprintf(fd," out file        : %s \n",out_file.c_str());
4918     fprintf(fd," client cfg file : %s \n",out_file.c_str());
4919     fprintf(fd," duration        : %.0f \n",m_duration);
4920     fprintf(fd," factor          : %.0f \n",m_factor);
4921     fprintf(fd," mbuf_factor     : %.0f \n",m_mbuf_factor);
4922     fprintf(fd," latency         : %d pkt/sec \n",m_latency_rate);
4923     fprintf(fd," zmq_port        : %d \n",m_zmq_port);
4924     fprintf(fd," telnet_port     : %d \n",m_telnet_port);
4925     fprintf(fd," expected_ports  : %d \n",m_expected_portd);
4926     if (preview.get_vlan_mode_enable() ) {
4927        fprintf(fd," vlans     : [%d,%d] \n",m_vlan_port[0],m_vlan_port[1]);
4928     }
4929
4930     int i;
4931     for (i = 0; i < TREX_MAX_PORTS; i++) {
4932         fprintf(fd," port : %d dst:",i);
4933         CMacAddrCfg * lp=&m_mac_addr[i];
4934         dump_mac_addr(fd,lp->u.m_mac.dest);
4935         fprintf(fd,"  src:");
4936         dump_mac_addr(fd,lp->u.m_mac.src);
4937         fprintf(fd,"\n");
4938     }
4939 }
4940
4941 void CParserOption::verify() {
4942     /* check for mutual exclusion options */
4943     if (preview.get_is_client_cfg_enable()) {
4944         if (preview.get_vlan_mode_enable() || preview.get_mac_ip_overide_enable()) {
4945             throw std::runtime_error("VLAN / MAC override cannot be combined with client configuration");
4946         }
4947     }
4948 }
4949
4950 #if 0
4951
4952 void CTupleGlobalGenerator::Dump(FILE *fd){
4953     fprintf(fd," src:%x  dest: %x \n",m_result_src_ip,m_result_dest_ip);
4954 }
4955
4956 bool CTupleGlobalGenerator::Create(){
4957     was_generated=false;
4958     return (true);
4959 }
4960
4961
4962 void CTupleGlobalGenerator::Copy(CTupleGlobalGenerator * gen){
4963    was_generated=false;
4964    m_min_src_ip  = gen->m_min_src_ip;
4965    m_max_src_ip  = gen->m_max_src_ip;
4966    m_min_dest_ip = gen->m_min_dest_ip;
4967    m_max_dest_ip = gen->m_max_dest_ip;
4968 }
4969
4970
4971 void CTupleGlobalGenerator::Delete(){
4972     was_generated=false;
4973 }
4974
4975 #endif
4976
4977 static uint32_t  get_rand_32(uint32_t MinimumRange ,
4978                       uint32_t MaximumRange );
4979
4980
4981 #if 0
4982 void CTupleGlobalGenerator::Generate(uint32_t thread_id,
4983                                      uint32_t num_addr ){
4984     if ( was_generated == false) {
4985         /* first time */
4986         was_generated = true;
4987         cur_src_ip = m_min_src_ip;
4988         cur_dst_ip = m_min_dest_ip;
4989     }
4990
4991     if ( ( cur_src_ip + num_addr ) > m_max_src_ip ) {
4992         cur_src_ip = m_min_src_ip;
4993     }
4994
4995     /* copy the results */
4996     m_result_src_ip  = cur_src_ip;
4997     m_result_dest_ip = cur_dst_ip;
4998     cur_src_ip += num_addr;
4999     cur_dst_ip += 1;
5000     if (cur_dst_ip > m_max_dest_ip ) {
5001         cur_dst_ip = m_min_dest_ip;
5002     }
5003 }
5004
5005
5006
5007
5008 void CTupleTemplateGenerator::Dump(FILE  *fd){
5009     fprintf(fd," id: %x,  %x:%x -  %x \n",m_id,m_result_src_ip,m_result_dest_ip,m_result_src_port);
5010 }
5011
5012
5013 bool CTupleTemplateGenerator::Create(CTupleGlobalGenerator * global_gen,
5014                                      uint16_t w,
5015                                      uint16_t wlength,
5016                                      uint32_t _id,
5017                                      uint32_t thread_id){
5018     m_was_generated = false;
5019     m_thread_id     = thread_id;
5020     m_lp_global_gen = global_gen;
5021     BP_ASSERT(m_lp_global_gen);
5022     m_cur_src_port  = 1;
5023     m_cur_src_port_cnt=0;
5024
5025     m_w = w;
5026     m_wlength = wlength;
5027
5028     m_id = _id;
5029     m_was_init=true;
5030     return(true);
5031 }
5032
5033 void CTupleTemplateGenerator::Delete(){
5034     m_was_generated = false;
5035     m_was_init=false;
5036 }
5037
5038 void CTupleTemplateGenerator::Generate_src_dest(){
5039     /* TBD need to fix the 100*/
5040     m_lp_global_gen->Generate(m_thread_id,m_wlength);
5041     m_result_src_ip  = m_lp_global_gen->m_result_src_ip;
5042
5043     m_dest_ip        = m_lp_global_gen->m_result_dest_ip;
5044     m_result_dest_ip = update_dest_ip(m_dest_ip );
5045     m_cnt=0;
5046 }
5047
5048 uint16_t CTupleTemplateGenerator::GenerateOneSourcePort(){
5049         /* handle port */
5050     m_cur_src_port++;
5051     /* do not use port zero */
5052     if (m_cur_src_port == 0) {
5053         m_cur_src_port=1;
5054     }
5055     m_result_src_port=m_cur_src_port;
5056     return (m_cur_src_port);
5057 }
5058
5059 void CTupleTemplateGenerator::Generate(){
5060     BP_ASSERT(m_was_init);
5061     if ( m_was_generated == false  ) {
5062         /* first time */
5063         Generate_src_dest();
5064         m_was_generated = true;
5065     }else{
5066         /*  ip+cnt,dest+cnt*/
5067         m_cnt++;
5068         if ( m_cnt >= m_wlength ) {
5069             m_cnt =0;
5070             m_result_src_ip -=m_wlength;
5071             m_result_dest_ip = m_dest_ip;
5072             m_cur_src_port_cnt++;
5073             if (m_cur_src_port_cnt >= m_w ) {
5074                 Generate_src_dest();
5075                 m_cur_src_port_cnt=0;
5076             }
5077         }
5078         m_result_src_ip += 1;
5079         m_result_dest_ip = update_dest_ip(m_dest_ip +m_cnt );
5080     }
5081
5082
5083     /* handle port */
5084     m_cur_src_port++;
5085     /* do not use port zero */
5086     if (m_cur_src_port == 0) {
5087         m_cur_src_port=1;
5088     }
5089     m_result_src_ip =update_src_ip( m_result_src_ip );
5090     m_result_src_port=m_cur_src_port;
5091 }
5092
5093 #endif
5094
5095 static uint32_t get_rand_32(uint32_t MinimumRange,
5096                             uint32_t MaximumRange) __attribute__ ((unused));
5097
5098 static uint32_t get_rand_32(uint32_t MinimumRange,
5099                             uint32_t MaximumRange) {
5100     enum {RANDS_NUM = 2 , RAND_MAX_BITS = 0xf , UNSIGNED_INT_BITS = 0x20 , TWO_BITS_MASK = 0x3};
5101     const double TWO_POWER_32_BITS = 0x10000000 * (double)0x10;
5102     uint32_t RandomNumber = 0;
5103
5104     for (int i = 0 ; i < RANDS_NUM;i++) {
5105         RandomNumber = (RandomNumber<<RAND_MAX_BITS) + rand();
5106     }
5107     RandomNumber = (RandomNumber<<(UNSIGNED_INT_BITS - RAND_MAX_BITS * RANDS_NUM)) + (rand() | TWO_BITS_MASK);
5108
5109     uint32_t Range;
5110     if ((Range = MaximumRange - MinimumRange) == 0xffffffff) {
5111         return RandomNumber;
5112     }
5113     return (uint32_t)(((Range + 1) / TWO_POWER_32_BITS * RandomNumber) + MinimumRange );
5114 }
5115
5116
5117
5118 int CNullIF::send_node(CGenNode * node){
5119     #if 0
5120     CFlowPktInfo * lp=node->m_pkt_info;
5121     rte_mbuf_t * buf=lp->generate_new_mbuf(node);
5122     //rte_pktmbuf_dump(buf, buf->pkt_len);
5123     //sending it ??
5124     // free it here as if driver does
5125     rte_pktmbuf_free(buf);
5126     #endif
5127     return (0);
5128 }
5129
5130
5131
5132 void CErfIF::fill_raw_packet(rte_mbuf_t * m,CGenNode * node,pkt_dir_t dir){
5133
5134     fill_pkt(m_raw,m);
5135
5136     CPktNsecTimeStamp t_c(node->m_time);
5137     m_raw->time_nsec = t_c.m_time_nsec;
5138     m_raw->time_sec  = t_c.m_time_sec;
5139     uint8_t p_id = (uint8_t)dir;
5140     m_raw->setInterface(p_id);
5141 }
5142
5143
5144 pkt_dir_t CErfIFStl::port_id_to_dir(uint8_t port_id) {
5145      return ((pkt_dir_t)(port_id&1));
5146 }
5147
5148
5149 int CErfIFStl::update_mac_addr_from_global_cfg(pkt_dir_t  dir, uint8_t * p){
5150     memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(dir),12);
5151     return (0);
5152 }
5153
5154 int CErfIFStl::send_sl_node(CGenNodeStateless *node_sl) {
5155     pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
5156
5157     rte_mbuf_t *    m;
5158     if ( likely(node_sl->is_cache_mbuf_array()) ) {
5159         m=node_sl->cache_mbuf_array_get_cur();
5160         fill_raw_packet(m,(CGenNode *)node_sl,dir);
5161     }else{
5162         m=node_sl->get_cache_mbuf();
5163         bool is_const = false;
5164         if (m) {
5165             is_const = true;
5166             rte_pktmbuf_refcnt_update(m,1);
5167         }else{
5168             m=node_sl->alloc_node_with_vm();
5169             assert(m);
5170         }
5171
5172         if (node_sl->is_stat_needed() && (node_sl->get_stat_hw_id() >= MAX_FLOW_STATS) ) {
5173             /* latency packet. flow stat without latency handled like normal packet in simulation */
5174             uint16_t hw_id = node_sl->get_stat_hw_id();
5175             rte_mbuf_t *mi;
5176             struct flow_stat_payload_header *fsp_head;
5177             mi = node_sl->alloc_flow_stat_mbuf(m, fsp_head, is_const);
5178             fsp_head->seq = 0x12345678;
5179             fsp_head->hw_id = hw_id - MAX_FLOW_STATS;
5180             fsp_head->magic = FLOW_STAT_PAYLOAD_MAGIC;
5181             fsp_head->flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ;
5182             fsp_head->time_stamp = 0x8899aabbccddeeff;
5183             fill_raw_packet(mi, (CGenNode *)node_sl, dir);
5184             rte_pktmbuf_free(mi);
5185         } else {
5186             fill_raw_packet(m,(CGenNode *)node_sl,dir);
5187             rte_pktmbuf_free(m);
5188         }
5189     }
5190         /* check that we have mbuf  */
5191     int rc = write_pkt(m_raw);
5192     BP_ASSERT(rc == 0);
5193
5194     return (rc);
5195 }
5196
5197 int CErfIFStl::send_pcap_node(CGenNodePCAP *pcap_node) {
5198     rte_mbuf_t *m = pcap_node->get_pkt();
5199     if (!m) {
5200         return (-1);
5201     }
5202
5203     pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
5204     fill_raw_packet(m, (CGenNode*)pcap_node, dir);
5205     rte_pktmbuf_free(m);
5206
5207     int rc = write_pkt(m_raw);
5208     BP_ASSERT(rc == 0);
5209
5210     return (rc);
5211 }
5212
5213 /*
5214  * This is the simulation stateless send_node.
5215  * in simulation (bp-sim-64) it is called instead of CCoreEthIFStateless::send_node
5216  * Purpose is to test the mbuf manipulation functions which are the same in simulation and "real" code
5217  */
5218 int CErfIFStl::send_node(CGenNode * _no_to_use){
5219
5220     if ( m_preview_mode->getFileWrite() ) {
5221
5222         switch (_no_to_use->m_type) {
5223         case CGenNode::STATELESS_PKT:
5224             return send_sl_node((CGenNodeStateless *) _no_to_use);
5225
5226         case CGenNode::PCAP_PKT:
5227             return send_pcap_node((CGenNodePCAP *) _no_to_use);
5228
5229         default:
5230             assert(0);
5231         }
5232     }
5233     return (0);
5234 }
5235
5236 void CErfIF::add_vlan(uint16_t vlan_id) {
5237     uint8_t *buffer =(uint8_t *)m_raw->raw;
5238
5239     uint16_t vlan_protocol = EthernetHeader::Protocol::VLAN;
5240     uint32_t vlan_tag = (vlan_protocol << 16) | vlan_id;
5241     vlan_tag = PKT_HTONL(vlan_tag);
5242
5243     /* insert vlan tag and adjust packet size */
5244     memcpy(cbuff+4, buffer + 12, m_raw->pkt_len - 12);
5245     memcpy(cbuff, &vlan_tag, 4);
5246     memcpy(buffer + 12, cbuff, m_raw->pkt_len - 8);
5247
5248     m_raw->pkt_len += 4;
5249 }
5250
5251 void CErfIF::apply_client_config(const ClientCfg *cfg, pkt_dir_t dir) {
5252     assert(cfg);
5253     uint8_t *p = (uint8_t *)m_raw->raw;
5254
5255     const ClientCfgDir &cfg_dir = ( (dir == CLIENT_SIDE) ? cfg->m_initiator : cfg->m_responder);
5256
5257     /* dst mac */
5258     if (cfg_dir.has_dst_mac_addr()) {
5259         memcpy(p, cfg_dir.get_dst_mac_addr(), 6);
5260     }
5261
5262     /* src mac */
5263     if (cfg_dir.has_src_mac_addr()) {
5264         memcpy(p + 6, cfg_dir.get_src_mac_addr(), 6);
5265     }
5266
5267     /* VLAN */
5268     if (cfg_dir.has_vlan()) {
5269         add_vlan(cfg_dir.get_vlan());
5270     }
5271 }
5272
5273 int CErfIF::send_node(CGenNode *node){
5274
5275     if (!m_preview_mode->getFileWrite()) {
5276         return (0);
5277     }
5278
5279     CFlowPktInfo *lp = node->m_pkt_info;
5280     rte_mbuf_t   *m  = lp->generate_new_mbuf(node);
5281     pkt_dir_t    dir = node->cur_interface_dir();
5282
5283     fill_raw_packet(m, node, dir);
5284
5285     /* update mac addr dest/src 12 bytes */
5286     uint8_t *p=(uint8_t *)m_raw->raw;
5287     int p_id=(int)dir;
5288     memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12);
5289
5290     /* if a client configuration was provided - apply the config */
5291     if (CGlobalInfo::m_options.preview.get_is_client_cfg_enable()) {
5292         apply_client_config(node->m_client_cfg, dir);
5293
5294     } else if (CGlobalInfo::m_options.preview.get_vlan_mode_enable()) {
5295         uint8_t vlan_port = (node->m_src_ip & 1);
5296         uint16_t vlan_id = CGlobalInfo::m_options.m_vlan_port[vlan_port];
5297         add_vlan(vlan_id);
5298     }
5299
5300     //utl_DumpBuffer(stdout,p,  12,0);
5301
5302     int rc = write_pkt(m_raw);
5303     BP_ASSERT(rc == 0);
5304
5305     rte_pktmbuf_free(m);
5306
5307     return (0);
5308 }
5309
5310 int CErfIF::flush_tx_queue(void){
5311     return (0);
5312 }
5313
5314 void CTcpSeq::update(uint8_t *p, CFlowPktInfo *pkt_info, int16_t s_size){
5315     TCPHeader *tcp= (TCPHeader *)(p+pkt_info->m_pkt_indication.getFastTcpOffset());
5316     uint32_t seqnum, acknum;
5317
5318     // This routine will adjust the TCP segment size for packets
5319     // based on the modifications made by the plugins.
5320     // Basically it will keep track of the size changes
5321     // and adjust the TCP sequence numbers accordingly.
5322
5323     bool is_init=pkt_info->m_pkt_indication.m_desc.IsInitSide();
5324
5325     // Update TCP seq number
5326     seqnum = tcp->getSeqNumber();
5327     acknum = tcp->getAckNumber();
5328     if (is_init) {
5329         // Packet is from client
5330         seqnum += client_seq_delta;
5331         acknum += server_seq_delta;
5332     } else {
5333         // Packet is from server
5334         seqnum += server_seq_delta;
5335         acknum += client_seq_delta;
5336     }
5337     tcp->setSeqNumber(seqnum);
5338     tcp->setAckNumber(acknum);
5339
5340     // Adjust delta being tracked
5341     if (is_init) {
5342         client_seq_delta += s_size;
5343     } else {
5344         server_seq_delta += s_size;
5345     }
5346 }
5347
5348
5349 void on_node_first(uint8_t plugin_id,CGenNode *     node,
5350                    CFlowYamlInfo *  template_info,
5351                    CTupleTemplateGeneratorSmart * tuple_gen,
5352                    CFlowGenListPerThread  * flow_gen){
5353
5354     if (CPluginCallback::callback) {
5355         CPluginCallback::callback->on_node_first(plugin_id,node,template_info, tuple_gen,flow_gen);
5356     }
5357 }
5358
5359 void on_node_last(uint8_t plugin_id,CGenNode *     node){
5360     if (CPluginCallback::callback) {
5361         CPluginCallback::callback->on_node_last(plugin_id,node);
5362     }
5363
5364 }
5365
5366 rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode *     node,CFlowPktInfo * pkt_info){
5367     rte_mbuf_t * m;
5368     assert(CPluginCallback::callback);
5369     m=CPluginCallback::callback->on_node_generate_mbuf(plugin_id,node,pkt_info);
5370     assert(m);
5371     return(m);
5372 }
5373
5374
5375 class CPlugin_rtsp : public CTcpSeq {
5376 public:
5377     void *   m_gen;
5378     uint16_t rtp_client_0;
5379     uint16_t rtp_client_1;
5380 };
5381
5382
5383 void CPluginCallbackSimple::on_node_first(uint8_t plugin_id,
5384                                           CGenNode *     node,
5385                                           CFlowYamlInfo *  template_info,
5386                                           CTupleTemplateGeneratorSmart * tuple_gen,
5387                                           CFlowGenListPerThread  * flow_gen ){
5388     //printf(" on on_node_first callback  %d  node %x! \n",(int)plugin_id,node);
5389     /* generate 2 ports from client side */
5390
5391     if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
5392         CPlugin_rtsp * lpP=new CPlugin_rtsp();
5393         assert(lpP);
5394
5395         /* TBD need to be fixed using new API */
5396         lpP->rtp_client_0 = tuple_gen->GenerateOneSourcePort();
5397         lpP->rtp_client_1 = tuple_gen->GenerateOneSourcePort();
5398         lpP->m_gen=flow_gen;
5399         node->m_plugin_info = (void *)lpP;
5400     }else{
5401         if (plugin_id ==mpDYN_PYLOAD) {
5402             /* nothing to do */
5403         }else{
5404             if (plugin_id ==mpAVL_HTTP_BROWSIN) {
5405                 CTcpSeq * lpP=new CTcpSeq();
5406                 assert(lpP);
5407                 node->m_plugin_info = (void *)lpP;
5408             }else{
5409                 /* do not support this */
5410                 assert(0);
5411             }
5412         }
5413     }
5414 }
5415
5416 void CPluginCallbackSimple::on_node_last(uint8_t plugin_id,CGenNode *     node){
5417     //printf(" on on_node_last callback  %d  %x! \n",(int)plugin_id,node);
5418     if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
5419         CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
5420         /* free the ports */
5421         CFlowGenListPerThread  * flow_gen=(CFlowGenListPerThread  *) lpP->m_gen;
5422         bool is_tcp=node->m_pkt_info->m_pkt_indication.m_desc.IsTcp();
5423         flow_gen->defer_client_port_free(is_tcp,node->m_src_idx,lpP->rtp_client_0,
5424                                 node->m_template_info->m_client_pool_idx,node->m_tuple_gen);
5425         flow_gen->defer_client_port_free(is_tcp,node->m_src_idx,lpP->rtp_client_1,
5426                                 node->m_template_info->m_client_pool_idx,  node->m_tuple_gen);
5427
5428         assert(lpP);
5429         delete lpP;
5430         node->m_plugin_info=0;
5431     }else{
5432         if (plugin_id ==mpDYN_PYLOAD) {
5433             /* nothing to do */
5434         }else{
5435             if (plugin_id ==mpAVL_HTTP_BROWSIN) {
5436                 /* nothing to do */
5437                 CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
5438                 delete lpP;
5439                 node->m_plugin_info=0;
5440             }else{
5441                 /* do not support this */
5442                 assert(0);
5443             }
5444         }
5445     }
5446 }
5447
5448 rte_mbuf_t * CPluginCallbackSimple::http_plugin(uint8_t plugin_id,
5449                                                 CGenNode *     node,
5450                                                 CFlowPktInfo * pkt_info){
5451     CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5452     assert(lpd->getFlowId()==0); /* only one flow */
5453     CMiniVMCmdBase * program[2];
5454     CMiniVMReplaceIP  replace_cmd;
5455     CMiniVMCmdBase    eop_cmd;
5456     CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
5457     assert(lpP);
5458     rte_mbuf_t *mbuf;
5459     int16_t s_size=0;
5460
5461     if ( likely (lpd->getFlowPktNum() != 3) ){
5462         if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
5463             // Request a larger initial segment for IPv6
5464             mbuf = pkt_info->do_generate_new_mbuf_big(node);
5465         }else{
5466             mbuf = pkt_info->do_generate_new_mbuf(node);
5467         }
5468
5469     }else{
5470         CFlowInfo flow_info;
5471         flow_info.vm_program=0;
5472
5473         flow_info.client_ip = node->m_src_ip;
5474         flow_info.server_ip = node->m_dest_ip;
5475         flow_info.client_port = node->m_src_port;
5476         flow_info.server_port = 0;
5477         flow_info.replace_server_port =false;
5478         flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5479         flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5480
5481
5482         replace_cmd.m_cmd     = VM_REPLACE_IP_OFFSET;
5483         replace_cmd.m_flags   = 0;
5484
5485         // Determine how much larger the packet needs to be to
5486         // handle the largest IP address. There is a single address
5487         // string of 8 bytes that needs to be replaced.
5488         if (CGlobalInfo::is_ipv6_enable() ) {
5489             // For IPv6, accomodate use of brackets (+2 bytes)
5490             replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 8;
5491
5492             // Mark as IPv6 and set the upper 96-bits
5493             replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5494             for (uint8_t idx=0; idx<6; idx++){
5495                 replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5496             }
5497         } else {
5498            replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
5499         }
5500
5501         // Set m_start_0/m_stop_1 at start/end of IP address to be replaced.
5502         // For this packet we know the IP addr string length is 8 bytes.
5503         replace_cmd.m_start_0 = 10+16;
5504         replace_cmd.m_stop_1  = replace_cmd.m_start_0 + 8;
5505
5506         replace_cmd.m_server_ip.v4 = flow_info.server_ip;
5507
5508         eop_cmd.m_cmd = VM_EOP;
5509
5510         program[0] = &replace_cmd;
5511         program[1] = &eop_cmd;
5512
5513         flow_info.vm_program = program;
5514
5515         mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
5516     }
5517
5518     // Fixup the TCP sequence numbers
5519     uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
5520
5521     // Update TCP sequence numbers
5522     lpP->update(p, pkt_info, s_size);
5523
5524     return(mbuf);
5525 }
5526
5527 rte_mbuf_t * CPluginCallbackSimple::dyn_pyload_plugin(uint8_t plugin_id,
5528                                                       CGenNode *     node,
5529                                                       CFlowPktInfo * pkt_info){
5530
5531     CMiniVMCmdBase * program[2];
5532
5533     CMiniVMDynPyload  dyn_cmd;
5534     CMiniVMCmdBase    eop_cmd;
5535
5536     CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5537     CFlowYamlDynamicPyloadPlugin   *  lpt = node->m_template_info->m_dpPkt;
5538     assert(lpt);
5539     CFlowInfo flow_info;
5540     flow_info.vm_program=0;
5541     int16_t s_size=0;
5542
5543     // IPv6 packets are not supported
5544     if (CGlobalInfo::is_ipv6_enable() ) {
5545          fprintf (stderr," IPv6 is not supported for dynamic pyload change\n");
5546          exit(-1);
5547     }
5548
5549     if ( lpd->getFlowId() == 0 ) {
5550
5551         flow_info.client_ip = node->m_src_ip;
5552         flow_info.server_ip = node->m_dest_ip;
5553         flow_info.client_port = node->m_src_port;
5554         flow_info.server_port = 0;
5555         flow_info.replace_server_port =false;
5556         flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5557         flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5558
5559         uint32_t pkt_num = lpd->getFlowPktNum();
5560         if (pkt_num < 253) {
5561             int i;
5562             /* fast filter */
5563             for (i=0; i<lpt->m_num; i++) {
5564                 if (lpt->m_pkt_ids[i] == pkt_num ) {
5565                      //add a program here
5566                     dyn_cmd.m_cmd     = VM_DYN_PYLOAD;
5567                     dyn_cmd.m_ptr= &lpt->m_program[i];
5568                     dyn_cmd.m_flags   = 0;
5569                     dyn_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
5570                     dyn_cmd.m_ip.v4=node->m_src_ip;
5571
5572                     eop_cmd.m_cmd = VM_EOP;
5573                     program[0] = &dyn_cmd;
5574                     program[1] = &eop_cmd;
5575
5576                     flow_info.vm_program = program;
5577                 }
5578             }
5579         }
5580         // only for the first flow
5581      }else{
5582          fprintf (stderr," only one flow is allowed for dynamic pyload change \n");
5583          exit(-1);
5584      }/* only for the first flow */
5585
5586     if ( unlikely( flow_info.vm_program != 0 ) ) {
5587
5588         return (  pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
5589     }else{
5590         return (  pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
5591     }
5592 }
5593
5594 rte_mbuf_t * CPluginCallbackSimple::sip_voice_plugin(uint8_t plugin_id,CGenNode *     node,CFlowPktInfo * pkt_info){
5595     CMiniVMCmdBase * program[2];
5596
5597     CMiniVMReplaceIP_PORT_IP_IP_Port  via_replace_cmd;
5598     CMiniVMCmdBase    eop_cmd;
5599
5600     CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5601     CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
5602     assert(lpP);
5603   //  printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
5604     CFlowInfo flow_info;
5605     flow_info.vm_program=0;
5606     int16_t s_size=0;
5607
5608     switch ( lpd->getFlowId() ) {
5609     /* flow - SIP , packet #0,#1 control  */
5610     case 0:
5611         flow_info.client_ip = node->m_src_ip;
5612         flow_info.server_ip = node->m_dest_ip;
5613         flow_info.client_port = node->m_src_port;
5614         flow_info.server_port = 0;
5615         flow_info.replace_server_port =false;
5616         flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5617         flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5618
5619
5620         /* program to replace ip server */
5621         switch ( lpd->getFlowPktNum() ) {
5622         case 0:
5623             {
5624                 via_replace_cmd.m_cmd     = VM_REPLACE_IPVIA_IP_IP_PORT;
5625                 via_replace_cmd.m_flags   = 0;
5626                 via_replace_cmd.m_start_0 = 0;
5627                 via_replace_cmd.m_stop_1  = 0;
5628
5629                 // Determine how much larger the packet needs to be to
5630                 // handle the largest IP address. There are 3 address
5631                 // strings (each 9 bytes) that needs to be replaced.
5632                 // We also need to accomodate IPv6 use of brackets
5633                 // (+2 bytes) in a URI.
5634                 // There are also 2 port strings that needs to be
5635                 //  replaced (1 is 4 bytes the other is 5 bytes).
5636                 if (CGlobalInfo::is_ipv6_enable() ) {
5637                     via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9)  * 3) +
5638                          ((INET_PORTSTRLEN * 2) - 9);
5639
5640                     // Mark as IPv6 and set the upper 96-bits
5641                     via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5642                     for (uint8_t idx=0; idx<6; idx++){
5643                         via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
5644                         via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
5645                     }
5646                 } else {
5647                     via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9)  * 3) +
5648                          ((INET_PORTSTRLEN * 2) - 9);
5649                 }
5650                 via_replace_cmd.m_ip.v4      =node->m_src_ip;
5651                 via_replace_cmd.m_ip0_start  = 377;
5652                 via_replace_cmd.m_ip0_stop   = 377+9;
5653
5654                 via_replace_cmd.m_ip1_start  = 409;
5655                 via_replace_cmd.m_ip1_stop   = 409+9;
5656
5657
5658                 via_replace_cmd.m_port       =lpP->rtp_client_0;
5659                 via_replace_cmd.m_port_start = 435;
5660                 via_replace_cmd.m_port_stop  = 435+5;
5661
5662                 via_replace_cmd.m_ip_via.v4     =  node->m_src_ip;
5663                 via_replace_cmd.m_port_via   =  node->m_src_port;
5664
5665                 via_replace_cmd.m_ip_via_start = 208;
5666                 via_replace_cmd.m_ip_via_stop  = 208+9+5;
5667
5668
5669                 eop_cmd.m_cmd = VM_EOP;
5670
5671                 program[0] = &via_replace_cmd;
5672                 program[1] = &eop_cmd;
5673
5674                 flow_info.vm_program = program;
5675             }
5676             break;
5677         case 1:
5678             {
5679                 via_replace_cmd.m_cmd     = VM_REPLACE_IPVIA_IP_IP_PORT;
5680                 via_replace_cmd.m_flags   = 0;
5681                 via_replace_cmd.m_start_0 = 0;
5682                 via_replace_cmd.m_stop_1  = 0;
5683
5684                 // Determine how much larger the packet needs to be to
5685                 // handle the largest IP address. There are 3 address
5686                 // strings (each 9 bytes) that needs to be replaced.
5687                 // We also need to accomodate IPv6 use of brackets
5688                 // (+2 bytes) in a URI.
5689                 // There are also 2 port strings that needs to be
5690                 //  replaced (1 is 4 bytes the other is 5 bytes).
5691                 if (CGlobalInfo::is_ipv6_enable() ) {
5692                     via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9)  * 3) +
5693                          ((INET_PORTSTRLEN * 2) - 9);
5694
5695                     // Mark as IPv6 and set the upper 96-bits
5696                     via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5697                     for (uint8_t idx=0; idx<6; idx++){
5698                         via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5699                         via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
5700                    }
5701                 } else {
5702                     via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9)  * 3) +
5703                          ((INET_PORTSTRLEN * 2) - 9);
5704                 }
5705
5706                 via_replace_cmd.m_ip.v4      =node->m_dest_ip;
5707                 via_replace_cmd.m_ip0_start  = 370;
5708                 via_replace_cmd.m_ip0_stop   = 370+8;
5709
5710                 via_replace_cmd.m_ip1_start  = 401;
5711                 via_replace_cmd.m_ip1_stop   = 401+8;
5712
5713
5714                 via_replace_cmd.m_port       =lpP->rtp_client_0;
5715                 via_replace_cmd.m_port_start = 426;
5716                 via_replace_cmd.m_port_stop  = 426+5;
5717
5718
5719                 via_replace_cmd.m_ip_via.v4  =  node->m_src_ip;
5720                 via_replace_cmd.m_port_via   =  node->m_src_port;
5721
5722                 via_replace_cmd.m_ip_via_start = 207;
5723                 via_replace_cmd.m_ip_via_stop  = 207+9+5;
5724
5725                 eop_cmd.m_cmd = VM_EOP;
5726
5727                 program[0] = &via_replace_cmd;
5728                 program[1] = &eop_cmd;
5729
5730                 flow_info.vm_program = program;
5731             }
5732             break;
5733
5734
5735         }/* end of big switch on packet */
5736         break;
5737
5738     case 1:
5739         flow_info.client_ip = node->m_src_ip ;
5740         flow_info.server_ip = node->m_dest_ip;
5741         flow_info.client_port = lpP->rtp_client_0;
5742         /* this is tricky ..*/
5743         flow_info.server_port = lpP->rtp_client_0;
5744         flow_info.replace_server_port = true;
5745         flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5746         flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5747
5748         break;
5749     default:
5750         assert(0);
5751         break;
5752     };
5753
5754     //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x  init:%x  replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
5755
5756     //printf(" program %p  \n",flow_info.vm_program);
5757     if ( unlikely( flow_info.vm_program != 0 ) ) {
5758
5759         return (  pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
5760     }else{
5761         return (  pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
5762     }
5763 }
5764
5765 rte_mbuf_t * CPluginCallbackSimple::rtsp_plugin(uint8_t plugin_id,CGenNode *     node,CFlowPktInfo * pkt_info){
5766
5767     CMiniVMCmdBase * program[2];
5768
5769     CMiniVMReplaceIP  replace_cmd;
5770     CMiniVMCmdBase    eop_cmd;
5771     CMiniVMReplaceIPWithPort  replace_port_cmd;
5772     rte_mbuf_t *mbuf;
5773
5774     CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
5775     CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
5776
5777     assert(lpP);
5778   //  printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
5779     CFlowInfo flow_info;
5780     flow_info.vm_program=0;
5781     int16_t s_size=0;
5782
5783     switch ( lpd->getFlowId() ) {
5784     /* flow - control  */
5785     case 0:
5786         flow_info.client_ip = node->m_src_ip;
5787         flow_info.server_ip = node->m_dest_ip;
5788         flow_info.client_port = node->m_src_port;
5789         flow_info.server_port = 0;
5790         flow_info.replace_server_port =false;
5791         flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
5792         flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
5793
5794
5795         /* program to replace ip server */
5796         switch ( lpd->getFlowPktNum() ) {
5797         case 3:
5798             {
5799                 replace_cmd.m_cmd     = VM_REPLACE_IP_OFFSET;
5800                 replace_cmd.m_flags   = 0;
5801                 replace_cmd.m_start_0 = 16;
5802                 replace_cmd.m_stop_1  = 16+9;
5803
5804                 // Determine how much larger the packet needs to be to
5805                 // handle the largest IP address. There is a single address
5806                 // string of 9 bytes that needs to be replaced.
5807                 if (CGlobalInfo::is_ipv6_enable() ) {
5808                     // For IPv6, accomodate use of brackets (+2 bytes)
5809                     replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
5810
5811                     // Mark as IPv6 and set the upper 96-bits
5812                     replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5813                     for (uint8_t idx=0; idx<6; idx++){
5814                         replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5815                     }
5816                 } else {
5817                     replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
5818                 }
5819                 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
5820
5821                 eop_cmd.m_cmd = VM_EOP;
5822
5823                 program[0] = &replace_cmd;
5824                 program[1] = &eop_cmd;
5825
5826                 flow_info.vm_program = program;
5827             }
5828             break;
5829         case 4:
5830             {
5831                 replace_cmd.m_cmd     = VM_REPLACE_IP_OFFSET;
5832                 replace_cmd.m_flags   = 0;
5833                 replace_cmd.m_start_0 = 46;
5834                 replace_cmd.m_stop_1  = 46+9;
5835
5836                 // Determine how much larger the packet needs to be to
5837                 // handle the largest IP address. There is a single address
5838                 // string of 9 bytes that needs to be replaced.
5839                 if (CGlobalInfo::is_ipv6_enable() ) {
5840                     // For IPv6, accomodate use of brackets (+2 bytes)
5841                     replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
5842
5843                     // Mark as IPv6 and set the upper 96-bits
5844                     replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5845                     for (uint8_t idx=0; idx<6; idx++){
5846                         replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5847                     }
5848                 } else {
5849                     replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
5850                 }
5851                 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
5852
5853                 eop_cmd.m_cmd = VM_EOP;
5854
5855                 program[0] = &replace_cmd;
5856                 program[1] = &eop_cmd;
5857
5858                 flow_info.vm_program = program;
5859             }
5860             break;
5861
5862         case 5:
5863             {
5864
5865                 replace_port_cmd.m_cmd     = VM_REPLACE_IP_PORT_OFFSET;
5866                 replace_port_cmd.m_flags   = 0;
5867                 replace_port_cmd.m_start_0 = 13;
5868                 replace_port_cmd.m_stop_1  = 13+9;
5869
5870                 // Determine how much larger the packet needs to be to
5871                 // handle the largest IP address. There is a single address
5872                 // string of 9 bytes that needs to be replaced.
5873                 // There are also 2 port strings (8 bytes) that needs to be
5874                 //  replaced.
5875                 if (CGlobalInfo::is_ipv6_enable() ) {
5876                     // For IPv6, accomodate use of brackets (+2 bytes)
5877                     replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
5878                          ((INET_PORTSTRLEN * 2) - 8);
5879
5880                     // Mark as IPv6 and set the upper 96-bits
5881                     replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5882                     for (uint8_t idx=0; idx<6; idx++){
5883                         replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5884                     }
5885                 } else {
5886                     replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
5887                          ((INET_PORTSTRLEN * 2) - 8);
5888                 }
5889                 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5890                 replace_port_cmd.m_start_port =  164;
5891                 replace_port_cmd.m_stop_port  =  164+(4*2)+1;
5892                 replace_port_cmd.m_client_port = lpP->rtp_client_0;
5893                 replace_port_cmd.m_server_port =0;
5894
5895
5896                 eop_cmd.m_cmd = VM_EOP;
5897
5898                 program[0] = &replace_port_cmd;
5899                 program[1] = &eop_cmd;
5900
5901                 flow_info.vm_program = program;
5902             }
5903             break;
5904
5905         case 6:
5906             {
5907
5908                 replace_port_cmd.m_cmd     = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
5909                 replace_port_cmd.m_flags   = 0;
5910                 replace_port_cmd.m_start_0 = 0;
5911                 replace_port_cmd.m_stop_1  = 0;
5912
5913                 // Determine how much larger the packet needs to be to
5914                 // handle the largest port addresses. There are 4 port address
5915                 // strings (16 bytes) that needs to be replaced.
5916                 replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
5917
5918                 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5919                 replace_port_cmd.m_start_port =  247;
5920                 replace_port_cmd.m_stop_port  = 247+(4*4)+2+13;
5921                 replace_port_cmd.m_client_port = lpP->rtp_client_0;
5922                 replace_port_cmd.m_server_port = lpP->rtp_client_0;
5923
5924
5925                 eop_cmd.m_cmd = VM_EOP;
5926
5927                 program[0] = &replace_port_cmd;
5928                 program[1] = &eop_cmd;
5929
5930                 flow_info.vm_program = program;
5931             }
5932             break;
5933
5934
5935         case 7:
5936             {
5937
5938                 replace_port_cmd.m_cmd     = VM_REPLACE_IP_PORT_OFFSET;
5939                 replace_port_cmd.m_flags   = 0;
5940                 replace_port_cmd.m_start_0 = 13;
5941                 replace_port_cmd.m_stop_1  = 13+9;
5942
5943                 // Determine how much larger the packet needs to be to
5944                 // handle the largest IP address. There is a single address
5945                 // string of 9 bytes that needs to be replaced.
5946                 // There are also 2 port strings (8 bytes) that needs to be
5947                 //  replaced.
5948                 if (CGlobalInfo::is_ipv6_enable() ) {
5949                     // For IPv6, accomodate use of brackets (+2 bytes)
5950                     replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
5951                          ((INET_PORTSTRLEN * 2) - 8);
5952
5953                     // Mark as IPv6 and set the upper 96-bits
5954                     replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
5955                     for (uint8_t idx=0; idx<6; idx++){
5956                         replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
5957                     }
5958                 } else {
5959                     replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
5960                          ((INET_PORTSTRLEN * 2) - 8);
5961                 }
5962                 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5963                 replace_port_cmd.m_start_port =  164;
5964                 replace_port_cmd.m_stop_port  =  164+(4*2)+1;
5965                 replace_port_cmd.m_client_port = lpP->rtp_client_1;
5966                 replace_port_cmd.m_server_port =0;
5967
5968
5969                 eop_cmd.m_cmd = VM_EOP;
5970
5971                 program[0] = &replace_port_cmd;
5972                 program[1] = &eop_cmd;
5973
5974                 flow_info.vm_program = program;
5975             }
5976             break;
5977
5978         case 8:
5979
5980             {
5981
5982                 replace_port_cmd.m_cmd     = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
5983                 replace_port_cmd.m_flags   = 0;
5984                 replace_port_cmd.m_start_0 = 0;
5985                 replace_port_cmd.m_stop_1  = 0;
5986
5987                 // Determine how much larger the packet needs to be to
5988                 // handle the largest port addresses. There are 4 port address
5989                 // strings (16 bytes) that needs to be replaced.
5990                 replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
5991
5992                 replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
5993                 replace_port_cmd.m_start_port =  247;
5994                 replace_port_cmd.m_stop_port  = 247+(4*4)+2+13;
5995                 replace_port_cmd.m_client_port = lpP->rtp_client_1;
5996                 replace_port_cmd.m_server_port = lpP->rtp_client_1;
5997
5998
5999                 eop_cmd.m_cmd = VM_EOP;
6000
6001                 program[0] = &replace_port_cmd;
6002                 program[1] = &eop_cmd;
6003
6004                 flow_info.vm_program = program;
6005             }
6006             break;
6007
6008         /* PLAY */
6009         case 9:
6010             {
6011
6012                 replace_cmd.m_cmd     = VM_REPLACE_IP_OFFSET;
6013                 replace_cmd.m_flags   = 0;
6014                 replace_cmd.m_start_0 = 12;
6015                 replace_cmd.m_stop_1  = 12+9;
6016
6017                 // Determine how much larger the packet needs to be to
6018                 // handle the largest IP address. There is a single address
6019                 // string of 9 bytes that needs to be replaced.
6020                 if (CGlobalInfo::is_ipv6_enable() ) {
6021                     // For IPv6, accomodate use of brackets (+2 bytes)
6022                     replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6023
6024                     // Mark as IPv6 and set the upper 96-bits
6025                     replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6026                     for (uint8_t idx=0; idx<6; idx++){
6027                         replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6028                     }
6029                 } else {
6030                     replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6031                 }
6032                 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6033
6034                 eop_cmd.m_cmd = VM_EOP;
6035
6036                 program[0] = &replace_cmd;
6037                 program[1] = &eop_cmd;
6038
6039                 flow_info.vm_program = program;
6040             }
6041             break;
6042
6043         /*OPTION 0*/
6044         case 12:
6045             {
6046
6047                 replace_cmd.m_cmd     = VM_REPLACE_IP_OFFSET;
6048                 replace_cmd.m_flags   = 0;
6049                 replace_cmd.m_start_0 = 15;
6050                 replace_cmd.m_stop_1  = 15+9;
6051
6052                 // Determine how much larger the packet needs to be to
6053                 // handle the largest IP address. There is a single address
6054                 // string of 9 bytes that needs to be replaced.
6055                 if (CGlobalInfo::is_ipv6_enable() ) {
6056                     // For IPv6, accomodate use of brackets (+2 bytes)
6057                     replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6058
6059                     // Mark as IPv6 and set the upper 96-bits
6060                     replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6061                     for (uint8_t idx=0; idx<6; idx++){
6062                         replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6063                     }
6064                 } else {
6065                     replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6066                 }
6067                 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6068
6069                 eop_cmd.m_cmd = VM_EOP;
6070
6071                 program[0] = &replace_cmd;
6072                 program[1] = &eop_cmd;
6073
6074                 flow_info.vm_program = program;
6075             }
6076             break;
6077
6078         /* option #2*/
6079         case 15:
6080             {
6081
6082                 replace_cmd.m_cmd     = VM_REPLACE_IP_OFFSET;
6083                 replace_cmd.m_flags   = 0;
6084                 replace_cmd.m_start_0 = 15;
6085                 replace_cmd.m_stop_1  = 15+9;
6086
6087                 // Determine how much larger the packet needs to be to
6088                 // handle the largest IP address. There is a single address
6089                 // string of 9 bytes that needs to be replaced.
6090                 if (CGlobalInfo::is_ipv6_enable() ) {
6091                     // For IPv6, accomodate use of brackets (+2 bytes)
6092                     replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6093
6094                     // Mark as IPv6 and set the upper 96-bits
6095                     replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6096                     for (uint8_t idx=0; idx<6; idx++){
6097                         replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6098                     }
6099                 } else {
6100                     replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6101                 }
6102                 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6103
6104                 eop_cmd.m_cmd = VM_EOP;
6105
6106                 program[0] = &replace_cmd;
6107                 program[1] = &eop_cmd;
6108
6109                 flow_info.vm_program = program;
6110             }
6111             break;
6112
6113         case 18:
6114             {
6115
6116                 replace_cmd.m_cmd     = VM_REPLACE_IP_OFFSET;
6117                 replace_cmd.m_flags   = 0;
6118                 replace_cmd.m_start_0 = 16;
6119                 replace_cmd.m_stop_1  = 16+9;
6120
6121                 // Determine how much larger the packet needs to be to
6122                 // handle the largest IP address. There is a single address
6123                 // string of 9 bytes that needs to be replaced.
6124                 if (CGlobalInfo::is_ipv6_enable() ) {
6125                     // For IPv6, accomodate use of brackets (+2 bytes)
6126                     replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
6127
6128                     // Mark as IPv6 and set the upper 96-bits
6129                     replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
6130                     for (uint8_t idx=0; idx<6; idx++){
6131                         replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
6132                     }
6133                 } else {
6134                     replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
6135                 }
6136                 replace_cmd.m_server_ip.v4 = flow_info.server_ip;
6137
6138                 eop_cmd.m_cmd = VM_EOP;
6139
6140                 program[0] = &replace_cmd;
6141                 program[1] = &eop_cmd;
6142
6143                 flow_info.vm_program = program;
6144             }
6145             break;
6146
6147
6148         }/* end of big switch on packet */
6149         break;
6150
6151     case 1:
6152         flow_info.client_ip = node->m_src_ip ;
6153         flow_info.server_ip = node->m_dest_ip;
6154         flow_info.client_port = lpP->rtp_client_0;
6155         /* this is tricky ..*/
6156         flow_info.server_port = lpP->rtp_client_0;
6157         flow_info.replace_server_port = true;
6158         flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
6159         flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
6160
6161         break;
6162     case 2:
6163         flow_info.client_ip = node->m_src_ip ;
6164         flow_info.server_ip = node->m_dest_ip;
6165         flow_info.client_port = lpP->rtp_client_1;
6166         /* this is tricky ..*/
6167         flow_info.server_port = lpP->rtp_client_1;
6168         flow_info.replace_server_port =true;
6169         flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
6170         flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
6171
6172         break;
6173     default:
6174         assert(0);
6175         break;
6176     };
6177
6178     //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x  init:%x  replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
6179
6180     //printf(" program %p  \n",flow_info.vm_program);
6181     if ( unlikely( flow_info.vm_program != 0 ) ) {
6182
6183         mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
6184     }else{
6185         if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
6186             // Request a larger initial segment for IPv6
6187             mbuf = pkt_info->do_generate_new_mbuf_ex_big(node,&flow_info);
6188         }else{
6189             mbuf = pkt_info->do_generate_new_mbuf_ex(node,&flow_info);
6190         }
6191     }
6192
6193     // Fixup the TCP sequence numbers for the TCP flow
6194     if ( lpd->getFlowId() == 0 ) {
6195         uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
6196
6197         // Update TCP sequence numbers
6198         lpP->update(p, pkt_info, s_size);
6199     }
6200
6201     return(mbuf);
6202 }
6203
6204
6205 /* replace the tuples */
6206 rte_mbuf_t * CPluginCallbackSimple::on_node_generate_mbuf(uint8_t plugin_id,CGenNode *     node,CFlowPktInfo * pkt_info){
6207
6208     rte_mbuf_t * m=NULL;
6209     switch (plugin_id) {
6210     case mpRTSP:
6211         m=rtsp_plugin(plugin_id,node,pkt_info);
6212         break;
6213     case mpSIP_VOICE:
6214         m=sip_voice_plugin(plugin_id,node,pkt_info);
6215         break;
6216     case  mpDYN_PYLOAD:
6217         m=dyn_pyload_plugin(plugin_id,node,pkt_info);
6218         break;
6219     case mpAVL_HTTP_BROWSIN:
6220         m=http_plugin(plugin_id,node,pkt_info);
6221         break;
6222     default:
6223         assert(0);
6224     }
6225     return (m);
6226 }
6227
6228
6229 int CMiniVM::mini_vm_run(CMiniVMCmdBase * cmds[]){
6230
6231     m_new_pkt_size=0;
6232     bool need_to_stop=false;
6233     int cnt=0;
6234     CMiniVMCmdBase * cmd=cmds[cnt];
6235     while (! need_to_stop) {
6236         switch (cmd->m_cmd) {
6237         case VM_REPLACE_IP_OFFSET:
6238             mini_vm_replace_ip((CMiniVMReplaceIP *)cmd);
6239             break;
6240         case VM_REPLACE_IP_PORT_OFFSET:
6241             mini_vm_replace_port_ip((CMiniVMReplaceIPWithPort *)cmd);
6242             break;
6243         case  VM_REPLACE_IP_PORT_RESPONSE_OFFSET:
6244             mini_vm_replace_ports((CMiniVMReplaceIPWithPort *)cmd);
6245             break;
6246
6247         case  VM_REPLACE_IP_IP_PORT:
6248             mini_vm_replace_ip_ip_ports((CMiniVMReplaceIP_IP_Port * )cmd);
6249             break;
6250
6251         case  VM_REPLACE_IPVIA_IP_IP_PORT:
6252             mini_vm_replace_ip_via_ip_ip_ports((CMiniVMReplaceIP_PORT_IP_IP_Port *)cmd);
6253             break;
6254
6255         case VM_DYN_PYLOAD:
6256             mini_vm_dyn_payload((CMiniVMDynPyload *)cmd);
6257             break;
6258
6259         case VM_EOP:
6260             need_to_stop=true;
6261             break;
6262         default:
6263             printf(" vm cmd %d does not exist \n",cmd->m_cmd);
6264             assert(0);
6265         }
6266         cnt++;
6267         cmd=cmds[cnt];
6268     }
6269     return (0);
6270 }
6271
6272 inline int cp_pkt_len(char *to,char *from,uint16_t from_offset,uint16_t len){
6273     memcpy(to, from+from_offset , len);
6274     return (len);
6275 }
6276
6277 /* not including the to_offset
6278
6279  0 1
6280  x
6281
6282 */
6283 inline int cp_pkt_to_from(char *to,char *from,uint16_t from_offset,uint16_t to_offset){
6284     memcpy(to, from+from_offset , to_offset-from_offset) ;
6285     return (to_offset-from_offset);
6286 }
6287
6288
6289 int CMiniVM::mini_vm_dyn_payload( CMiniVMDynPyload * cmd){
6290     /* copy all the packet */
6291     CFlowYamlDpPkt * dyn=(CFlowYamlDpPkt *)cmd->m_ptr;
6292     uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6293     uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset ;
6294     char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6295     char * p=m_pyload_mbuf_ptr;
6296     /* copy payload */
6297     memcpy(p,original_l7_ptr,len);
6298     if ( ( dyn->m_pyld_offset+ (dyn->m_len*4)) < ( len-4) ){
6299         // we can change the packet
6300         int i;
6301         uint32_t *l=(uint32_t *)(p+dyn->m_pyld_offset);
6302         for (i=0; i<dyn->m_len; i++) {
6303             if ( dyn->m_type==0 ) {
6304                 *l=(rand() & dyn->m_pkt_mask);
6305             }else if (dyn->m_type==1){
6306                 *l=(PKT_NTOHL(cmd->m_ip.v4) & dyn->m_pkt_mask);
6307             }
6308             l++;
6309         }
6310
6311     }
6312
6313     // Return packet size which hasn't changed
6314     m_new_pkt_size = m_pkt_info->m_packet->pkt_len;
6315
6316     return (0);
6317 }
6318
6319
6320 int CMiniVM::mini_vm_replace_ip_via_ip_ip_ports(CMiniVMReplaceIP_PORT_IP_IP_Port * cmd){
6321     uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6322     uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
6323     char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6324     char * p=m_pyload_mbuf_ptr;
6325
6326     p+=cp_pkt_to_from(p,original_l7_ptr,
6327                       0,
6328                       cmd->m_ip_via_start);
6329
6330     if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6331         p+=ipv6_to_str(&cmd->m_ip_via,p);
6332     } else {
6333         p+=ip_to_str(cmd->m_ip_via.v4,p);
6334     }
6335     p+=sprintf(p,":%u",cmd->m_port_via);
6336
6337     /* up to the IP */
6338     p+=cp_pkt_to_from(p,original_l7_ptr,
6339                       cmd->m_ip_via_stop,
6340                       cmd->m_ip0_start);
6341     /*IP */
6342     if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6343         p[-2] = '6';
6344         p+=ipv6_to_str(&cmd->m_ip,p);
6345     } else {
6346         p+=ip_to_str(cmd->m_ip.v4,p);
6347     }
6348     /* up to IP 2 */
6349     p+=cp_pkt_to_from(p, original_l7_ptr ,
6350                    cmd->m_ip0_stop,
6351                    cmd->m_ip1_start);
6352     /* IP2 */
6353     if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6354         p[-2] = '6';
6355         p+=ipv6_to_str(&cmd->m_ip,p);
6356     } else {
6357         p+=ip_to_str(cmd->m_ip.v4,p);
6358     }
6359
6360     /* up to port */
6361     p+=cp_pkt_to_from(p, original_l7_ptr ,
6362                    cmd->m_ip1_stop,
6363                    cmd->m_port_start);
6364     /* port */
6365     p+=sprintf(p,"%u",cmd->m_port);
6366
6367     /* up to end */
6368     p+=cp_pkt_to_from(p, original_l7_ptr ,
6369                    cmd->m_port_stop,
6370                    len);
6371
6372     // Determine new packet size
6373     m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6374
6375     return (0);
6376 }
6377
6378
6379 int CMiniVM::mini_vm_replace_ip_ip_ports(CMiniVMReplaceIP_IP_Port * cmd){
6380     uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6381     uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
6382     char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6383     char * p=m_pyload_mbuf_ptr;
6384
6385     /* up to the IP */
6386     p+=cp_pkt_to_from(p,original_l7_ptr,
6387                       0,
6388                       cmd->m_ip0_start);
6389     /*IP */
6390     if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6391         p+=ipv6_to_str(&cmd->m_ip,p);
6392     } else {
6393         p+=ip_to_str(cmd->m_ip.v4,p);
6394     }
6395     /* up to IP 2 */
6396     p+=cp_pkt_to_from(p, original_l7_ptr ,
6397                    cmd->m_ip0_stop,
6398                    cmd->m_ip1_start);
6399     /* IP2 */
6400     if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6401         p+=ipv6_to_str(&cmd->m_ip,p);
6402     } else {
6403         p+=ip_to_str(cmd->m_ip.v4,p);
6404     }
6405
6406     /* up to port */
6407     p+=cp_pkt_to_from(p, original_l7_ptr ,
6408                    cmd->m_ip1_stop,
6409                    cmd->m_port_start);
6410     /* port */
6411     p+=sprintf(p,"%u",cmd->m_port);
6412
6413     /* up to end */
6414     p+=cp_pkt_to_from(p, original_l7_ptr ,
6415                    cmd->m_port_stop,
6416                    len);
6417
6418     // Determine new packet size
6419     m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6420
6421     return (0);
6422 }
6423
6424 int CMiniVM::mini_vm_replace_ports(CMiniVMReplaceIPWithPort * cmd){
6425     uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6426     uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
6427     char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6428
6429     memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_port);
6430     char * p=m_pyload_mbuf_ptr+cmd->m_start_port;
6431     p+=sprintf(p,"%u-%u;server_port=%u-%u",cmd->m_client_port,cmd->m_client_port+1,cmd->m_server_port,cmd->m_server_port+1);
6432     memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
6433     p+=(len-cmd->m_stop_port);
6434
6435     // Determine new packet size
6436     m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6437
6438     return (0);
6439 }
6440
6441
6442 int CMiniVM::mini_vm_replace_port_ip(CMiniVMReplaceIPWithPort * cmd){
6443     uint16_t l7_offset=m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6444     uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset - 0;
6445     char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
6446
6447     memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
6448     char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
6449     if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6450         p+=ipv6_to_str(&cmd->m_server_ip,p);
6451     } else {
6452         p+=ip_to_str(cmd->m_server_ip.v4,p);
6453     }
6454     /* copy until the port start offset */
6455     int len1=cmd->m_start_port-cmd->m_stop_1 ;
6456     memcpy(p, original_l7_ptr+cmd->m_stop_1,len1);
6457     p+=len1;
6458     p+=sprintf(p,"%u-%u",cmd->m_client_port,cmd->m_client_port+1);
6459     memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
6460     p+=len-cmd->m_stop_port;
6461
6462     // Determine new packet size
6463     m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
6464
6465     return (0);
6466 }
6467
6468 int CMiniVM::mini_vm_replace_ip(CMiniVMReplaceIP * cmd){
6469     uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
6470     uint16_t len       = m_pkt_info->m_packet->pkt_len - l7_offset;
6471     char * original_l7_ptr = m_pkt_info->m_packet->raw+l7_offset;
6472
6473     memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
6474     char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
6475
6476     int n_size=0;
6477     if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
6478         n_size=ipv6_to_str(&cmd->m_server_ip,p);
6479     } else {
6480         n_size=ip_to_str(cmd->m_server_ip.v4,p);
6481     }
6482     p+=n_size;
6483     memcpy(p, original_l7_ptr+cmd->m_stop_1,len-cmd->m_stop_1);
6484
6485     // Determine new packet size
6486     m_new_pkt_size= ((p+l7_offset+(len-cmd->m_stop_1)) - m_pyload_mbuf_ptr);
6487
6488     return (0);
6489 }
6490
6491
6492 void CFlowYamlDpPkt::Dump(FILE *fd){
6493     fprintf(fd," pkt_id   : %d \n",(int)m_pkt_id);
6494     fprintf(fd," offset   : %d \n",(int)m_pyld_offset);
6495     fprintf(fd," offset   : %d \n",(int)m_type);
6496     fprintf(fd," len      : %d \n",(int)m_len);
6497     fprintf(fd," mask     : 0x%x \n",(int)m_pkt_mask);
6498 }
6499
6500
6501 void CFlowYamlDynamicPyloadPlugin::Add(CFlowYamlDpPkt & fd){
6502     if (m_num ==MAX_PYLOAD_PKT_CHANGE) {
6503         fprintf (stderr,"ERROR can set only %d rules \n",MAX_PYLOAD_PKT_CHANGE);
6504         exit(-1);
6505     }
6506     m_pkt_ids[m_num]=fd.m_pkt_id;
6507     m_program[m_num]=fd;
6508     m_num+=1;
6509 }
6510
6511 void CFlowYamlDynamicPyloadPlugin::Dump(FILE *fd){
6512     int i;
6513     fprintf(fd," pkts :");
6514     for (i=0; i<m_num; i++) {
6515         fprintf(fd," %d ",m_pkt_ids[i]);
6516     }
6517     fprintf(fd,"\n");
6518     for (i=0; i<m_num; i++) {
6519         fprintf(fd," program : %d \n",i);
6520         fprintf(fd,"---------------- \n");
6521         m_program[i].Dump(fd);
6522     }
6523 }
6524
6525 /* free the right object.
6526    it is classic to use virtual function but we can't do it here and we don't even want to use callback function
6527    as we want to save space and in most cases there is nothing to free.
6528    this might be changed in the future
6529  */
6530 void CGenNodeBase::free_base(){
6531     if ( m_type == FLOW_PKT ) {
6532          CGenNode* p=(CGenNode*)this;
6533          p->free_gen_node();
6534         return;
6535     }
6536     if (m_type==STATELESS_PKT) {
6537          CGenNodeStateless* p=(CGenNodeStateless*)this;
6538          p->free_stl_node();
6539         return;
6540     }
6541
6542     if (m_type == PCAP_PKT) {
6543         CGenNodePCAP *p = (CGenNodePCAP *)this;
6544         p->destroy();
6545         return;
6546     }
6547
6548     if ( m_type == COMMAND ) {
6549          CGenNodeCommand* p=(CGenNodeCommand*)this;
6550          p->free_command();
6551     }
6552
6553 }