Implement DHCPv6 PD client (VPP-718, VPP-1050)
[vpp.git] / test / test_dhcp6.py
1 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
2     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
3     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
4     DHCP6_Rebind, DUID_LLT, DHCP6_Release, DHCP6OptElapsedTime
5 from scapy.layers.inet6 import IPv6, Ether, UDP
6 from scapy.utils6 import in6_mactoifaceid
7 from scapy.utils import inet_ntop, inet_pton
8 from socket import AF_INET6
9 from framework import VppTestCase
10 from time import time
11
12
13 def ip6_normalize(ip6):
14     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
15
16
17 def mk_ll_addr(mac):
18     euid = in6_mactoifaceid(mac)
19     addr = "fe80::" + euid
20     return addr
21
22
23 class TestDHCPv6PD(VppTestCase):
24     """ DHCPv6 PD Data Plane Test Case """
25
26     @classmethod
27     def setUpClass(cls):
28         super(TestDHCPv6PD, cls).setUpClass()
29
30     def setUp(self):
31         super(TestDHCPv6PD, self).setUp()
32
33         self.create_pg_interfaces(range(1))
34         self.interfaces = list(self.pg_interfaces)
35         for i in self.interfaces:
36             i.admin_up()
37             i.config_ip6()
38
39         time_since_2000 = int(time()) - 946684800
40         self.server_duid = DUID_LLT(timeval=time_since_2000,
41                                     lladdr=self.pg0.remote_mac)
42
43     def tearDown(self):
44         for i in self.interfaces:
45             i.unconfig_ip6()
46             i.admin_down()
47         super(TestDHCPv6PD, self).tearDown()
48
49     def test_dhcp_send_solicit_receive_advertise(self):
50         """ Verify DHCPv6 PD Solicit packet and received Advertise envent """
51
52         self.pg_enable_capture(self.pg_interfaces)
53         self.pg_start()
54         prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
55         prefix = {'prefix': prefix_bin,
56                   'prefix_length': 50,
57                   'preferred_time': 60,
58                   'valid_time': 120}
59         self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
60                                                T1=20, T2=40, prefixes=[prefix])
61         rx_list = self.pg0.get_capture(1)
62         self.assertEqual(len(rx_list), 1)
63         packet = rx_list[0]
64
65         self.assertTrue(packet.haslayer(IPv6))
66         self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit))
67
68         client_duid = packet[DHCP6OptClientId].duid
69         trid = packet[DHCP6_Solicit].trid
70
71         dst = ip6_normalize(packet[IPv6].dst)
72         dst2 = ip6_normalize("ff02::1:2")
73         self.assert_equal(dst, dst2)
74         src = ip6_normalize(packet[IPv6].src)
75         src2 = ip6_normalize(self.pg0.local_ip6_ll)
76         self.assert_equal(src, src2)
77         ia_pd = packet[DHCP6OptIA_PD]
78         self.assert_equal(ia_pd.T1, 20)
79         self.assert_equal(ia_pd.T2, 40)
80         self.assert_equal(len(ia_pd.iapdopt), 1)
81         prefix = ia_pd.iapdopt[0]
82         self.assert_equal(prefix.prefix, '1:2:3::')
83         self.assert_equal(prefix.plen, 50)
84         self.assert_equal(prefix.preflft, 60)
85         self.assert_equal(prefix.validlft, 120)
86
87         self.vapi.want_dhcp6_pd_reply_events()
88         self.vapi.dhcp6_clients_enable_disable()
89
90         ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
91                                       validlft=120)
92         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
93              IPv6(src=mk_ll_addr(self.pg0.remote_mac),
94                   dst=self.pg0.local_ip6_ll) /
95              UDP(sport=547, dport=546) /
96              DHCP6_Advertise(trid=trid) /
97              DHCP6OptServerId(duid=self.server_duid) /
98              DHCP6OptClientId(duid=client_duid) /
99              DHCP6OptPref(prefval=7) /
100              DHCP6OptStatusCode(statuscode=1) /
101              DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
102              )
103         self.pg0.add_stream([p])
104         self.pg_start()
105
106         ev = self.vapi.wait_for_event(10, "dhcp6_pd_reply_event")
107
108         self.assert_equal(ev.preference, 7)
109         self.assert_equal(ev.status_code, 1)
110         self.assert_equal(ev.T1, 20)
111         self.assert_equal(ev.T2, 40)
112
113         reported_prefix = ev.prefixes[0]
114         prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
115         self.assert_equal(reported_prefix.prefix, prefix)
116         self.assert_equal(reported_prefix.prefix_length,
117                           ia_pd_opts.getfieldval("plen"))
118         self.assert_equal(reported_prefix.preferred_time,
119                           ia_pd_opts.getfieldval("preflft"))
120         self.assert_equal(reported_prefix.valid_time,
121                           ia_pd_opts.getfieldval("validlft"))
122
123
124 class TestDHCPv6PDControlPlane(VppTestCase):
125     """ DHCPv6 PD Control Plane Test Case """
126
127     @classmethod
128     def setUpClass(cls):
129         super(TestDHCPv6PDControlPlane, cls).setUpClass()
130
131     def setUp(self):
132         super(TestDHCPv6PDControlPlane, self).setUp()
133
134         self.create_pg_interfaces(range(2))
135         self.interfaces = list(self.pg_interfaces)
136         for i in self.interfaces:
137             i.admin_up()
138
139         time_since_2000 = int(time()) - 946684800
140         self.server_duid = DUID_LLT(timeval=time_since_2000,
141                                     lladdr=self.pg0.remote_mac)
142         self.client_duid = None
143         self.T1 = 1
144         self.T2 = 2
145
146         fib = self.vapi.ip6_fib_dump()
147         self.initial_addresses = set(self.get_interface_addresses(fib,
148                                                                   self.pg1))
149
150         self.pg_enable_capture(self.pg_interfaces)
151         self.pg_start()
152
153         self.prefix_group = 'my-pd-prefix-group'
154
155         self.vapi.dhcp6_pd_client_enable_disable(
156             self.pg0.sw_if_index,
157             prefix_group=self.prefix_group)
158
159     def tearDown(self):
160         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
161                                                  enable=0)
162
163         for i in self.interfaces:
164             i.admin_down()
165
166         super(TestDHCPv6PDControlPlane, self).tearDown()
167
168     @staticmethod
169     def get_interface_addresses(fib, pg):
170         lst = []
171         for entry in fib:
172             if entry.address_length == 128:
173                 path = entry.path[0]
174                 if path.sw_if_index == pg.sw_if_index:
175                     lst.append(entry.address)
176         return lst
177
178     def get_addresses(self):
179         fib = self.vapi.ip6_fib_dump()
180         addresses = set(self.get_interface_addresses(fib, self.pg1))
181         return addresses.difference(self.initial_addresses)
182
183     def validate_duid_llt(self, duid):
184         DUID_LLT(duid)
185
186     def validate_packet(self, packet, msg_type, is_resend=False):
187         try:
188             self.assertTrue(packet.haslayer(msg_type))
189             client_duid = packet[DHCP6OptClientId].duid
190             if self.client_duid is None:
191                 self.client_duid = client_duid
192                 self.validate_duid_llt(client_duid)
193             else:
194                 self.assertEqual(self.client_duid, client_duid)
195             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
196                 server_duid = packet[DHCP6OptServerId].duid
197                 self.assertEqual(server_duid, self.server_duid)
198             if is_resend:
199                 self.assertEqual(self.trid, packet[msg_type].trid)
200             else:
201                 self.trid = packet[msg_type].trid
202             ip = packet[IPv6]
203             udp = packet[UDP]
204             self.assertEqual(ip.dst, 'ff02::1:2')
205             self.assertEqual(udp.sport, 546)
206             self.assertEqual(udp.dport, 547)
207             dhcpv6 = packet[msg_type]
208             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
209             if (is_resend):
210                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
211             else:
212                 self.assertEqual(elapsed_time.elapsedtime, 0)
213         except:
214             packet.show()
215             raise
216
217     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
218         if timeout is None:
219             timeout = 3
220         rx_list = self.pg0.get_capture(1, timeout=timeout)
221         packet = rx_list[0]
222         self.validate_packet(packet, msg_type, is_resend=is_resend)
223
224     def wait_for_solicit(self, timeout=None, is_resend=False):
225         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
226
227     def wait_for_request(self, timeout=None, is_resend=False):
228         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
229
230     def wait_for_renew(self, timeout=None, is_resend=False):
231         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
232
233     def wait_for_rebind(self, timeout=None, is_resend=False):
234         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
235
236     def wait_for_release(self, timeout=None, is_resend=False):
237         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
238
239     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
240         if t1 is None:
241             t1 = self.T1
242         if t2 is None:
243             t2 = self.T2
244         if iapdopt is None:
245             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
246         else:
247             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
248         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
249              IPv6(src=mk_ll_addr(self.pg0.remote_mac),
250                   dst=self.pg0.local_ip6_ll) /
251              UDP(sport=547, dport=546) /
252              msg_type(trid=self.trid) /
253              DHCP6OptServerId(duid=self.server_duid) /
254              DHCP6OptClientId(duid=self.client_duid) /
255              opt_ia_pd
256              )
257         self.pg0.add_stream([p])
258         self.pg_enable_capture(self.pg_interfaces)
259         self.pg_start()
260
261     def send_advertise(self, t1=None, t2=None, iapdopt=None):
262         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
263
264     def send_reply(self, t1=None, t2=None, iapdopt=None):
265         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
266
267     def test_T1_and_T2_timeouts(self):
268         """ Test T1 and T2 timeouts """
269
270         self.wait_for_solicit()
271         self.send_advertise()
272         self.wait_for_request()
273         self.send_reply()
274
275         self.sleep(1)
276
277         self.wait_for_renew()
278
279         self.pg_enable_capture(self.pg_interfaces)
280
281         self.sleep(1)
282
283         self.wait_for_rebind()
284
285     def test_prefixes(self):
286         """ Test handling of prefixes """
287         address_bin_1 = None
288         address_bin_2 = None
289         try:
290             address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
291             address_prefix_length_1 = 60
292             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
293                                                        address_bin_1,
294                                                        address_prefix_length_1,
295                                                        self.prefix_group)
296
297             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
298                                           validlft=3)
299
300             self.wait_for_solicit()
301             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
302             self.wait_for_request()
303             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
304             self.sleep(0.1)
305
306             # check FIB for new address
307             new_addresses = self.get_addresses()
308             self.assertEqual(len(new_addresses), 1)
309             addr = list(new_addresses)[0]
310             self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
311
312             self.sleep(1)
313
314             address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
315             address_prefix_length_2 = 62
316             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
317                                                        address_bin_2,
318                                                        address_prefix_length_2,
319                                                        self.prefix_group)
320
321             self.sleep(1)
322
323             # check FIB contains 2 addresses
324             fib = self.vapi.ip6_fib_dump()
325             addresses = set(self.get_interface_addresses(fib, self.pg1))
326             new_addresses = addresses.difference(self.initial_addresses)
327             self.assertEqual(len(new_addresses), 2)
328             addr1 = list(new_addresses)[0]
329             addr2 = list(new_addresses)[1]
330             if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
331                 addr1, addr2 = addr2, addr1
332             self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
333             self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
334
335             self.sleep(1)
336
337             # check that the addresses are deleted
338             fib = self.vapi.ip6_fib_dump()
339             addresses = set(self.get_interface_addresses(fib, self.pg1))
340             new_addresses = addresses.difference(self.initial_addresses)
341             self.assertEqual(len(new_addresses), 0)
342
343         finally:
344             if address_bin_1 is not None:
345                 self.vapi.ip6_add_del_address_using_prefix(
346                     self.pg1.sw_if_index, address_bin_1,
347                     address_prefix_length_1, self.prefix_group, is_add=0)
348             if address_bin_2 is not None:
349                 self.vapi.ip6_add_del_address_using_prefix(
350                     self.pg1.sw_if_index, address_bin_2,
351                     address_prefix_length_2, self.prefix_group, is_add=0)
352
353     def test_sending_client_messages_solicit(self):
354         """ VPP receives messages from DHCPv6 client """
355
356         self.wait_for_solicit()
357         self.send_packet(DHCP6_Solicit)
358         self.send_packet(DHCP6_Request)
359         self.send_packet(DHCP6_Renew)
360         self.send_packet(DHCP6_Rebind)
361         self.sleep(1)
362         self.wait_for_solicit(is_resend=True)
363
364     def test_sending_inapropriate_packets(self):
365         """ Server sends messages with inapropriate message types """
366
367         self.wait_for_solicit()
368         self.send_reply()
369         self.wait_for_solicit(is_resend=True)
370         self.send_advertise()
371         self.wait_for_request()
372         self.send_advertise()
373         self.wait_for_request(is_resend=True)
374         self.send_reply()
375         self.wait_for_renew()
376
377     def test_no_prefix_available_in_advertise(self):
378         """ Advertise message contains NoPrefixAvail status code """
379
380         self.wait_for_solicit()
381         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
382         self.send_advertise(iapdopt=noavail)
383         self.wait_for_solicit(is_resend=True)
384
385     def test_preferred_greater_than_valit_lifetime(self):
386         """ Preferred lifetime is greater than valid lifetime """
387
388         try:
389             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
390             address_prefix_length = 60
391             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
392                                                        address_bin,
393                                                        address_prefix_length,
394                                                        self.prefix_group)
395
396             self.wait_for_solicit()
397             self.send_advertise()
398             self.wait_for_request()
399             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
400                                           validlft=3)
401             self.send_reply(iapdopt=ia_pd_opts)
402
403             self.sleep(0.5)
404
405             # check FIB contains no addresses
406             fib = self.vapi.ip6_fib_dump()
407             addresses = set(self.get_interface_addresses(fib, self.pg1))
408             new_addresses = addresses.difference(self.initial_addresses)
409             self.assertEqual(len(new_addresses), 0)
410
411         finally:
412             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
413                                                        address_bin,
414                                                        address_prefix_length,
415                                                        self.prefix_group,
416                                                        is_add=0)
417
418     def test_T1_greater_than_T2(self):
419         """ T1 is greater than T2 """
420
421         try:
422             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
423             address_prefix_length = 60
424             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
425                                                        address_bin,
426                                                        address_prefix_length,
427                                                        self.prefix_group)
428
429             self.wait_for_solicit()
430             self.send_advertise()
431             self.wait_for_request()
432             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
433                                           validlft=8)
434             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
435
436             self.sleep(0.5)
437
438             # check FIB contains no addresses
439             fib = self.vapi.ip6_fib_dump()
440             addresses = set(self.get_interface_addresses(fib, self.pg1))
441             new_addresses = addresses.difference(self.initial_addresses)
442             self.assertEqual(len(new_addresses), 0)
443
444         finally:
445             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
446                                                        address_bin,
447                                                        address_prefix_length,
448                                                        self.prefix_group,
449                                                        is_add=0)