tests: Use errno value rather than a specific int
[vpp.git] / test / test_map_br.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath
8
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
11 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
12 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
13
14
15 class TestMAPBR(VppTestCase):
16     """MAP-T Test Cases"""
17
18     @classmethod
19     def setUpClass(cls):
20         super(TestMAPBR, cls).setUpClass()
21
22     @classmethod
23     def tearDownClass(cls):
24         super(TestMAPBR, cls).tearDownClass()
25
26     def setUp(self):
27         super(TestMAPBR, self).setUp()
28
29         #
30         # Create 2 pg interfaces.
31         # pg0 is IPv4
32         # pg1 is IPv6
33         #
34         self.create_pg_interfaces(range(2))
35
36         self.pg0.admin_up()
37         self.pg0.config_ip4()
38         self.pg1.generate_remote_hosts(20)
39         self.pg1.configure_ipv4_neighbors()
40         self.pg0.resolve_arp()
41
42         self.pg1.admin_up()
43         self.pg1.config_ip6()
44         self.pg1.generate_remote_hosts(20)
45         self.pg1.configure_ipv6_neighbors()
46
47         #
48         # BR configuration parameters used for all test.
49         #
50         self.ip4_prefix = "198.18.0.0/24"
51         self.ip6_prefix = "2001:db8:f0::/48"
52         self.ip6_src = "2001:db8:ffff:ff00::/64"
53         self.ea_bits_len = 12
54         self.psid_offset = 6
55         self.psid_length = 4
56         self.mtu = 1500
57         self.tag = "MAP-T BR"
58
59         self.ipv4_internet_address = self.pg0.remote_ip4
60         self.ipv4_map_address = "198.18.0.12"
61         self.ipv4_udp_or_tcp_internet_port = 65000
62         self.ipv4_udp_or_tcp_map_port = 16606
63
64         self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3"  # 198.18.0.12
65         self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3"  # 198.18.0.28
66         self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3"  # 10.0.0.12
67         self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4"  # 4
68         self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3"  # f1
69
70         self.ipv6_udp_or_tcp_internet_port = 65000
71         self.ipv6_udp_or_tcp_map_port = 16606
72         self.ipv6_udp_or_tcp_spoof_port = 16862
73
74         self.ipv6_map_address = "2001:db8:ffff:ff00:ac:1001:200:0"  # 176.16.1.2
75         self.ipv6_map_same_rule_diff_addr = (
76             "2001:db8:ffff:ff00:c6:1200:1000:0"  # 198.18.0.16
77         )
78         self.ipv6_map_same_rule_same_addr = (
79             "2001:db8:ffff:ff00:c6:1200:c00:0"  # 198.18.0.12
80         )
81
82         self.map_br_prefix = "2001:db8:f0::"
83         self.map_br_prefix_len = 48
84         self.psid_number = 3
85
86         #
87         # Add an IPv6 route to the MAP-BR.
88         #
89         map_route = VppIpRoute(
90             self,
91             self.map_br_prefix,
92             self.map_br_prefix_len,
93             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
94         )
95         map_route.add_vpp_config()
96
97         #
98         # Add a MAP BR domain that maps from pg0 to pg1.
99         #
100         self.vapi.map_add_domain(
101             ip4_prefix=self.ip4_prefix,
102             ip6_prefix=self.ip6_prefix,
103             ip6_src=self.ip6_src,
104             ea_bits_len=self.ea_bits_len,
105             psid_offset=self.psid_offset,
106             psid_length=self.psid_length,
107             mtu=self.mtu,
108             tag=self.tag,
109         )
110
111         #
112         # Set BR parameters.
113         #
114         self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
115         self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
116         self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
117         self.vapi.map_param_set_traffic_class(copy=1)
118
119         #
120         # Enable MAP-T on interfaces.
121         #
122         self.vapi.map_if_enable_disable(
123             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
124         )
125
126         self.vapi.map_if_enable_disable(
127             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
128         )
129
130         self.vapi.map_if_enable_disable(
131             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
132         )
133
134     def tearDown(self):
135         super(TestMAPBR, self).tearDown()
136         for i in self.pg_interfaces:
137             i.unconfig_ip4()
138             i.unconfig_ip6()
139             i.admin_down()
140
141     def v4_address_check(self, pkt):
142         self.assertEqual(pkt[IP].src, self.ipv4_map_address)
143         self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
144
145     def v4_port_check(self, pkt, proto):
146         self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
147         self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
148
149     def v6_address_check(self, pkt):
150         self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
151         self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
152
153     def v6_port_check(self, pkt, proto):
154         self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
155         self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
156
157     #
158     # Normal translation of UDP packets v4 -> v6 direction
159     # Send 128 frame size packet for IPv4/UDP.
160     # Received packet should be translated into IPv6 packet with no
161     # fragment header.
162     #
163
164     def test_map_t_udp_ip4_to_ip6(self):
165         """MAP-T UDP IPv4 -> IPv6"""
166
167         eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
168         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address, tos=0)
169         udp = UDP(
170             sport=self.ipv4_udp_or_tcp_internet_port,
171             dport=self.ipv4_udp_or_tcp_map_port,
172         )
173         payload = "a" * 82
174         tx_pkt = eth / ip / udp / payload
175
176         self.pg_send(self.pg0, tx_pkt * 1)
177
178         rx_pkts = self.pg1.get_capture(1)
179         rx_pkt = rx_pkts[0]
180
181         self.v6_address_check(rx_pkt)
182         self.v6_port_check(rx_pkt, UDP)
183         self.assertEqual(rx_pkt[IPv6].tc, 0)  # IPv4 ToS passed to v6 TC
184         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
185
186     #
187     # Normal translation of TCP packets v4 -> v6 direction.
188     # Send 128 frame size packet for IPv4/TCP.
189     # Received packet should be translated into IPv6 packet with no
190     # fragment header.
191     #
192
193     def test_map_t_tcp_ip4_to_ip6(self):
194         """MAP-T TCP IPv4 -> IPv6"""
195
196         eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
197         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address, tos=0)
198         tcp = TCP(
199             sport=self.ipv4_udp_or_tcp_internet_port,
200             dport=self.ipv4_udp_or_tcp_map_port,
201         )
202         payload = "a" * 82
203         tx_pkt = eth / ip / tcp / payload
204
205         self.pg_send(self.pg0, tx_pkt * 1)
206
207         rx_pkts = self.pg1.get_capture(1)
208         rx_pkt = rx_pkts[0]
209
210         self.v6_address_check(rx_pkt)
211         self.v6_port_check(rx_pkt, TCP)
212         self.assertEqual(rx_pkt[IPv6].tc, 0)  # IPv4 ToS passed to v6 TC
213         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
214
215     #
216     # Normal translation of UDP packets v6 -> v4 direction
217     # Send 128 frame size packet for IPv6/UDP.
218     # Received packet should be translated into an IPv4 packet with DF=1.
219     #
220
221     def test_map_t_udp_ip6_to_ip4(self):
222         """MAP-T UDP IPv6 -> IPv4"""
223
224         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
225         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
226         udp = UDP(
227             sport=self.ipv6_udp_or_tcp_map_port,
228             dport=self.ipv6_udp_or_tcp_internet_port,
229         )
230         payload = "a" * 82
231         tx_pkt = eth / ip / udp / payload
232
233         self.pg_send(self.pg1, tx_pkt * 1)
234
235         rx_pkts = self.pg0.get_capture(1)
236         rx_pkt = rx_pkts[0]
237
238         self.v4_address_check(rx_pkt)
239         self.v4_port_check(rx_pkt, UDP)
240         self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
241         self.assertEqual(rx_pkt[IP].tos, 0)  # IPv6 TC passed to v4 ToS
242         df_bit = IP(flags="DF").flags
243         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
244
245     #
246     # Normal translation of TCP packets v6 -> v4 direction
247     # Send 128 frame size packet for IPv6/TCP.
248     # Received packet should be translated into an IPv4 packet with DF=1
249     #
250
251     def test_map_t_tcp_ip6_to_ip4(self):
252         """MAP-T TCP IPv6 -> IPv4"""
253
254         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
255         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
256         tcp = TCP(
257             sport=self.ipv6_udp_or_tcp_map_port,
258             dport=self.ipv6_udp_or_tcp_internet_port,
259         )
260         payload = "a" * 82
261         tx_pkt = eth / ip / tcp / payload
262
263         self.pg_send(self.pg1, tx_pkt * 1)
264
265         rx_pkts = self.pg0.get_capture(1)
266         rx_pkt = rx_pkts[0]
267
268         self.v4_address_check(rx_pkt)
269         self.v4_port_check(rx_pkt, TCP)
270         self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
271         self.assertEqual(rx_pkt[IP].tos, 0)  # IPv6 TC passed to v4 ToS
272         df_bit = IP(flags="DF").flags
273         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
274
275     #
276     # Translation of ICMP Echo Request v4 -> v6 direction
277     # Received packet should be translated into an IPv6 Echo Request.
278     #
279
280     def test_map_t_echo_request_ip4_to_ip6(self):
281         """MAP-T echo request IPv4 -> IPv6"""
282
283         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
284         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
285         icmp = ICMP(type="echo-request", id=self.ipv6_udp_or_tcp_map_port)
286         payload = "H" * 10
287         tx_pkt = eth / ip / icmp / payload
288
289         self.pg_send(self.pg0, tx_pkt * 1)
290
291         rx_pkts = self.pg1.get_capture(1)
292         rx_pkt = rx_pkts[0]
293
294         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
295         self.assertEqual(
296             rx_pkt[ICMPv6EchoRequest].type, ICMPv6EchoRequest(type="Echo Request").type
297         )
298         self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
299         self.assertEqual(rx_pkt[ICMPv6EchoRequest].id, self.ipv6_udp_or_tcp_map_port)
300
301     #
302     # Translation of ICMP Echo Reply v4 -> v6 direction
303     # Received packet should be translated into an IPv6 Echo Reply.
304     #
305
306     def test_map_t_echo_reply_ip4_to_ip6(self):
307         """MAP-T echo reply IPv4 -> IPv6"""
308
309         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
310         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
311         icmp = ICMP(type="echo-reply", id=self.ipv6_udp_or_tcp_map_port)
312         payload = "H" * 10
313         tx_pkt = eth / ip / icmp / payload
314
315         self.pg_send(self.pg0, tx_pkt * 1)
316
317         rx_pkts = self.pg1.get_capture(1)
318         rx_pkt = rx_pkts[0]
319
320         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
321         self.assertEqual(
322             rx_pkt[ICMPv6EchoReply].type, ICMPv6EchoReply(type="Echo Reply").type
323         )
324         self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
325         self.assertEqual(rx_pkt[ICMPv6EchoReply].id, self.ipv6_udp_or_tcp_map_port)
326
327     #
328     # Translation of ICMP Time Exceeded v4 -> v6 direction
329     # Received packet should be translated into an IPv6 Time Exceeded.
330     #
331
332     def test_map_t_time_exceeded_ip4_to_ip6(self):
333         """MAP-T time exceeded IPv4 -> IPv6"""
334
335         eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
336         ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
337         icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
338         ip_inner = IP(dst=self.pg0.remote_ip4, src=self.ipv4_map_address, ttl=1)
339         udp_inner = UDP(
340             sport=self.ipv4_udp_or_tcp_map_port,
341             dport=self.ipv4_udp_or_tcp_internet_port,
342         )
343         payload = "H" * 10
344         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
345
346         self.pg_send(self.pg0, tx_pkt * 1)
347
348         rx_pkts = self.pg1.get_capture(1)
349         rx_pkt = rx_pkts[0]
350
351         self.v6_address_check(rx_pkt)
352         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
353         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type, ICMPv6TimeExceeded().type)
354         self.assertEqual(
355             rx_pkt[ICMPv6TimeExceeded].code,
356             ICMPv6TimeExceeded(code="hop limit exceeded in transit").code,
357         )
358         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
359         self.assertTrue(rx_pkt.haslayer(IPerror6))
360         self.assertTrue(rx_pkt.haslayer(UDPerror))
361         self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
362         self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
363         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
364         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv6_udp_or_tcp_internet_port)
365
366     #
367     # Translation of ICMP Echo Request v6 -> v4 direction
368     # Received packet should be translated into an IPv4 Echo Request.
369     #
370
371     def test_map_t_echo_request_ip6_to_ip4(self):
372         """MAP-T echo request IPv6 -> IPv4"""
373
374         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
375         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
376         icmp = ICMPv6EchoRequest()
377         icmp.id = self.ipv6_udp_or_tcp_map_port
378         payload = "H" * 10
379         tx_pkt = eth / ip / icmp / payload
380
381         self.pg_send(self.pg1, tx_pkt * 1)
382
383         rx_pkts = self.pg0.get_capture(1)
384         rx_pkt = rx_pkts[0]
385
386         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
387         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
388         self.assertEqual(rx_pkt[ICMP].code, 0)
389         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
390
391     #
392     # Translation of ICMP Echo Reply v6 -> v4 direction
393     # Received packet should be translated into an IPv4 Echo Reply.
394     #
395
396     def test_map_t_echo_reply_ip6_to_ip4(self):
397         """MAP-T echo reply IPv6 -> IPv4"""
398
399         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
400         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
401         icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
402         payload = "H" * 10
403         tx_pkt = eth / ip / icmp / payload
404
405         self.pg_send(self.pg1, tx_pkt * 1)
406
407         rx_pkts = self.pg0.get_capture(1)
408         rx_pkt = rx_pkts[0]
409
410         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
411         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
412         self.assertEqual(rx_pkt[ICMP].code, 0)
413         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
414
415     #
416     # Translation of ICMP Packet Too Big v6 -> v4 direction
417     # Received packet should be translated into an IPv4 Dest Unreachable.
418     #
419
420     def test_map_t_packet_too_big_ip6_to_ip4(self):
421         """MAP-T packet too big IPv6 -> IPv4"""
422
423         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
424         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
425         icmp = ICMPv6PacketTooBig(mtu=1280)
426         ip_inner = IPv6(src=self.ipv6_map_address, dst=self.ipv6_cpe_address)
427         udp_inner = UDP(
428             sport=self.ipv6_udp_or_tcp_internet_port,
429             dport=self.ipv6_udp_or_tcp_map_port,
430         )
431         payload = "H" * 10
432         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
433
434         self.pg_send(self.pg1, tx_pkt * 1)
435
436         rx_pkts = self.pg0.get_capture(1)
437         rx_pkt = rx_pkts[0]
438
439         self.v4_address_check(rx_pkt)
440         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
441         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
442         self.assertEqual(rx_pkt[ICMP].code, ICMP(code="fragmentation-needed").code)
443         self.assertEqual(rx_pkt[ICMP].nexthopmtu, tx_pkt[ICMPv6PacketTooBig].mtu - 20)
444         self.assertTrue(rx_pkt.haslayer(IPerror))
445         self.assertTrue(rx_pkt.haslayer(UDPerror))
446         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
447         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
448         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv4_udp_or_tcp_internet_port)
449         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
450
451     #
452     # Translation of ICMP Time Exceeded v6 -> v4 direction
453     # Received packet should be translated into an IPv4 Time Exceeded.
454     #
455
456     def test_map_t_time_exceeded_ip6_to_ip4(self):
457         """MAP-T time exceeded IPv6 -> IPv4"""
458
459         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
460         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
461         icmp = ICMPv6TimeExceeded()
462         ip_inner = IPv6(src=self.ipv6_map_address, dst=self.ipv6_cpe_address, hlim=1)
463         udp_inner = UDP(
464             sport=self.ipv6_udp_or_tcp_internet_port,
465             dport=self.ipv6_udp_or_tcp_map_port,
466         )
467         payload = "H" * 10
468         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
469
470         self.pg_send(self.pg1, tx_pkt * 1)
471
472         rx_pkts = self.pg0.get_capture(1)
473         rx_pkt = rx_pkts[0]
474
475         self.v4_address_check(rx_pkt)
476         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
477         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
478         self.assertEqual(rx_pkt[ICMP].code, ICMP(code="ttl-zero-during-transit").code)
479         self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
480         self.assertTrue(rx_pkt.haslayer(IPerror))
481         self.assertTrue(rx_pkt.haslayer(UDPerror))
482         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
483         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
484         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv4_udp_or_tcp_internet_port)
485         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
486
487     #
488     # Spoofed IPv4 Source Address v6 -> v4 direction
489     # Send a packet with a wrong IPv4 address embedded in bits 72-103.
490     # The BR should either drop the packet, or rewrite the spoofed
491     # source IPv4 as the actual source IPv4 address.
492     # The BR really should drop the packet.
493     #
494
495     def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
496         """MAP-T spoof ipv4 src addr IPv6 -> IPv4"""
497
498         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
499         ip = IPv6(src=self.ipv6_spoof_address, dst=self.ipv6_map_address)
500         udp = UDP(
501             sport=self.ipv6_udp_or_tcp_map_port,
502             dport=self.ipv6_udp_or_tcp_internet_port,
503         )
504         payload = "a" * 82
505         tx_pkt = eth / ip / udp / payload
506
507         self.pg_send(self.pg1, tx_pkt * 1)
508
509         self.pg0.get_capture(0, timeout=1)
510         self.pg0.assert_nothing_captured(remark="Should drop IPv4 spoof address")
511
512     #
513     # Spoofed IPv4 Source Prefix v6 -> v4 direction
514     # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
515     # The BR should either drop the packet, or rewrite the source IPv4
516     # to the prefix that matches the source IPv4 address.
517     #
518
519     def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
520         """MAP-T spoof ipv4 src prefix IPv6 -> IPv4"""
521
522         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
523         ip = IPv6(src=self.ipv6_spoof_prefix, dst=self.ipv6_map_address)
524         udp = UDP(
525             sport=self.ipv6_udp_or_tcp_map_port,
526             dport=self.ipv6_udp_or_tcp_internet_port,
527         )
528         payload = "a" * 82
529         tx_pkt = eth / ip / udp / payload
530
531         self.pg_send(self.pg1, tx_pkt * 1)
532
533         self.pg0.get_capture(0, timeout=1)
534         self.pg0.assert_nothing_captured(remark="Should drop IPv4 spoof prefix")
535
536     #
537     # Spoofed IPv6 PSID v6 -> v4 direction
538     # Send a packet with a wrong IPv6 port PSID
539     # The BR should drop the packet.
540     #
541
542     def test_map_t_spoof_psid_ip6_to_ip4(self):
543         """MAP-T spoof psid IPv6 -> IPv4"""
544
545         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
546         ip = IPv6(src=self.ipv6_spoof_psid, dst=self.ipv6_map_address)
547         udp = UDP(
548             sport=self.ipv6_udp_or_tcp_map_port,
549             dport=self.ipv6_udp_or_tcp_internet_port,
550         )
551         payload = "a" * 82
552         tx_pkt = eth / ip / udp / payload
553
554         self.pg_send(self.pg1, tx_pkt * 1)
555
556         self.pg0.get_capture(0, timeout=1)
557         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof PSID")
558
559     #
560     # Spoofed IPv6 subnet field v6 -> v4 direction
561     # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
562     # The BR should drop the packet.
563     #
564
565     def test_map_t_spoof_subnet_ip6_to_ip4(self):
566         """MAP-T spoof subnet IPv6 -> IPv4"""
567
568         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
569         ip = IPv6(src=self.ipv6_spoof_subnet, dst=self.ipv6_map_address)
570         udp = UDP(
571             sport=self.ipv6_udp_or_tcp_map_port,
572             dport=self.ipv6_udp_or_tcp_internet_port,
573         )
574         payload = "a" * 82
575         tx_pkt = eth / ip / udp / payload
576
577         self.pg_send(self.pg1, tx_pkt * 1)
578
579         self.pg0.get_capture(0, timeout=1)
580         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof subnet")
581
582     #
583     # Spoofed IPv6 port PSID v6 -> v4 direction
584     # Send a packet with a wrong IPv6 port PSID
585     # The BR should drop the packet.
586     #
587
588     def test_map_t_spoof_port_psid_ip6_to_ip4(self):
589         """MAP-T spoof port psid IPv6 -> IPv4"""
590
591         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
592         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
593         udp = UDP(
594             sport=self.ipv6_udp_or_tcp_spoof_port,
595             dport=self.ipv6_udp_or_tcp_internet_port,
596         )
597         payload = "a" * 82
598         tx_pkt = eth / ip / udp / payload
599
600         self.pg_send(self.pg1, tx_pkt * 1)
601
602         self.pg0.get_capture(0, timeout=1)
603         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof port PSID")
604
605     #
606     # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
607     # Send a packet with a wrong IPv6 IMCP ID PSID
608     # The BR should drop the packet.
609     #
610
611     def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
612         """MAP-T spoof ICMP id psid IPv6 -> IPv4"""
613
614         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
615         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
616         icmp = ICMPv6EchoRequest()
617         icmp.id = self.ipv6_udp_or_tcp_spoof_port
618         payload = "H" * 10
619         tx_pkt = eth / ip / icmp / payload
620
621         self.pg_send(self.pg1, tx_pkt * 1)
622
623         self.pg0.get_capture(0, timeout=1)
624         self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof port PSID")
625
626     #
627     # Map to Map - same rule, different address
628     #
629
630     @unittest.skip("Fixme: correct behavior needs clarification")
631     def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
632         """MAP-T same rule, diff addr IPv6 -> IPv6"""
633
634         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
635         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_same_rule_diff_addr)
636         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port, dport=1025)
637         payload = "a" * 82
638         tx_pkt = eth / ip / udp / payload
639
640         self.pg_send(self.pg1, tx_pkt * 1)
641
642         rx_pkts = self.pg1.get_capture(1)
643         rx_pkt = rx_pkts[0]
644
645     #
646     # Map to Map - same rule, same address
647     #
648
649     @unittest.skip("Fixme: correct behavior needs clarification")
650     def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
651         """MAP-T same rule, same addr IPv6 -> IPv6"""
652
653         eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
654         ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_same_rule_same_addr)
655         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port, dport=1025)
656         payload = "a" * 82
657         tx_pkt = eth / ip / udp / payload
658
659         self.pg_send(self.pg1, tx_pkt * 1)
660
661         rx_pkts = self.pg1.get_capture(1)
662         rx_pkt = rx_pkts[0]
663
664
665 if __name__ == "__main__":
666     unittest.main(testRunner=VppTestRunner)