tests: replace pycodestyle with black
[vpp.git] / test / test_ip6_nd_mirror_proxy.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import os
5 from socket import AF_INET6, inet_pton, inet_ntop
6
7 from framework import tag_fixme_vpp_workers
8 from framework import VppTestCase, VppTestRunner
9 from vpp_neighbor import VppNeighbor, find_nbr
10 from vpp_ip_route import (
11     VppIpRoute,
12     VppRoutePath,
13     find_route,
14     VppIpTable,
15     DpoProto,
16     FibPathType,
17     VppIpInterfaceAddress,
18 )
19 from vpp_papi import VppEnum
20 from vpp_ip import VppIpPuntRedirect
21
22 import scapy.compat
23 from scapy.packet import Raw
24 from scapy.layers.l2 import Ether, ARP, Dot1Q
25 from scapy.layers.inet import IP, UDP, TCP
26 from scapy.layers.inet6 import (
27     IPv6,
28     ipv6nh,
29     ICMPv6ND_NS,
30     ICMPv6ND_NA,
31     ICMPv6NDOptSrcLLAddr,
32     ICMPv6NDOptDstLLAddr,
33     ICMPv6EchoRequest,
34     ICMPv6EchoReply,
35 )
36 from scapy.utils6 import in6_ptop, in6_getnsma, in6_getnsmac, in6_ismaddr
37
38
39 class TestNDPROXY(VppTestCase):
40     """IP6 ND (mirror) Proxy Test Case"""
41
42     @classmethod
43     def setUpClass(self):
44         super(TestNDPROXY, self).setUpClass()
45         self.create_pg_interfaces(range(2))
46
47     @classmethod
48     def tearDownClass(self):
49         super(TestNDPROXY, self).tearDownClass()
50
51     def setUp(self):
52         super(TestNDPROXY, self).setUp()
53         for i in self.pg_interfaces:
54             i.admin_up()
55             i.config_ip6()
56             i.disable_ipv6_ra()
57
58     def tearDown(self):
59         super(TestNDPROXY, self).tearDown()
60         if not self.vpp_dead:
61             for i in self.pg_interfaces:
62                 i.unconfig_ip6()
63                 i.admin_down()
64
65     def test_nd_mirror_proxy(self):
66         """Interface (Mirror) Proxy ND"""
67
68         #
69         # When VPP has an interface whose address is also applied to a TAP
70         # interface on the host, then VPP's TAP interface will be unnumbered
71         # to the 'real' interface and do proxy ND from the host.
72         # the curious aspect of this setup is that ND requests from the host
73         # will come from the VPP's own address.
74         #
75         addr = self.pg0.remote_ip6
76         nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
77         d = inet_ntop(socket.AF_INET6, nsma)
78
79         # Make pg1 un-numbered to pg0
80         #
81         self.pg1.unconfig_ip6()
82         self.pg1.set_unnumbered(self.pg0.sw_if_index)
83
84         #
85         # Enable ND proxy on pg1
86         #
87         self.vapi.ip6nd_proxy_enable_disable(
88             sw_if_index=self.pg1.sw_if_index, is_enable=1
89         )
90         #
91         # Send the ND request with an originating address that
92         # is VPP's own address
93         #
94         nd_req_from_host = (
95             Ether(src=self.pg1.remote_mac, dst=in6_getnsmac(nsma))
96             / IPv6(dst=d, src=self.pg0.local_ip6)
97             / ICMPv6ND_NS(tgt=addr)
98             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_mac)
99         )
100
101         rx = self.send_and_expect(self.pg1, [nd_req_from_host], self.pg1)
102         self.assertEqual(rx[0][Ether].src, self.pg1.local_mac)
103         self.assertEqual(rx[0][Ether].dst, self.pg1.remote_mac)
104         self.assertEqual(rx[0][IPv6].src, self.pg0.remote_ip6)
105         self.assertEqual(rx[0][IPv6].dst, self.pg0.local_ip6)
106         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
107         self.assertEqual(rx[0][ICMPv6ND_NA].tgt, self.pg0.remote_ip6)
108         self.assertTrue(rx[0].haslayer(ICMPv6NDOptDstLLAddr))
109         self.assertEqual(rx[0][ICMPv6NDOptDstLLAddr].lladdr, self.pg1.local_mac)
110
111         #
112         # Send the unicast ND request
113         #
114         unicast_nd_req_from_host = (
115             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
116             / IPv6(dst=self.pg0.remote_ip6, src=self.pg1.remote_ip6_ll)
117             / ICMPv6ND_NS(tgt=self.pg0.remote_ip6)
118             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_mac)
119         )
120
121         rx = self.send_and_expect(self.pg1, [unicast_nd_req_from_host], self.pg0)
122         self.assertEqual(rx[0][Ether].src, self.pg0.local_mac)
123         self.assertEqual(rx[0][Ether].dst, in6_getnsmac(nsma))
124         self.assertEqual(rx[0][IPv6].src, self.pg0.local_ip6)
125         self.assertEqual(rx[0][IPv6].dst, d)
126         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
127         self.assertEqual(rx[0][ICMPv6ND_NS].tgt, self.pg0.remote_ip6)
128         self.assertTrue(rx[0].haslayer(ICMPv6NDOptSrcLLAddr))
129         self.assertEqual(rx[0][ICMPv6NDOptSrcLLAddr].lladdr, self.pg0.local_mac)
130
131         # Resolve the NDs on the uplink
132         self.pg0.resolve_ndp()
133
134         #
135         # Again send the unicast ND request, this time dst address should be
136         # in local cache
137         #
138         rx = self.send_and_expect(self.pg1, [unicast_nd_req_from_host], self.pg1)
139         self.assertEqual(rx[0][Ether].src, self.pg1.local_mac)
140         self.assertEqual(rx[0][Ether].dst, self.pg1.remote_mac)
141         self.assertEqual(rx[0][IPv6].src, self.pg0.remote_ip6)
142         self.assertEqual(in6_ptop(rx[0][IPv6].dst), in6_ptop(self.pg1.remote_ip6_ll))
143         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
144         self.assertEqual(rx[0][ICMPv6ND_NA].tgt, self.pg0.remote_ip6)
145         self.assertTrue(rx[0].haslayer(ICMPv6NDOptDstLLAddr))
146         self.assertEqual(rx[0][ICMPv6NDOptDstLLAddr].lladdr, self.pg1.local_mac)
147
148         #
149         # Send the Echo Request from host to remote (of uplink)
150         #
151         id = self.pg1.sw_if_index
152         seq = 0x1
153         echo_request = (
154             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
155             / IPv6(dst=self.pg0.remote_ip6, src=self.pg0.local_ip6)
156             / ICMPv6EchoRequest(seq=seq, id=id)
157         )
158
159         rx = self.send_and_expect(self.pg1, [echo_request], self.pg0)
160         self.assertEqual(rx[0][Ether].src, self.pg0.local_mac)
161         self.assertEqual(rx[0][Ether].dst, self.pg0.remote_mac)
162         self.assertEqual(rx[0][IPv6].src, self.pg0.local_ip6)
163         self.assertEqual(rx[0][IPv6].dst, self.pg0.remote_ip6)
164         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
165         self.assertTrue(rx[0].haslayer(ICMPv6EchoRequest))
166         self.assertEqual(rx[0][ICMPv6EchoRequest].id, id)
167         self.assertEqual(rx[0][ICMPv6EchoRequest].seq, seq)
168
169         #
170         # setup a punt redirect so packets from the uplink go to the tap
171         #
172         redirect = VppIpPuntRedirect(
173             self, self.pg0.sw_if_index, self.pg1.sw_if_index, self.pg0.local_ip6
174         )
175         redirect.add_vpp_config()
176
177         echo_reply = (
178             Ether(dst=self.pg0.remote_mac, src=self.pg0.local_mac)
179             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
180             / ICMPv6EchoReply(seq=1, id=id)
181         )
182
183         rx = self.send_and_expect(self.pg0, [echo_reply], self.pg1)
184         self.assertEqual(rx[0][Ether].src, self.pg1.local_mac)
185         self.assertEqual(rx[0][Ether].dst, self.pg1.remote_mac)
186         self.assertEqual(rx[0][IPv6].src, self.pg0.remote_ip6)
187         self.assertEqual(rx[0][IPv6].dst, self.pg0.local_ip6)
188         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
189         self.assertTrue(rx[0].haslayer(ICMPv6EchoReply))
190         self.assertEqual(rx[0][ICMPv6EchoReply].id, id)
191         self.assertEqual(rx[0][ICMPv6EchoReply].seq, seq)
192
193         #
194         # cleanup
195         #
196         self.vapi.ip6nd_proxy_enable_disable(
197             sw_if_index=self.pg1.sw_if_index, is_enable=0
198         )
199         redirect.remove_vpp_config()
200
201
202 if __name__ == "__main__":
203     unittest.main(testRunner=VppTestRunner)