from random import randint
import scapy.compat
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
self.plugin_disable()
def plugin_enable(self):
- self.vapi.nat44_plugin_enable_disable(
- flags=self.nat44_config_flags.NAT44_IS_ENDPOINT_DEPENDENT,
+ self.vapi.nat44_ed_plugin_enable_disable(
sessions=self.max_sessions, enable=1)
def plugin_disable(self):
- self.vapi.nat44_plugin_enable_disable(enable=0)
+ self.vapi.nat44_ed_plugin_enable_disable(enable=0)
@property
def config_flags(self):
@classmethod
def setUpClass(cls):
super(NAT44EDTestCase, cls).setUpClass()
- cls.vapi.cli("set log class nat level debug")
cls.create_pg_interfaces(range(12))
cls.interfaces = list(cls.pg_interfaces[:4])
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_TWICE_NAT)
- self.logger.info(self.vapi.cli("show nat44 sessions detail"))
+ self.logger.info(self.vapi.cli("show nat44 sessions"))
self.vapi.nat44_del_session(
address=sessions[0].inside_ip_address,
port=sessions[0].inside_port,
"%d" % self.tcp_external_port)
+@tag_fixme_vpp_workers
class TestNAT44ED(NAT44EDTestCase):
""" NAT44ED Test Case """
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_outside_address_distribution(self):
+ """ Outside address distribution based on source address """
+
+ x = 100
+ nat_addresses = []
+
+ for i in range(1, x):
+ a = "10.0.0.%d" % i
+ nat_addresses.append(a)
+
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ self.vapi.nat44_add_del_address_range(
+ first_ip_address=nat_addresses[0],
+ last_ip_address=nat_addresses[-1],
+ vrf_id=0xFFFFFFFF, is_add=1, flags=0)
+
+ self.pg0.generate_remote_hosts(x)
+
+ pkts = []
+ for i in range(x):
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_hosts[i].ip4,
+ dst=self.pg1.remote_ip4) /
+ UDP(sport=7000+i, dport=80+i))
+ pkts.append(p)
+
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ recvd = self.pg1.get_capture(len(pkts))
+ for (p_sent, p_recvd) in zip(pkts, recvd):
+ packed = socket.inet_aton(p_sent[IP].src)
+ numeric = struct.unpack("!L", packed)[0]
+ numeric = socket.htonl(numeric)
+ a = nat_addresses[(numeric-1) % len(nat_addresses)]
+ self.assertEqual(a, p_recvd[IP].src, "Packet not translated")
+
class TestNAT44EDMW(TestNAT44ED):
""" NAT44ED MW Test Case """
self.nat_add_outside_interface(self.pg1)
# in2out
- tc1 = self.get_stats_counter('/nat44/ed/in2out/slowpath/tcp')
- uc1 = self.get_stats_counter('/nat44/ed/in2out/slowpath/udp')
- ic1 = self.get_stats_counter('/nat44/ed/in2out/slowpath/icmp')
- dc1 = self.get_stats_counter('/nat44/ed/in2out/slowpath/drops')
+ tc1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/tcp')
+ uc1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/udp')
+ ic1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/icmp')
+ dc1 = self.get_stats_counter('/nat44-ed/in2out/slowpath/drops')
pkts = self.create_stream_in(self.pg0, self.pg1)
# TODO: specify worker=idx, also stats have to
self.verify_capture_out(capture, ignore_port=True)
if_idx = self.pg0.sw_if_index
- tc2 = self.get_stats_counter('/nat44/ed/in2out/slowpath/tcp')
- uc2 = self.get_stats_counter('/nat44/ed/in2out/slowpath/udp')
- ic2 = self.get_stats_counter('/nat44/ed/in2out/slowpath/icmp')
- dc2 = self.get_stats_counter('/nat44/ed/in2out/slowpath/drops')
+ tc2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/tcp')
+ uc2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/udp')
+ ic2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/icmp')
+ dc2 = self.get_stats_counter('/nat44-ed/in2out/slowpath/drops')
self.assertEqual(tc2[if_idx] - tc1[if_idx], 2)
self.assertEqual(uc2[if_idx] - uc1[if_idx], 1)
self.assertEqual(dc2[if_idx] - dc1[if_idx], 0)
# out2in
- tc1 = self.get_stats_counter('/nat44/ed/out2in/fastpath/tcp')
- uc1 = self.get_stats_counter('/nat44/ed/out2in/fastpath/udp')
- ic1 = self.get_stats_counter('/nat44/ed/out2in/slowpath/icmp')
- dc1 = self.get_stats_counter('/nat44/ed/out2in/fastpath/drops')
+ tc1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/tcp')
+ uc1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/udp')
+ ic1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/icmp')
+ dc1 = self.get_stats_counter('/nat44-ed/out2in/fastpath/drops')
pkts = self.create_stream_out(self.pg1)
self.pg1.add_stream(pkts)
self.verify_capture_in(capture, self.pg0)
if_idx = self.pg1.sw_if_index
- tc2 = self.get_stats_counter('/nat44/ed/out2in/fastpath/tcp')
- uc2 = self.get_stats_counter('/nat44/ed/out2in/fastpath/udp')
- ic2 = self.get_stats_counter('/nat44/ed/out2in/slowpath/icmp')
- dc2 = self.get_stats_counter('/nat44/ed/out2in/fastpath/drops')
+ tc2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/tcp')
+ uc2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/udp')
+ ic2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/icmp')
+ dc2 = self.get_stats_counter('/nat44-ed/out2in/fastpath/drops')
self.assertEqual(tc2[if_idx] - tc1[if_idx], 2)
self.assertEqual(uc2[if_idx] - uc1[if_idx], 1)
self.assertEqual(ic2[if_idx] - ic1[if_idx], 1)
self.assertEqual(dc2[if_idx] - dc1[if_idx], 0)
- sc = self.get_stats_counter('/nat44/total-sessions')
+ sc = self.get_stats_counter('/nat44-ed/total-sessions')
self.assertEqual(sc[0], 3)
def test_frag_in_order(self):
capture = outside.get_capture(len(stream))
- def test_clear_sessions(self):
- """ NAT44ED session clearing test """
-
- self.nat_add_address(self.nat_addr)
- self.nat_add_inside_interface(self.pg0)
- self.nat_add_outside_interface(self.pg1)
-
- pkts = self.create_stream_in(self.pg0, self.pg1)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out(capture, ignore_port=True)
-
- sessions = self.get_stats_counter('/nat44/total-sessions')
- self.assertTrue(sessions[0] > 0)
-
- self.vapi.cli("clear nat44 sessions")
-
- sessions = self.get_stats_counter('/nat44/total-sessions')
- self.assertEqual(sessions[0], 0)
-
def test_show_max_translations(self):
""" NAT44ED API test - max translations per thread """
nat_config = self.vapi.nat_show_config_2()
# in2out
tcpn = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/tcp')
+ '/nat44-ed/in2out/slowpath/tcp')
udpn = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/udp')
+ '/nat44-ed/in2out/slowpath/udp')
icmpn = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/icmp')
+ '/nat44-ed/in2out/slowpath/icmp')
drops = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/drops')
+ '/nat44-ed/in2out/slowpath/drops')
pkts = self.create_stream_in(self.pg7, self.pg8)
self.pg7.add_stream(pkts)
if_idx = self.pg7.sw_if_index
cnt = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/tcp')
+ '/nat44-ed/in2out/slowpath/tcp')
self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
cnt = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/udp')
+ '/nat44-ed/in2out/slowpath/udp')
self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
cnt = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/icmp')
+ '/nat44-ed/in2out/slowpath/icmp')
self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
cnt = self.get_stats_counter(
- '/nat44/ed/in2out/slowpath/drops')
+ '/nat44-ed/in2out/slowpath/drops')
self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
# out2in
tcpn = self.get_stats_counter(
- '/nat44/ed/out2in/fastpath/tcp')
+ '/nat44-ed/out2in/fastpath/tcp')
udpn = self.get_stats_counter(
- '/nat44/ed/out2in/fastpath/udp')
+ '/nat44-ed/out2in/fastpath/udp')
icmpn = self.get_stats_counter(
- '/nat44/ed/out2in/slowpath/icmp')
+ '/nat44-ed/out2in/fastpath/icmp')
drops = self.get_stats_counter(
- '/nat44/ed/out2in/fastpath/drops')
+ '/nat44-ed/out2in/fastpath/drops')
pkts = self.create_stream_out(self.pg8)
self.pg8.add_stream(pkts)
if_idx = self.pg8.sw_if_index
cnt = self.get_stats_counter(
- '/nat44/ed/out2in/fastpath/tcp')
+ '/nat44-ed/out2in/fastpath/tcp')
self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
cnt = self.get_stats_counter(
- '/nat44/ed/out2in/fastpath/udp')
+ '/nat44-ed/out2in/fastpath/udp')
self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
cnt = self.get_stats_counter(
- '/nat44/ed/out2in/slowpath/icmp')
+ '/nat44-ed/out2in/fastpath/icmp')
self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
cnt = self.get_stats_counter(
- '/nat44/ed/out2in/fastpath/drops')
+ '/nat44-ed/out2in/fastpath/drops')
self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
- sessions = self.get_stats_counter('/nat44/total-sessions')
+ sessions = self.get_stats_counter('/nat44-ed/total-sessions')
self.assertEqual(sessions[0], 3)
finally: