from random import randint
import scapy.compat
-from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
cls.create_and_add_ip4_table(i, table_id)
i.admin_up()
- i.unconfig_ip4()
i.config_ip4()
i.resolve_arp()
def get_err_counter(self, path):
return self.statistics.get_err_counter(path)
- def get_stats_counter(self, path, worker=0):
- return self.statistics.get_counter(path)[worker]
-
def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
host_in_port, proto=IP_PROTOS.tcp,
ignore_port=False):
class TestNAT44EDMW(TestNAT44ED):
""" NAT44ED MW Test Case """
- worker_config = "workers 1"
-
- def get_stats_counter(self, path, worker=1):
- return super(TestNAT44EDMW, self).get_stats_counter(path, worker)
+ vpp_worker_count = 1
@unittest.skip('MW fix required')
def test_users_dump(self):
self.nat_add_outside_interface(self.pg1)
# in2out
- tc1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/tcp')
- uc1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/udp')
- ic1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/icmp')
- dc1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/drops')
+ tc1 = self.statistics['/nat44-ed/in2out/slowpath/tcp']
+ uc1 = self.statistics['/nat44-ed/in2out/slowpath/udp']
+ ic1 = self.statistics['/nat44-ed/in2out/slowpath/icmp']
+ dc1 = self.statistics['/nat44-ed/in2out/slowpath/drops']
pkts = self.create_stream_in(self.pg0, self.pg1)
# TODO: specify worker=idx, also stats have to
self.verify_capture_out(capture, ignore_port=True)
if_idx = self.pg0.sw_if_index
- tc2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/tcp')
- uc2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/udp')
- ic2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/icmp')
- dc2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/drops')
+ tc2 = self.statistics['/nat44-ed/in2out/slowpath/tcp']
+ uc2 = self.statistics['/nat44-ed/in2out/slowpath/udp']
+ ic2 = self.statistics['/nat44-ed/in2out/slowpath/icmp']
+ dc2 = self.statistics['/nat44-ed/in2out/slowpath/drops']
- self.assertEqual(tc2[if_idx] - tc1[if_idx], 2)
- self.assertEqual(uc2[if_idx] - uc1[if_idx], 1)
- self.assertEqual(ic2[if_idx] - ic1[if_idx], 1)
- self.assertEqual(dc2[if_idx] - dc1[if_idx], 0)
+ self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), 2)
+ self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), 1)
+ self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), 1)
+ self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
# out2in
- tc1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/tcp')
- uc1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/udp')
- ic1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/icmp')
- dc1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/drops')
+ tc1 = self.statistics['/nat44-ed/out2in/fastpath/tcp']
+ uc1 = self.statistics['/nat44-ed/out2in/fastpath/udp']
+ ic1 = self.statistics['/nat44-ed/out2in/fastpath/icmp']
+ dc1 = self.statistics['/nat44-ed/out2in/fastpath/drops']
pkts = self.create_stream_out(self.pg1)
self.pg1.add_stream(pkts)
self.verify_capture_in(capture, self.pg0)
if_idx = self.pg1.sw_if_index
- tc2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/tcp')
- uc2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/udp')
- ic2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/icmp')
- dc2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/drops')
+ tc2 = self.statistics['/nat44-ed/out2in/fastpath/tcp']
+ uc2 = self.statistics['/nat44-ed/out2in/fastpath/udp']
+ ic2 = self.statistics['/nat44-ed/out2in/fastpath/icmp']
+ dc2 = self.statistics['/nat44-ed/out2in/fastpath/drops']
- self.assertEqual(tc2[if_idx] - tc1[if_idx], 2)
- self.assertEqual(uc2[if_idx] - uc1[if_idx], 1)
- self.assertEqual(ic2[if_idx] - ic1[if_idx], 1)
- self.assertEqual(dc2[if_idx] - dc1[if_idx], 0)
+ self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), 2)
+ self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), 1)
+ self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), 1)
+ self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
- sc = self.get_stats_counter('/nat44-ed/total-sessions')
- self.assertEqual(sc[0], 3)
+ sc = self.statistics['/nat44-ed/total-sessions']
+ self.assertEqual(sc[:, 0].sum(), 3)
def test_frag_in_order(self):
""" NAT44ED translate fragments arriving in order """
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - session_n, 0)
+ def test_dynamic_vrf(self):
+ """ NAT44ED dynamic translation test: different VRF"""
+
+ vrf_id_in = 33
+ vrf_id_out = 34
+
+ self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
+
+ try:
+ self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
+ self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
+
+ self.nat_add_inside_interface(self.pg7)
+ self.nat_add_outside_interface(self.pg8)
+
+ # just basic stuff nothing special
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture, ignore_port=True)
+
+ pkts = self.create_stream_out(self.pg8)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ finally:
+ self.pg7.unconfig()
+ self.pg8.unconfig()
+
+ self.vapi.ip_table_add_del(is_add=0,
+ table={'table_id': vrf_id_in})
+ self.vapi.ip_table_add_del(is_add=0,
+ table={'table_id': vrf_id_out})
+
def test_dynamic_output_feature_vrf(self):
""" NAT44ED dynamic translation test: output-feature, VRF"""
self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
# in2out
- tcpn = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/tcp')
- udpn = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/udp')
- icmpn = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/icmp')
- drops = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/drops')
+ tcpn = self.statistics['/nat44-ed/in2out/slowpath/tcp']
+ udpn = self.statistics['/nat44-ed/in2out/slowpath/udp']
+ icmpn = self.statistics['/nat44-ed/in2out/slowpath/icmp']
+ drops = self.statistics['/nat44-ed/in2out/slowpath/drops']
pkts = self.create_stream_in(self.pg7, self.pg8)
self.pg7.add_stream(pkts)
self.verify_capture_out(capture, ignore_port=True)
if_idx = self.pg7.sw_if_index
- cnt = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/tcp')
- self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
- cnt = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/udp')
- self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
- cnt = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/icmp')
- self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
- cnt = self.get_stats_counter(
- '/nat44-ed/in2out/slowpath/drops')
- self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
+ cnt = self.statistics['/nat44-ed/in2out/slowpath/tcp']
+ self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
+ cnt = self.statistics['/nat44-ed/in2out/slowpath/udp']
+ self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
+ cnt = self.statistics['/nat44-ed/in2out/slowpath/icmp']
+ self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
+ cnt = self.statistics['/nat44-ed/in2out/slowpath/drops']
+ self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
# out2in
- tcpn = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/tcp')
- udpn = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/udp')
- icmpn = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/icmp')
- drops = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/drops')
+ tcpn = self.statistics['/nat44-ed/out2in/fastpath/tcp']
+ udpn = self.statistics['/nat44-ed/out2in/fastpath/udp']
+ icmpn = self.statistics['/nat44-ed/out2in/fastpath/icmp']
+ drops = self.statistics['/nat44-ed/out2in/fastpath/drops']
pkts = self.create_stream_out(self.pg8)
self.pg8.add_stream(pkts)
self.verify_capture_in(capture, self.pg7)
if_idx = self.pg8.sw_if_index
- cnt = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/tcp')
- self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
- cnt = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/udp')
- self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
- cnt = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/icmp')
- self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
- cnt = self.get_stats_counter(
- '/nat44-ed/out2in/fastpath/drops')
- self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
-
- sessions = self.get_stats_counter('/nat44-ed/total-sessions')
- self.assertEqual(sessions[0], 3)
+ cnt = self.statistics['/nat44-ed/out2in/fastpath/tcp']
+ self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
+ cnt = self.statistics['/nat44-ed/out2in/fastpath/udp']
+ self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
+ cnt = self.statistics['/nat44-ed/out2in/fastpath/icmp']
+ self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
+ cnt = self.statistics['/nat44-ed/out2in/fastpath/drops']
+ self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
+
+ sessions = self.statistics['/nat44-ed/total-sessions']
+ self.assertEqual(sessions[:, 0].sum(), 3)
finally:
- self.configure_ip4_interface(self.pg7, table_id=0)
- self.configure_ip4_interface(self.pg8, table_id=0)
+ self.pg7.unconfig()
+ self.pg8.unconfig()
self.vapi.ip_table_add_del(is_add=0,
table={'table_id': new_vrf_id})