tests: changes for scapy 2.4.3 migration
[vpp.git] / src / plugins / map / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16
17
18 class TestMAP(VppTestCase):
19     """ MAP Test Case """
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestMAP, cls).setUpClass()
24
25     @classmethod
26     def tearDownClass(cls):
27         super(TestMAP, cls).tearDownClass()
28
29     def setUp(self):
30         super(TestMAP, self).setUp()
31
32         # create 2 pg interfaces
33         self.create_pg_interfaces(range(4))
34
35         # pg0 is 'inside' IPv4
36         self.pg0.admin_up()
37         self.pg0.config_ip4()
38         self.pg0.resolve_arp()
39
40         # pg1 is 'outside' IPv6
41         self.pg1.admin_up()
42         self.pg1.config_ip6()
43         self.pg1.generate_remote_hosts(4)
44         self.pg1.configure_ipv6_neighbors()
45
46     def tearDown(self):
47         super(TestMAP, self).tearDown()
48         for i in self.pg_interfaces:
49             i.unconfig_ip4()
50             i.unconfig_ip6()
51             i.admin_down()
52
53     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
54         if not dmac:
55             dmac = self.pg1.remote_mac
56
57         self.pg0.add_stream(packets)
58
59         self.pg_enable_capture(self.pg_interfaces)
60         self.pg_start()
61
62         capture = self.pg1.get_capture(len(packets))
63         for rx, tx in zip(capture, packets):
64             self.assertEqual(rx[Ether].dst, dmac)
65             self.assertEqual(rx[IP].src, tx[IP].src)
66             self.assertEqual(rx[IPv6].src, ip6_src)
67             self.assertEqual(rx[IPv6].dst, ip6_dst)
68
69     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
70                                      dmac=None):
71         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
72
73     def test_api_map_domain_dump(self):
74         map_dst = '2001::/64'
75         map_src = '3000::1/128'
76         client_pfx = '192.168.0.0/16'
77         tag = 'MAP-E tag.'
78         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
79                                          ip6_prefix=map_dst,
80                                          ip6_src=map_src,
81                                          tag=tag).index
82         rv = self.vapi.map_domain_dump()
83
84         # restore the state early so as to not impact subsequent tests.
85         # If an assert fails, we will not get the chance to do it at the end.
86         self.vapi.map_del_domain(index=index)
87
88         self.assertGreater(len(rv), 0,
89                            "Expected output from 'map_domain_dump'")
90
91         # typedefs are returned as ipaddress objects.
92         # wrap results in str() ugh! to avoid the need to call unicode.
93         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
94         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
95         self.assertEqual(str(rv[0].ip6_src), map_src)
96
97         self.assertEqual(rv[0].tag, tag,
98                          "output produced incorrect tag value.")
99
100     def test_map_e_udp(self):
101         """ MAP-E UDP"""
102
103         #
104         # Add a route to the MAP-BR
105         #
106         map_br_pfx = "2001::"
107         map_br_pfx_len = 32
108         map_route = VppIpRoute(self,
109                                map_br_pfx,
110                                map_br_pfx_len,
111                                [VppRoutePath(self.pg1.remote_ip6,
112                                              self.pg1.sw_if_index)])
113         map_route.add_vpp_config()
114
115         #
116         # Add a domain that maps from pg0 to pg1
117         #
118         map_dst = '2001::/32'
119         map_src = '3000::1/128'
120         client_pfx = '192.168.0.0/16'
121         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
122         tag = 'MAP-E tag.'
123         self.vapi.map_add_domain(ip4_prefix=client_pfx,
124                                  ip6_prefix=map_dst,
125                                  ip6_src=map_src,
126                                  ea_bits_len=20,
127                                  psid_offset=4,
128                                  psid_length=4,
129                                  tag=tag)
130
131         self.vapi.map_param_set_security_check(enable=1, fragments=1)
132
133         # Enable MAP on interface.
134         self.vapi.map_if_enable_disable(is_enable=1,
135                                         sw_if_index=self.pg0.sw_if_index,
136                                         is_translation=0)
137
138         # Ensure MAP doesn't steal all packets!
139         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
140               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
141               UDP(sport=20000, dport=10000) /
142               Raw(b'\xa5' * 100))
143         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
144         v4_reply = v4[1]
145         v4_reply.ttl -= 1
146         for p in rx:
147             self.validate(p[1], v4_reply)
148
149         #
150         # Fire in a v4 packet that will be encapped to the BR
151         #
152         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
153               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
154               UDP(sport=20000, dport=10000) /
155               Raw(b'\xa5' * 100))
156
157         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
158
159         #
160         # Verify reordered fragments are able to pass as well
161         #
162         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
163               IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
164               UDP(sport=20000, dport=10000) /
165               Raw(b'\xa5' * 1000))
166
167         frags = fragment_rfc791(v4, 400)
168         frags.reverse()
169
170         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
171
172         # Enable MAP on interface.
173         self.vapi.map_if_enable_disable(is_enable=1,
174                                         sw_if_index=self.pg1.sw_if_index,
175                                         is_translation=0)
176
177         # Ensure MAP doesn't steal all packets
178         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
179               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
180               UDP(sport=20000, dport=10000) /
181               Raw(b'\xa5' * 100))
182         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
183         v6_reply = v6[1]
184         v6_reply.hlim -= 1
185         for p in rx:
186             self.validate(p[1], v6_reply)
187
188         #
189         # Fire in a V6 encapped packet.
190         # expect a decapped packet on the inside ip4 link
191         #
192         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
193              IPv6(dst='3000::1', src=map_translated_addr) /
194              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
195              UDP(sport=10000, dport=20000) /
196              Raw(b'\xa5' * 100))
197
198         self.pg1.add_stream(p)
199
200         self.pg_enable_capture(self.pg_interfaces)
201         self.pg_start()
202
203         rx = self.pg0.get_capture(1)
204         rx = rx[0]
205
206         self.assertFalse(rx.haslayer(IPv6))
207         self.assertEqual(rx[IP].src, p[IP].src)
208         self.assertEqual(rx[IP].dst, p[IP].dst)
209
210         #
211         # Verify encapped reordered fragments pass as well
212         #
213         p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
214              UDP(sport=10000, dport=20000) /
215              Raw(b'\xa5' * 1500))
216         frags = fragment_rfc791(p, 400)
217         frags.reverse()
218
219         stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
220                   IPv6(dst='3000::1', src=map_translated_addr) /
221                   x for x in frags)
222
223         self.pg1.add_stream(stream)
224
225         self.pg_enable_capture(self.pg_interfaces)
226         self.pg_start()
227
228         rx = self.pg0.get_capture(len(frags))
229
230         for r in rx:
231             self.assertFalse(r.haslayer(IPv6))
232             self.assertEqual(r[IP].src, p[IP].src)
233             self.assertEqual(r[IP].dst, p[IP].dst)
234
235         # Verify that fragments pass even if ipv6 layer is fragmented
236         stream = (IPv6(dst='3000::1', src=map_translated_addr) / x
237                   for x in frags)
238
239         v6_stream = [
240             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
241             for i in range(len(frags))
242             for x in fragment_rfc8200(
243                 IPv6(dst='3000::1', src=map_translated_addr) / frags[i],
244                 i, 200)]
245
246         self.pg1.add_stream(v6_stream)
247
248         self.pg_enable_capture(self.pg_interfaces)
249         self.pg_start()
250
251         rx = self.pg0.get_capture(len(frags))
252
253         for r in rx:
254             self.assertFalse(r.haslayer(IPv6))
255             self.assertEqual(r[IP].src, p[IP].src)
256             self.assertEqual(r[IP].dst, p[IP].dst)
257
258         #
259         # Pre-resolve. No API for this!!
260         #
261         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
262
263         self.send_and_assert_no_replies(self.pg0, v4,
264                                         "resolved via default route")
265
266         #
267         # Add a route to 4001::1. Expect the encapped traffic to be
268         # sent via that routes next-hop
269         #
270         pre_res_route = VppIpRoute(self, "4001::1", 128,
271                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
272                                                  self.pg1.sw_if_index)])
273         pre_res_route.add_vpp_config()
274
275         self.send_and_assert_encapped_one(v4, "3000::1",
276                                           map_translated_addr,
277                                           dmac=self.pg1.remote_hosts[2].mac)
278
279         #
280         # change the route to the pre-solved next-hop
281         #
282         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
283                                            self.pg1.sw_if_index)])
284         pre_res_route.add_vpp_config()
285
286         self.send_and_assert_encapped_one(v4, "3000::1",
287                                           map_translated_addr,
288                                           dmac=self.pg1.remote_hosts[3].mac)
289
290         #
291         # cleanup. The test infra's object registry will ensure
292         # the route is really gone and thus that the unresolve worked.
293         #
294         pre_res_route.remove_vpp_config()
295         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
296
297     def test_map_e_inner_frag(self):
298         """ MAP-E Inner fragmentation """
299
300         #
301         # Add a route to the MAP-BR
302         #
303         map_br_pfx = "2001::"
304         map_br_pfx_len = 32
305         map_route = VppIpRoute(self,
306                                map_br_pfx,
307                                map_br_pfx_len,
308                                [VppRoutePath(self.pg1.remote_ip6,
309                                              self.pg1.sw_if_index)])
310         map_route.add_vpp_config()
311
312         #
313         # Add a domain that maps from pg0 to pg1
314         #
315         map_dst = '2001::/32'
316         map_src = '3000::1/128'
317         client_pfx = '192.168.0.0/16'
318         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
319         tag = 'MAP-E tag.'
320         self.vapi.map_add_domain(ip4_prefix=client_pfx,
321                                  ip6_prefix=map_dst,
322                                  ip6_src=map_src,
323                                  ea_bits_len=20,
324                                  psid_offset=4,
325                                  psid_length=4,
326                                  mtu=1000,
327                                  tag=tag)
328
329         # Enable MAP on interface.
330         self.vapi.map_if_enable_disable(is_enable=1,
331                                         sw_if_index=self.pg0.sw_if_index,
332                                         is_translation=0)
333
334         # Enable inner fragmentation
335         self.vapi.map_param_set_fragmentation(inner=1)
336
337         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
338               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
339               UDP(sport=20000, dport=10000) /
340               Raw(b'\xa5' * 1300))
341
342         self.pg_send(self.pg0, v4*1)
343         rx = self.pg1.get_capture(2)
344
345         frags = fragment_rfc791(v4[1], 1000)
346         frags[0].id = 0
347         frags[1].id = 0
348         frags[0].ttl -= 1
349         frags[1].ttl -= 1
350         frags[0].chksum = 0
351         frags[1].chksum = 0
352
353         v6_reply1 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
354                      frags[0])
355         v6_reply2 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
356                      frags[1])
357         rx[0][1].fl = 0
358         rx[1][1].fl = 0
359         rx[0][1][IP].id = 0
360         rx[1][1][IP].id = 0
361         rx[0][1][IP].chksum = 0
362         rx[1][1][IP].chksum = 0
363
364         self.validate(rx[0][1], v6_reply1)
365         self.validate(rx[1][1], v6_reply2)
366
367     def test_map_e_tcp_mss(self):
368         """ MAP-E TCP MSS"""
369
370         #
371         # Add a route to the MAP-BR
372         #
373         map_br_pfx = "2001::"
374         map_br_pfx_len = 32
375         map_route = VppIpRoute(self,
376                                map_br_pfx,
377                                map_br_pfx_len,
378                                [VppRoutePath(self.pg1.remote_ip6,
379                                              self.pg1.sw_if_index)])
380         map_route.add_vpp_config()
381
382         #
383         # Add a domain that maps from pg0 to pg1
384         #
385         map_dst = '2001::/32'
386         map_src = '3000::1/128'
387         client_pfx = '192.168.0.0/16'
388         map_translated_addr = '2001:0:101:5000:0:c0a8:101:5'
389         tag = 'MAP-E TCP tag.'
390         self.vapi.map_add_domain(ip4_prefix=client_pfx,
391                                  ip6_prefix=map_dst,
392                                  ip6_src=map_src,
393                                  ea_bits_len=20,
394                                  psid_offset=4,
395                                  psid_length=4,
396                                  tag=tag)
397
398         # Enable MAP on pg0 interface.
399         self.vapi.map_if_enable_disable(is_enable=1,
400                                         sw_if_index=self.pg0.sw_if_index,
401                                         is_translation=0)
402
403         # Enable MAP on pg1 interface.
404         self.vapi.map_if_enable_disable(is_enable=1,
405                                         sw_if_index=self.pg1.sw_if_index,
406                                         is_translation=0)
407
408         # TCP MSS clamping
409         mss_clamp = 1300
410         self.vapi.map_param_set_tcp(mss_clamp)
411
412         #
413         # Send a v4 packet that will be encapped.
414         #
415         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
416         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.1.1')
417         p_tcp = TCP(sport=20000, dport=30000, flags="S",
418                     options=[("MSS", 1455)])
419         p4 = p_ether / p_ip4 / p_tcp
420
421         self.pg1.add_stream(p4)
422         self.pg_enable_capture(self.pg_interfaces)
423         self.pg_start()
424
425         rx = self.pg1.get_capture(1)
426         rx = rx[0]
427
428         self.assertTrue(rx.haslayer(IPv6))
429         self.assertEqual(rx[IP].src, p4[IP].src)
430         self.assertEqual(rx[IP].dst, p4[IP].dst)
431         self.assertEqual(rx[IPv6].src, "3000::1")
432         self.assertEqual(rx[TCP].options,
433                          TCP(options=[('MSS', mss_clamp)]).options)
434
435     def validate(self, rx, expected):
436         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
437
438     def payload(self, len):
439         return 'x' * len
440
441     def test_map_t(self):
442         """ MAP-T """
443
444         #
445         # Add a domain that maps from pg0 to pg1
446         #
447         map_dst = '2001:db8::/32'
448         map_src = '1234:5678:90ab:cdef::/64'
449         ip4_pfx = '192.168.0.0/24'
450         tag = 'MAP-T Tag.'
451
452         self.vapi.map_add_domain(ip6_prefix=map_dst,
453                                  ip4_prefix=ip4_pfx,
454                                  ip6_src=map_src,
455                                  ea_bits_len=16,
456                                  psid_offset=6,
457                                  psid_length=4,
458                                  mtu=1500,
459                                  tag=tag)
460
461         # Enable MAP-T on interfaces.
462         self.vapi.map_if_enable_disable(is_enable=1,
463                                         sw_if_index=self.pg0.sw_if_index,
464                                         is_translation=1)
465         self.vapi.map_if_enable_disable(is_enable=1,
466                                         sw_if_index=self.pg1.sw_if_index,
467                                         is_translation=1)
468
469         # Ensure MAP doesn't steal all packets!
470         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
471               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
472               UDP(sport=20000, dport=10000) /
473               Raw(b'\xa5' * 100))
474         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
475         v4_reply = v4[1]
476         v4_reply.ttl -= 1
477         for p in rx:
478             self.validate(p[1], v4_reply)
479         # Ensure MAP doesn't steal all packets
480         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
481               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
482               UDP(sport=20000, dport=10000) /
483               Raw(b'\xa5' * 100))
484         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
485         v6_reply = v6[1]
486         v6_reply.hlim -= 1
487         for p in rx:
488             self.validate(p[1], v6_reply)
489
490         map_route = VppIpRoute(self,
491                                "2001:db8::",
492                                32,
493                                [VppRoutePath(self.pg1.remote_ip6,
494                                              self.pg1.sw_if_index,
495                                              proto=DpoProto.DPO_PROTO_IP6)])
496         map_route.add_vpp_config()
497
498         #
499         # Send a v4 packet that will be translated
500         #
501         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
502         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
503         payload = TCP(sport=0xabcd, dport=0xabcd)
504
505         p4 = (p_ether / p_ip4 / payload)
506         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
507                               dst="2001:db8:1f0::c0a8:1:f") / payload)
508         p6_translated.hlim -= 1
509         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
510         for p in rx:
511             self.validate(p[1], p6_translated)
512
513         # Send back an IPv6 packet that will be "untranslated"
514         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
515         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
516                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
517         p6 = (p_ether6 / p_ip6 / payload)
518         p4_translated = (IP(src='192.168.0.1',
519                             dst=self.pg0.remote_ip4) / payload)
520         p4_translated.id = 0
521         p4_translated.ttl -= 1
522         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
523         for p in rx:
524             self.validate(p[1], p4_translated)
525
526         # IPv4 TTL
527         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
528         p4 = (p_ether / ip4_ttl_expired / payload)
529
530         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
531                           dst=self.pg0.remote_ip4) /
532                        ICMP(type='time-exceeded',
533                             code='ttl-zero-during-transit') /
534                        IP(src=self.pg0.remote_ip4,
535                           dst='192.168.0.1', ttl=0) / payload)
536         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
537         for p in rx:
538             self.validate(p[1], icmp4_reply)
539
540         '''
541         This one is broken, cause it would require hairpinning...
542         # IPv4 TTL TTL1
543         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
544         p4 = (p_ether / ip4_ttl_expired / payload)
545
546         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
547         dst=self.pg0.remote_ip4) / \
548         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
549         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
550         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
551         for p in rx:
552             self.validate(p[1], icmp4_reply)
553         '''
554
555         # IPv6 Hop limit
556         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
557                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
558         p6 = (p_ether6 / ip6_hlim_expired / payload)
559
560         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
561                             dst="2001:db8:1ab::c0a8:1:ab") /
562                        ICMPv6TimeExceeded(code=0) /
563                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
564                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
565                             hlim=0) / payload)
566         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
567         for p in rx:
568             self.validate(p[1], icmp6_reply)
569
570         # IPv4 Well-known port
571         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
572         payload = UDP(sport=200, dport=200)
573         p4 = (p_ether / p_ip4 / payload)
574         self.send_and_assert_no_replies(self.pg0, p4*1)
575
576         # IPv6 Well-known port
577         payload = UDP(sport=200, dport=200)
578         p6 = (p_ether6 / p_ip6 / payload)
579         self.send_and_assert_no_replies(self.pg1, p6*1)
580
581         # Packet fragmentation
582         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
583         p4 = (p_ether / p_ip4 / payload)
584         self.pg_enable_capture()
585         self.pg0.add_stream(p4)
586         self.pg_start()
587         rx = self.pg1.get_capture(2)
588         for p in rx:
589             pass
590             # TODO: Manual validation
591             # self.validate(p[1], icmp4_reply)
592
593         # Packet fragmentation send fragments
594         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
595         p4 = (p_ether / p_ip4 / payload)
596         frags = fragment(p4, fragsize=1000)
597         self.pg_enable_capture()
598         self.pg0.add_stream(frags)
599         self.pg_start()
600         rx = self.pg1.get_capture(2)
601         for p in rx:
602             pass
603             # p.show2()
604
605         # reass_pkt = reassemble(rx)
606         # p4_reply.ttl -= 1
607         # p4_reply.id = 256
608         # self.validate(reass_pkt, p4_reply)
609
610         # TCP MSS clamping
611         self.vapi.map_param_set_tcp(1300)
612
613         #
614         # Send a v4 TCP SYN packet that will be translated and MSS clamped
615         #
616         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
617         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
618         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
619                       options=[('MSS', 1460)])
620
621         p4 = (p_ether / p_ip4 / payload)
622         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
623                               dst="2001:db8:1f0::c0a8:1:f") / payload)
624         p6_translated.hlim -= 1
625         p6_translated[TCP].options = [('MSS', 1300)]
626         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
627         for p in rx:
628             self.validate(p[1], p6_translated)
629
630         # Send back an IPv6 packet that will be "untranslated"
631         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
632         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
633                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
634         p6 = (p_ether6 / p_ip6 / payload)
635         p4_translated = (IP(src='192.168.0.1',
636                             dst=self.pg0.remote_ip4) / payload)
637         p4_translated.id = 0
638         p4_translated.ttl -= 1
639         p4_translated[TCP].options = [('MSS', 1300)]
640         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
641         for p in rx:
642             self.validate(p[1], p4_translated)
643
644     def test_map_t_ip6_psid(self):
645         """ MAP-T v6->v4 PSID validation"""
646
647         #
648         # Add a domain that maps from pg0 to pg1
649         #
650         map_dst = '2001:db8::/32'
651         map_src = '1234:5678:90ab:cdef::/64'
652         ip4_pfx = '192.168.0.0/24'
653         tag = 'MAP-T Test Domain'
654
655         self.vapi.map_add_domain(ip6_prefix=map_dst,
656                                  ip4_prefix=ip4_pfx,
657                                  ip6_src=map_src,
658                                  ea_bits_len=16,
659                                  psid_offset=6,
660                                  psid_length=4,
661                                  mtu=1500,
662                                  tag=tag)
663
664         # Enable MAP-T on interfaces.
665         self.vapi.map_if_enable_disable(is_enable=1,
666                                         sw_if_index=self.pg0.sw_if_index,
667                                         is_translation=1)
668         self.vapi.map_if_enable_disable(is_enable=1,
669                                         sw_if_index=self.pg1.sw_if_index,
670                                         is_translation=1)
671
672         map_route = VppIpRoute(self,
673                                "2001:db8::",
674                                32,
675                                [VppRoutePath(self.pg1.remote_ip6,
676                                              self.pg1.sw_if_index,
677                                              proto=DpoProto.DPO_PROTO_IP6)])
678         map_route.add_vpp_config()
679
680         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
681         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
682                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
683
684         # Send good IPv6 source port, ensure translated IPv4 received
685         payload = TCP(sport=0xabcd, dport=80)
686         p6 = (p_ether6 / p_ip6 / payload)
687         p4_translated = (IP(src='192.168.0.1',
688                             dst=self.pg0.remote_ip4) / payload)
689         p4_translated.id = 0
690         p4_translated.ttl -= 1
691         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
692         for p in rx:
693             self.validate(p[1], p4_translated)
694
695         # Send bad IPv6 source port, ensure translated IPv4 not received
696         payload = TCP(sport=0xdcba, dport=80)
697         p6 = (p_ether6 / p_ip6 / payload)
698         self.send_and_assert_no_replies(self.pg1, p6*1)
699
700 if __name__ == '__main__':
701     unittest.main(testRunner=VppTestRunner)