map: fix translation of icmp6 error messages
[vpp.git] / src / plugins / map / test / test_map_br.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
17
18
19 class TestMAPBR(VppTestCase):
20     """ MAP-T Test Cases """
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAPBR, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAPBR, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAPBR, self).setUp()
32
33         #
34         # Create 2 pg interfaces.
35         # pg0 is IPv4
36         # pg1 is IPv6
37         #
38         self.create_pg_interfaces(range(2))
39
40         self.pg0.admin_up()
41         self.pg0.config_ip4()
42         self.pg1.generate_remote_hosts(20)
43         self.pg1.configure_ipv4_neighbors()
44         self.pg0.resolve_arp()
45
46         self.pg1.admin_up()
47         self.pg1.config_ip6()
48         self.pg1.generate_remote_hosts(20)
49         self.pg1.configure_ipv6_neighbors()
50
51         #
52         # BR configuration parameters used for all test.
53         #
54         self.ip4_prefix = '198.18.0.0/24'
55         self.ip6_prefix = '2001:db8:f0::/48'
56         self.ip6_src = '2001:db8:ffff:ff00::/64'
57         self.ea_bits_len = 12
58         self.psid_offset = 6
59         self.psid_length = 4
60         self.mtu = 1500
61         self.tag = 'MAP-T BR'
62
63         self.ipv4_internet_address = self.pg0.remote_ip4
64         self.ipv4_map_address = "198.18.0.12"
65         self.ipv4_udp_or_tcp_internet_port = 65000
66         self.ipv4_udp_or_tcp_map_port = 16606
67
68         self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3"      # 198.18.0.12
69         self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3"   # 198.18.0.28
70         self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3"      # 10.0.0.12
71         self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4"       # 4
72         self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3"     # f1
73
74         self.ipv6_udp_or_tcp_internet_port = 65000
75         self.ipv6_udp_or_tcp_map_port = 16606
76         self.ipv6_udp_or_tcp_spoof_port = 16862
77
78         self.ipv6_map_address = (
79             "2001:db8:ffff:ff00:ac:1001:200:0")         # 176.16.1.2
80         self.ipv6_map_same_rule_diff_addr = (
81             "2001:db8:ffff:ff00:c6:1200:1000:0")        # 198.18.0.16
82         self.ipv6_map_same_rule_same_addr = (
83             "2001:db8:ffff:ff00:c6:1200:c00:0")         # 198.18.0.12
84
85         self.map_br_prefix = "2001:db8:f0::"
86         self.map_br_prefix_len = 48
87         self.psid_number = 3
88
89         #
90         # Add an IPv6 route to the MAP-BR.
91         #
92         map_route = VppIpRoute(self,
93                                self.map_br_prefix,
94                                self.map_br_prefix_len,
95                                [VppRoutePath(self.pg1.remote_ip6,
96                                              self.pg1.sw_if_index)])
97         map_route.add_vpp_config()
98
99         #
100         # Add a MAP BR domain that maps from pg0 to pg1.
101         #
102         self.vapi.map_add_domain(ip4_prefix=self.ip4_prefix,
103                                  ip6_prefix=self.ip6_prefix,
104                                  ip6_src=self.ip6_src,
105                                  ea_bits_len=self.ea_bits_len,
106                                  psid_offset=self.psid_offset,
107                                  psid_length=self.psid_length,
108                                  mtu=self.mtu,
109                                  tag=self.tag)
110
111         #
112         # Set BR parameters.
113         #
114         self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
115         self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
116         self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
117         self.vapi.map_param_set_traffic_class(copy=1)
118
119         #
120         # Enable MAP-T on interfaces.
121         #
122         self.vapi.map_if_enable_disable(is_enable=1,
123                                         sw_if_index=self.pg0.sw_if_index,
124                                         is_translation=1)
125
126         self.vapi.map_if_enable_disable(is_enable=1,
127                                         sw_if_index=self.pg1.sw_if_index,
128                                         is_translation=1)
129
130         self.vapi.map_if_enable_disable(is_enable=1,
131                                         sw_if_index=self.pg1.sw_if_index,
132                                         is_translation=1)
133
134     def tearDown(self):
135         super(TestMAPBR, self).tearDown()
136         for i in self.pg_interfaces:
137             i.unconfig_ip4()
138             i.unconfig_ip6()
139             i.admin_down()
140
141     def v4_address_check(self, pkt):
142         self.assertEqual(pkt[IP].src, self.ipv4_map_address)
143         self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
144
145     def v4_port_check(self, pkt, proto):
146         self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
147         self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
148
149     def v6_address_check(self, pkt):
150         self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
151         self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
152
153     def v6_port_check(self, pkt, proto):
154         self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
155         self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
156
157     #
158     # Normal translation of UDP packets v4 -> v6 direction
159     # Send 128 frame size packet for IPv4/UDP.
160     # Received packet should be translated into IPv6 packet with no
161     # fragment header.
162     #
163
164     def test_map_t_udp_ip4_to_ip6(self):
165         """ MAP-T UDP IPv4 -> IPv6 """
166
167         eth = Ether(src=self.pg0.remote_mac,
168                     dst=self.pg0.local_mac)
169         ip = IP(src=self.pg0.remote_ip4,
170                 dst=self.ipv4_map_address,
171                 tos=0)
172         udp = UDP(sport=self.ipv4_udp_or_tcp_internet_port,
173                   dport=self.ipv4_udp_or_tcp_map_port)
174         payload = "a" * 82
175         tx_pkt = eth / ip / udp / payload
176
177         self.pg_send(self.pg0, tx_pkt * 1)
178
179         rx_pkts = self.pg1.get_capture(1)
180         rx_pkt = rx_pkts[0]
181
182         self.v6_address_check(rx_pkt)
183         self.v6_port_check(rx_pkt, UDP)
184         self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
185         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
186
187     #
188     # Normal translation of TCP packets v4 -> v6 direction.
189     # Send 128 frame size packet for IPv4/TCP.
190     # Received packet should be translated into IPv6 packet with no
191     # fragment header.
192     #
193
194     def test_map_t_tcp_ip4_to_ip6(self):
195         """ MAP-T TCP IPv4 -> IPv6 """
196
197         eth = Ether(src=self.pg0.remote_mac,
198                     dst=self.pg0.local_mac)
199         ip = IP(src=self.pg0.remote_ip4,
200                 dst=self.ipv4_map_address,
201                 tos=0)
202         tcp = TCP(sport=self.ipv4_udp_or_tcp_internet_port,
203                   dport=self.ipv4_udp_or_tcp_map_port)
204         payload = "a" * 82
205         tx_pkt = eth / ip / tcp / payload
206
207         self.pg_send(self.pg0, tx_pkt * 1)
208
209         rx_pkts = self.pg1.get_capture(1)
210         rx_pkt = rx_pkts[0]
211
212         self.v6_address_check(rx_pkt)
213         self.v6_port_check(rx_pkt, TCP)
214         self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
215         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
216
217     #
218     # Normal translation of UDP packets v6 -> v4 direction
219     # Send 128 frame size packet for IPv6/UDP.
220     # Received packet should be translated into an IPv4 packet with DF=1.
221     #
222
223     def test_map_t_udp_ip6_to_ip4(self):
224         """ MAP-T UDP IPv6 -> IPv4 """
225
226         eth = Ether(src=self.pg1.remote_mac,
227                     dst=self.pg1.local_mac)
228         ip = IPv6(src=self.ipv6_cpe_address,
229                   dst=self.ipv6_map_address)
230         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
231                   dport=self.ipv6_udp_or_tcp_internet_port)
232         payload = "a" * 82
233         tx_pkt = eth / ip / udp / payload
234
235         self.pg_send(self.pg1, tx_pkt * 1)
236
237         rx_pkts = self.pg0.get_capture(1)
238         rx_pkt = rx_pkts[0]
239
240         self.v4_address_check(rx_pkt)
241         self.v4_port_check(rx_pkt, UDP)
242         self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
243         self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
244         df_bit = IP(flags="DF").flags
245         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
246
247     #
248     # Normal translation of TCP packets v6 -> v4 direction
249     # Send 128 frame size packet for IPv6/TCP.
250     # Received packet should be translated into an IPv4 packet with DF=1
251     #
252
253     def test_map_t_tcp_ip6_to_ip4(self):
254         """ MAP-T TCP IPv6 -> IPv4 """
255
256         eth = Ether(src=self.pg1.remote_mac,
257                     dst=self.pg1.local_mac)
258         ip = IPv6(src=self.ipv6_cpe_address,
259                   dst=self.ipv6_map_address)
260         tcp = TCP(sport=self.ipv6_udp_or_tcp_map_port,
261                   dport=self.ipv6_udp_or_tcp_internet_port)
262         payload = "a" * 82
263         tx_pkt = eth / ip / tcp / payload
264
265         self.pg_send(self.pg1, tx_pkt * 1)
266
267         rx_pkts = self.pg0.get_capture(1)
268         rx_pkt = rx_pkts[0]
269
270         self.v4_address_check(rx_pkt)
271         self.v4_port_check(rx_pkt, TCP)
272         self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
273         self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
274         df_bit = IP(flags="DF").flags
275         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
276
277     #
278     # Translation of ICMP Echo Request v4 -> v6 direction
279     # Received packet should be translated into an IPv6 Echo Request.
280     #
281
282     def test_map_t_echo_request_ip4_to_ip6(self):
283         """ MAP-T echo request IPv4 -> IPv6 """
284
285         eth = Ether(src=self.pg1.remote_mac,
286                     dst=self.pg1.local_mac)
287         ip = IP(src=self.pg0.remote_ip4,
288                 dst=self.ipv4_map_address)
289         icmp = ICMP(type="echo-request",
290                     id=self.ipv6_udp_or_tcp_map_port)
291         payload = "H" * 10
292         tx_pkt = eth / ip / icmp / payload
293
294         self.pg_send(self.pg0, tx_pkt * 1)
295
296         rx_pkts = self.pg1.get_capture(1)
297         rx_pkt = rx_pkts[0]
298
299         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
300         self.assertEqual(rx_pkt[ICMPv6EchoRequest].type,
301                          ICMPv6EchoRequest(type="Echo Request").type)
302         self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303         self.assertEqual(rx_pkt[ICMPv6EchoRequest].id,
304                          self.ipv6_udp_or_tcp_map_port)
305
306     #
307     # Translation of ICMP Echo Reply v4 -> v6 direction
308     # Received packet should be translated into an IPv6 Echo Reply.
309     #
310
311     def test_map_t_echo_reply_ip4_to_ip6(self):
312         """ MAP-T echo reply IPv4 -> IPv6 """
313
314         eth = Ether(src=self.pg1.remote_mac,
315                     dst=self.pg1.local_mac)
316         ip = IP(src=self.pg0.remote_ip4,
317                 dst=self.ipv4_map_address)
318         icmp = ICMP(type="echo-reply",
319                     id=self.ipv6_udp_or_tcp_map_port)
320         payload = "H" * 10
321         tx_pkt = eth / ip / icmp / payload
322
323         self.pg_send(self.pg0, tx_pkt * 1)
324
325         rx_pkts = self.pg1.get_capture(1)
326         rx_pkt = rx_pkts[0]
327
328         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
329         self.assertEqual(rx_pkt[ICMPv6EchoReply].type,
330                          ICMPv6EchoReply(type="Echo Reply").type)
331         self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
332         self.assertEqual(rx_pkt[ICMPv6EchoReply].id,
333                          self.ipv6_udp_or_tcp_map_port)
334
335     #
336     # Translation of ICMP Time Exceeded v4 -> v6 direction
337     # Received packet should be translated into an IPv6 Time Exceeded.
338     #
339
340     def test_map_t_time_exceeded_ip4_to_ip6(self):
341         """ MAP-T time exceeded IPv4 -> IPv6 """
342
343         eth = Ether(src=self.pg0.remote_mac,
344                     dst=self.pg0.local_mac)
345         ip = IP(src=self.pg0.remote_ip4,
346                 dst=self.ipv4_map_address)
347         icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
348         ip_inner = IP(dst=self.pg0.remote_ip4,
349                       src=self.ipv4_map_address, ttl=1)
350         udp_inner = UDP(sport=self.ipv4_udp_or_tcp_map_port,
351                         dport=self.ipv4_udp_or_tcp_internet_port)
352         payload = "H" * 10
353         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
354
355         self.pg_send(self.pg0, tx_pkt * 1)
356
357         rx_pkts = self.pg1.get_capture(1)
358         rx_pkt = rx_pkts[0]
359
360         self.v6_address_check(rx_pkt)
361         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
362         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type,
363                          ICMPv6TimeExceeded().type)
364         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].code,
365                          ICMPv6TimeExceeded(
366                             code="hop limit exceeded in transit").code)
367         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
368         self.assertTrue(rx_pkt.haslayer(IPerror6))
369         self.assertTrue(rx_pkt.haslayer(UDPerror))
370         self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
371         self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
372         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
373         self.assertEqual(rx_pkt[UDPerror].dport,
374                          self.ipv6_udp_or_tcp_internet_port)
375
376     #
377     # Translation of ICMP Echo Request v6 -> v4 direction
378     # Received packet should be translated into an IPv4 Echo Request.
379     #
380
381     def test_map_t_echo_request_ip6_to_ip4(self):
382         """ MAP-T echo request IPv6 -> IPv4 """
383
384         eth = Ether(src=self.pg1.remote_mac,
385                     dst=self.pg1.local_mac)
386         ip = IPv6(src=self.ipv6_cpe_address,
387                   dst=self.ipv6_map_address)
388         icmp = ICMPv6EchoRequest()
389         icmp.id = self.ipv6_udp_or_tcp_map_port
390         payload = "H" * 10
391         tx_pkt = eth / ip / icmp / payload
392
393         self.pg_send(self.pg1, tx_pkt * 1)
394
395         rx_pkts = self.pg0.get_capture(1)
396         rx_pkt = rx_pkts[0]
397
398         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
399         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
400         self.assertEqual(rx_pkt[ICMP].code, 0)
401         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
402
403     #
404     # Translation of ICMP Echo Reply v6 -> v4 direction
405     # Received packet should be translated into an IPv4 Echo Reply.
406     #
407
408     def test_map_t_echo_reply_ip6_to_ip4(self):
409         """ MAP-T echo reply IPv6 -> IPv4 """
410
411         eth = Ether(src=self.pg1.remote_mac,
412                     dst=self.pg1.local_mac)
413         ip = IPv6(src=self.ipv6_cpe_address,
414                   dst=self.ipv6_map_address)
415         icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
416         payload = "H" * 10
417         tx_pkt = eth / ip / icmp / payload
418
419         self.pg_send(self.pg1, tx_pkt * 1)
420
421         rx_pkts = self.pg0.get_capture(1)
422         rx_pkt = rx_pkts[0]
423
424         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
425         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
426         self.assertEqual(rx_pkt[ICMP].code, 0)
427         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
428
429     #
430     # Translation of ICMP Packet Too Big v6 -> v4 direction
431     # Received packet should be translated into an IPv4 Dest Unreachable.
432     #
433
434     def test_map_t_packet_too_big_ip6_to_ip4(self):
435         """ MAP-T packet too big IPv6 -> IPv4 """
436
437         eth = Ether(src=self.pg1.remote_mac,
438                     dst=self.pg1.local_mac)
439         ip = IPv6(src=self.ipv6_cpe_address,
440                   dst=self.ipv6_map_address)
441         icmp = ICMPv6PacketTooBig(mtu=1280)
442         ip_inner = IPv6(src=self.ipv6_map_address,
443                         dst=self.ipv6_cpe_address)
444         udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
445                         dport=self.ipv6_udp_or_tcp_map_port)
446         payload = "H" * 10
447         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
448
449         self.pg_send(self.pg1, tx_pkt * 1)
450
451         rx_pkts = self.pg0.get_capture(1)
452         rx_pkt = rx_pkts[0]
453
454         self.v4_address_check(rx_pkt)
455         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
456         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
457         self.assertEqual(rx_pkt[ICMP].code,
458                          ICMP(code="fragmentation-needed").code)
459         self.assertEqual(rx_pkt[ICMP].nexthopmtu,
460                          tx_pkt[ICMPv6PacketTooBig].mtu - 20)
461         self.assertTrue(rx_pkt.haslayer(IPerror))
462         self.assertTrue(rx_pkt.haslayer(UDPerror))
463         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
464         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
465         self.assertEqual(rx_pkt[UDPerror].sport,
466                          self.ipv4_udp_or_tcp_internet_port)
467         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
468
469     #
470     # Translation of ICMP Time Exceeded v6 -> v4 direction
471     # Received packet should be translated into an IPv4 Time Exceeded.
472     #
473
474     def test_map_t_time_exceeded_ip6_to_ip4(self):
475         """ MAP-T time exceeded IPv6 -> IPv4 """
476
477         eth = Ether(src=self.pg1.remote_mac,
478                     dst=self.pg1.local_mac)
479         ip = IPv6(src=self.ipv6_cpe_address,
480                   dst=self.ipv6_map_address)
481         icmp = ICMPv6TimeExceeded()
482         ip_inner = IPv6(src=self.ipv6_map_address,
483                         dst=self.ipv6_cpe_address, hlim=1)
484         udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
485                         dport=self.ipv6_udp_or_tcp_map_port)
486         payload = "H" * 10
487         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
488
489         self.pg_send(self.pg1, tx_pkt * 1)
490
491         rx_pkts = self.pg0.get_capture(1)
492         rx_pkt = rx_pkts[0]
493
494         self.v4_address_check(rx_pkt)
495         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
496         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
497         self.assertEqual(rx_pkt[ICMP].code,
498                          ICMP(code="ttl-zero-during-transit").code)
499         self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
500         self.assertTrue(rx_pkt.haslayer(IPerror))
501         self.assertTrue(rx_pkt.haslayer(UDPerror))
502         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
503         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
504         self.assertEqual(rx_pkt[UDPerror].sport,
505                          self.ipv4_udp_or_tcp_internet_port)
506         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
507
508     #
509     # Spoofed IPv4 Source Address v6 -> v4 direction
510     # Send a packet with a wrong IPv4 address embedded in bits 72-103.
511     # The BR should either drop the packet, or rewrite the spoofed
512     # source IPv4 as the actual source IPv4 address.
513     # The BR really should drop the packet.
514     #
515
516     def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
517         """ MAP-T spoof ipv4 src addr IPv6 -> IPv4 """
518
519         eth = Ether(src=self.pg1.remote_mac,
520                     dst=self.pg1.local_mac)
521         ip = IPv6(src=self.ipv6_spoof_address,
522                   dst=self.ipv6_map_address)
523         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
524                   dport=self.ipv6_udp_or_tcp_internet_port)
525         payload = "a" * 82
526         tx_pkt = eth / ip / udp / payload
527
528         self.pg_send(self.pg1, tx_pkt * 1)
529
530         self.pg0.get_capture(0, timeout=1)
531         self.pg0.assert_nothing_captured("Should drop IPv4 spoof address")
532
533     #
534     # Spoofed IPv4 Source Prefix v6 -> v4 direction
535     # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
536     # The BR should either drop the packet, or rewrite the source IPv4
537     # to the prefix that matches the source IPv4 address.
538     #
539
540     def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
541         """ MAP-T spoof ipv4 src prefix IPv6 -> IPv4 """
542
543         eth = Ether(src=self.pg1.remote_mac,
544                     dst=self.pg1.local_mac)
545         ip = IPv6(src=self.ipv6_spoof_prefix,
546                   dst=self.ipv6_map_address)
547         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
548                   dport=self.ipv6_udp_or_tcp_internet_port)
549         payload = "a" * 82
550         tx_pkt = eth / ip / udp / payload
551
552         self.pg_send(self.pg1, tx_pkt * 1)
553
554         self.pg0.get_capture(0, timeout=1)
555         self.pg0.assert_nothing_captured("Should drop IPv4 spoof prefix")
556
557     #
558     # Spoofed IPv6 PSID v6 -> v4 direction
559     # Send a packet with a wrong IPv6 port PSID
560     # The BR should drop the packet.
561     #
562
563     def test_map_t_spoof_psid_ip6_to_ip4(self):
564         """ MAP-T spoof psid IPv6 -> IPv4 """
565
566         eth = Ether(src=self.pg1.remote_mac,
567                     dst=self.pg1.local_mac)
568         ip = IPv6(src=self.ipv6_spoof_psid,
569                   dst=self.ipv6_map_address)
570         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
571                   dport=self.ipv6_udp_or_tcp_internet_port)
572         payload = "a" * 82
573         tx_pkt = eth / ip / udp / payload
574
575         self.pg_send(self.pg1, tx_pkt * 1)
576
577         self.pg0.get_capture(0, timeout=1)
578         self.pg0.assert_nothing_captured("Should drop IPv6 spoof PSID")
579
580     #
581     # Spoofed IPv6 subnet field v6 -> v4 direction
582     # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
583     # The BR should drop the packet.
584     #
585
586     def test_map_t_spoof_subnet_ip6_to_ip4(self):
587         """ MAP-T spoof subnet IPv6 -> IPv4 """
588
589         eth = Ether(src=self.pg1.remote_mac,
590                     dst=self.pg1.local_mac)
591         ip = IPv6(src=self.ipv6_spoof_subnet,
592                   dst=self.ipv6_map_address)
593         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
594                   dport=self.ipv6_udp_or_tcp_internet_port)
595         payload = "a" * 82
596         tx_pkt = eth / ip / udp / payload
597
598         self.pg_send(self.pg1, tx_pkt * 1)
599
600         self.pg0.get_capture(0, timeout=1)
601         self.pg0.assert_nothing_captured("Should drop IPv6 spoof subnet")
602
603     #
604     # Spoofed IPv6 port PSID v6 -> v4 direction
605     # Send a packet with a wrong IPv6 port PSID
606     # The BR should drop the packet.
607     #
608
609     def test_map_t_spoof_port_psid_ip6_to_ip4(self):
610         """ MAP-T spoof port psid IPv6 -> IPv4 """
611
612         eth = Ether(src=self.pg1.remote_mac,
613                     dst=self.pg1.local_mac)
614         ip = IPv6(src=self.ipv6_cpe_address,
615                   dst=self.ipv6_map_address)
616         udp = UDP(sport=self.ipv6_udp_or_tcp_spoof_port,
617                   dport=self.ipv6_udp_or_tcp_internet_port)
618         payload = "a" * 82
619         tx_pkt = eth / ip / udp / payload
620
621         self.pg_send(self.pg1, tx_pkt * 1)
622
623         self.pg0.get_capture(0, timeout=1)
624         self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
625
626     #
627     # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
628     # Send a packet with a wrong IPv6 IMCP ID PSID
629     # The BR should drop the packet.
630     #
631
632     def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
633         """ MAP-T spoof ICMP id psid IPv6 -> IPv4 """
634
635         eth = Ether(src=self.pg1.remote_mac,
636                     dst=self.pg1.local_mac)
637         ip = IPv6(src=self.ipv6_cpe_address,
638                   dst=self.ipv6_map_address)
639         icmp = ICMPv6EchoRequest()
640         icmp.id = self.ipv6_udp_or_tcp_spoof_port
641         payload = "H" * 10
642         tx_pkt = eth / ip / icmp / payload
643
644         self.pg_send(self.pg1, tx_pkt * 1)
645
646         self.pg0.get_capture(0, timeout=1)
647         self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
648
649     #
650     # Map to Map - same rule, different address
651     #
652
653     @unittest.skip("Fixme: correct behavior needs clarification")
654     def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
655         """ MAP-T same rule, diff addr IPv6 -> IPv6 """
656
657         eth = Ether(src=self.pg1.remote_mac,
658                     dst=self.pg1.local_mac)
659         ip = IPv6(src=self.ipv6_cpe_address,
660                   dst=self.ipv6_map_same_rule_diff_addr)
661         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
662                   dport=1025)
663         payload = "a" * 82
664         tx_pkt = eth / ip / udp / payload
665
666         self.pg_send(self.pg1, tx_pkt * 1)
667
668         rx_pkts = self.pg1.get_capture(1)
669         rx_pkt = rx_pkts[0]
670
671     #
672     # Map to Map - same rule, same address
673     #
674
675     @unittest.skip("Fixme: correct behavior needs clarification")
676     def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
677         """ MAP-T same rule, same addr IPv6 -> IPv6 """
678
679         eth = Ether(src=self.pg1.remote_mac,
680                     dst=self.pg1.local_mac)
681         ip = IPv6(src=self.ipv6_cpe_address,
682                   dst=self.ipv6_map_same_rule_same_addr)
683         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
684                   dport=1025)
685         payload = "a" * 82
686         tx_pkt = eth / ip / udp / payload
687
688         self.pg_send(self.pg1, tx_pkt * 1)
689
690         rx_pkts = self.pg1.get_capture(1)
691         rx_pkt = rx_pkts[0]
692
693 if __name__ == '__main__':
694     unittest.main(testRunner=VppTestRunner)