X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fmap%2Ftest%2Ftest_map.py;h=93ea3f0697681239bb3cfab48f562e658163d869;hb=f5db3711b;hp=fd8b1685f71ace5062efff229d64e1c9cc40692a;hpb=f145c15631ba62e798395499f83a2f8a91ae83c7;p=vpp.git diff --git a/src/plugins/map/test/test_map.py b/src/plugins/map/test/test_map.py index fd8b1685f71..93ea3f06976 100644 --- a/src/plugins/map/test/test_map.py +++ b/src/plugins/map/test/test_map.py @@ -100,6 +100,48 @@ class TestMAP(VppTestCase): self.assertEqual(rv[0].tag, tag, "output produced incorrect tag value.") + def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str): + ip4_pfx = ipaddress.ip_network(ip4_pfx_str) + ip6_dst = ipaddress.ip_network(ip6_pfx_str) + mod = ip4_pfx.num_addresses / 1024 + indicies = [] + for i in range(ip4_pfx.num_addresses): + rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str, + ip4_prefix=str(ip4_pfx[i]) + "/32", + ip6_src=ip6_src_str) + indicies.append(rv.index) + return indicies + + def test_api_map_domains_get(self): + # Create a bunch of domains + domains = self.create_domains('130.67.0.0/24', '2001::/32', + '2001::1/128') + self.assertEqual(len(domains), 256) + + d = [] + cursor = 0 + + # Invalid cursor + rv, details = self.vapi.map_domains_get(cursor=1234) + self.assertEqual(rv.retval, -7) + + # Delete a domain in the middle of walk + rv, details = self.vapi.map_domains_get(cursor=0) + self.assertEqual(rv.retval, -165) + self.vapi.map_del_domain(index=rv.cursor) + domains.remove(rv.cursor) + + # Continue at point of deleted cursor + rv, details = self.vapi.map_domains_get(cursor=rv.cursor) + self.assertEqual(rv.retval, -165) + + d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get)) + self.assertEqual(len(d), 255) + + # Clean up + for i in domains: + self.vapi.map_del_domain(index=i) + def test_map_e_udp(self): """ MAP-E UDP""" @@ -438,14 +480,14 @@ class TestMAP(VppTestCase): def validate(self, rx, expected): self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected))) - def validate_frag(self, p6_frag, p_ip6_expected): + def validate_frag6(self, p6_frag, p_ip6_expected): self.assertFalse(p6_frag.haslayer(IP)) self.assertTrue(p6_frag.haslayer(IPv6)) self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment)) self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src) self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst) - def validate_frag_payload_len(self, rx, proto, payload_len_expected): + def validate_frag_payload_len6(self, rx, proto, payload_len_expected): payload_total = 0 for p in rx: payload_total += p[IPv6].plen @@ -458,6 +500,23 @@ class TestMAP(VppTestCase): self.assertEqual(payload_total, payload_len_expected) + def validate_frag4(self, p4_frag, p_ip4_expected): + self.assertFalse(p4_frag.haslayer(IPv6)) + self.assertTrue(p4_frag.haslayer(IP)) + self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF) + self.assertEqual(p4_frag[IP].src, p_ip4_expected.src) + self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst) + + def validate_frag_payload_len4(self, rx, proto, payload_len_expected): + payload_total = 0 + for p in rx: + payload_total += len(p[IP].payload) + + # First fragment has proto + payload_total -= len(proto()) + + self.assertEqual(payload_total, payload_len_expected) + def payload(self, len): return 'x' * len @@ -574,7 +633,22 @@ class TestMAP(VppTestCase): for p in rx: self.validate(p[1], icmp4_reply) - # IPv6 Hop limit + # IPv6 Hop limit at BR + ip6_hlim_expired = IPv6(hlim=1, src='2001:db8:1ab::c0a8:1:ab', + dst='1234:5678:90ab:cdef:ac:1001:200:0') + p6 = (p_ether6 / ip6_hlim_expired / payload) + + icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6, + dst="2001:db8:1ab::c0a8:1:ab") / + ICMPv6TimeExceeded(code=0) / + IPv6(src="2001:db8:1ab::c0a8:1:ab", + dst='1234:5678:90ab:cdef:ac:1001:200:0', + hlim=1) / payload) + rx = self.send_and_expect(self.pg1, p6*1, self.pg1) + for p in rx: + self.validate(p[1], icmp6_reply) + + # IPv6 Hop limit beyond BR ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab', dst='1234:5678:90ab:cdef:ac:1001:200:0') p6 = (p_ether6 / ip6_hlim_expired / payload) @@ -612,11 +686,12 @@ class TestMAP(VppTestCase): p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0', dst='2001:db8:1e0::c0a8:1:e') for p in rx: - self.validate_frag(p, p_ip6_translated) + self.validate_frag6(p, p_ip6_translated) - self.validate_frag_payload_len(rx, UDP, payload_len) + self.validate_frag_payload_len6(rx, UDP, payload_len) # UDP packet fragmentation send fragments + payload_len = 1453 payload = UDP(sport=40000, dport=4000) / self.payload(payload_len) p4 = (p_ether / p_ip4 / payload) frags = fragment_rfc791(p4, fragsize=1000) @@ -626,9 +701,32 @@ class TestMAP(VppTestCase): rx = self.pg1.get_capture(2) for p in rx: - self.validate_frag(p, p_ip6_translated) + self.validate_frag6(p, p_ip6_translated) + + self.validate_frag_payload_len6(rx, UDP, payload_len) + + # Send back an fragmented IPv6 UDP packet that will be "untranslated" + payload = UDP(sport=4000, dport=40000) / self.payload(payload_len) + p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + p_ip6 = IPv6(src='2001:db8:1e0::c0a8:1:e', + dst='1234:5678:90ab:cdef:ac:1001:200:0') + p6 = (p_ether6 / p_ip6 / payload) + frags6 = fragment_rfc8200(p6, identification=0xdcba, fragsize=1000) + + p_ip4_translated = IP(src='192.168.0.1', dst=self.pg0.remote_ip4) + p4_translated = (p_ip4_translated / payload) + p4_translated.id = 0 + p4_translated.ttl -= 1 + + self.pg_enable_capture() + self.pg1.add_stream(frags6) + self.pg_start() + rx = self.pg0.get_capture(2) + + for p in rx: + self.validate_frag4(p, p4_translated) - self.validate_frag_payload_len(rx, UDP, payload_len) + self.validate_frag_payload_len4(rx, UDP, payload_len) # ICMP packet fragmentation payload = ICMP(id=6529) / self.payload(payload_len) @@ -641,9 +739,9 @@ class TestMAP(VppTestCase): p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0', dst='2001:db8:160::c0a8:1:6') for p in rx: - self.validate_frag(p, p_ip6_translated) + self.validate_frag6(p, p_ip6_translated) - self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len) + self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len) # ICMP packet fragmentation send fragments payload = ICMP(id=6529) / self.payload(payload_len) @@ -655,9 +753,9 @@ class TestMAP(VppTestCase): rx = self.pg1.get_capture(2) for p in rx: - self.validate_frag(p, p_ip6_translated) + self.validate_frag6(p, p_ip6_translated) - self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len) + self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len) # TCP MSS clamping self.vapi.map_param_set_tcp(1300) @@ -860,5 +958,6 @@ class TestMAP(VppTestCase): ip6_nh_address="4001::1", is_add=0) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)