@classmethod
def setUpClass(cls):
super(TestNAT44, cls).setUpClass()
+ cls.vapi.cli("set log class nat level debug")
try:
cls.tcp_port_in = 6303
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
+ sleep(1)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
+ sleep(1)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
# remove addresses and verify
self.nat44_add_address(self.nat_addr, is_add=0)
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
ARP(op=ARP.who_has, pdst=static_addr,
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
def test_vrf_mode(self):
""" NAT44 tenant VRF aware address pool mode """
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture(1)
+ self.pg0.get_capture(1)
p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
IP(src=host.ip4, dst=server_nat_ip) /
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(len(pkts))
+ self.pg1.get_capture(len(pkts))
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
nsessions = len(sessions)
self.pg0.add_stream(pkts[-1])
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- frags = self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
+ sleep(1)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
def test_tcp_session_close_in(self):
""" Close TCP session from inside network """
+ self.tcp_port_out = 10505
self.nat44_add_address(self.nat_addr)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ twice_nat=1)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
self.initiate_tcp_session(self.pg0, self.pg1)
- # close the session from inside
- try:
- # FIN packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="FA", seq=100, ack=300))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=100, ack=300))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
- pkts = []
+ pkts = []
- # ACK packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="A", seq=300, ack=101))
- pkts.append(p)
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=300, ack=101))
+ pkts.append(p)
- # FIN packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="FA", seq=300, ack=101))
- pkts.append(p)
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=300, ack=101))
+ pkts.append(p)
- self.pg1.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(2)
- # ACK packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A", seq=101, ack=301))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
- self.initiate_tcp_session(self.pg0, self.pg1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
- 0)
- self.assertEqual(len(sessions) - start_sessnum, 1)
- except:
- self.logger.error("TCP session termination failed")
- raise
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 0)
def test_tcp_session_close_out(self):
""" Close TCP session from outside network """
+ self.tcp_port_out = 10505
self.nat44_add_address(self.nat_addr)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ twice_nat=1)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
self.initiate_tcp_session(self.pg0, self.pg1)
- # close the session from outside
- try:
- # FIN packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="FA", seq=100, ack=300))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=100, ack=300))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
- # FIN+ACK packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="FA", seq=300, ack=101))
+ # FIN+ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=300, ack=101))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
- # ACK packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="A", seq=101, ack=301))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=101, ack=301))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
- self.initiate_tcp_session(self.pg0, self.pg1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
- 0)
- self.assertEqual(len(sessions) - start_sessnum, 1)
- except:
- self.logger.error("TCP session termination failed")
- raise
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 0)
def test_tcp_session_close_simultaneous(self):
""" Close TCP session from inside network """
+ self.tcp_port_out = 10505
self.nat44_add_address(self.nat_addr)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ twice_nat=1)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
self.initiate_tcp_session(self.pg0, self.pg1)
- # close the session from inside
- try:
- # FIN packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="FA", seq=100, ack=300))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=100, ack=300))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
- # FIN packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="FA", seq=300, ack=100))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=300, ack=100))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
- # ACK packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A", seq=101, ack=301))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
- # ACK packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="A", seq=301, ack=101))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=301, ack=101))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
- self.initiate_tcp_session(self.pg0, self.pg1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
- 0)
- self.assertEqual(len(sessions) - start_sessnum, 1)
- except:
- self.logger.error("TCP session termination failed")
- raise
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 0)
def tearDown(self):
super(TestNAT44, self).tearDown()
self.logger.info(self.vapi.cli("show nat44 interface address"))
self.logger.info(self.vapi.cli("show nat44 sessions detail"))
self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
+ self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
self.vapi.cli("nat addr-port-assignment-alg default")
self.clear_nat44()
+ self.vapi.cli("clear logging")
class TestNAT44Out2InDPO(MethodHolder):
@classmethod
def setUpClass(cls):
super(TestNAT44Out2InDPO, cls).setUpClass()
+ cls.vapi.cli("set log class nat level debug")
try:
cls.tcp_port_in = 6303
@classmethod
def setUpClass(cls):
super(TestDeterministicNAT, cls).setUpClass()
+ cls.vapi.cli("set log class nat level debug")
try:
cls.tcp_port_in = 6303
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
+ sleep(1)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
+ sleep(1)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
capture = self.pg3.get_capture(1)
# verify events in data set
self.pg0.add_stream(pkts[-1])
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- self.pg1.get_capture(0)
+ self.pg1.assert_nothing_captured()
+ sleep(1)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()