1 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
2 DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
3 DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
4 DHCP6_Rebind, DUID_LLT, DHCP6_Release, DHCP6OptElapsedTime
5 from scapy.layers.inet6 import IPv6, Ether, UDP
6 from scapy.utils6 import in6_mactoifaceid
7 from scapy.utils import inet_ntop, inet_pton
8 from socket import AF_INET6
9 from framework import VppTestCase
13 def ip6_normalize(ip6):
14 return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
18 euid = in6_mactoifaceid(mac)
19 addr = "fe80::" + euid
23 class TestDHCPv6PD(VppTestCase):
24 """ DHCPv6 PD Data Plane Test Case """
28 super(TestDHCPv6PD, cls).setUpClass()
31 super(TestDHCPv6PD, self).setUp()
33 self.create_pg_interfaces(range(1))
34 self.interfaces = list(self.pg_interfaces)
35 for i in self.interfaces:
39 time_since_2000 = int(time()) - 946684800
40 self.server_duid = DUID_LLT(timeval=time_since_2000,
41 lladdr=self.pg0.remote_mac)
44 for i in self.interfaces:
47 super(TestDHCPv6PD, self).tearDown()
49 def test_dhcp_send_solicit_receive_advertise(self):
50 """ Verify DHCPv6 PD Solicit packet and received Advertise envent """
52 self.pg_enable_capture(self.pg_interfaces)
54 prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
55 prefix = {'prefix': prefix_bin,
59 self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
60 T1=20, T2=40, prefixes=[prefix])
61 rx_list = self.pg0.get_capture(1)
62 self.assertEqual(len(rx_list), 1)
65 self.assertTrue(packet.haslayer(IPv6))
66 self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit))
68 client_duid = packet[DHCP6OptClientId].duid
69 trid = packet[DHCP6_Solicit].trid
71 dst = ip6_normalize(packet[IPv6].dst)
72 dst2 = ip6_normalize("ff02::1:2")
73 self.assert_equal(dst, dst2)
74 src = ip6_normalize(packet[IPv6].src)
75 src2 = ip6_normalize(self.pg0.local_ip6_ll)
76 self.assert_equal(src, src2)
77 ia_pd = packet[DHCP6OptIA_PD]
78 self.assert_equal(ia_pd.T1, 20)
79 self.assert_equal(ia_pd.T2, 40)
80 self.assert_equal(len(ia_pd.iapdopt), 1)
81 prefix = ia_pd.iapdopt[0]
82 self.assert_equal(prefix.prefix, '1:2:3::')
83 self.assert_equal(prefix.plen, 50)
84 self.assert_equal(prefix.preflft, 60)
85 self.assert_equal(prefix.validlft, 120)
87 self.vapi.want_dhcp6_pd_reply_events()
88 self.vapi.dhcp6_clients_enable_disable()
90 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
92 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
93 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
94 dst=self.pg0.local_ip6_ll) /
95 UDP(sport=547, dport=546) /
96 DHCP6_Advertise(trid=trid) /
97 DHCP6OptServerId(duid=self.server_duid) /
98 DHCP6OptClientId(duid=client_duid) /
99 DHCP6OptPref(prefval=7) /
100 DHCP6OptStatusCode(statuscode=1) /
101 DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
103 self.pg0.add_stream([p])
106 ev = self.vapi.wait_for_event(10, "dhcp6_pd_reply_event")
108 self.assert_equal(ev.preference, 7)
109 self.assert_equal(ev.status_code, 1)
110 self.assert_equal(ev.T1, 20)
111 self.assert_equal(ev.T2, 40)
113 reported_prefix = ev.prefixes[0]
114 prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
115 self.assert_equal(reported_prefix.prefix, prefix)
116 self.assert_equal(reported_prefix.prefix_length,
117 ia_pd_opts.getfieldval("plen"))
118 self.assert_equal(reported_prefix.preferred_time,
119 ia_pd_opts.getfieldval("preflft"))
120 self.assert_equal(reported_prefix.valid_time,
121 ia_pd_opts.getfieldval("validlft"))
124 class TestDHCPv6PDControlPlane(VppTestCase):
125 """ DHCPv6 PD Control Plane Test Case """
129 super(TestDHCPv6PDControlPlane, cls).setUpClass()
132 super(TestDHCPv6PDControlPlane, self).setUp()
134 self.create_pg_interfaces(range(2))
135 self.interfaces = list(self.pg_interfaces)
136 for i in self.interfaces:
139 time_since_2000 = int(time()) - 946684800
140 self.server_duid = DUID_LLT(timeval=time_since_2000,
141 lladdr=self.pg0.remote_mac)
142 self.client_duid = None
146 fib = self.vapi.ip6_fib_dump()
147 self.initial_addresses = set(self.get_interface_addresses(fib,
150 self.pg_enable_capture(self.pg_interfaces)
153 self.prefix_group = 'my-pd-prefix-group'
155 self.vapi.dhcp6_pd_client_enable_disable(
156 self.pg0.sw_if_index,
157 prefix_group=self.prefix_group)
160 self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
163 for i in self.interfaces:
166 super(TestDHCPv6PDControlPlane, self).tearDown()
169 def get_interface_addresses(fib, pg):
172 if entry.address_length == 128:
174 if path.sw_if_index == pg.sw_if_index:
175 lst.append(entry.address)
178 def get_addresses(self):
179 fib = self.vapi.ip6_fib_dump()
180 addresses = set(self.get_interface_addresses(fib, self.pg1))
181 return addresses.difference(self.initial_addresses)
183 def validate_duid_llt(self, duid):
186 def validate_packet(self, packet, msg_type, is_resend=False):
188 self.assertTrue(packet.haslayer(msg_type))
189 client_duid = packet[DHCP6OptClientId].duid
190 if self.client_duid is None:
191 self.client_duid = client_duid
192 self.validate_duid_llt(client_duid)
194 self.assertEqual(self.client_duid, client_duid)
195 if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
196 server_duid = packet[DHCP6OptServerId].duid
197 self.assertEqual(server_duid, self.server_duid)
199 self.assertEqual(self.trid, packet[msg_type].trid)
201 self.trid = packet[msg_type].trid
204 self.assertEqual(ip.dst, 'ff02::1:2')
205 self.assertEqual(udp.sport, 546)
206 self.assertEqual(udp.dport, 547)
207 dhcpv6 = packet[msg_type]
208 elapsed_time = dhcpv6[DHCP6OptElapsedTime]
210 self.assertNotEqual(elapsed_time.elapsedtime, 0)
212 self.assertEqual(elapsed_time.elapsedtime, 0)
217 def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
220 rx_list = self.pg0.get_capture(1, timeout=timeout)
222 self.validate_packet(packet, msg_type, is_resend=is_resend)
224 def wait_for_solicit(self, timeout=None, is_resend=False):
225 self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
227 def wait_for_request(self, timeout=None, is_resend=False):
228 self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
230 def wait_for_renew(self, timeout=None, is_resend=False):
231 self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
233 def wait_for_rebind(self, timeout=None, is_resend=False):
234 self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
236 def wait_for_release(self, timeout=None, is_resend=False):
237 self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
239 def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
245 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
247 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
248 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
249 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
250 dst=self.pg0.local_ip6_ll) /
251 UDP(sport=547, dport=546) /
252 msg_type(trid=self.trid) /
253 DHCP6OptServerId(duid=self.server_duid) /
254 DHCP6OptClientId(duid=self.client_duid) /
257 self.pg0.add_stream([p])
258 self.pg_enable_capture(self.pg_interfaces)
261 def send_advertise(self, t1=None, t2=None, iapdopt=None):
262 self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
264 def send_reply(self, t1=None, t2=None, iapdopt=None):
265 self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
267 def test_T1_and_T2_timeouts(self):
268 """ Test T1 and T2 timeouts """
270 self.wait_for_solicit()
271 self.send_advertise()
272 self.wait_for_request()
277 self.wait_for_renew()
279 self.pg_enable_capture(self.pg_interfaces)
283 self.wait_for_rebind()
285 def test_prefixes(self):
286 """ Test handling of prefixes """
290 address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
291 address_prefix_length_1 = 60
292 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
294 address_prefix_length_1,
297 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
300 self.wait_for_solicit()
301 self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
302 self.wait_for_request()
303 self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
306 # check FIB for new address
307 new_addresses = self.get_addresses()
308 self.assertEqual(len(new_addresses), 1)
309 addr = list(new_addresses)[0]
310 self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
314 address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
315 address_prefix_length_2 = 62
316 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
318 address_prefix_length_2,
323 # check FIB contains 2 addresses
324 fib = self.vapi.ip6_fib_dump()
325 addresses = set(self.get_interface_addresses(fib, self.pg1))
326 new_addresses = addresses.difference(self.initial_addresses)
327 self.assertEqual(len(new_addresses), 2)
328 addr1 = list(new_addresses)[0]
329 addr2 = list(new_addresses)[1]
330 if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
331 addr1, addr2 = addr2, addr1
332 self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
333 self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
337 # check that the addresses are deleted
338 fib = self.vapi.ip6_fib_dump()
339 addresses = set(self.get_interface_addresses(fib, self.pg1))
340 new_addresses = addresses.difference(self.initial_addresses)
341 self.assertEqual(len(new_addresses), 0)
344 if address_bin_1 is not None:
345 self.vapi.ip6_add_del_address_using_prefix(
346 self.pg1.sw_if_index, address_bin_1,
347 address_prefix_length_1, self.prefix_group, is_add=0)
348 if address_bin_2 is not None:
349 self.vapi.ip6_add_del_address_using_prefix(
350 self.pg1.sw_if_index, address_bin_2,
351 address_prefix_length_2, self.prefix_group, is_add=0)
353 def test_sending_client_messages_solicit(self):
354 """ VPP receives messages from DHCPv6 client """
356 self.wait_for_solicit()
357 self.send_packet(DHCP6_Solicit)
358 self.send_packet(DHCP6_Request)
359 self.send_packet(DHCP6_Renew)
360 self.send_packet(DHCP6_Rebind)
362 self.wait_for_solicit(is_resend=True)
364 def test_sending_inapropriate_packets(self):
365 """ Server sends messages with inapropriate message types """
367 self.wait_for_solicit()
369 self.wait_for_solicit(is_resend=True)
370 self.send_advertise()
371 self.wait_for_request()
372 self.send_advertise()
373 self.wait_for_request(is_resend=True)
375 self.wait_for_renew()
377 def test_no_prefix_available_in_advertise(self):
378 """ Advertise message contains NoPrefixAvail status code """
380 self.wait_for_solicit()
381 noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail
382 self.send_advertise(iapdopt=noavail)
383 self.wait_for_solicit(is_resend=True)
385 def test_preferred_greater_than_valit_lifetime(self):
386 """ Preferred lifetime is greater than valid lifetime """
389 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
390 address_prefix_length = 60
391 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
393 address_prefix_length,
396 self.wait_for_solicit()
397 self.send_advertise()
398 self.wait_for_request()
399 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
401 self.send_reply(iapdopt=ia_pd_opts)
405 # check FIB contains no addresses
406 fib = self.vapi.ip6_fib_dump()
407 addresses = set(self.get_interface_addresses(fib, self.pg1))
408 new_addresses = addresses.difference(self.initial_addresses)
409 self.assertEqual(len(new_addresses), 0)
412 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
414 address_prefix_length,
418 def test_T1_greater_than_T2(self):
419 """ T1 is greater than T2 """
422 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
423 address_prefix_length = 60
424 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
426 address_prefix_length,
429 self.wait_for_solicit()
430 self.send_advertise()
431 self.wait_for_request()
432 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
434 self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
438 # check FIB contains no addresses
439 fib = self.vapi.ip6_fib_dump()
440 addresses = set(self.get_interface_addresses(fib, self.pg1))
441 new_addresses = addresses.difference(self.initial_addresses)
442 self.assertEqual(len(new_addresses), 0)
445 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
447 address_prefix_length,