src_ip=self.pg1.local_ip4)
def test_dynamic_icmp_errors_in2out_ttl_2(self):
- """ SNAT handling of error respones to client packets with TTL=2 """
+ """ SNAT handling of error responses to client packets with TTL=2 """
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.verify_capture_in_with_icmp_errors(capture, self.pg0)
def test_dynamic_icmp_errors_out2in_ttl_2(self):
- """ SNAT handling of error respones to server packets with TTL=2 """
+ """ SNAT handling of error responses to server packets with TTL=2 """
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out_with_icmp_errors(capture)
+ def test_ping_out_interface_from_outside(self):
+ """ Ping SNAT out interface from outside """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
+ ICMP(id=self.icmp_id_out, type='echo-request'))
+ pkts = [p]
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.assertEqual(1, len(capture))
+ packet = capture[0]
+ try:
+ self.assertEqual(packet[IP].src, self.pg1.local_ip4)
+ self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.assertEqual(packet[ICMP].type, 0) # echo reply
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
+ raise
+
def test_static_in(self):
""" SNAT 1:1 NAT initialized from inside network """
capture = self.pg5.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg5)
+ # pg5 session dump
+ addresses = self.vapi.snat_address_dump()
+ self.assertEqual(len(addresses), 1)
+ sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
+ self.assertEqual(len(sessions), 3)
+ for session in sessions:
+ self.assertFalse(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg5.remote_ip4n)
+ self.assertEqual(session.outside_ip_address,
+ addresses[0].ip_address)
+ self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
+ self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
+ self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
+ self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
+ self.assertEqual(sessions[1].inside_port, self.udp_port_in)
+ self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
+ self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
+ self.assertEqual(sessions[1].outside_port, self.udp_port_out)
+ self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
+
# in2out 3rd interface
pkts = self.create_stream_in(self.pg6, self.pg3)
self.pg6.add_stream(pkts)
capture = self.pg6.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg6)
+ # general user and session dump verifications
+ users = self.vapi.snat_user_dump()
+ self.assertTrue(len(users) >= 3)
+ addresses = self.vapi.snat_address_dump()
+ self.assertEqual(len(addresses), 1)
+ for user in users:
+ sessions = self.vapi.snat_user_session_dump(user.ip_address,
+ user.vrf_id)
+ for session in sessions:
+ self.assertEqual(user.ip_address, session.inside_ip_address)
+ self.assertTrue(session.total_bytes > session.total_pkts > 0)
+ self.assertTrue(session.protocol in
+ [IP_PROTOS.tcp, IP_PROTOS.udp,
+ IP_PROTOS.icmp])
+
+ # pg4 session dump
+ sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
+ self.assertTrue(len(sessions) >= 4)
+ for session in sessions:
+ self.assertFalse(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg4.remote_ip4n)
+ self.assertEqual(session.outside_ip_address,
+ addresses[0].ip_address)
+
+ # pg6 session dump
+ sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
+ self.assertTrue(len(sessions) >= 3)
+ for session in sessions:
+ self.assertTrue(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg6.remote_ip4n)
+ self.assertEqual(map(ord, session.outside_ip_address[0:4]),
+ map(int, static_nat_ip.split('.')))
+ self.assertTrue(session.inside_port in
+ [self.tcp_port_in, self.udp_port_in,
+ self.icmp_id_in])
+
def test_hairpinning(self):
""" SNAT hairpinning """