tests: fix assert_nothing_captured
[vpp.git] / test / test_map_br.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
17
18
19 class TestMAPBR(VppTestCase):
20     """ MAP-T Test Cases """
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAPBR, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAPBR, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAPBR, self).setUp()
32
33         #
34         # Create 2 pg interfaces.
35         # pg0 is IPv4
36         # pg1 is IPv6
37         #
38         self.create_pg_interfaces(range(2))
39
40         self.pg0.admin_up()
41         self.pg0.config_ip4()
42         self.pg1.generate_remote_hosts(20)
43         self.pg1.configure_ipv4_neighbors()
44         self.pg0.resolve_arp()
45
46         self.pg1.admin_up()
47         self.pg1.config_ip6()
48         self.pg1.generate_remote_hosts(20)
49         self.pg1.configure_ipv6_neighbors()
50
51         #
52         # BR configuration parameters used for all test.
53         #
54         self.ip4_prefix = '198.18.0.0/24'
55         self.ip6_prefix = '2001:db8:f0::/48'
56         self.ip6_src = '2001:db8:ffff:ff00::/64'
57         self.ea_bits_len = 12
58         self.psid_offset = 6
59         self.psid_length = 4
60         self.mtu = 1500
61         self.tag = 'MAP-T BR'
62
63         self.ipv4_internet_address = self.pg0.remote_ip4
64         self.ipv4_map_address = "198.18.0.12"
65         self.ipv4_udp_or_tcp_internet_port = 65000
66         self.ipv4_udp_or_tcp_map_port = 16606
67
68         self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3"      # 198.18.0.12
69         self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3"   # 198.18.0.28
70         self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3"      # 10.0.0.12
71         self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4"       # 4
72         self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3"     # f1
73
74         self.ipv6_udp_or_tcp_internet_port = 65000
75         self.ipv6_udp_or_tcp_map_port = 16606
76         self.ipv6_udp_or_tcp_spoof_port = 16862
77
78         self.ipv6_map_address = (
79             "2001:db8:ffff:ff00:ac:1001:200:0")         # 176.16.1.2
80         self.ipv6_map_same_rule_diff_addr = (
81             "2001:db8:ffff:ff00:c6:1200:1000:0")        # 198.18.0.16
82         self.ipv6_map_same_rule_same_addr = (
83             "2001:db8:ffff:ff00:c6:1200:c00:0")         # 198.18.0.12
84
85         self.map_br_prefix = "2001:db8:f0::"
86         self.map_br_prefix_len = 48
87         self.psid_number = 3
88
89         #
90         # Add an IPv6 route to the MAP-BR.
91         #
92         map_route = VppIpRoute(self,
93                                self.map_br_prefix,
94                                self.map_br_prefix_len,
95                                [VppRoutePath(self.pg1.remote_ip6,
96                                              self.pg1.sw_if_index)])
97         map_route.add_vpp_config()
98
99         #
100         # Add a MAP BR domain that maps from pg0 to pg1.
101         #
102         self.vapi.map_add_domain(ip4_prefix=self.ip4_prefix,
103                                  ip6_prefix=self.ip6_prefix,
104                                  ip6_src=self.ip6_src,
105                                  ea_bits_len=self.ea_bits_len,
106                                  psid_offset=self.psid_offset,
107                                  psid_length=self.psid_length,
108                                  mtu=self.mtu,
109                                  tag=self.tag)
110
111         #
112         # Set BR parameters.
113         #
114         self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
115         self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
116         self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
117         self.vapi.map_param_set_traffic_class(copy=1)
118
119         #
120         # Enable MAP-T on interfaces.
121         #
122         self.vapi.map_if_enable_disable(is_enable=1,
123                                         sw_if_index=self.pg0.sw_if_index,
124                                         is_translation=1)
125
126         self.vapi.map_if_enable_disable(is_enable=1,
127                                         sw_if_index=self.pg1.sw_if_index,
128                                         is_translation=1)
129
130         self.vapi.map_if_enable_disable(is_enable=1,
131                                         sw_if_index=self.pg1.sw_if_index,
132                                         is_translation=1)
133
134     def tearDown(self):
135         super(TestMAPBR, self).tearDown()
136         for i in self.pg_interfaces:
137             i.unconfig_ip4()
138             i.unconfig_ip6()
139             i.admin_down()
140
141     def v4_address_check(self, pkt):
142         self.assertEqual(pkt[IP].src, self.ipv4_map_address)
143         self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
144
145     def v4_port_check(self, pkt, proto):
146         self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
147         self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
148
149     def v6_address_check(self, pkt):
150         self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
151         self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
152
153     def v6_port_check(self, pkt, proto):
154         self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
155         self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
156
157     #
158     # Normal translation of UDP packets v4 -> v6 direction
159     # Send 128 frame size packet for IPv4/UDP.
160     # Received packet should be translated into IPv6 packet with no
161     # fragment header.
162     #
163
164     def test_map_t_udp_ip4_to_ip6(self):
165         """ MAP-T UDP IPv4 -> IPv6 """
166
167         eth = Ether(src=self.pg0.remote_mac,
168                     dst=self.pg0.local_mac)
169         ip = IP(src=self.pg0.remote_ip4,
170                 dst=self.ipv4_map_address,
171                 tos=0)
172         udp = UDP(sport=self.ipv4_udp_or_tcp_internet_port,
173                   dport=self.ipv4_udp_or_tcp_map_port)
174         payload = "a" * 82
175         tx_pkt = eth / ip / udp / payload
176
177         self.pg_send(self.pg0, tx_pkt * 1)
178
179         rx_pkts = self.pg1.get_capture(1)
180         rx_pkt = rx_pkts[0]
181
182         self.v6_address_check(rx_pkt)
183         self.v6_port_check(rx_pkt, UDP)
184         self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
185         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
186
187     #
188     # Normal translation of TCP packets v4 -> v6 direction.
189     # Send 128 frame size packet for IPv4/TCP.
190     # Received packet should be translated into IPv6 packet with no
191     # fragment header.
192     #
193
194     def test_map_t_tcp_ip4_to_ip6(self):
195         """ MAP-T TCP IPv4 -> IPv6 """
196
197         eth = Ether(src=self.pg0.remote_mac,
198                     dst=self.pg0.local_mac)
199         ip = IP(src=self.pg0.remote_ip4,
200                 dst=self.ipv4_map_address,
201                 tos=0)
202         tcp = TCP(sport=self.ipv4_udp_or_tcp_internet_port,
203                   dport=self.ipv4_udp_or_tcp_map_port)
204         payload = "a" * 82
205         tx_pkt = eth / ip / tcp / payload
206
207         self.pg_send(self.pg0, tx_pkt * 1)
208
209         rx_pkts = self.pg1.get_capture(1)
210         rx_pkt = rx_pkts[0]
211
212         self.v6_address_check(rx_pkt)
213         self.v6_port_check(rx_pkt, TCP)
214         self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
215         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
216
217     #
218     # Normal translation of UDP packets v6 -> v4 direction
219     # Send 128 frame size packet for IPv6/UDP.
220     # Received packet should be translated into an IPv4 packet with DF=1.
221     #
222
223     def test_map_t_udp_ip6_to_ip4(self):
224         """ MAP-T UDP IPv6 -> IPv4 """
225
226         eth = Ether(src=self.pg1.remote_mac,
227                     dst=self.pg1.local_mac)
228         ip = IPv6(src=self.ipv6_cpe_address,
229                   dst=self.ipv6_map_address)
230         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
231                   dport=self.ipv6_udp_or_tcp_internet_port)
232         payload = "a" * 82
233         tx_pkt = eth / ip / udp / payload
234
235         self.pg_send(self.pg1, tx_pkt * 1)
236
237         rx_pkts = self.pg0.get_capture(1)
238         rx_pkt = rx_pkts[0]
239
240         self.v4_address_check(rx_pkt)
241         self.v4_port_check(rx_pkt, UDP)
242         self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
243         self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
244         df_bit = IP(flags="DF").flags
245         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
246
247     #
248     # Normal translation of TCP packets v6 -> v4 direction
249     # Send 128 frame size packet for IPv6/TCP.
250     # Received packet should be translated into an IPv4 packet with DF=1
251     #
252
253     def test_map_t_tcp_ip6_to_ip4(self):
254         """ MAP-T TCP IPv6 -> IPv4 """
255
256         eth = Ether(src=self.pg1.remote_mac,
257                     dst=self.pg1.local_mac)
258         ip = IPv6(src=self.ipv6_cpe_address,
259                   dst=self.ipv6_map_address)
260         tcp = TCP(sport=self.ipv6_udp_or_tcp_map_port,
261                   dport=self.ipv6_udp_or_tcp_internet_port)
262         payload = "a" * 82
263         tx_pkt = eth / ip / tcp / payload
264
265         self.pg_send(self.pg1, tx_pkt * 1)
266
267         rx_pkts = self.pg0.get_capture(1)
268         rx_pkt = rx_pkts[0]
269
270         self.v4_address_check(rx_pkt)
271         self.v4_port_check(rx_pkt, TCP)
272         self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
273         self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
274         df_bit = IP(flags="DF").flags
275         self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
276
277     #
278     # Translation of ICMP Echo Request v4 -> v6 direction
279     # Received packet should be translated into an IPv6 Echo Request.
280     #
281
282     def test_map_t_echo_request_ip4_to_ip6(self):
283         """ MAP-T echo request IPv4 -> IPv6 """
284
285         eth = Ether(src=self.pg1.remote_mac,
286                     dst=self.pg1.local_mac)
287         ip = IP(src=self.pg0.remote_ip4,
288                 dst=self.ipv4_map_address)
289         icmp = ICMP(type="echo-request",
290                     id=self.ipv6_udp_or_tcp_map_port)
291         payload = "H" * 10
292         tx_pkt = eth / ip / icmp / payload
293
294         self.pg_send(self.pg0, tx_pkt * 1)
295
296         rx_pkts = self.pg1.get_capture(1)
297         rx_pkt = rx_pkts[0]
298
299         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
300         self.assertEqual(rx_pkt[ICMPv6EchoRequest].type,
301                          ICMPv6EchoRequest(type="Echo Request").type)
302         self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303         self.assertEqual(rx_pkt[ICMPv6EchoRequest].id,
304                          self.ipv6_udp_or_tcp_map_port)
305
306     #
307     # Translation of ICMP Echo Reply v4 -> v6 direction
308     # Received packet should be translated into an IPv6 Echo Reply.
309     #
310
311     def test_map_t_echo_reply_ip4_to_ip6(self):
312         """ MAP-T echo reply IPv4 -> IPv6 """
313
314         eth = Ether(src=self.pg1.remote_mac,
315                     dst=self.pg1.local_mac)
316         ip = IP(src=self.pg0.remote_ip4,
317                 dst=self.ipv4_map_address)
318         icmp = ICMP(type="echo-reply",
319                     id=self.ipv6_udp_or_tcp_map_port)
320         payload = "H" * 10
321         tx_pkt = eth / ip / icmp / payload
322
323         self.pg_send(self.pg0, tx_pkt * 1)
324
325         rx_pkts = self.pg1.get_capture(1)
326         rx_pkt = rx_pkts[0]
327
328         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
329         self.assertEqual(rx_pkt[ICMPv6EchoReply].type,
330                          ICMPv6EchoReply(type="Echo Reply").type)
331         self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
332         self.assertEqual(rx_pkt[ICMPv6EchoReply].id,
333                          self.ipv6_udp_or_tcp_map_port)
334
335     #
336     # Translation of ICMP Time Exceeded v4 -> v6 direction
337     # Received packet should be translated into an IPv6 Time Exceeded.
338     #
339
340     def test_map_t_time_exceeded_ip4_to_ip6(self):
341         """ MAP-T time exceeded IPv4 -> IPv6 """
342
343         eth = Ether(src=self.pg0.remote_mac,
344                     dst=self.pg0.local_mac)
345         ip = IP(src=self.pg0.remote_ip4,
346                 dst=self.ipv4_map_address)
347         icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
348         ip_inner = IP(dst=self.pg0.remote_ip4,
349                       src=self.ipv4_map_address, ttl=1)
350         udp_inner = UDP(sport=self.ipv4_udp_or_tcp_map_port,
351                         dport=self.ipv4_udp_or_tcp_internet_port)
352         payload = "H" * 10
353         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
354
355         self.pg_send(self.pg0, tx_pkt * 1)
356
357         rx_pkts = self.pg1.get_capture(1)
358         rx_pkt = rx_pkts[0]
359
360         self.v6_address_check(rx_pkt)
361         self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
362         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type,
363                          ICMPv6TimeExceeded().type)
364         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].code,
365                          ICMPv6TimeExceeded(
366             code="hop limit exceeded in transit").code)
367         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
368         self.assertTrue(rx_pkt.haslayer(IPerror6))
369         self.assertTrue(rx_pkt.haslayer(UDPerror))
370         self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
371         self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
372         self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
373         self.assertEqual(rx_pkt[UDPerror].dport,
374                          self.ipv6_udp_or_tcp_internet_port)
375
376     #
377     # Translation of ICMP Echo Request v6 -> v4 direction
378     # Received packet should be translated into an IPv4 Echo Request.
379     #
380
381     def test_map_t_echo_request_ip6_to_ip4(self):
382         """ MAP-T echo request IPv6 -> IPv4 """
383
384         eth = Ether(src=self.pg1.remote_mac,
385                     dst=self.pg1.local_mac)
386         ip = IPv6(src=self.ipv6_cpe_address,
387                   dst=self.ipv6_map_address)
388         icmp = ICMPv6EchoRequest()
389         icmp.id = self.ipv6_udp_or_tcp_map_port
390         payload = "H" * 10
391         tx_pkt = eth / ip / icmp / payload
392
393         self.pg_send(self.pg1, tx_pkt * 1)
394
395         rx_pkts = self.pg0.get_capture(1)
396         rx_pkt = rx_pkts[0]
397
398         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
399         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
400         self.assertEqual(rx_pkt[ICMP].code, 0)
401         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
402
403     #
404     # Translation of ICMP Echo Reply v6 -> v4 direction
405     # Received packet should be translated into an IPv4 Echo Reply.
406     #
407
408     def test_map_t_echo_reply_ip6_to_ip4(self):
409         """ MAP-T echo reply IPv6 -> IPv4 """
410
411         eth = Ether(src=self.pg1.remote_mac,
412                     dst=self.pg1.local_mac)
413         ip = IPv6(src=self.ipv6_cpe_address,
414                   dst=self.ipv6_map_address)
415         icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
416         payload = "H" * 10
417         tx_pkt = eth / ip / icmp / payload
418
419         self.pg_send(self.pg1, tx_pkt * 1)
420
421         rx_pkts = self.pg0.get_capture(1)
422         rx_pkt = rx_pkts[0]
423
424         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
425         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
426         self.assertEqual(rx_pkt[ICMP].code, 0)
427         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
428
429     #
430     # Translation of ICMP Packet Too Big v6 -> v4 direction
431     # Received packet should be translated into an IPv4 Dest Unreachable.
432     #
433
434     def test_map_t_packet_too_big_ip6_to_ip4(self):
435         """ MAP-T packet too big IPv6 -> IPv4 """
436
437         eth = Ether(src=self.pg1.remote_mac,
438                     dst=self.pg1.local_mac)
439         ip = IPv6(src=self.ipv6_cpe_address,
440                   dst=self.ipv6_map_address)
441         icmp = ICMPv6PacketTooBig(mtu=1280)
442         ip_inner = IPv6(src=self.ipv6_map_address,
443                         dst=self.ipv6_cpe_address)
444         udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
445                         dport=self.ipv6_udp_or_tcp_map_port)
446         payload = "H" * 10
447         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
448
449         self.pg_send(self.pg1, tx_pkt * 1)
450
451         rx_pkts = self.pg0.get_capture(1)
452         rx_pkt = rx_pkts[0]
453
454         self.v4_address_check(rx_pkt)
455         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
456         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
457         self.assertEqual(rx_pkt[ICMP].code,
458                          ICMP(code="fragmentation-needed").code)
459         self.assertEqual(rx_pkt[ICMP].nexthopmtu,
460                          tx_pkt[ICMPv6PacketTooBig].mtu - 20)
461         self.assertTrue(rx_pkt.haslayer(IPerror))
462         self.assertTrue(rx_pkt.haslayer(UDPerror))
463         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
464         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
465         self.assertEqual(rx_pkt[UDPerror].sport,
466                          self.ipv4_udp_or_tcp_internet_port)
467         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
468
469     #
470     # Translation of ICMP Time Exceeded v6 -> v4 direction
471     # Received packet should be translated into an IPv4 Time Exceeded.
472     #
473
474     def test_map_t_time_exceeded_ip6_to_ip4(self):
475         """ MAP-T time exceeded IPv6 -> IPv4 """
476
477         eth = Ether(src=self.pg1.remote_mac,
478                     dst=self.pg1.local_mac)
479         ip = IPv6(src=self.ipv6_cpe_address,
480                   dst=self.ipv6_map_address)
481         icmp = ICMPv6TimeExceeded()
482         ip_inner = IPv6(src=self.ipv6_map_address,
483                         dst=self.ipv6_cpe_address, hlim=1)
484         udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
485                         dport=self.ipv6_udp_or_tcp_map_port)
486         payload = "H" * 10
487         tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
488
489         self.pg_send(self.pg1, tx_pkt * 1)
490
491         rx_pkts = self.pg0.get_capture(1)
492         rx_pkt = rx_pkts[0]
493
494         self.v4_address_check(rx_pkt)
495         self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
496         self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
497         self.assertEqual(rx_pkt[ICMP].code,
498                          ICMP(code="ttl-zero-during-transit").code)
499         self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
500         self.assertTrue(rx_pkt.haslayer(IPerror))
501         self.assertTrue(rx_pkt.haslayer(UDPerror))
502         self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
503         self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
504         self.assertEqual(rx_pkt[UDPerror].sport,
505                          self.ipv4_udp_or_tcp_internet_port)
506         self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
507
508     #
509     # Spoofed IPv4 Source Address v6 -> v4 direction
510     # Send a packet with a wrong IPv4 address embedded in bits 72-103.
511     # The BR should either drop the packet, or rewrite the spoofed
512     # source IPv4 as the actual source IPv4 address.
513     # The BR really should drop the packet.
514     #
515
516     def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
517         """ MAP-T spoof ipv4 src addr IPv6 -> IPv4 """
518
519         eth = Ether(src=self.pg1.remote_mac,
520                     dst=self.pg1.local_mac)
521         ip = IPv6(src=self.ipv6_spoof_address,
522                   dst=self.ipv6_map_address)
523         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
524                   dport=self.ipv6_udp_or_tcp_internet_port)
525         payload = "a" * 82
526         tx_pkt = eth / ip / udp / payload
527
528         self.pg_send(self.pg1, tx_pkt * 1)
529
530         self.pg0.get_capture(0, timeout=1)
531         self.pg0.assert_nothing_captured(
532             remark="Should drop IPv4 spoof address")
533
534     #
535     # Spoofed IPv4 Source Prefix v6 -> v4 direction
536     # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
537     # The BR should either drop the packet, or rewrite the source IPv4
538     # to the prefix that matches the source IPv4 address.
539     #
540
541     def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
542         """ MAP-T spoof ipv4 src prefix IPv6 -> IPv4 """
543
544         eth = Ether(src=self.pg1.remote_mac,
545                     dst=self.pg1.local_mac)
546         ip = IPv6(src=self.ipv6_spoof_prefix,
547                   dst=self.ipv6_map_address)
548         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
549                   dport=self.ipv6_udp_or_tcp_internet_port)
550         payload = "a" * 82
551         tx_pkt = eth / ip / udp / payload
552
553         self.pg_send(self.pg1, tx_pkt * 1)
554
555         self.pg0.get_capture(0, timeout=1)
556         self.pg0.assert_nothing_captured(
557             remark="Should drop IPv4 spoof prefix")
558
559     #
560     # Spoofed IPv6 PSID v6 -> v4 direction
561     # Send a packet with a wrong IPv6 port PSID
562     # The BR should drop the packet.
563     #
564
565     def test_map_t_spoof_psid_ip6_to_ip4(self):
566         """ MAP-T spoof psid IPv6 -> IPv4 """
567
568         eth = Ether(src=self.pg1.remote_mac,
569                     dst=self.pg1.local_mac)
570         ip = IPv6(src=self.ipv6_spoof_psid,
571                   dst=self.ipv6_map_address)
572         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
573                   dport=self.ipv6_udp_or_tcp_internet_port)
574         payload = "a" * 82
575         tx_pkt = eth / ip / udp / payload
576
577         self.pg_send(self.pg1, tx_pkt * 1)
578
579         self.pg0.get_capture(0, timeout=1)
580         self.pg0.assert_nothing_captured(
581             remark="Should drop IPv6 spoof PSID")
582
583     #
584     # Spoofed IPv6 subnet field v6 -> v4 direction
585     # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
586     # The BR should drop the packet.
587     #
588
589     def test_map_t_spoof_subnet_ip6_to_ip4(self):
590         """ MAP-T spoof subnet IPv6 -> IPv4 """
591
592         eth = Ether(src=self.pg1.remote_mac,
593                     dst=self.pg1.local_mac)
594         ip = IPv6(src=self.ipv6_spoof_subnet,
595                   dst=self.ipv6_map_address)
596         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
597                   dport=self.ipv6_udp_or_tcp_internet_port)
598         payload = "a" * 82
599         tx_pkt = eth / ip / udp / payload
600
601         self.pg_send(self.pg1, tx_pkt * 1)
602
603         self.pg0.get_capture(0, timeout=1)
604         self.pg0.assert_nothing_captured(
605             remark="Should drop IPv6 spoof subnet")
606
607     #
608     # Spoofed IPv6 port PSID v6 -> v4 direction
609     # Send a packet with a wrong IPv6 port PSID
610     # The BR should drop the packet.
611     #
612
613     def test_map_t_spoof_port_psid_ip6_to_ip4(self):
614         """ MAP-T spoof port psid IPv6 -> IPv4 """
615
616         eth = Ether(src=self.pg1.remote_mac,
617                     dst=self.pg1.local_mac)
618         ip = IPv6(src=self.ipv6_cpe_address,
619                   dst=self.ipv6_map_address)
620         udp = UDP(sport=self.ipv6_udp_or_tcp_spoof_port,
621                   dport=self.ipv6_udp_or_tcp_internet_port)
622         payload = "a" * 82
623         tx_pkt = eth / ip / udp / payload
624
625         self.pg_send(self.pg1, tx_pkt * 1)
626
627         self.pg0.get_capture(0, timeout=1)
628         self.pg0.assert_nothing_captured(
629             remark="Should drop IPv6 spoof port PSID")
630
631     #
632     # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
633     # Send a packet with a wrong IPv6 IMCP ID PSID
634     # The BR should drop the packet.
635     #
636
637     def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
638         """ MAP-T spoof ICMP id psid IPv6 -> IPv4 """
639
640         eth = Ether(src=self.pg1.remote_mac,
641                     dst=self.pg1.local_mac)
642         ip = IPv6(src=self.ipv6_cpe_address,
643                   dst=self.ipv6_map_address)
644         icmp = ICMPv6EchoRequest()
645         icmp.id = self.ipv6_udp_or_tcp_spoof_port
646         payload = "H" * 10
647         tx_pkt = eth / ip / icmp / payload
648
649         self.pg_send(self.pg1, tx_pkt * 1)
650
651         self.pg0.get_capture(0, timeout=1)
652         self.pg0.assert_nothing_captured(
653             remark="Should drop IPv6 spoof port PSID")
654
655     #
656     # Map to Map - same rule, different address
657     #
658
659     @unittest.skip("Fixme: correct behavior needs clarification")
660     def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
661         """ MAP-T same rule, diff addr IPv6 -> IPv6 """
662
663         eth = Ether(src=self.pg1.remote_mac,
664                     dst=self.pg1.local_mac)
665         ip = IPv6(src=self.ipv6_cpe_address,
666                   dst=self.ipv6_map_same_rule_diff_addr)
667         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
668                   dport=1025)
669         payload = "a" * 82
670         tx_pkt = eth / ip / udp / payload
671
672         self.pg_send(self.pg1, tx_pkt * 1)
673
674         rx_pkts = self.pg1.get_capture(1)
675         rx_pkt = rx_pkts[0]
676
677     #
678     # Map to Map - same rule, same address
679     #
680
681     @unittest.skip("Fixme: correct behavior needs clarification")
682     def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
683         """ MAP-T same rule, same addr IPv6 -> IPv6 """
684
685         eth = Ether(src=self.pg1.remote_mac,
686                     dst=self.pg1.local_mac)
687         ip = IPv6(src=self.ipv6_cpe_address,
688                   dst=self.ipv6_map_same_rule_same_addr)
689         udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
690                   dport=1025)
691         payload = "a" * 82
692         tx_pkt = eth / ip / udp / payload
693
694         self.pg_send(self.pg1, tx_pkt * 1)
695
696         rx_pkts = self.pg1.get_capture(1)
697         rx_pkt = rx_pkts[0]
698
699
700 if __name__ == '__main__':
701     unittest.main(testRunner=VppTestRunner)