ip6-nd: add ip6-nd proxy
[vpp.git] / test / test_ip6_nd_mirror_proxy.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import os
5 from socket import AF_INET6, inet_pton, inet_ntop
6
7 from framework import tag_fixme_vpp_workers
8 from framework import VppTestCase, VppTestRunner
9 from vpp_neighbor import VppNeighbor, find_nbr
10 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \
11     VppIpTable, DpoProto, FibPathType, VppIpInterfaceAddress
12 from vpp_papi import VppEnum
13 from vpp_ip import VppIpPuntRedirect
14
15 import scapy.compat
16 from scapy.packet import Raw
17 from scapy.layers.l2 import Ether, ARP, Dot1Q
18 from scapy.layers.inet import IP, UDP, TCP
19 from scapy.layers.inet6 import IPv6, ipv6nh, ICMPv6ND_NS, ICMPv6ND_NA, \
20     ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr, ICMPv6EchoRequest, \
21     ICMPv6EchoReply
22 from scapy.utils6 import in6_ptop, in6_getnsma, in6_getnsmac, in6_ismaddr
23
24
25 class TestNDPROXY(VppTestCase):
26     """ IP6 ND (mirror) Proxy Test Case """
27
28     @classmethod
29     def setUpClass(self):
30         super(TestNDPROXY, self).setUpClass()
31         self.create_pg_interfaces(range(2))
32
33     @classmethod
34     def tearDownClass(self):
35         super(TestNDPROXY, self).tearDownClass()
36
37     def setUp(self):
38         super(TestNDPROXY, self).setUp()
39         for i in self.pg_interfaces:
40             i.admin_up()
41             i.config_ip6()
42             i.disable_ipv6_ra()
43
44     def tearDown(self):
45         super(TestNDPROXY, self).tearDown()
46         if not self.vpp_dead:
47             for i in self.pg_interfaces:
48                 i.unconfig_ip6()
49                 i.admin_down()
50
51     def test_nd_mirror_proxy(self):
52         """ Interface (Mirror) Proxy ND """
53
54         #
55         # When VPP has an interface whose address is also applied to a TAP
56         # interface on the host, then VPP's TAP interface will be unnumbered
57         # to the 'real' interface and do proxy ND from the host.
58         # the curious aspect of this setup is that ND requests from the host
59         # will come from the VPP's own address.
60         #
61         addr = self.pg0.remote_ip6
62         nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
63         d = inet_ntop(socket.AF_INET6, nsma)
64
65         # Make pg1 un-numbered to pg0
66         #
67         self.pg1.unconfig_ip6()
68         self.pg1.set_unnumbered(self.pg0.sw_if_index)
69
70         #
71         # Enable ND proxy on pg1
72         #
73         self.vapi.ip6nd_proxy_enable_disable(sw_if_index=self.pg1.sw_if_index,
74                                              is_enable=1)
75         #
76         # Send the ND request with an originating address that
77         # is VPP's own address
78         #
79         nd_req_from_host = (Ether(src=self.pg1.remote_mac,
80                                   dst=in6_getnsmac(nsma)) /
81                             IPv6(dst=d, src=self.pg0.local_ip6) /
82                             ICMPv6ND_NS(tgt=addr) /
83                             ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_mac))
84
85         rx = self.send_and_expect(self.pg1, [nd_req_from_host], self.pg1)
86         self.assertEqual(rx[0][Ether].src, self.pg1.local_mac)
87         self.assertEqual(rx[0][Ether].dst, self.pg1.remote_mac)
88         self.assertEqual(rx[0][IPv6].src, self.pg0.remote_ip6)
89         self.assertEqual(rx[0][IPv6].dst, self.pg0.local_ip6)
90         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
91         self.assertEqual(rx[0][ICMPv6ND_NA].tgt, self.pg0.remote_ip6)
92         self.assertTrue(rx[0].haslayer(ICMPv6NDOptDstLLAddr))
93         self.assertEqual(rx[0][ICMPv6NDOptDstLLAddr].lladdr,
94                          self.pg1.local_mac)
95
96         #
97         # Send the unicast ND request
98         #
99         unicast_nd_req_from_host = (Ether(src=self.pg1.remote_mac,
100                                           dst=self.pg1.local_mac) /
101                                     IPv6(dst=self.pg0.remote_ip6,
102                                          src=self.pg1.remote_ip6_ll) /
103                                     ICMPv6ND_NS(tgt=self.pg0.remote_ip6) /
104                                     ICMPv6NDOptSrcLLAddr(
105                                          lladdr=self.pg1.remote_mac))
106
107         rx = self.send_and_expect(self.pg1, [unicast_nd_req_from_host],
108                                   self.pg0)
109         self.assertEqual(rx[0][Ether].src, self.pg0.local_mac)
110         self.assertEqual(rx[0][Ether].dst, in6_getnsmac(nsma))
111         self.assertEqual(rx[0][IPv6].src, self.pg0.local_ip6)
112         self.assertEqual(rx[0][IPv6].dst, d)
113         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
114         self.assertEqual(rx[0][ICMPv6ND_NS].tgt, self.pg0.remote_ip6)
115         self.assertTrue(rx[0].haslayer(ICMPv6NDOptSrcLLAddr))
116         self.assertEqual(rx[0][ICMPv6NDOptSrcLLAddr].lladdr,
117                          self.pg0.local_mac)
118
119         # Resolve the NDs on the uplink
120         self.pg0.resolve_ndp()
121
122         #
123         # Again send the unicast ND request, this time dst address should be
124         # in local cache
125         #
126         rx = self.send_and_expect(self.pg1, [unicast_nd_req_from_host],
127                                   self.pg1)
128         self.assertEqual(rx[0][Ether].src, self.pg1.local_mac)
129         self.assertEqual(rx[0][Ether].dst, self.pg1.remote_mac)
130         self.assertEqual(rx[0][IPv6].src, self.pg0.remote_ip6)
131         self.assertEqual(in6_ptop(rx[0][IPv6].dst),
132                          in6_ptop(self.pg1.remote_ip6_ll))
133         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
134         self.assertEqual(rx[0][ICMPv6ND_NA].tgt, self.pg0.remote_ip6)
135         self.assertTrue(rx[0].haslayer(ICMPv6NDOptDstLLAddr))
136         self.assertEqual(rx[0][ICMPv6NDOptDstLLAddr].lladdr,
137                          self.pg1.local_mac)
138
139         #
140         # Send the Echo Request from host to remote (of uplink)
141         #
142         id = self.pg1.sw_if_index
143         seq = 0x1
144         echo_request = (Ether(dst=self.pg1.local_mac,
145                               src=self.pg1.remote_mac) /
146                         IPv6(dst=self.pg0.remote_ip6, src=self.pg0.local_ip6) /
147                         ICMPv6EchoRequest(seq=seq, id=id))
148
149         rx = self.send_and_expect(self.pg1, [echo_request], self.pg0)
150         self.assertEqual(rx[0][Ether].src, self.pg0.local_mac)
151         self.assertEqual(rx[0][Ether].dst, self.pg0.remote_mac)
152         self.assertEqual(rx[0][IPv6].src, self.pg0.local_ip6)
153         self.assertEqual(rx[0][IPv6].dst, self.pg0.remote_ip6)
154         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
155         self.assertTrue(rx[0].haslayer(ICMPv6EchoRequest))
156         self.assertEqual(rx[0][ICMPv6EchoRequest].id, id)
157         self.assertEqual(rx[0][ICMPv6EchoRequest].seq, seq)
158
159         #
160         # setup a punt redirect so packets from the uplink go to the tap
161         #
162         redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index,
163                                      self.pg1.sw_if_index, self.pg0.local_ip6)
164         redirect.add_vpp_config()
165
166         echo_reply = (Ether(dst=self.pg0.remote_mac, src=self.pg0.local_mac) /
167                       IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
168                       ICMPv6EchoReply(seq=1, id=id))
169
170         rx = self.send_and_expect(self.pg0, [echo_reply], self.pg1)
171         self.assertEqual(rx[0][Ether].src, self.pg1.local_mac)
172         self.assertEqual(rx[0][Ether].dst, self.pg1.remote_mac)
173         self.assertEqual(rx[0][IPv6].src, self.pg0.remote_ip6)
174         self.assertEqual(rx[0][IPv6].dst, self.pg0.local_ip6)
175         self.assertEqual(ipv6nh[rx[0][IPv6].nh], "ICMPv6")
176         self.assertTrue(rx[0].haslayer(ICMPv6EchoReply))
177         self.assertEqual(rx[0][ICMPv6EchoReply].id, id)
178         self.assertEqual(rx[0][ICMPv6EchoReply].seq, seq)
179
180         #
181         # cleanup
182         #
183         self.vapi.ip6nd_proxy_enable_disable(sw_if_index=self.pg1.sw_if_index,
184                                              is_enable=0)
185         redirect.remove_vpp_config()
186
187 if __name__ == '__main__':
188     unittest.main(testRunner=VppTestRunner)