631517e5d4e8dee0245d98b202c68abd07aa0723
[vpp.git] / src / plugins / map / test / test_map_br.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
17
18
19 class TestMAPBR(VppTestCase):
20     """ MAP-T Test Cases """
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAPBR, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAPBR, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAPBR, self).setUp()
32
33         #
34         # Create 2 pg interfaces.
35         # pg0 is IPv4
36         # pg1 is IPv6
37         #
38         self.create_pg_interfaces(range(2))
39
40         self.pg0.admin_up()
41         self.pg0.config_ip4()
42         self.pg1.generate_remote_hosts(20)
43         self.pg1.configure_ipv4_neighbors()
44         self.pg0.resolve_arp()
45
46         self.pg1.admin_up()
47         self.pg1.config_ip6()
48         self.pg1.generate_remote_hosts(20)
49         self.pg1.configure_ipv6_neighbors()
50
51         #
52         # BR configuration parameters used for all test.
53         #
54         self.ip4_prefix = '198.18.0.0/24'
55         self.ip6_prefix = '2001:db8:f0::/48'
56         self.ip6_src = '2001:db8:ffff:ff00::/64'
57         self.ea_bits_len = 12
58         self.psid_offset = 6
59         self.psid_length = 4
60         self.mtu = 1500
61         self.tag = 'MAP-T BR'
62
63         self.ipv4_internet_address = self.pg0.remote_ip4
64         self.ipv4_map_address = "198.18.0.12"
65         self.ipv4_udp_or_tcp_internet_port = 65000
66         self.ipv4_udp_or_tcp_map_port = 16606
67
68         self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3"      # 198.18.0.12
69         self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3"   # 198.18.0.28
70         self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3"      # 10.0.0.12
71         self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4"       # 4
72         self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3"     # f1
73
74         self.ipv6_udp_or_tcp_internet_port = 65000
75         self.ipv6_udp_or_tcp_map_port = 16606
76         self.ipv6_udp_or_tcp_spoof_port = 16862
77
78         self.ipv6_map_address = (
79             "2001:db8:ffff:ff00:ac:1001:200:0")         # 176.16.1.2
80         self.ipv6_map_same_rule_diff_addr = (
81             "2001:db8:ffff:ff00:c6:1200:1000:0")        # 198.18.0.16
82         self.ipv6_map_same_rule_same_addr = (
83             "2001:db8:ffff:ff00:c6:1200:c00:0")         # 198.18.0.12
84
85         self.map_br_prefix = "2001:db8:f0::"
86         self.map_br_prefix_len = 48
87         self.psid_number = 3
88
89         #
90         # Add an IPv6 route to the MAP-BR.
91         #
92         map_route = VppIpRoute(self,
93                                self.map_br_prefix,
94                                self.map_br_prefix_len,
95                                [VppRoutePath(self.pg1.remote_ip6,
96                                              self.pg1.sw_if_index)])
97         map_route.add_vpp_config()
98
99         #
100         # Add a MAP BR domain that maps from pg0 to pg1.
101         #
102         self.vapi.map_add_domain(ip4_prefix=self.ip4_prefix,
103                                  ip6_prefix=self.ip6_prefix,
104                                  ip6_src=self.ip6_src,
105                                  ea_bits_len=self.ea_bits_len,
106                                  psid_offset=self.psid_offset,
107                                  psid_length=self.psid_length,
108                                  mtu=self.mtu,
109                                  tag=self.tag)
110
111         #
112         # Set BR parameters.
113         #
114         self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
115         self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
116         self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
117         self.vapi.map_param_set_traffic_class(copy=1)
118
119         #
120         # Enable MAP-T on interfaces.
121         #
122         self.vapi.map_if_enable_disable(is_enable=1,
123                                         sw_if_index=self.pg0.sw_if_index,
124                                         is_translation=1)
125
126         self.vapi.map_if_enable_disable(is_enable=1,
127                                         sw_if_index=self.pg1.sw_if_index,
128                                         is_translation=1)
129
130         self.vapi.map_if_enable_disable(is_enable=1,
131                                         sw_if_index=self.pg1.sw_if_index,
132                                         is_translation=1)
133
134     def tearDown(self):
135         super(TestMAPBR, self).tearDown()
136         for i in self.pg_interfaces:
137             i.unconfig_ip4()
138             i.unconfig_ip6()
139             i.admin_down()
140
141     def v4_address_check(self, pkt):
142         self.assertEqual(pkt[IP].src, self.ipv4_map_address)
143         self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
144
145     def v4_port_check(self, pkt, proto):
146         self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
147         self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
148
149     def v6_address_check(self, pkt):
150         self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
151         self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
152
153     def v6_port_check(self, pkt, proto):
154         self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
155         self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
156
157     #
158     # Normal translation of UDP packets v4 -> v6 direction
159     # Send 128 frame size packet for IPv4/UDP.
160     # Received packet should be translated into IPv6 packet with no
161     # fragment header.
162     #
163
164     def test_map_t_udp_ip4_to_ip6(self):
165         """ MAP-T UDP IPv4 -> IPv6 """
166
167         eth = Ether(src=self.pg0.remote_mac,
168                     dst=self.pg0.local_mac)
169         ip = IP(src=self.pg0.remote_ip4,
170                 dst=self.ipv4_map_address,
171                 tos=0)
172         udp = UDP(sport=self.ipv4_udp_or_tcp_internet_port,
173                   dport=self.ipv4_udp_or_tcp_map_port)
174         payload = "a" * 82
175         tx_pkt = eth / ip / udp / payload
176
177         self.pg_send(self.pg0, tx_pkt * 1)
178
179         rx_pkts = self.pg1.get_capture(1)
180         rx_pkt = rx_pkts[0]
181
182         self.v6_address_check(rx_pkt)
183         self.v6_port_check(rx_pkt, UDP)
184         self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
185         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
186
187     #
188     # Normal translation of TCP packets v4 -> v6 direction.
189     # Send 128 frame size packet for IPv4/TCP.
190     # Received packet should be translated into IPv6 packet with no
191     # fragment header.
192     #
193
194     def test_map_t_tcp_ip4_to_ip6(self):
195         """ MAP-T TCP IPv4 -> IPv6 """
196
197         eth = Ether(src=self.pg0.remote_mac,
198                     dst=self.pg0.local_mac)
199         ip = IP(src=self.pg0.remote_ip4,
200                 dst=self.ipv4_map_address,
201                 tos=0)
202         tcp = TCP(sport=self.ipv4_udp_or_tcp_internet_port,
203                   dport=self.ipv4_udp_or_tcp_map_port)
204         payload = "a" * 82
205         tx_pkt = eth / ip / tcp / payload
206
207         self.pg_send(self.pg0, tx_pkt * 1)
208
209         rx_pkts = self.pg1.get_capture(1)
210         rx_pkt = rx_pkts[0]
211
212         self.v6_address_check(rx_pkt)
213         self.v6_port_check(rx_pkt, TCP)
214         self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
215         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
216
217     #
218     # Normal translation of UDP packets v6 -> v4 direction
219     # Send 128 frame size packet for IPv6/UDP.
220     # Received packet should be translated into an IPv4 packet with DF=1.
221     #
222
223     def test_map_t_udp_ip6_to_ip4(self):
224         """ MAP-T UDP IPv6 -> IPv4 """
225
226         eth = Ether(src=self.pg1.remote_mac,
227                     dst=self.pg1.local_mac)
228         ip = IPv6(src=self.ipv6_cpe_address,
229                   dst=self.ipv6_map_address)
230         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
231                   dport=self.ipv6_udp_or_tcp_internet_port)
232         payload = "a" * 82
233         tx_pkt = eth / ip / udp / payload
234
235         self.pg_send(self.pg1, tx_pkt * 1)
236
237         rx_pkts = self.pg0.get_capture(1)
238         rx_pkt = rx_pkts[0]
239
240         self.v4_address_check(rx_pkt)
241         self.v4_port_check(rx_pkt, UDP)
242         self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
243         self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
244         df_bit = IP(flags="DF").flags
245         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
246
247     #
248     # Normal translation of TCP packets v6 -> v4 direction
249     # Send 128 frame size packet for IPv6/TCP.
250     # Received packet should be translated into an IPv4 packet with DF=1
251     #
252
253     def test_map_t_tcp_ip6_to_ip4(self):
254         """ MAP-T TCP IPv6 -> IPv4 """
255
256         eth = Ether(src=self.pg1.remote_mac,
257                     dst=self.pg1.local_mac)
258         ip = IPv6(src=self.ipv6_cpe_address,
259                   dst=self.ipv6_map_address)
260         tcp = TCP(sport=self.ipv6_udp_or_tcp_map_port,
261                   dport=self.ipv6_udp_or_tcp_internet_port)
262         payload = "a" * 82
263         tx_pkt = eth / ip / tcp / payload
264
265         self.pg_send(self.pg1, tx_pkt * 1)
266
267         rx_pkts = self.pg0.get_capture(1)
268         rx_pkt = rx_pkts[0]
269
270         self.v4_address_check(rx_pkt)
271         self.v4_port_check(rx_pkt, TCP)
272         self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
273         self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
274         df_bit = IP(flags="DF").flags
275         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
276
277     #
278     # Translation of ICMP Echo Request v4 -> v6 direction
279     # Received packet should be translated into an IPv6 Echo Request.
280     #
281
282     def test_map_t_echo_request_ip4_to_ip6(self):
283         """ MAP-T echo request IPv4 -> IPv6 """
284
285         eth = Ether(src=self.pg1.remote_mac,
286                     dst=self.pg1.local_mac)
287         ip = IP(src=self.pg0.remote_ip4,
288                 dst=self.ipv4_map_address)
289         icmp = ICMP(type="echo-request",
290                     id=self.ipv6_udp_or_tcp_map_port)
291         payload = "H" * 10
292         tx_pkt = eth / ip / icmp / payload
293
294         self.pg_send(self.pg0, tx_pkt * 1)
295
296         rx_pkts = self.pg1.get_capture(1)
297         rx_pkt = rx_pkts[0]
298
299         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
300         self.assertEqual(rx_pkt[ICMPv6EchoRequest].type,
301                          ICMPv6EchoRequest(type="Echo Request").type)
302         self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303         self.assertEqual(rx_pkt[ICMPv6EchoRequest].id,
304                          self.ipv6_udp_or_tcp_map_port)
305
306     #
307     # Translation of ICMP Echo Reply v4 -> v6 direction
308     # Received packet should be translated into an IPv6 Echo Reply.
309     #
310
311     def test_map_t_echo_reply_ip4_to_ip6(self):
312         """ MAP-T echo reply IPv4 -> IPv6 """
313
314         eth = Ether(src=self.pg1.remote_mac,
315                     dst=self.pg1.local_mac)
316         ip = IP(src=self.pg0.remote_ip4,
317                 dst=self.ipv4_map_address)
318         icmp = ICMP(type="echo-reply",
319                     id=self.ipv6_udp_or_tcp_map_port)
320         payload = "H" * 10
321         tx_pkt = eth / ip / icmp / payload
322
323         self.pg_send(self.pg0, tx_pkt * 1)
324
325         rx_pkts = self.pg1.get_capture(1)
326         rx_pkt = rx_pkts[0]
327
328         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
329         self.assertEqual(rx_pkt[ICMPv6EchoReply].type,
330                          ICMPv6EchoReply(type="Echo Reply").type)
331         self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
332         self.assertEqual(rx_pkt[ICMPv6EchoReply].id,
333                          self.ipv6_udp_or_tcp_map_port)
334
335     #
336     # Translation of ICMP Time Exceeded v4 -> v6 direction
337     # Received packet should be translated into an IPv6 Time Exceeded.
338     #
339
340     def test_map_t_time_exceeded_ip4_to_ip6(self):
341         """ MAP-T time exceeded IPv4 -> IPv6 """
342
343         eth = Ether(src=self.pg0.remote_mac,
344                     dst=self.pg0.local_mac)
345         ip = IP(src=self.pg0.remote_ip4,
346                 dst=self.ipv4_map_address)
347         icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
348         ip_inner = IP(dst=self.pg0.remote_ip4,
349                       src=self.ipv4_map_address, ttl=1)
350         udp_inner = UDP(sport=self.ipv4_udp_or_tcp_map_port,
351                         dport=self.ipv4_udp_or_tcp_internet_port)
352         payload = "H" * 10
353         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
354
355         self.pg_send(self.pg0, tx_pkt * 1)
356
357         rx_pkts = self.pg1.get_capture(1)
358         rx_pkt = rx_pkts[0]
359
360         self.v6_address_check(rx_pkt)
361         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
362         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type,
363                          ICMPv6TimeExceeded().type)
364         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].code,
365                          ICMPv6TimeExceeded(
366                             code="hop limit exceeded in transit").code)
367         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
368         self.assertTrue(rx_pkt.haslayer(IPerror6))
369         self.assertTrue(rx_pkt.haslayer(UDPerror))
370         self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
371         self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
372         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
373         self.assertEqual(rx_pkt[UDPerror].dport,
374                          self.ipv6_udp_or_tcp_internet_port)
375
376     #
377     # Translation of ICMP Echo Request v6 -> v4 direction
378     # Received packet should be translated into an IPv4 Echo Request.
379     #
380
381     def test_map_t_echo_request_ip6_to_ip4(self):
382         """ MAP-T echo request IPv6 -> IPv4 """
383
384         eth = Ether(src=self.pg1.remote_mac,
385                     dst=self.pg1.local_mac)
386         ip = IPv6(src=self.ipv6_cpe_address,
387                   dst=self.ipv6_map_address)
388         icmp = ICMPv6EchoRequest()
389         icmp.id = self.ipv6_udp_or_tcp_map_port
390         payload = "H" * 10
391         tx_pkt = eth / ip / icmp / payload
392
393         self.pg_send(self.pg1, tx_pkt * 1)
394
395         rx_pkts = self.pg0.get_capture(1)
396         rx_pkt = rx_pkts[0]
397
398         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
399         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
400         self.assertEqual(rx_pkt[ICMP].code, 0)
401         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
402
403     #
404     # Translation of ICMP Echo Reply v6 -> v4 direction
405     # Received packet should be translated into an IPv4 Echo Reply.
406     #
407
408     def test_map_t_echo_reply_ip6_to_ip4(self):
409         """ MAP-T echo reply IPv6 -> IPv4 """
410
411         eth = Ether(src=self.pg1.remote_mac,
412                     dst=self.pg1.local_mac)
413         ip = IPv6(src=self.ipv6_cpe_address,
414                   dst=self.ipv6_map_address)
415         icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
416         payload = "H" * 10
417         tx_pkt = eth / ip / icmp / payload
418
419         self.pg_send(self.pg1, tx_pkt * 1)
420
421         rx_pkts = self.pg0.get_capture(1)
422         rx_pkt = rx_pkts[0]
423
424         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
425         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
426         self.assertEqual(rx_pkt[ICMP].code, 0)
427         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
428
429     #
430     # Spoofed IPv4 Source Address v6 -> v4 direction
431     # Send a packet with a wrong IPv4 address embedded in bits 72-103.
432     # The BR should either drop the packet, or rewrite the spoofed
433     # source IPv4 as the actual source IPv4 address.
434     # The BR really should drop the packet.
435     #
436
437     def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
438         """ MAP-T spoof ipv4 src addr IPv6 -> IPv4 """
439
440         eth = Ether(src=self.pg1.remote_mac,
441                     dst=self.pg1.local_mac)
442         ip = IPv6(src=self.ipv6_spoof_address,
443                   dst=self.ipv6_map_address)
444         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
445                   dport=self.ipv6_udp_or_tcp_internet_port)
446         payload = "a" * 82
447         tx_pkt = eth / ip / udp / payload
448
449         self.pg_send(self.pg1, tx_pkt * 1)
450
451         self.pg0.get_capture(0, timeout=1)
452         self.pg0.assert_nothing_captured("Should drop IPv4 spoof address")
453
454     #
455     # Spoofed IPv4 Source Prefix v6 -> v4 direction
456     # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
457     # The BR should either drop the packet, or rewrite the source IPv4
458     # to the prefix that matches the source IPv4 address.
459     #
460
461     def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
462         """ MAP-T spoof ipv4 src prefix IPv6 -> IPv4 """
463
464         eth = Ether(src=self.pg1.remote_mac,
465                     dst=self.pg1.local_mac)
466         ip = IPv6(src=self.ipv6_spoof_prefix,
467                   dst=self.ipv6_map_address)
468         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
469                   dport=self.ipv6_udp_or_tcp_internet_port)
470         payload = "a" * 82
471         tx_pkt = eth / ip / udp / payload
472
473         self.pg_send(self.pg1, tx_pkt * 1)
474
475         self.pg0.get_capture(0, timeout=1)
476         self.pg0.assert_nothing_captured("Should drop IPv4 spoof prefix")
477
478     #
479     # Spoofed IPv6 PSID v6 -> v4 direction
480     # Send a packet with a wrong IPv6 port PSID
481     # The BR should drop the packet.
482     #
483
484     def test_map_t_spoof_psid_ip6_to_ip4(self):
485         """ MAP-T spoof psid IPv6 -> IPv4 """
486
487         eth = Ether(src=self.pg1.remote_mac,
488                     dst=self.pg1.local_mac)
489         ip = IPv6(src=self.ipv6_spoof_psid,
490                   dst=self.ipv6_map_address)
491         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
492                   dport=self.ipv6_udp_or_tcp_internet_port)
493         payload = "a" * 82
494         tx_pkt = eth / ip / udp / payload
495
496         self.pg_send(self.pg1, tx_pkt * 1)
497
498         self.pg0.get_capture(0, timeout=1)
499         self.pg0.assert_nothing_captured("Should drop IPv6 spoof PSID")
500
501     #
502     # Spoofed IPv6 subnet field v6 -> v4 direction
503     # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
504     # The BR should drop the packet.
505     #
506
507     def test_map_t_spoof_subnet_ip6_to_ip4(self):
508         """ MAP-T spoof subnet IPv6 -> IPv4 """
509
510         eth = Ether(src=self.pg1.remote_mac,
511                     dst=self.pg1.local_mac)
512         ip = IPv6(src=self.ipv6_spoof_subnet,
513                   dst=self.ipv6_map_address)
514         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
515                   dport=self.ipv6_udp_or_tcp_internet_port)
516         payload = "a" * 82
517         tx_pkt = eth / ip / udp / payload
518
519         self.pg_send(self.pg1, tx_pkt * 1)
520
521         self.pg0.get_capture(0, timeout=1)
522         self.pg0.assert_nothing_captured("Should drop IPv6 spoof subnet")
523
524     #
525     # Spoofed IPv6 port PSID v6 -> v4 direction
526     # Send a packet with a wrong IPv6 port PSID
527     # The BR should drop the packet.
528     #
529
530     def test_map_t_spoof_port_psid_ip6_to_ip4(self):
531         """ MAP-T spoof port psid IPv6 -> IPv4 """
532
533         eth = Ether(src=self.pg1.remote_mac,
534                     dst=self.pg1.local_mac)
535         ip = IPv6(src=self.ipv6_cpe_address,
536                   dst=self.ipv6_map_address)
537         udp = UDP(sport=self.ipv6_udp_or_tcp_spoof_port,
538                   dport=self.ipv6_udp_or_tcp_internet_port)
539         payload = "a" * 82
540         tx_pkt = eth / ip / udp / payload
541
542         self.pg_send(self.pg1, tx_pkt * 1)
543
544         self.pg0.get_capture(0, timeout=1)
545         self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
546
547     #
548     # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
549     # Send a packet with a wrong IPv6 IMCP ID PSID
550     # The BR should drop the packet.
551     #
552
553     def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
554         """ MAP-T spoof ICMP id psid IPv6 -> IPv4 """
555
556         eth = Ether(src=self.pg1.remote_mac,
557                     dst=self.pg1.local_mac)
558         ip = IPv6(src=self.ipv6_cpe_address,
559                   dst=self.ipv6_map_address)
560         icmp = ICMPv6EchoRequest()
561         icmp.id = self.ipv6_udp_or_tcp_spoof_port
562         payload = "H" * 10
563         tx_pkt = eth / ip / icmp / payload
564
565         self.pg_send(self.pg1, tx_pkt * 1)
566
567         self.pg0.get_capture(0, timeout=1)
568         self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
569
570     #
571     # Map to Map - same rule, different address
572     #
573
574     @unittest.skip("Fixme: correct behavior needs clarification")
575     def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
576         """ MAP-T same rule, diff addr IPv6 -> IPv6 """
577
578         eth = Ether(src=self.pg1.remote_mac,
579                     dst=self.pg1.local_mac)
580         ip = IPv6(src=self.ipv6_cpe_address,
581                   dst=self.ipv6_map_same_rule_diff_addr)
582         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
583                   dport=1025)
584         payload = "a" * 82
585         tx_pkt = eth / ip / udp / payload
586
587         self.pg_send(self.pg1, tx_pkt * 1)
588
589         rx_pkts = self.pg1.get_capture(1)
590         rx_pkt = rx_pkts[0]
591
592     #
593     # Map to Map - same rule, same address
594     #
595
596     @unittest.skip("Fixme: correct behavior needs clarification")
597     def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
598         """ MAP-T same rule, same addr IPv6 -> IPv6 """
599
600         eth = Ether(src=self.pg1.remote_mac,
601                     dst=self.pg1.local_mac)
602         ip = IPv6(src=self.ipv6_cpe_address,
603                   dst=self.ipv6_map_same_rule_same_addr)
604         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
605                   dport=1025)
606         payload = "a" * 82
607         tx_pkt = eth / ip / udp / payload
608
609         self.pg_send(self.pg1, tx_pkt * 1)
610
611         rx_pkts = self.pg1.get_capture(1)
612         rx_pkt = rx_pkts[0]
613
614 if __name__ == '__main__':
615     unittest.main(testRunner=VppTestRunner)