tests: replace pycodestyle with black
[vpp.git] / test / test_map_br.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
17
18
19 class TestMAPBR(VppTestCase):
20     """MAP-T Test Cases"""
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAPBR, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAPBR, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAPBR, self).setUp()
32
33         #
34         # Create 2 pg interfaces.
35         # pg0 is IPv4
36         # pg1 is IPv6
37         #
38         self.create_pg_interfaces(range(2))
39
40         self.pg0.admin_up()
41         self.pg0.config_ip4()
42         self.pg1.generate_remote_hosts(20)
43         self.pg1.configure_ipv4_neighbors()
44         self.pg0.resolve_arp()
45
46         self.pg1.admin_up()
47         self.pg1.config_ip6()
48         self.pg1.generate_remote_hosts(20)
49         self.pg1.configure_ipv6_neighbors()
50
51         #
52         # BR configuration parameters used for all test.
53         #
54         self.ip4_prefix = "198.18.0.0/24"
55         self.ip6_prefix = "2001:db8:f0::/48"
56         self.ip6_src = "2001:db8:ffff:ff00::/64"
57         self.ea_bits_len = 12
58         self.psid_offset = 6
59         self.psid_length = 4
60         self.mtu = 1500
61         self.tag = "MAP-T BR"
62
63         self.ipv4_internet_address = self.pg0.remote_ip4
64         self.ipv4_map_address = "198.18.0.12"
65         self.ipv4_udp_or_tcp_internet_port = 65000
66         self.ipv4_udp_or_tcp_map_port = 16606
67
68         self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3"  # 198.18.0.12
69         self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3"  # 198.18.0.28
70         self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3"  # 10.0.0.12
71         self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4"  # 4
72         self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3"  # f1
73
74         self.ipv6_udp_or_tcp_internet_port = 65000
75         self.ipv6_udp_or_tcp_map_port = 16606
76         self.ipv6_udp_or_tcp_spoof_port = 16862
77
78         self.ipv6_map_address = "2001:db8:ffff:ff00:ac:1001:200:0"  # 176.16.1.2
79         self.ipv6_map_same_rule_diff_addr = (
80             "2001:db8:ffff:ff00:c6:1200:1000:0"  # 198.18.0.16
81         )
82         self.ipv6_map_same_rule_same_addr = (
83             "2001:db8:ffff:ff00:c6:1200:c00:0"  # 198.18.0.12
84         )
85
86         self.map_br_prefix = "2001:db8:f0::"
87         self.map_br_prefix_len = 48
88         self.psid_number = 3
89
90         #
91         # Add an IPv6 route to the MAP-BR.
92         #
93         map_route = VppIpRoute(
94             self,
95             self.map_br_prefix,
96             self.map_br_prefix_len,
97             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
98         )
99         map_route.add_vpp_config()
100
101         #
102         # Add a MAP BR domain that maps from pg0 to pg1.
103         #
104         self.vapi.map_add_domain(
105             ip4_prefix=self.ip4_prefix,
106             ip6_prefix=self.ip6_prefix,
107             ip6_src=self.ip6_src,
108             ea_bits_len=self.ea_bits_len,
109             psid_offset=self.psid_offset,
110             psid_length=self.psid_length,
111             mtu=self.mtu,
112             tag=self.tag,
113         )
114
115         #
116         # Set BR parameters.
117         #
118         self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
119         self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
120         self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
121         self.vapi.map_param_set_traffic_class(copy=1)
122
123         #
124         # Enable MAP-T on interfaces.
125         #
126         self.vapi.map_if_enable_disable(
127             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
128         )
129
130         self.vapi.map_if_enable_disable(
131             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
132         )
133
134         self.vapi.map_if_enable_disable(
135             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
136         )
137
138     def tearDown(self):
139         super(TestMAPBR, self).tearDown()
140         for i in self.pg_interfaces:
141             i.unconfig_ip4()
142             i.unconfig_ip6()
143             i.admin_down()
144
145     def v4_address_check(self, pkt):
146         self.assertEqual(pkt[IP].src, self.ipv4_map_address)
147         self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
148
149     def v4_port_check(self, pkt, proto):
150         self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
151         self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
152
153     def v6_address_check(self, pkt):
154         self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
155         self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
156
157     def v6_port_check(self, pkt, proto):
158         self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
159         self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
160
161     #
162     # Normal translation of UDP packets v4 -> v6 direction
163     # Send 128 frame size packet for IPv4/UDP.
164     # Received packet should be translated into IPv6 packet with no
165     # fragment header.
166     #
167
168     def test_map_t_udp_ip4_to_ip6(self):
169         """MAP-T UDP IPv4 -> IPv6"""
170
171         eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
172         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address, tos=0)
173         udp = UDP(
174             sport=self.ipv4_udp_or_tcp_internet_port,
175             dport=self.ipv4_udp_or_tcp_map_port,
176         )
177         payload = "a" * 82
178         tx_pkt = eth / ip / udp / payload
179
180         self.pg_send(self.pg0, tx_pkt * 1)
181
182         rx_pkts = self.pg1.get_capture(1)
183         rx_pkt = rx_pkts[0]
184
185         self.v6_address_check(rx_pkt)
186         self.v6_port_check(rx_pkt, UDP)
187         self.assertEqual(rx_pkt[IPv6].tc, 0)  # IPv4 ToS passed to v6 TC
188         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
189
190     #
191     # Normal translation of TCP packets v4 -> v6 direction.
192     # Send 128 frame size packet for IPv4/TCP.
193     # Received packet should be translated into IPv6 packet with no
194     # fragment header.
195     #
196
197     def test_map_t_tcp_ip4_to_ip6(self):
198         """MAP-T TCP IPv4 -> IPv6"""
199
200         eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
201         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address, tos=0)
202         tcp = TCP(
203             sport=self.ipv4_udp_or_tcp_internet_port,
204             dport=self.ipv4_udp_or_tcp_map_port,
205         )
206         payload = "a" * 82
207         tx_pkt = eth / ip / tcp / payload
208
209         self.pg_send(self.pg0, tx_pkt * 1)
210
211         rx_pkts = self.pg1.get_capture(1)
212         rx_pkt = rx_pkts[0]
213
214         self.v6_address_check(rx_pkt)
215         self.v6_port_check(rx_pkt, TCP)
216         self.assertEqual(rx_pkt[IPv6].tc, 0)  # IPv4 ToS passed to v6 TC
217         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
218
219     #
220     # Normal translation of UDP packets v6 -> v4 direction
221     # Send 128 frame size packet for IPv6/UDP.
222     # Received packet should be translated into an IPv4 packet with DF=1.
223     #
224
225     def test_map_t_udp_ip6_to_ip4(self):
226         """MAP-T UDP IPv6 -> IPv4"""
227
228         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
229         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
230         udp = UDP(
231             sport=self.ipv6_udp_or_tcp_map_port,
232             dport=self.ipv6_udp_or_tcp_internet_port,
233         )
234         payload = "a" * 82
235         tx_pkt = eth / ip / udp / payload
236
237         self.pg_send(self.pg1, tx_pkt * 1)
238
239         rx_pkts = self.pg0.get_capture(1)
240         rx_pkt = rx_pkts[0]
241
242         self.v4_address_check(rx_pkt)
243         self.v4_port_check(rx_pkt, UDP)
244         self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
245         self.assertEqual(rx_pkt[IP].tos, 0)  # IPv6 TC passed to v4 ToS
246         df_bit = IP(flags="DF").flags
247         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
248
249     #
250     # Normal translation of TCP packets v6 -> v4 direction
251     # Send 128 frame size packet for IPv6/TCP.
252     # Received packet should be translated into an IPv4 packet with DF=1
253     #
254
255     def test_map_t_tcp_ip6_to_ip4(self):
256         """MAP-T TCP IPv6 -> IPv4"""
257
258         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
259         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
260         tcp = TCP(
261             sport=self.ipv6_udp_or_tcp_map_port,
262             dport=self.ipv6_udp_or_tcp_internet_port,
263         )
264         payload = "a" * 82
265         tx_pkt = eth / ip / tcp / payload
266
267         self.pg_send(self.pg1, tx_pkt * 1)
268
269         rx_pkts = self.pg0.get_capture(1)
270         rx_pkt = rx_pkts[0]
271
272         self.v4_address_check(rx_pkt)
273         self.v4_port_check(rx_pkt, TCP)
274         self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
275         self.assertEqual(rx_pkt[IP].tos, 0)  # IPv6 TC passed to v4 ToS
276         df_bit = IP(flags="DF").flags
277         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
278
279     #
280     # Translation of ICMP Echo Request v4 -> v6 direction
281     # Received packet should be translated into an IPv6 Echo Request.
282     #
283
284     def test_map_t_echo_request_ip4_to_ip6(self):
285         """MAP-T echo request IPv4 -> IPv6"""
286
287         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
288         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
289         icmp = ICMP(type="echo-request", id=self.ipv6_udp_or_tcp_map_port)
290         payload = "H" * 10
291         tx_pkt = eth / ip / icmp / payload
292
293         self.pg_send(self.pg0, tx_pkt * 1)
294
295         rx_pkts = self.pg1.get_capture(1)
296         rx_pkt = rx_pkts[0]
297
298         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
299         self.assertEqual(
300             rx_pkt[ICMPv6EchoRequest].type, ICMPv6EchoRequest(type="Echo Request").type
301         )
302         self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303         self.assertEqual(rx_pkt[ICMPv6EchoRequest].id, self.ipv6_udp_or_tcp_map_port)
304
305     #
306     # Translation of ICMP Echo Reply v4 -> v6 direction
307     # Received packet should be translated into an IPv6 Echo Reply.
308     #
309
310     def test_map_t_echo_reply_ip4_to_ip6(self):
311         """MAP-T echo reply IPv4 -> IPv6"""
312
313         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
314         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
315         icmp = ICMP(type="echo-reply", id=self.ipv6_udp_or_tcp_map_port)
316         payload = "H" * 10
317         tx_pkt = eth / ip / icmp / payload
318
319         self.pg_send(self.pg0, tx_pkt * 1)
320
321         rx_pkts = self.pg1.get_capture(1)
322         rx_pkt = rx_pkts[0]
323
324         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
325         self.assertEqual(
326             rx_pkt[ICMPv6EchoReply].type, ICMPv6EchoReply(type="Echo Reply").type
327         )
328         self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
329         self.assertEqual(rx_pkt[ICMPv6EchoReply].id, self.ipv6_udp_or_tcp_map_port)
330
331     #
332     # Translation of ICMP Time Exceeded v4 -> v6 direction
333     # Received packet should be translated into an IPv6 Time Exceeded.
334     #
335
336     def test_map_t_time_exceeded_ip4_to_ip6(self):
337         """MAP-T time exceeded IPv4 -> IPv6"""
338
339         eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
340         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
341         icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
342         ip_inner = IP(dst=self.pg0.remote_ip4, src=self.ipv4_map_address, ttl=1)
343         udp_inner = UDP(
344             sport=self.ipv4_udp_or_tcp_map_port,
345             dport=self.ipv4_udp_or_tcp_internet_port,
346         )
347         payload = "H" * 10
348         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
349
350         self.pg_send(self.pg0, tx_pkt * 1)
351
352         rx_pkts = self.pg1.get_capture(1)
353         rx_pkt = rx_pkts[0]
354
355         self.v6_address_check(rx_pkt)
356         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
357         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type, ICMPv6TimeExceeded().type)
358         self.assertEqual(
359             rx_pkt[ICMPv6TimeExceeded].code,
360             ICMPv6TimeExceeded(code="hop limit exceeded in transit").code,
361         )
362         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
363         self.assertTrue(rx_pkt.haslayer(IPerror6))
364         self.assertTrue(rx_pkt.haslayer(UDPerror))
365         self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
366         self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
367         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
368         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv6_udp_or_tcp_internet_port)
369
370     #
371     # Translation of ICMP Echo Request v6 -> v4 direction
372     # Received packet should be translated into an IPv4 Echo Request.
373     #
374
375     def test_map_t_echo_request_ip6_to_ip4(self):
376         """MAP-T echo request IPv6 -> IPv4"""
377
378         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
379         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
380         icmp = ICMPv6EchoRequest()
381         icmp.id = self.ipv6_udp_or_tcp_map_port
382         payload = "H" * 10
383         tx_pkt = eth / ip / icmp / payload
384
385         self.pg_send(self.pg1, tx_pkt * 1)
386
387         rx_pkts = self.pg0.get_capture(1)
388         rx_pkt = rx_pkts[0]
389
390         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
391         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
392         self.assertEqual(rx_pkt[ICMP].code, 0)
393         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
394
395     #
396     # Translation of ICMP Echo Reply v6 -> v4 direction
397     # Received packet should be translated into an IPv4 Echo Reply.
398     #
399
400     def test_map_t_echo_reply_ip6_to_ip4(self):
401         """MAP-T echo reply IPv6 -> IPv4"""
402
403         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
404         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
405         icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
406         payload = "H" * 10
407         tx_pkt = eth / ip / icmp / payload
408
409         self.pg_send(self.pg1, tx_pkt * 1)
410
411         rx_pkts = self.pg0.get_capture(1)
412         rx_pkt = rx_pkts[0]
413
414         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
415         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
416         self.assertEqual(rx_pkt[ICMP].code, 0)
417         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
418
419     #
420     # Translation of ICMP Packet Too Big v6 -> v4 direction
421     # Received packet should be translated into an IPv4 Dest Unreachable.
422     #
423
424     def test_map_t_packet_too_big_ip6_to_ip4(self):
425         """MAP-T packet too big IPv6 -> IPv4"""
426
427         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
428         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
429         icmp = ICMPv6PacketTooBig(mtu=1280)
430         ip_inner = IPv6(src=self.ipv6_map_address, dst=self.ipv6_cpe_address)
431         udp_inner = UDP(
432             sport=self.ipv6_udp_or_tcp_internet_port,
433             dport=self.ipv6_udp_or_tcp_map_port,
434         )
435         payload = "H" * 10
436         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
437
438         self.pg_send(self.pg1, tx_pkt * 1)
439
440         rx_pkts = self.pg0.get_capture(1)
441         rx_pkt = rx_pkts[0]
442
443         self.v4_address_check(rx_pkt)
444         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
445         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
446         self.assertEqual(rx_pkt[ICMP].code, ICMP(code="fragmentation-needed").code)
447         self.assertEqual(rx_pkt[ICMP].nexthopmtu, tx_pkt[ICMPv6PacketTooBig].mtu - 20)
448         self.assertTrue(rx_pkt.haslayer(IPerror))
449         self.assertTrue(rx_pkt.haslayer(UDPerror))
450         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
451         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
452         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv4_udp_or_tcp_internet_port)
453         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
454
455     #
456     # Translation of ICMP Time Exceeded v6 -> v4 direction
457     # Received packet should be translated into an IPv4 Time Exceeded.
458     #
459
460     def test_map_t_time_exceeded_ip6_to_ip4(self):
461         """MAP-T time exceeded IPv6 -> IPv4"""
462
463         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
464         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
465         icmp = ICMPv6TimeExceeded()
466         ip_inner = IPv6(src=self.ipv6_map_address, dst=self.ipv6_cpe_address, hlim=1)
467         udp_inner = UDP(
468             sport=self.ipv6_udp_or_tcp_internet_port,
469             dport=self.ipv6_udp_or_tcp_map_port,
470         )
471         payload = "H" * 10
472         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
473
474         self.pg_send(self.pg1, tx_pkt * 1)
475
476         rx_pkts = self.pg0.get_capture(1)
477         rx_pkt = rx_pkts[0]
478
479         self.v4_address_check(rx_pkt)
480         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
481         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
482         self.assertEqual(rx_pkt[ICMP].code, ICMP(code="ttl-zero-during-transit").code)
483         self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
484         self.assertTrue(rx_pkt.haslayer(IPerror))
485         self.assertTrue(rx_pkt.haslayer(UDPerror))
486         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
487         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
488         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv4_udp_or_tcp_internet_port)
489         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
490
491     #
492     # Spoofed IPv4 Source Address v6 -> v4 direction
493     # Send a packet with a wrong IPv4 address embedded in bits 72-103.
494     # The BR should either drop the packet, or rewrite the spoofed
495     # source IPv4 as the actual source IPv4 address.
496     # The BR really should drop the packet.
497     #
498
499     def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
500         """MAP-T spoof ipv4 src addr IPv6 -> IPv4"""
501
502         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
503         ip = IPv6(src=self.ipv6_spoof_address, dst=self.ipv6_map_address)
504         udp = UDP(
505             sport=self.ipv6_udp_or_tcp_map_port,
506             dport=self.ipv6_udp_or_tcp_internet_port,
507         )
508         payload = "a" * 82
509         tx_pkt = eth / ip / udp / payload
510
511         self.pg_send(self.pg1, tx_pkt * 1)
512
513         self.pg0.get_capture(0, timeout=1)
514         self.pg0.assert_nothing_captured(remark="Should drop IPv4 spoof address")
515
516     #
517     # Spoofed IPv4 Source Prefix v6 -> v4 direction
518     # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
519     # The BR should either drop the packet, or rewrite the source IPv4
520     # to the prefix that matches the source IPv4 address.
521     #
522
523     def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
524         """MAP-T spoof ipv4 src prefix IPv6 -> IPv4"""
525
526         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
527         ip = IPv6(src=self.ipv6_spoof_prefix, dst=self.ipv6_map_address)
528         udp = UDP(
529             sport=self.ipv6_udp_or_tcp_map_port,
530             dport=self.ipv6_udp_or_tcp_internet_port,
531         )
532         payload = "a" * 82
533         tx_pkt = eth / ip / udp / payload
534
535         self.pg_send(self.pg1, tx_pkt * 1)
536
537         self.pg0.get_capture(0, timeout=1)
538         self.pg0.assert_nothing_captured(remark="Should drop IPv4 spoof prefix")
539
540     #
541     # Spoofed IPv6 PSID v6 -> v4 direction
542     # Send a packet with a wrong IPv6 port PSID
543     # The BR should drop the packet.
544     #
545
546     def test_map_t_spoof_psid_ip6_to_ip4(self):
547         """MAP-T spoof psid IPv6 -> IPv4"""
548
549         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
550         ip = IPv6(src=self.ipv6_spoof_psid, dst=self.ipv6_map_address)
551         udp = UDP(
552             sport=self.ipv6_udp_or_tcp_map_port,
553             dport=self.ipv6_udp_or_tcp_internet_port,
554         )
555         payload = "a" * 82
556         tx_pkt = eth / ip / udp / payload
557
558         self.pg_send(self.pg1, tx_pkt * 1)
559
560         self.pg0.get_capture(0, timeout=1)
561         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof PSID")
562
563     #
564     # Spoofed IPv6 subnet field v6 -> v4 direction
565     # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
566     # The BR should drop the packet.
567     #
568
569     def test_map_t_spoof_subnet_ip6_to_ip4(self):
570         """MAP-T spoof subnet IPv6 -> IPv4"""
571
572         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
573         ip = IPv6(src=self.ipv6_spoof_subnet, dst=self.ipv6_map_address)
574         udp = UDP(
575             sport=self.ipv6_udp_or_tcp_map_port,
576             dport=self.ipv6_udp_or_tcp_internet_port,
577         )
578         payload = "a" * 82
579         tx_pkt = eth / ip / udp / payload
580
581         self.pg_send(self.pg1, tx_pkt * 1)
582
583         self.pg0.get_capture(0, timeout=1)
584         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof subnet")
585
586     #
587     # Spoofed IPv6 port PSID v6 -> v4 direction
588     # Send a packet with a wrong IPv6 port PSID
589     # The BR should drop the packet.
590     #
591
592     def test_map_t_spoof_port_psid_ip6_to_ip4(self):
593         """MAP-T spoof port psid IPv6 -> IPv4"""
594
595         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
596         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
597         udp = UDP(
598             sport=self.ipv6_udp_or_tcp_spoof_port,
599             dport=self.ipv6_udp_or_tcp_internet_port,
600         )
601         payload = "a" * 82
602         tx_pkt = eth / ip / udp / payload
603
604         self.pg_send(self.pg1, tx_pkt * 1)
605
606         self.pg0.get_capture(0, timeout=1)
607         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof port PSID")
608
609     #
610     # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
611     # Send a packet with a wrong IPv6 IMCP ID PSID
612     # The BR should drop the packet.
613     #
614
615     def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
616         """MAP-T spoof ICMP id psid IPv6 -> IPv4"""
617
618         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
619         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
620         icmp = ICMPv6EchoRequest()
621         icmp.id = self.ipv6_udp_or_tcp_spoof_port
622         payload = "H" * 10
623         tx_pkt = eth / ip / icmp / payload
624
625         self.pg_send(self.pg1, tx_pkt * 1)
626
627         self.pg0.get_capture(0, timeout=1)
628         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof port PSID")
629
630     #
631     # Map to Map - same rule, different address
632     #
633
634     @unittest.skip("Fixme: correct behavior needs clarification")
635     def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
636         """MAP-T same rule, diff addr IPv6 -> IPv6"""
637
638         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
639         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_same_rule_diff_addr)
640         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port, dport=1025)
641         payload = "a" * 82
642         tx_pkt = eth / ip / udp / payload
643
644         self.pg_send(self.pg1, tx_pkt * 1)
645
646         rx_pkts = self.pg1.get_capture(1)
647         rx_pkt = rx_pkts[0]
648
649     #
650     # Map to Map - same rule, same address
651     #
652
653     @unittest.skip("Fixme: correct behavior needs clarification")
654     def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
655         """MAP-T same rule, same addr IPv6 -> IPv6"""
656
657         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
658         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_same_rule_same_addr)
659         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port, dport=1025)
660         payload = "a" * 82
661         tx_pkt = eth / ip / udp / payload
662
663         self.pg_send(self.pg1, tx_pkt * 1)
664
665         rx_pkts = self.pg1.get_capture(1)
666         rx_pkt = rx_pkts[0]
667
668
669 if __name__ == "__main__":
670     unittest.main(testRunner=VppTestRunner)