api: add new stream message convention
[vpp.git] / src / plugins / map / test / test_map.py
index fd8b168..93ea3f0 100644 (file)
@@ -100,6 +100,48 @@ class TestMAP(VppTestCase):
         self.assertEqual(rv[0].tag, tag,
                          "output produced incorrect tag value.")
 
+    def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
+        ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
+        ip6_dst = ipaddress.ip_network(ip6_pfx_str)
+        mod = ip4_pfx.num_addresses / 1024
+        indicies = []
+        for i in range(ip4_pfx.num_addresses):
+            rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str,
+                                          ip4_prefix=str(ip4_pfx[i]) + "/32",
+                                          ip6_src=ip6_src_str)
+            indicies.append(rv.index)
+        return indicies
+
+    def test_api_map_domains_get(self):
+        # Create a bunch of domains
+        domains = self.create_domains('130.67.0.0/24', '2001::/32',
+                                      '2001::1/128')
+        self.assertEqual(len(domains), 256)
+
+        d = []
+        cursor = 0
+
+        # Invalid cursor
+        rv, details = self.vapi.map_domains_get(cursor=1234)
+        self.assertEqual(rv.retval, -7)
+
+        # Delete a domain in the middle of walk
+        rv, details = self.vapi.map_domains_get(cursor=0)
+        self.assertEqual(rv.retval, -165)
+        self.vapi.map_del_domain(index=rv.cursor)
+        domains.remove(rv.cursor)
+
+        # Continue at point of deleted cursor
+        rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
+        self.assertEqual(rv.retval, -165)
+
+        d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
+        self.assertEqual(len(d), 255)
+
+        # Clean up
+        for i in domains:
+            self.vapi.map_del_domain(index=i)
+
     def test_map_e_udp(self):
         """ MAP-E UDP"""
 
@@ -438,14 +480,14 @@ class TestMAP(VppTestCase):
     def validate(self, rx, expected):
         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
 
-    def validate_frag(self, p6_frag, p_ip6_expected):
+    def validate_frag6(self, p6_frag, p_ip6_expected):
         self.assertFalse(p6_frag.haslayer(IP))
         self.assertTrue(p6_frag.haslayer(IPv6))
         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
 
-    def validate_frag_payload_len(self, rx, proto, payload_len_expected):
+    def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
         payload_total = 0
         for p in rx:
             payload_total += p[IPv6].plen
@@ -458,6 +500,23 @@ class TestMAP(VppTestCase):
 
         self.assertEqual(payload_total, payload_len_expected)
 
+    def validate_frag4(self, p4_frag, p_ip4_expected):
+        self.assertFalse(p4_frag.haslayer(IPv6))
+        self.assertTrue(p4_frag.haslayer(IP))
+        self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
+        self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
+        self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
+
+    def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
+        payload_total = 0
+        for p in rx:
+            payload_total += len(p[IP].payload)
+
+        # First fragment has proto
+        payload_total -= len(proto())
+
+        self.assertEqual(payload_total, payload_len_expected)
+
     def payload(self, len):
         return 'x' * len
 
@@ -574,7 +633,22 @@ class TestMAP(VppTestCase):
         for p in rx:
             self.validate(p[1], icmp4_reply)
 
-        # IPv6 Hop limit
+        # IPv6 Hop limit at BR
+        ip6_hlim_expired = IPv6(hlim=1, src='2001:db8:1ab::c0a8:1:ab',
+                                dst='1234:5678:90ab:cdef:ac:1001:200:0')
+        p6 = (p_ether6 / ip6_hlim_expired / payload)
+
+        icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
+                            dst="2001:db8:1ab::c0a8:1:ab") /
+                       ICMPv6TimeExceeded(code=0) /
+                       IPv6(src="2001:db8:1ab::c0a8:1:ab",
+                            dst='1234:5678:90ab:cdef:ac:1001:200:0',
+                            hlim=1) / payload)
+        rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
+        for p in rx:
+            self.validate(p[1], icmp6_reply)
+
+        # IPv6 Hop limit beyond BR
         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
         p6 = (p_ether6 / ip6_hlim_expired / payload)
@@ -612,11 +686,12 @@ class TestMAP(VppTestCase):
         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
                                 dst='2001:db8:1e0::c0a8:1:e')
         for p in rx:
-            self.validate_frag(p, p_ip6_translated)
+            self.validate_frag6(p, p_ip6_translated)
 
-        self.validate_frag_payload_len(rx, UDP, payload_len)
+        self.validate_frag_payload_len6(rx, UDP, payload_len)
 
         # UDP packet fragmentation send fragments
+        payload_len = 1453
         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
         p4 = (p_ether / p_ip4 / payload)
         frags = fragment_rfc791(p4, fragsize=1000)
@@ -626,9 +701,32 @@ class TestMAP(VppTestCase):
         rx = self.pg1.get_capture(2)
 
         for p in rx:
-            self.validate_frag(p, p_ip6_translated)
+            self.validate_frag6(p, p_ip6_translated)
+
+        self.validate_frag_payload_len6(rx, UDP, payload_len)
+
+        # Send back an fragmented IPv6 UDP packet that will be "untranslated"
+        payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
+        p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+        p_ip6 = IPv6(src='2001:db8:1e0::c0a8:1:e',
+                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
+        p6 = (p_ether6 / p_ip6 / payload)
+        frags6 = fragment_rfc8200(p6, identification=0xdcba, fragsize=1000)
+
+        p_ip4_translated = IP(src='192.168.0.1', dst=self.pg0.remote_ip4)
+        p4_translated = (p_ip4_translated / payload)
+        p4_translated.id = 0
+        p4_translated.ttl -= 1
+
+        self.pg_enable_capture()
+        self.pg1.add_stream(frags6)
+        self.pg_start()
+        rx = self.pg0.get_capture(2)
+
+        for p in rx:
+            self.validate_frag4(p, p4_translated)
 
-        self.validate_frag_payload_len(rx, UDP, payload_len)
+        self.validate_frag_payload_len4(rx, UDP, payload_len)
 
         # ICMP packet fragmentation
         payload = ICMP(id=6529) / self.payload(payload_len)
@@ -641,9 +739,9 @@ class TestMAP(VppTestCase):
         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
                                 dst='2001:db8:160::c0a8:1:6')
         for p in rx:
-            self.validate_frag(p, p_ip6_translated)
+            self.validate_frag6(p, p_ip6_translated)
 
-        self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len)
+        self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
 
         # ICMP packet fragmentation send fragments
         payload = ICMP(id=6529) / self.payload(payload_len)
@@ -655,9 +753,9 @@ class TestMAP(VppTestCase):
         rx = self.pg1.get_capture(2)
 
         for p in rx:
-            self.validate_frag(p, p_ip6_translated)
+            self.validate_frag6(p, p_ip6_translated)
 
-        self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len)
+        self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
 
         # TCP MSS clamping
         self.vapi.map_param_set_tcp(1300)
@@ -860,5 +958,6 @@ class TestMAP(VppTestCase):
                                                 ip6_nh_address="4001::1",
                                                 is_add=0)
 
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)