X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_snat.py;h=4cb5116135329dda49f6b07d6da4a6e3b07bf46f;hb=refs%2Fchanges%2F02%2F5602%2F3;hp=500d629dfe54b19e06db0a3db025ede2a9f55d06;hpb=b33f413af46ec8dff7f222dbd5bc3bcec1502d3d;p=vpp.git diff --git a/test/test_snat.py b/test/test_snat.py index 500d629dfe5..4cb51161353 100644 --- a/test/test_snat.py +++ b/test/test_snat.py @@ -485,7 +485,7 @@ class TestSNAT(VppTestCase): src_ip=self.pg1.local_ip4) def test_dynamic_icmp_errors_in2out_ttl_2(self): - """ SNAT handling of error respones to client packets with TTL=2 """ + """ SNAT handling of error responses to client packets with TTL=2 """ self.snat_add_address(self.snat_addr) self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) @@ -512,7 +512,7 @@ class TestSNAT(VppTestCase): self.verify_capture_in_with_icmp_errors(capture, self.pg0) def test_dynamic_icmp_errors_out2in_ttl_2(self): - """ SNAT handling of error respones to server packets with TTL=2 """ + """ SNAT handling of error responses to server packets with TTL=2 """ self.snat_add_address(self.snat_addr) self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) @@ -546,6 +546,34 @@ class TestSNAT(VppTestCase): capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out_with_icmp_errors(capture) + def test_ping_out_interface_from_outside(self): + """ Ping SNAT out interface from outside """ + + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) / + ICMP(id=self.icmp_id_out, type='echo-request')) + pkts = [p] + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.assertEqual(1, len(capture)) + packet = capture[0] + try: + self.assertEqual(packet[IP].src, self.pg1.local_ip4) + self.assertEqual(packet[IP].dst, self.pg1.remote_ip4) + self.assertEqual(packet[ICMP].id, self.icmp_id_in) + self.assertEqual(packet[ICMP].type, 0) # echo reply + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(outside network):", packet)) + raise + def test_static_in(self): """ SNAT 1:1 NAT initialized from inside network """ @@ -842,6 +870,27 @@ class TestSNAT(VppTestCase): capture = self.pg5.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg5) + # pg5 session dump + addresses = self.vapi.snat_address_dump() + self.assertEqual(len(addresses), 1) + sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10) + self.assertEqual(len(sessions), 3) + for session in sessions: + self.assertFalse(session.is_static) + self.assertEqual(session.inside_ip_address[0:4], + self.pg5.remote_ip4n) + self.assertEqual(session.outside_ip_address, + addresses[0].ip_address) + self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp) + self.assertEqual(sessions[1].protocol, IP_PROTOS.udp) + self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp) + self.assertEqual(sessions[0].inside_port, self.tcp_port_in) + self.assertEqual(sessions[1].inside_port, self.udp_port_in) + self.assertEqual(sessions[2].inside_port, self.icmp_id_in) + self.assertEqual(sessions[0].outside_port, self.tcp_port_out) + self.assertEqual(sessions[1].outside_port, self.udp_port_out) + self.assertEqual(sessions[2].outside_port, self.icmp_id_out) + # in2out 3rd interface pkts = self.create_stream_in(self.pg6, self.pg3) self.pg6.add_stream(pkts) @@ -858,6 +907,44 @@ class TestSNAT(VppTestCase): capture = self.pg6.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg6) + # general user and session dump verifications + users = self.vapi.snat_user_dump() + self.assertTrue(len(users) >= 3) + addresses = self.vapi.snat_address_dump() + self.assertEqual(len(addresses), 1) + for user in users: + sessions = self.vapi.snat_user_session_dump(user.ip_address, + user.vrf_id) + for session in sessions: + self.assertEqual(user.ip_address, session.inside_ip_address) + self.assertTrue(session.total_bytes > session.total_pkts > 0) + self.assertTrue(session.protocol in + [IP_PROTOS.tcp, IP_PROTOS.udp, + IP_PROTOS.icmp]) + + # pg4 session dump + sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10) + self.assertTrue(len(sessions) >= 4) + for session in sessions: + self.assertFalse(session.is_static) + self.assertEqual(session.inside_ip_address[0:4], + self.pg4.remote_ip4n) + self.assertEqual(session.outside_ip_address, + addresses[0].ip_address) + + # pg6 session dump + sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20) + self.assertTrue(len(sessions) >= 3) + for session in sessions: + self.assertTrue(session.is_static) + self.assertEqual(session.inside_ip_address[0:4], + self.pg6.remote_ip4n) + self.assertEqual(map(ord, session.outside_ip_address[0:4]), + map(int, static_nat_ip.split('.'))) + self.assertTrue(session.inside_port in + [self.tcp_port_in, self.udp_port_in, + self.icmp_id_in]) + def test_hairpinning(self): """ SNAT hairpinning """