tests: replace pycodestyle with black
[vpp.git] / test / test_dhcp.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5 import struct
6 import six
7
8 from framework import VppTestCase, VppTestRunner
9 from framework import tag_run_solo
10 from vpp_neighbor import VppNeighbor
11 from vpp_ip_route import find_route, VppIpTable
12 from util import mk_ll_addr
13 import scapy.compat
14 from scapy.layers.l2 import Ether, getmacbyip, ARP, Dot1Q
15 from scapy.layers.inet import IP, UDP, ICMP
16 from scapy.layers.inet6 import IPv6, in6_getnsmac
17 from scapy.utils6 import in6_mactoifaceid
18 from scapy.layers.dhcp import DHCP, BOOTP, DHCPTypes
19 from scapy.layers.dhcp6 import (
20     DHCP6,
21     DHCP6_Solicit,
22     DHCP6_RelayForward,
23     DHCP6_RelayReply,
24     DHCP6_Advertise,
25     DHCP6OptRelayMsg,
26     DHCP6OptIfaceId,
27     DHCP6OptStatusCode,
28     DHCP6OptVSS,
29     DHCP6OptClientLinkLayerAddr,
30     DHCP6_Request,
31 )
32 from socket import AF_INET, AF_INET6, inet_pton, inet_ntop
33 from scapy.utils6 import in6_ptop
34 from vpp_papi import mac_pton, VppEnum
35 from vpp_sub_interface import VppDot1QSubint
36 from vpp_qos import VppQosEgressMap, VppQosMark
37 from vpp_dhcp import VppDHCPClient, VppDHCPProxy
38
39
40 DHCP4_CLIENT_PORT = 68
41 DHCP4_SERVER_PORT = 67
42 DHCP6_CLIENT_PORT = 547
43 DHCP6_SERVER_PORT = 546
44
45
46 @tag_run_solo
47 class TestDHCP(VppTestCase):
48     """DHCP Test Case"""
49
50     @classmethod
51     def setUpClass(cls):
52         super(TestDHCP, cls).setUpClass()
53
54     @classmethod
55     def tearDownClass(cls):
56         super(TestDHCP, cls).tearDownClass()
57
58     def setUp(self):
59         super(TestDHCP, self).setUp()
60
61         # create 6 pg interfaces for pg0 to pg5
62         self.create_pg_interfaces(range(6))
63         self.tables = []
64
65         # pg0 to 2 are IP configured in VRF 0, 1 and 2.
66         # pg3 to 5 are non IP-configured in VRF 0, 1 and 2.
67         table_id = 0
68         for table_id in range(1, 4):
69             tbl4 = VppIpTable(self, table_id)
70             tbl4.add_vpp_config()
71             self.tables.append(tbl4)
72             tbl6 = VppIpTable(self, table_id, is_ip6=1)
73             tbl6.add_vpp_config()
74             self.tables.append(tbl6)
75
76         table_id = 0
77         for i in self.pg_interfaces[:3]:
78             i.admin_up()
79             i.set_table_ip4(table_id)
80             i.set_table_ip6(table_id)
81             i.config_ip4()
82             i.resolve_arp()
83             i.config_ip6()
84             i.resolve_ndp()
85             table_id += 1
86
87         table_id = 0
88         for i in self.pg_interfaces[3:]:
89             i.admin_up()
90             i.set_table_ip4(table_id)
91             i.set_table_ip6(table_id)
92             table_id += 1
93
94     def tearDown(self):
95         for i in self.pg_interfaces[:3]:
96             i.unconfig_ip4()
97             i.unconfig_ip6()
98
99         for i in self.pg_interfaces:
100             i.set_table_ip4(0)
101             i.set_table_ip6(0)
102             i.admin_down()
103         super(TestDHCP, self).tearDown()
104
105     def verify_dhcp_has_option(self, pkt, option, value):
106         dhcp = pkt[DHCP]
107         found = False
108
109         for i in dhcp.options:
110             if isinstance(i, tuple):
111                 if i[0] == option:
112                     self.assertEqual(i[1], value)
113                     found = True
114
115         self.assertTrue(found)
116
117     def validate_relay_options(self, pkt, intf, ip_addr, vpn_id, fib_id, oui):
118         dhcp = pkt[DHCP]
119         found = 0
120         data = []
121         id_len = len(vpn_id)
122
123         for i in dhcp.options:
124             if isinstance(i, tuple):
125                 if i[0] == "relay_agent_Information":
126                     #
127                     # There are two sb-options present - each of length 6.
128                     #
129                     data = i[1]
130                     if oui != 0:
131                         self.assertEqual(len(data), 24)
132                     elif len(vpn_id) > 0:
133                         self.assertEqual(len(data), len(vpn_id) + 17)
134                     else:
135                         self.assertEqual(len(data), 12)
136
137                     #
138                     # First sub-option is ID 1, len 4, then encoded
139                     #  sw_if_index. This test uses low valued indicies
140                     # so [2:4] are 0.
141                     # The ID space is VPP internal - so no matching value
142                     # scapy
143                     #
144                     self.assertEqual(six.byte2int(data[0:1]), 1)
145                     self.assertEqual(six.byte2int(data[1:2]), 4)
146                     self.assertEqual(six.byte2int(data[2:3]), 0)
147                     self.assertEqual(six.byte2int(data[3:4]), 0)
148                     self.assertEqual(six.byte2int(data[4:5]), 0)
149                     self.assertEqual(six.byte2int(data[5:6]), intf._sw_if_index)
150
151                     #
152                     # next sub-option is the IP address of the client side
153                     # interface.
154                     # sub-option ID=5, length (of a v4 address)=4
155                     #
156                     claddr = socket.inet_pton(AF_INET, ip_addr)
157
158                     self.assertEqual(six.byte2int(data[6:7]), 5)
159                     self.assertEqual(six.byte2int(data[7:8]), 4)
160                     self.assertEqual(data[8], claddr[0])
161                     self.assertEqual(data[9], claddr[1])
162                     self.assertEqual(data[10], claddr[2])
163                     self.assertEqual(data[11], claddr[3])
164
165                     if oui != 0:
166                         # sub-option 151 encodes vss_type 1,
167                         # the 3 byte oui and the 4 byte fib_id
168                         self.assertEqual(id_len, 0)
169                         self.assertEqual(six.byte2int(data[12:13]), 151)
170                         self.assertEqual(six.byte2int(data[13:14]), 8)
171                         self.assertEqual(six.byte2int(data[14:15]), 1)
172                         self.assertEqual(six.byte2int(data[15:16]), 0)
173                         self.assertEqual(six.byte2int(data[16:17]), 0)
174                         self.assertEqual(six.byte2int(data[17:18]), oui)
175                         self.assertEqual(six.byte2int(data[18:19]), 0)
176                         self.assertEqual(six.byte2int(data[19:20]), 0)
177                         self.assertEqual(six.byte2int(data[20:21]), 0)
178                         self.assertEqual(six.byte2int(data[21:22]), fib_id)
179
180                         # VSS control sub-option
181                         self.assertEqual(six.byte2int(data[22:23]), 152)
182                         self.assertEqual(six.byte2int(data[23:24]), 0)
183
184                     if id_len > 0:
185                         # sub-option 151 encode vss_type of 0
186                         # followerd by vpn_id in ascii
187                         self.assertEqual(oui, 0)
188                         self.assertEqual(six.byte2int(data[12:13]), 151)
189                         self.assertEqual(six.byte2int(data[13:14]), id_len + 1)
190                         self.assertEqual(six.byte2int(data[14:15]), 0)
191                         self.assertEqual(data[15 : 15 + id_len].decode("ascii"), vpn_id)
192
193                         # VSS control sub-option
194                         self.assertEqual(
195                             six.byte2int(data[15 + len(vpn_id) : 16 + len(vpn_id)]), 152
196                         )
197                         self.assertEqual(
198                             six.byte2int(data[16 + len(vpn_id) : 17 + len(vpn_id)]), 0
199                         )
200
201                     found = 1
202         self.assertTrue(found)
203
204         return data
205
206     def verify_dhcp_msg_type(self, pkt, name):
207         dhcp = pkt[DHCP]
208         found = False
209         for o in dhcp.options:
210             if isinstance(o, tuple):
211                 if o[0] == "message-type" and DHCPTypes[o[1]] == name:
212                     found = True
213         self.assertTrue(found)
214
215     def verify_dhcp_offer(self, pkt, intf, vpn_id="", fib_id=0, oui=0):
216         ether = pkt[Ether]
217         self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
218         self.assertEqual(ether.src, intf.local_mac)
219
220         ip = pkt[IP]
221         self.assertEqual(ip.dst, "255.255.255.255")
222         self.assertEqual(ip.src, intf.local_ip4)
223
224         udp = pkt[UDP]
225         self.assertEqual(udp.dport, DHCP4_CLIENT_PORT)
226         self.assertEqual(udp.sport, DHCP4_SERVER_PORT)
227
228         self.verify_dhcp_msg_type(pkt, "offer")
229         data = self.validate_relay_options(
230             pkt, intf, intf.local_ip4, vpn_id, fib_id, oui
231         )
232
233     def verify_orig_dhcp_pkt(self, pkt, intf, dscp, l2_bc=True):
234         ether = pkt[Ether]
235         if l2_bc:
236             self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
237         else:
238             self.assertEqual(ether.dst, intf.remote_mac)
239         self.assertEqual(ether.src, intf.local_mac)
240
241         ip = pkt[IP]
242
243         if l2_bc:
244             self.assertEqual(ip.dst, "255.255.255.255")
245             self.assertEqual(ip.src, "0.0.0.0")
246         else:
247             self.assertEqual(ip.dst, intf.remote_ip4)
248             self.assertEqual(ip.src, intf.local_ip4)
249         self.assertEqual(ip.tos, dscp)
250
251         udp = pkt[UDP]
252         self.assertEqual(udp.dport, DHCP4_SERVER_PORT)
253         self.assertEqual(udp.sport, DHCP4_CLIENT_PORT)
254
255     def verify_orig_dhcp_discover(
256         self, pkt, intf, hostname, client_id=None, broadcast=True, dscp=0
257     ):
258         self.verify_orig_dhcp_pkt(pkt, intf, dscp)
259
260         self.verify_dhcp_msg_type(pkt, "discover")
261         self.verify_dhcp_has_option(pkt, "hostname", hostname.encode("ascii"))
262         if client_id:
263             client_id = "\x00" + client_id
264             self.verify_dhcp_has_option(pkt, "client_id", client_id.encode("ascii"))
265         bootp = pkt[BOOTP]
266         self.assertEqual(bootp.ciaddr, "0.0.0.0")
267         self.assertEqual(bootp.giaddr, "0.0.0.0")
268         if broadcast:
269             self.assertEqual(bootp.flags, 0x8000)
270         else:
271             self.assertEqual(bootp.flags, 0x0000)
272
273     def verify_orig_dhcp_request(
274         self, pkt, intf, hostname, ip, broadcast=True, l2_bc=True, dscp=0
275     ):
276         self.verify_orig_dhcp_pkt(pkt, intf, dscp, l2_bc=l2_bc)
277
278         self.verify_dhcp_msg_type(pkt, "request")
279         self.verify_dhcp_has_option(pkt, "hostname", hostname.encode("ascii"))
280         self.verify_dhcp_has_option(pkt, "requested_addr", ip)
281         bootp = pkt[BOOTP]
282
283         if l2_bc:
284             self.assertEqual(bootp.ciaddr, "0.0.0.0")
285         else:
286             self.assertEqual(bootp.ciaddr, intf.local_ip4)
287         self.assertEqual(bootp.giaddr, "0.0.0.0")
288
289         if broadcast:
290             self.assertEqual(bootp.flags, 0x8000)
291         else:
292             self.assertEqual(bootp.flags, 0x0000)
293
294     def verify_relayed_dhcp_discover(
295         self,
296         pkt,
297         intf,
298         src_intf=None,
299         fib_id=0,
300         oui=0,
301         vpn_id="",
302         dst_mac=None,
303         dst_ip=None,
304     ):
305         if not dst_mac:
306             dst_mac = intf.remote_mac
307         if not dst_ip:
308             dst_ip = intf.remote_ip4
309
310         ether = pkt[Ether]
311         self.assertEqual(ether.dst, dst_mac)
312         self.assertEqual(ether.src, intf.local_mac)
313
314         ip = pkt[IP]
315         self.assertEqual(ip.dst, dst_ip)
316         self.assertEqual(ip.src, intf.local_ip4)
317
318         udp = pkt[UDP]
319         self.assertEqual(udp.dport, DHCP4_SERVER_PORT)
320         self.assertEqual(udp.sport, DHCP4_CLIENT_PORT)
321
322         dhcp = pkt[DHCP]
323
324         is_discover = False
325         for o in dhcp.options:
326             if isinstance(o, tuple):
327                 if o[0] == "message-type" and DHCPTypes[o[1]] == "discover":
328                     is_discover = True
329         self.assertTrue(is_discover)
330
331         data = self.validate_relay_options(
332             pkt, src_intf, src_intf.local_ip4, vpn_id, fib_id, oui
333         )
334         return data
335
336     def verify_dhcp6_solicit(
337         self,
338         pkt,
339         intf,
340         peer_ip,
341         peer_mac,
342         vpn_id="",
343         fib_id=0,
344         oui=0,
345         dst_mac=None,
346         dst_ip=None,
347     ):
348         if not dst_mac:
349             dst_mac = intf.remote_mac
350         if not dst_ip:
351             dst_ip = in6_ptop(intf.remote_ip6)
352
353         ether = pkt[Ether]
354         self.assertEqual(ether.dst, dst_mac)
355         self.assertEqual(ether.src, intf.local_mac)
356
357         ip = pkt[IPv6]
358         self.assertEqual(in6_ptop(ip.dst), dst_ip)
359         self.assertEqual(in6_ptop(ip.src), in6_ptop(intf.local_ip6))
360
361         udp = pkt[UDP]
362         self.assertEqual(udp.dport, DHCP6_CLIENT_PORT)
363         self.assertEqual(udp.sport, DHCP6_SERVER_PORT)
364
365         relay = pkt[DHCP6_RelayForward]
366         self.assertEqual(in6_ptop(relay.peeraddr), in6_ptop(peer_ip))
367         oid = pkt[DHCP6OptIfaceId]
368         cll = pkt[DHCP6OptClientLinkLayerAddr]
369         self.assertEqual(cll.optlen, 8)
370         self.assertEqual(cll.lltype, 1)
371         self.assertEqual(cll.clladdr, peer_mac)
372
373         id_len = len(vpn_id)
374
375         if fib_id != 0:
376             self.assertEqual(id_len, 0)
377             vss = pkt[DHCP6OptVSS]
378             self.assertEqual(vss.optlen, 8)
379             self.assertEqual(vss.type, 1)
380             # the OUI and FIB-id are really 3 and 4 bytes resp.
381             # but the tested range is small
382             self.assertEqual(six.byte2int(vss.data[0:1]), 0)
383             self.assertEqual(six.byte2int(vss.data[1:2]), 0)
384             self.assertEqual(six.byte2int(vss.data[2:3]), oui)
385             self.assertEqual(six.byte2int(vss.data[3:4]), 0)
386             self.assertEqual(six.byte2int(vss.data[4:5]), 0)
387             self.assertEqual(six.byte2int(vss.data[5:6]), 0)
388             self.assertEqual(six.byte2int(vss.data[6:7]), fib_id)
389
390         if id_len > 0:
391             self.assertEqual(oui, 0)
392             vss = pkt[DHCP6OptVSS]
393             self.assertEqual(vss.optlen, id_len + 1)
394             self.assertEqual(vss.type, 0)
395             self.assertEqual(vss.data[0:id_len].decode("ascii"), vpn_id)
396
397         # the relay message should be an encoded Solicit
398         msg = pkt[DHCP6OptRelayMsg]
399         sol = DHCP6_Solicit()
400         self.assertEqual(msg.optlen, len(sol))
401         self.assertEqual(sol, msg[1])
402
403     def verify_dhcp6_advert(self, pkt, intf, peer):
404         ether = pkt[Ether]
405         self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
406         self.assertEqual(ether.src, intf.local_mac)
407
408         ip = pkt[IPv6]
409         self.assertEqual(in6_ptop(ip.dst), in6_ptop(peer))
410         self.assertEqual(in6_ptop(ip.src), in6_ptop(intf.local_ip6))
411
412         udp = pkt[UDP]
413         self.assertEqual(udp.dport, DHCP6_SERVER_PORT)
414         self.assertEqual(udp.sport, DHCP6_CLIENT_PORT)
415
416         # not sure why this is not decoding
417         # adv = pkt[DHCP6_Advertise]
418
419     def wait_for_no_route(self, address, length, n_tries=50, s_time=1):
420         while n_tries:
421             if not find_route(self, address, length):
422                 return True
423             n_tries = n_tries - 1
424             self.sleep(s_time)
425
426         return False
427
428     def test_dhcp_proxy(self):
429         """DHCPv4 Proxy"""
430
431         #
432         # Verify no response to DHCP request without DHCP config
433         #
434         p_disc_vrf0 = (
435             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg3.remote_mac)
436             / IP(src="0.0.0.0", dst="255.255.255.255")
437             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
438             / BOOTP(op=1)
439             / DHCP(options=[("message-type", "discover"), ("end")])
440         )
441         pkts_disc_vrf0 = [p_disc_vrf0]
442         p_disc_vrf1 = (
443             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg4.remote_mac)
444             / IP(src="0.0.0.0", dst="255.255.255.255")
445             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
446             / BOOTP(op=1)
447             / DHCP(options=[("message-type", "discover"), ("end")])
448         )
449         pkts_disc_vrf1 = [p_disc_vrf1]
450         p_disc_vrf2 = (
451             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg5.remote_mac)
452             / IP(src="0.0.0.0", dst="255.255.255.255")
453             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
454             / BOOTP(op=1)
455             / DHCP(options=[("message-type", "discover"), ("end")])
456         )
457         pkts_disc_vrf2 = [p_disc_vrf2]
458
459         self.send_and_assert_no_replies(
460             self.pg3, pkts_disc_vrf0, "DHCP with no configuration"
461         )
462         self.send_and_assert_no_replies(
463             self.pg4, pkts_disc_vrf1, "DHCP with no configuration"
464         )
465         self.send_and_assert_no_replies(
466             self.pg5, pkts_disc_vrf2, "DHCP with no configuration"
467         )
468
469         #
470         # Enable DHCP proxy in VRF 0
471         #
472         server_addr = self.pg0.remote_ip4
473         src_addr = self.pg0.local_ip4
474
475         Proxy = VppDHCPProxy(self, server_addr, src_addr, rx_vrf_id=0)
476         Proxy.add_vpp_config()
477
478         #
479         # Discover packets from the client are dropped because there is no
480         # IP address configured on the client facing interface
481         #
482         self.send_and_assert_no_replies(
483             self.pg3, pkts_disc_vrf0, "Discover DHCP no relay address"
484         )
485
486         #
487         # Inject a response from the server
488         #  dropped, because there is no IP addrees on the
489         #  client interfce to fill in the option.
490         #
491         p = (
492             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
493             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
494             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
495             / BOOTP(op=1)
496             / DHCP(options=[("message-type", "offer"), ("end")])
497         )
498         pkts = [p]
499
500         self.send_and_assert_no_replies(self.pg3, pkts, "Offer DHCP no relay address")
501
502         #
503         # configure an IP address on the client facing interface
504         #
505         self.pg3.config_ip4()
506
507         #
508         # Try again with a discover packet
509         # Rx'd packet should be to the server address and from the configured
510         # source address
511         # UDP source ports are unchanged
512         # we've no option 82 config so that should be absent
513         #
514         self.pg3.add_stream(pkts_disc_vrf0)
515         self.pg_enable_capture(self.pg_interfaces)
516         self.pg_start()
517
518         rx = self.pg0.get_capture(1)
519         rx = rx[0]
520
521         option_82 = self.verify_relayed_dhcp_discover(rx, self.pg0, src_intf=self.pg3)
522
523         #
524         # Create an DHCP offer reply from the server with a correctly formatted
525         # option 82. i.e. send back what we just captured
526         # The offer, sent mcast to the client, still has option 82.
527         #
528         p = (
529             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
530             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
531             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
532             / BOOTP(op=1)
533             / DHCP(
534                 options=[
535                     ("message-type", "offer"),
536                     ("relay_agent_Information", option_82),
537                     ("end"),
538                 ]
539             )
540         )
541         pkts = [p]
542
543         self.pg0.add_stream(pkts)
544         self.pg_enable_capture(self.pg_interfaces)
545         self.pg_start()
546
547         rx = self.pg3.get_capture(1)
548         rx = rx[0]
549
550         self.verify_dhcp_offer(rx, self.pg3)
551
552         #
553         # Bogus Option 82:
554         #
555         # 1. not our IP address = not checked by VPP? so offer is replayed
556         #    to client
557         bad_ip = option_82[0:8] + scapy.compat.chb(33) + option_82[9:]
558
559         p = (
560             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
561             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
562             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
563             / BOOTP(op=1)
564             / DHCP(
565                 options=[
566                     ("message-type", "offer"),
567                     ("relay_agent_Information", bad_ip),
568                     ("end"),
569                 ]
570             )
571         )
572         pkts = [p]
573         self.send_and_assert_no_replies(
574             self.pg0, pkts, "DHCP offer option 82 bad address"
575         )
576
577         # 2. Not a sw_if_index VPP knows
578         bad_if_index = option_82[0:2] + scapy.compat.chb(33) + option_82[3:]
579
580         p = (
581             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
582             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
583             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
584             / BOOTP(op=1)
585             / DHCP(
586                 options=[
587                     ("message-type", "offer"),
588                     ("relay_agent_Information", bad_if_index),
589                     ("end"),
590                 ]
591             )
592         )
593         pkts = [p]
594         self.send_and_assert_no_replies(
595             self.pg0, pkts, "DHCP offer option 82 bad if index"
596         )
597
598         #
599         # Send a DHCP request in VRF 1. should be dropped.
600         #
601         self.send_and_assert_no_replies(
602             self.pg4, pkts_disc_vrf1, "DHCP with no configuration VRF 1"
603         )
604
605         #
606         # Delete the DHCP config in VRF 0
607         # Should now drop requests.
608         #
609         Proxy.remove_vpp_config()
610
611         self.send_and_assert_no_replies(
612             self.pg3, pkts_disc_vrf0, "DHCP config removed VRF 0"
613         )
614         self.send_and_assert_no_replies(
615             self.pg4, pkts_disc_vrf1, "DHCP config removed VRF 1"
616         )
617
618         #
619         # Add DHCP config for VRF 1 & 2
620         #
621         server_addr1 = self.pg1.remote_ip4
622         src_addr1 = self.pg1.local_ip4
623         Proxy1 = VppDHCPProxy(
624             self, server_addr1, src_addr1, rx_vrf_id=1, server_vrf_id=1
625         )
626         Proxy1.add_vpp_config()
627
628         server_addr2 = self.pg2.remote_ip4
629         src_addr2 = self.pg2.local_ip4
630         Proxy2 = VppDHCPProxy(
631             self, server_addr2, src_addr2, rx_vrf_id=2, server_vrf_id=2
632         )
633         Proxy2.add_vpp_config()
634
635         #
636         # Confim DHCP requests ok in VRF 1 & 2.
637         #  - dropped on IP config on client interface
638         #
639         self.send_and_assert_no_replies(
640             self.pg4, pkts_disc_vrf1, "DHCP config removed VRF 1"
641         )
642         self.send_and_assert_no_replies(
643             self.pg5, pkts_disc_vrf2, "DHCP config removed VRF 2"
644         )
645
646         #
647         # configure an IP address on the client facing interface
648         #
649         self.pg4.config_ip4()
650         self.pg4.add_stream(pkts_disc_vrf1)
651         self.pg_enable_capture(self.pg_interfaces)
652         self.pg_start()
653         rx = self.pg1.get_capture(1)
654         rx = rx[0]
655         self.verify_relayed_dhcp_discover(rx, self.pg1, src_intf=self.pg4)
656
657         self.pg5.config_ip4()
658         self.pg5.add_stream(pkts_disc_vrf2)
659         self.pg_enable_capture(self.pg_interfaces)
660         self.pg_start()
661         rx = self.pg2.get_capture(1)
662         rx = rx[0]
663         self.verify_relayed_dhcp_discover(rx, self.pg2, src_intf=self.pg5)
664
665         #
666         # Add VSS config
667         #  table=1, vss_type=1, vpn_index=1, oui=4
668         #  table=2, vss_type=0, vpn_id = "ip4-table-2"
669         self.vapi.dhcp_proxy_set_vss(tbl_id=1, vss_type=1, vpn_index=1, oui=4, is_add=1)
670         self.vapi.dhcp_proxy_set_vss(
671             tbl_id=2, vss_type=0, vpn_ascii_id="ip4-table-2", is_add=1
672         )
673
674         self.pg4.add_stream(pkts_disc_vrf1)
675         self.pg_enable_capture(self.pg_interfaces)
676         self.pg_start()
677
678         rx = self.pg1.get_capture(1)
679         rx = rx[0]
680         self.verify_relayed_dhcp_discover(
681             rx, self.pg1, src_intf=self.pg4, fib_id=1, oui=4
682         )
683
684         self.pg5.add_stream(pkts_disc_vrf2)
685         self.pg_enable_capture(self.pg_interfaces)
686         self.pg_start()
687
688         rx = self.pg2.get_capture(1)
689         rx = rx[0]
690         self.verify_relayed_dhcp_discover(
691             rx, self.pg2, src_intf=self.pg5, vpn_id="ip4-table-2"
692         )
693
694         #
695         # Add a second DHCP server in VRF 1
696         #  expect clients messages to be relay to both configured servers
697         #
698         self.pg1.generate_remote_hosts(2)
699         server_addr12 = self.pg1.remote_hosts[1].ip4
700
701         Proxy12 = VppDHCPProxy(
702             self, server_addr12, src_addr, rx_vrf_id=1, server_vrf_id=1
703         )
704         Proxy12.add_vpp_config()
705
706         #
707         # We'll need an ARP entry for the server to send it packets
708         #
709         arp_entry = VppNeighbor(
710             self,
711             self.pg1.sw_if_index,
712             self.pg1.remote_hosts[1].mac,
713             self.pg1.remote_hosts[1].ip4,
714         )
715         arp_entry.add_vpp_config()
716
717         #
718         # Send a discover from the client. expect two relayed messages
719         # The frist packet is sent to the second server
720         # We're not enforcing that here, it's just the way it is.
721         #
722         self.pg4.add_stream(pkts_disc_vrf1)
723         self.pg_enable_capture(self.pg_interfaces)
724         self.pg_start()
725
726         rx = self.pg1.get_capture(2)
727
728         option_82 = self.verify_relayed_dhcp_discover(
729             rx[0],
730             self.pg1,
731             src_intf=self.pg4,
732             dst_mac=self.pg1.remote_hosts[1].mac,
733             dst_ip=self.pg1.remote_hosts[1].ip4,
734             fib_id=1,
735             oui=4,
736         )
737         self.verify_relayed_dhcp_discover(
738             rx[1], self.pg1, src_intf=self.pg4, fib_id=1, oui=4
739         )
740
741         #
742         # Send both packets back. Client gets both.
743         #
744         p1 = (
745             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
746             / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
747             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
748             / BOOTP(op=1)
749             / DHCP(
750                 options=[
751                     ("message-type", "offer"),
752                     ("relay_agent_Information", option_82),
753                     ("end"),
754                 ]
755             )
756         )
757         p2 = (
758             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
759             / IP(src=self.pg1.remote_hosts[1].ip4, dst=self.pg1.local_ip4)
760             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
761             / BOOTP(op=1)
762             / DHCP(
763                 options=[
764                     ("message-type", "offer"),
765                     ("relay_agent_Information", option_82),
766                     ("end"),
767                 ]
768             )
769         )
770         pkts = [p1, p2]
771
772         self.pg1.add_stream(pkts)
773         self.pg_enable_capture(self.pg_interfaces)
774         self.pg_start()
775
776         rx = self.pg4.get_capture(2)
777
778         self.verify_dhcp_offer(rx[0], self.pg4, fib_id=1, oui=4)
779         self.verify_dhcp_offer(rx[1], self.pg4, fib_id=1, oui=4)
780
781         #
782         # Ensure offers from non-servers are dropeed
783         #
784         p2 = (
785             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
786             / IP(src="8.8.8.8", dst=self.pg1.local_ip4)
787             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
788             / BOOTP(op=1)
789             / DHCP(
790                 options=[
791                     ("message-type", "offer"),
792                     ("relay_agent_Information", option_82),
793                     ("end"),
794                 ]
795             )
796         )
797         self.send_and_assert_no_replies(self.pg1, p2, "DHCP offer from non-server")
798
799         #
800         # Ensure only the discover is sent to multiple servers
801         #
802         p_req_vrf1 = (
803             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg4.remote_mac)
804             / IP(src="0.0.0.0", dst="255.255.255.255")
805             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
806             / BOOTP(op=1)
807             / DHCP(options=[("message-type", "request"), ("end")])
808         )
809
810         self.pg4.add_stream(p_req_vrf1)
811         self.pg_enable_capture(self.pg_interfaces)
812         self.pg_start()
813
814         rx = self.pg1.get_capture(1)
815
816         #
817         # Remove the second DHCP server
818         #
819         Proxy12.remove_vpp_config()
820
821         #
822         # Test we can still relay with the first
823         #
824         self.pg4.add_stream(pkts_disc_vrf1)
825         self.pg_enable_capture(self.pg_interfaces)
826         self.pg_start()
827
828         rx = self.pg1.get_capture(1)
829         rx = rx[0]
830         self.verify_relayed_dhcp_discover(
831             rx, self.pg1, src_intf=self.pg4, fib_id=1, oui=4
832         )
833
834         #
835         # Remove the VSS config
836         #  relayed DHCP has default vlaues in the option.
837         #
838         self.vapi.dhcp_proxy_set_vss(tbl_id=1, is_add=0)
839         self.vapi.dhcp_proxy_set_vss(tbl_id=2, is_add=0)
840
841         self.pg4.add_stream(pkts_disc_vrf1)
842         self.pg_enable_capture(self.pg_interfaces)
843         self.pg_start()
844
845         rx = self.pg1.get_capture(1)
846         rx = rx[0]
847         self.verify_relayed_dhcp_discover(rx, self.pg1, src_intf=self.pg4)
848
849         #
850         # remove DHCP config to cleanup
851         #
852         Proxy1.remove_vpp_config()
853         Proxy2.remove_vpp_config()
854
855         self.send_and_assert_no_replies(self.pg3, pkts_disc_vrf0, "DHCP cleanup VRF 0")
856         self.send_and_assert_no_replies(self.pg4, pkts_disc_vrf1, "DHCP cleanup VRF 1")
857         self.send_and_assert_no_replies(self.pg5, pkts_disc_vrf2, "DHCP cleanup VRF 2")
858
859         self.pg3.unconfig_ip4()
860         self.pg4.unconfig_ip4()
861         self.pg5.unconfig_ip4()
862
863     def test_dhcp6_proxy(self):
864         """DHCPv6 Proxy"""
865         #
866         # Verify no response to DHCP request without DHCP config
867         #
868         dhcp_solicit_dst = "ff02::1:2"
869         dhcp_solicit_src_vrf0 = mk_ll_addr(self.pg3.remote_mac)
870         dhcp_solicit_src_vrf1 = mk_ll_addr(self.pg4.remote_mac)
871         dhcp_solicit_src_vrf2 = mk_ll_addr(self.pg5.remote_mac)
872         server_addr_vrf0 = self.pg0.remote_ip6
873         src_addr_vrf0 = self.pg0.local_ip6
874         server_addr_vrf1 = self.pg1.remote_ip6
875         src_addr_vrf1 = self.pg1.local_ip6
876         server_addr_vrf2 = self.pg2.remote_ip6
877         src_addr_vrf2 = self.pg2.local_ip6
878
879         dmac = in6_getnsmac(inet_pton(socket.AF_INET6, dhcp_solicit_dst))
880         p_solicit_vrf0 = (
881             Ether(dst=dmac, src=self.pg3.remote_mac)
882             / IPv6(src=dhcp_solicit_src_vrf0, dst=dhcp_solicit_dst)
883             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
884             / DHCP6_Solicit()
885         )
886         p_solicit_vrf1 = (
887             Ether(dst=dmac, src=self.pg4.remote_mac)
888             / IPv6(src=dhcp_solicit_src_vrf1, dst=dhcp_solicit_dst)
889             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
890             / DHCP6_Solicit()
891         )
892         p_solicit_vrf2 = (
893             Ether(dst=dmac, src=self.pg5.remote_mac)
894             / IPv6(src=dhcp_solicit_src_vrf2, dst=dhcp_solicit_dst)
895             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
896             / DHCP6_Solicit()
897         )
898
899         self.send_and_assert_no_replies(
900             self.pg3, p_solicit_vrf0, "DHCP with no configuration"
901         )
902         self.send_and_assert_no_replies(
903             self.pg4, p_solicit_vrf1, "DHCP with no configuration"
904         )
905         self.send_and_assert_no_replies(
906             self.pg5, p_solicit_vrf2, "DHCP with no configuration"
907         )
908
909         #
910         # DHCPv6 config in VRF 0.
911         # Packets still dropped because the client facing interface has no
912         # IPv6 config
913         #
914         Proxy = VppDHCPProxy(
915             self, server_addr_vrf0, src_addr_vrf0, rx_vrf_id=0, server_vrf_id=0
916         )
917         Proxy.add_vpp_config()
918
919         self.send_and_assert_no_replies(
920             self.pg3, p_solicit_vrf0, "DHCP with no configuration"
921         )
922         self.send_and_assert_no_replies(
923             self.pg4, p_solicit_vrf1, "DHCP with no configuration"
924         )
925
926         #
927         # configure an IP address on the client facing interface
928         #
929         self.pg3.config_ip6()
930
931         #
932         # Now the DHCP requests are relayed to the server
933         #
934         self.pg3.add_stream(p_solicit_vrf0)
935         self.pg_enable_capture(self.pg_interfaces)
936         self.pg_start()
937
938         rx = self.pg0.get_capture(1)
939
940         self.verify_dhcp6_solicit(
941             rx[0], self.pg0, dhcp_solicit_src_vrf0, self.pg3.remote_mac
942         )
943
944         #
945         # Exception cases for rejected relay responses
946         #
947
948         # 1 - not a relay reply
949         p_adv_vrf0 = (
950             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
951             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
952             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
953             / DHCP6_Advertise()
954         )
955         self.send_and_assert_no_replies(self.pg3, p_adv_vrf0, "DHCP6 not a relay reply")
956
957         # 2 - no relay message option
958         p_adv_vrf0 = (
959             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
960             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
961             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
962             / DHCP6_RelayReply()
963             / DHCP6_Advertise()
964         )
965         self.send_and_assert_no_replies(
966             self.pg3, p_adv_vrf0, "DHCP not a relay message"
967         )
968
969         # 3 - no circuit ID
970         p_adv_vrf0 = (
971             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
972             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
973             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
974             / DHCP6_RelayReply()
975             / DHCP6OptRelayMsg(optlen=0)
976             / DHCP6_Advertise()
977         )
978         self.send_and_assert_no_replies(self.pg3, p_adv_vrf0, "DHCP6 no circuit ID")
979         # 4 - wrong circuit ID
980         p_adv_vrf0 = (
981             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
982             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
983             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
984             / DHCP6_RelayReply()
985             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
986             / DHCP6OptRelayMsg(optlen=0)
987             / DHCP6_Advertise()
988         )
989         self.send_and_assert_no_replies(self.pg3, p_adv_vrf0, "DHCP6 wrong circuit ID")
990
991         #
992         # Send the relay response (the advertisement)
993         #   - no peer address
994         p_adv_vrf0 = (
995             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
996             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
997             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
998             / DHCP6_RelayReply()
999             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x04")
1000             / DHCP6OptRelayMsg(optlen=0)
1001             / DHCP6_Advertise(trid=1)
1002             / DHCP6OptStatusCode(statuscode=0)
1003         )
1004         pkts_adv_vrf0 = [p_adv_vrf0]
1005
1006         self.pg0.add_stream(pkts_adv_vrf0)
1007         self.pg_enable_capture(self.pg_interfaces)
1008         self.pg_start()
1009
1010         rx = self.pg3.get_capture(1)
1011
1012         self.verify_dhcp6_advert(rx[0], self.pg3, "::")
1013
1014         #
1015         # Send the relay response (the advertisement)
1016         #   - with peer address
1017         p_adv_vrf0 = (
1018             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1019             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
1020             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1021             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf0)
1022             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x04")
1023             / DHCP6OptRelayMsg(optlen=0)
1024             / DHCP6_Advertise(trid=1)
1025             / DHCP6OptStatusCode(statuscode=0)
1026         )
1027         pkts_adv_vrf0 = [p_adv_vrf0]
1028
1029         self.pg0.add_stream(pkts_adv_vrf0)
1030         self.pg_enable_capture(self.pg_interfaces)
1031         self.pg_start()
1032
1033         rx = self.pg3.get_capture(1)
1034
1035         self.verify_dhcp6_advert(rx[0], self.pg3, dhcp_solicit_src_vrf0)
1036
1037         #
1038         # Add all the config for VRF 1 & 2
1039         #
1040         Proxy1 = VppDHCPProxy(
1041             self, server_addr_vrf1, src_addr_vrf1, rx_vrf_id=1, server_vrf_id=1
1042         )
1043         Proxy1.add_vpp_config()
1044         self.pg4.config_ip6()
1045
1046         Proxy2 = VppDHCPProxy(
1047             self, server_addr_vrf2, src_addr_vrf2, rx_vrf_id=2, server_vrf_id=2
1048         )
1049         Proxy2.add_vpp_config()
1050         self.pg5.config_ip6()
1051
1052         #
1053         # VRF 1 solicit
1054         #
1055         self.pg4.add_stream(p_solicit_vrf1)
1056         self.pg_enable_capture(self.pg_interfaces)
1057         self.pg_start()
1058
1059         rx = self.pg1.get_capture(1)
1060
1061         self.verify_dhcp6_solicit(
1062             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1063         )
1064
1065         #
1066         # VRF 2 solicit
1067         #
1068         self.pg5.add_stream(p_solicit_vrf2)
1069         self.pg_enable_capture(self.pg_interfaces)
1070         self.pg_start()
1071
1072         rx = self.pg2.get_capture(1)
1073
1074         self.verify_dhcp6_solicit(
1075             rx[0], self.pg2, dhcp_solicit_src_vrf2, self.pg5.remote_mac
1076         )
1077
1078         #
1079         # VRF 1 Advert
1080         #
1081         p_adv_vrf1 = (
1082             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1083             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_ip6)
1084             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1085             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1086             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1087             / DHCP6OptRelayMsg(optlen=0)
1088             / DHCP6_Advertise(trid=1)
1089             / DHCP6OptStatusCode(statuscode=0)
1090         )
1091         pkts_adv_vrf1 = [p_adv_vrf1]
1092
1093         self.pg1.add_stream(pkts_adv_vrf1)
1094         self.pg_enable_capture(self.pg_interfaces)
1095         self.pg_start()
1096
1097         rx = self.pg4.get_capture(1)
1098
1099         self.verify_dhcp6_advert(rx[0], self.pg4, dhcp_solicit_src_vrf1)
1100
1101         #
1102         # Add VSS config
1103         #
1104         self.vapi.dhcp_proxy_set_vss(
1105             tbl_id=1, vss_type=1, oui=4, vpn_index=1, is_ipv6=1, is_add=1
1106         )
1107         self.vapi.dhcp_proxy_set_vss(
1108             tbl_id=2, vss_type=0, vpn_ascii_id="IPv6-table-2", is_ipv6=1, is_add=1
1109         )
1110
1111         self.pg4.add_stream(p_solicit_vrf1)
1112         self.pg_enable_capture(self.pg_interfaces)
1113         self.pg_start()
1114
1115         rx = self.pg1.get_capture(1)
1116
1117         self.verify_dhcp6_solicit(
1118             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac, fib_id=1, oui=4
1119         )
1120
1121         self.pg5.add_stream(p_solicit_vrf2)
1122         self.pg_enable_capture(self.pg_interfaces)
1123         self.pg_start()
1124
1125         rx = self.pg2.get_capture(1)
1126
1127         self.verify_dhcp6_solicit(
1128             rx[0],
1129             self.pg2,
1130             dhcp_solicit_src_vrf2,
1131             self.pg5.remote_mac,
1132             vpn_id="IPv6-table-2",
1133         )
1134
1135         #
1136         # Remove the VSS config
1137         #  relayed DHCP has default vlaues in the option.
1138         #
1139         self.vapi.dhcp_proxy_set_vss(tbl_id=1, is_ipv6=1, is_add=0)
1140
1141         self.pg4.add_stream(p_solicit_vrf1)
1142         self.pg_enable_capture(self.pg_interfaces)
1143         self.pg_start()
1144
1145         rx = self.pg1.get_capture(1)
1146
1147         self.verify_dhcp6_solicit(
1148             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1149         )
1150
1151         #
1152         # Add a second DHCP server in VRF 1
1153         #  expect clients messages to be relay to both configured servers
1154         #
1155         self.pg1.generate_remote_hosts(2)
1156         server_addr12 = self.pg1.remote_hosts[1].ip6
1157
1158         Proxy12 = VppDHCPProxy(
1159             self, server_addr12, src_addr_vrf1, rx_vrf_id=1, server_vrf_id=1
1160         )
1161         Proxy12.add_vpp_config()
1162
1163         #
1164         # We'll need an ND entry for the server to send it packets
1165         #
1166         nd_entry = VppNeighbor(
1167             self,
1168             self.pg1.sw_if_index,
1169             self.pg1.remote_hosts[1].mac,
1170             self.pg1.remote_hosts[1].ip6,
1171         )
1172         nd_entry.add_vpp_config()
1173
1174         #
1175         # Send a discover from the client. expect two relayed messages
1176         # The frist packet is sent to the second server
1177         # We're not enforcing that here, it's just the way it is.
1178         #
1179         self.pg4.add_stream(p_solicit_vrf1)
1180         self.pg_enable_capture(self.pg_interfaces)
1181         self.pg_start()
1182
1183         rx = self.pg1.get_capture(2)
1184
1185         self.verify_dhcp6_solicit(
1186             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1187         )
1188         self.verify_dhcp6_solicit(
1189             rx[1],
1190             self.pg1,
1191             dhcp_solicit_src_vrf1,
1192             self.pg4.remote_mac,
1193             dst_mac=self.pg1.remote_hosts[1].mac,
1194             dst_ip=self.pg1.remote_hosts[1].ip6,
1195         )
1196
1197         #
1198         # Send both packets back. Client gets both.
1199         #
1200         p1 = (
1201             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1202             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_ip6)
1203             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1204             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1205             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1206             / DHCP6OptRelayMsg(optlen=0)
1207             / DHCP6_Advertise(trid=1)
1208             / DHCP6OptStatusCode(statuscode=0)
1209         )
1210         p2 = (
1211             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_hosts[1].mac)
1212             / IPv6(dst=self.pg1.local_ip6, src=self.pg1._remote_hosts[1].ip6)
1213             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1214             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1215             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1216             / DHCP6OptRelayMsg(optlen=0)
1217             / DHCP6_Advertise(trid=1)
1218             / DHCP6OptStatusCode(statuscode=0)
1219         )
1220
1221         pkts = [p1, p2]
1222
1223         self.pg1.add_stream(pkts)
1224         self.pg_enable_capture(self.pg_interfaces)
1225         self.pg_start()
1226
1227         rx = self.pg4.get_capture(2)
1228
1229         self.verify_dhcp6_advert(rx[0], self.pg4, dhcp_solicit_src_vrf1)
1230         self.verify_dhcp6_advert(rx[1], self.pg4, dhcp_solicit_src_vrf1)
1231
1232         #
1233         # Ensure only solicit messages are duplicated
1234         #
1235         p_request_vrf1 = (
1236             Ether(dst=dmac, src=self.pg4.remote_mac)
1237             / IPv6(src=dhcp_solicit_src_vrf1, dst=dhcp_solicit_dst)
1238             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
1239             / DHCP6_Request()
1240         )
1241
1242         self.pg4.add_stream(p_request_vrf1)
1243         self.pg_enable_capture(self.pg_interfaces)
1244         self.pg_start()
1245
1246         rx = self.pg1.get_capture(1)
1247
1248         #
1249         # Test we drop DHCP packets from addresses that are not configured as
1250         # DHCP servers
1251         #
1252         p2 = (
1253             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_hosts[1].mac)
1254             / IPv6(dst=self.pg1.local_ip6, src="3001::1")
1255             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1256             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1257             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1258             / DHCP6OptRelayMsg(optlen=0)
1259             / DHCP6_Advertise(trid=1)
1260             / DHCP6OptStatusCode(statuscode=0)
1261         )
1262         self.send_and_assert_no_replies(self.pg1, p2, "DHCP6 not from server")
1263
1264         #
1265         # Remove the second DHCP server
1266         #
1267         Proxy12.remove_vpp_config()
1268
1269         #
1270         # Test we can still relay with the first
1271         #
1272         self.pg4.add_stream(p_solicit_vrf1)
1273         self.pg_enable_capture(self.pg_interfaces)
1274         self.pg_start()
1275
1276         rx = self.pg1.get_capture(1)
1277
1278         self.verify_dhcp6_solicit(
1279             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1280         )
1281
1282         #
1283         # Cleanup
1284         #
1285         Proxy.remove_vpp_config()
1286         Proxy1.remove_vpp_config()
1287         Proxy2.remove_vpp_config()
1288
1289         self.pg3.unconfig_ip6()
1290         self.pg4.unconfig_ip6()
1291         self.pg5.unconfig_ip6()
1292
1293     def test_dhcp_client(self):
1294         """DHCP Client"""
1295
1296         vdscp = VppEnum.vl_api_ip_dscp_t
1297         hostname = "universal-dp"
1298
1299         self.pg_enable_capture(self.pg_interfaces)
1300
1301         #
1302         # Configure DHCP client on PG3 and capture the discover sent
1303         #
1304         Client = VppDHCPClient(self, self.pg3.sw_if_index, hostname)
1305         Client.add_vpp_config()
1306         self.assertTrue(Client.query_vpp_config())
1307
1308         rx = self.pg3.get_capture(1)
1309
1310         self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname)
1311
1312         #
1313         # Send back on offer, expect the request
1314         #
1315         p_offer = (
1316             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1317             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1318             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1319             / BOOTP(
1320                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1321             )
1322             / DHCP(
1323                 options=[
1324                     ("message-type", "offer"),
1325                     ("server_id", self.pg3.remote_ip4),
1326                     "end",
1327                 ]
1328             )
1329         )
1330
1331         self.pg3.add_stream(p_offer)
1332         self.pg_enable_capture(self.pg_interfaces)
1333         self.pg_start()
1334
1335         rx = self.pg3.get_capture(1)
1336         self.verify_orig_dhcp_request(rx[0], self.pg3, hostname, self.pg3.local_ip4)
1337
1338         #
1339         # Send an acknowledgment
1340         #
1341         p_ack = (
1342             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1343             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1344             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1345             / BOOTP(
1346                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1347             )
1348             / DHCP(
1349                 options=[
1350                     ("message-type", "ack"),
1351                     ("subnet_mask", "255.255.255.0"),
1352                     ("router", self.pg3.remote_ip4),
1353                     ("server_id", self.pg3.remote_ip4),
1354                     ("lease_time", 43200),
1355                     "end",
1356                 ]
1357             )
1358         )
1359
1360         self.pg3.add_stream(p_ack)
1361         self.pg_enable_capture(self.pg_interfaces)
1362         self.pg_start()
1363
1364         #
1365         # We'll get an ARP request for the router address
1366         #
1367         rx = self.pg3.get_capture(1)
1368
1369         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1370         self.pg_enable_capture(self.pg_interfaces)
1371
1372         #
1373         # At the end of this procedure there should be a connected route
1374         # in the FIB
1375         #
1376         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1377         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1378
1379         #
1380         # remove the DHCP config
1381         #
1382         Client.remove_vpp_config()
1383
1384         #
1385         # and now the route should be gone
1386         #
1387         self.assertFalse(find_route(self, self.pg3.local_ip4, 32))
1388         self.assertFalse(find_route(self, self.pg3.local_ip4, 24))
1389
1390         #
1391         # Start the procedure again. this time have VPP send the client-ID
1392         # and set the DSCP value
1393         #
1394         self.pg3.admin_down()
1395         self.sleep(1)
1396         self.pg3.admin_up()
1397         Client.set_client(
1398             self.pg3.sw_if_index,
1399             hostname,
1400             id=self.pg3.local_mac,
1401             dscp=vdscp.IP_API_DSCP_EF,
1402         )
1403         Client.add_vpp_config()
1404
1405         rx = self.pg3.get_capture(1)
1406
1407         self.verify_orig_dhcp_discover(
1408             rx[0], self.pg3, hostname, self.pg3.local_mac, dscp=vdscp.IP_API_DSCP_EF
1409         )
1410
1411         # TODO: VPP DHCP client should not accept DHCP OFFER message with
1412         # the XID (Transaction ID) not matching the XID of the most recent
1413         # DHCP DISCOVERY message.
1414         # Such DHCP OFFER message must be silently discarded - RFC2131.
1415         # Reported in Jira ticket: VPP-99
1416         self.pg3.add_stream(p_offer)
1417         self.pg_enable_capture(self.pg_interfaces)
1418         self.pg_start()
1419
1420         rx = self.pg3.get_capture(1)
1421         self.verify_orig_dhcp_request(
1422             rx[0], self.pg3, hostname, self.pg3.local_ip4, dscp=vdscp.IP_API_DSCP_EF
1423         )
1424
1425         #
1426         # unicast the ack to the offered address
1427         #
1428         p_ack = (
1429             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1430             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1431             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1432             / BOOTP(
1433                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1434             )
1435             / DHCP(
1436                 options=[
1437                     ("message-type", "ack"),
1438                     ("subnet_mask", "255.255.255.0"),
1439                     ("router", self.pg3.remote_ip4),
1440                     ("server_id", self.pg3.remote_ip4),
1441                     ("lease_time", 43200),
1442                     "end",
1443                 ]
1444             )
1445         )
1446
1447         self.pg3.add_stream(p_ack)
1448         self.pg_enable_capture(self.pg_interfaces)
1449         self.pg_start()
1450
1451         #
1452         # We'll get an ARP request for the router address
1453         #
1454         rx = self.pg3.get_capture(1)
1455
1456         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1457         self.pg_enable_capture(self.pg_interfaces)
1458
1459         #
1460         # At the end of this procedure there should be a connected route
1461         # in the FIB
1462         #
1463         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1464         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1465
1466         #
1467         # remove the DHCP config
1468         #
1469         Client.remove_vpp_config()
1470
1471         self.assertFalse(find_route(self, self.pg3.local_ip4, 32))
1472         self.assertFalse(find_route(self, self.pg3.local_ip4, 24))
1473
1474         #
1475         # Rince and repeat, this time with VPP configured not to set
1476         # the braodcast flag in the discover and request messages,
1477         # and for the server to unicast the responses.
1478         #
1479         # Configure DHCP client on PG3 and capture the discover sent
1480         #
1481         Client.set_client(self.pg3.sw_if_index, hostname, set_broadcast_flag=False)
1482         Client.add_vpp_config()
1483
1484         rx = self.pg3.get_capture(1)
1485
1486         self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname, broadcast=False)
1487
1488         #
1489         # Send back on offer, unicasted to the offered address.
1490         # Expect the request.
1491         #
1492         p_offer = (
1493             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1494             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1495             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1496             / BOOTP(
1497                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1498             )
1499             / DHCP(
1500                 options=[
1501                     ("message-type", "offer"),
1502                     ("server_id", self.pg3.remote_ip4),
1503                     "end",
1504                 ]
1505             )
1506         )
1507
1508         self.pg3.add_stream(p_offer)
1509         self.pg_enable_capture(self.pg_interfaces)
1510         self.pg_start()
1511
1512         rx = self.pg3.get_capture(1)
1513         self.verify_orig_dhcp_request(
1514             rx[0], self.pg3, hostname, self.pg3.local_ip4, broadcast=False
1515         )
1516
1517         #
1518         # Send an acknowledgment, the lease renewal time is 2 seconds
1519         # so we should expect the renew straight after
1520         #
1521         p_ack = (
1522             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1523             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1524             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1525             / BOOTP(
1526                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1527             )
1528             / DHCP(
1529                 options=[
1530                     ("message-type", "ack"),
1531                     ("subnet_mask", "255.255.255.0"),
1532                     ("router", self.pg3.remote_ip4),
1533                     ("server_id", self.pg3.remote_ip4),
1534                     ("lease_time", 43200),
1535                     ("renewal_time", 2),
1536                     "end",
1537                 ]
1538             )
1539         )
1540
1541         self.pg3.add_stream(p_ack)
1542         self.pg_enable_capture(self.pg_interfaces)
1543         self.pg_start()
1544
1545         #
1546         # We'll get an ARP request for the router address
1547         #
1548         rx = self.pg3.get_capture(1)
1549
1550         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1551         self.pg_enable_capture(self.pg_interfaces)
1552
1553         #
1554         # At the end of this procedure there should be a connected route
1555         # in the FIB
1556         #
1557         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1558         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1559
1560         #
1561         # read the DHCP client details from a dump
1562         #
1563         clients = self.vapi.dhcp_client_dump()
1564
1565         self.assertEqual(clients[0].client.sw_if_index, self.pg3.sw_if_index)
1566         self.assertEqual(clients[0].lease.sw_if_index, self.pg3.sw_if_index)
1567         self.assertEqual(clients[0].client.hostname, hostname)
1568         self.assertEqual(clients[0].lease.hostname, hostname)
1569         # 0 = DISCOVER, 1 = REQUEST, 2 = BOUND
1570         self.assertEqual(clients[0].lease.state, 2)
1571         self.assertEqual(clients[0].lease.mask_width, 24)
1572         self.assertEqual(str(clients[0].lease.router_address), self.pg3.remote_ip4)
1573         self.assertEqual(str(clients[0].lease.host_address), self.pg3.local_ip4)
1574
1575         #
1576         # wait for the unicasted renewal
1577         #  the first attempt will be an ARP packet, since we have not yet
1578         #  responded to VPP's request
1579         #
1580         self.logger.info(self.vapi.cli("sh dhcp client intfc pg3 verbose"))
1581         rx = self.pg3.get_capture(1, timeout=10)
1582
1583         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1584
1585         # respond to the arp
1586         p_arp = Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / ARP(
1587             op="is-at",
1588             hwdst=self.pg3.local_mac,
1589             hwsrc=self.pg3.remote_mac,
1590             pdst=self.pg3.local_ip4,
1591             psrc=self.pg3.remote_ip4,
1592         )
1593         self.pg3.add_stream(p_arp)
1594         self.pg_enable_capture(self.pg_interfaces)
1595         self.pg_start()
1596
1597         # the next packet is the unicasted renewal
1598         rx = self.pg3.get_capture(1, timeout=10)
1599         self.verify_orig_dhcp_request(
1600             rx[0], self.pg3, hostname, self.pg3.local_ip4, l2_bc=False, broadcast=False
1601         )
1602
1603         # send an ACK with different data from the original offer *
1604         self.pg3.generate_remote_hosts(4)
1605         p_ack = (
1606             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1607             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1608             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1609             / BOOTP(
1610                 op=1,
1611                 yiaddr=self.pg3.remote_hosts[3].ip4,
1612                 chaddr=mac_pton(self.pg3.local_mac),
1613             )
1614             / DHCP(
1615                 options=[
1616                     ("message-type", "ack"),
1617                     ("subnet_mask", "255.255.255.0"),
1618                     ("router", self.pg3.remote_hosts[1].ip4),
1619                     ("server_id", self.pg3.remote_hosts[2].ip4),
1620                     ("lease_time", 43200),
1621                     ("renewal_time", 2),
1622                     "end",
1623                 ]
1624             )
1625         )
1626
1627         self.pg3.add_stream(p_ack)
1628         self.pg_enable_capture(self.pg_interfaces)
1629         self.pg_start()
1630
1631         #
1632         # read the DHCP client details from a dump
1633         #
1634         clients = self.vapi.dhcp_client_dump()
1635
1636         self.assertEqual(clients[0].client.sw_if_index, self.pg3.sw_if_index)
1637         self.assertEqual(clients[0].lease.sw_if_index, self.pg3.sw_if_index)
1638         self.assertEqual(clients[0].client.hostname, hostname)
1639         self.assertEqual(clients[0].lease.hostname, hostname)
1640         # 0 = DISCOVER, 1 = REQUEST, 2 = BOUND
1641         self.assertEqual(clients[0].lease.state, 2)
1642         self.assertEqual(clients[0].lease.mask_width, 24)
1643         self.assertEqual(
1644             str(clients[0].lease.router_address), self.pg3.remote_hosts[1].ip4
1645         )
1646         self.assertEqual(
1647             str(clients[0].lease.host_address), self.pg3.remote_hosts[3].ip4
1648         )
1649
1650         #
1651         # remove the DHCP config
1652         #
1653         Client.remove_vpp_config()
1654
1655         #
1656         # and now the route should be gone
1657         #
1658         self.assertFalse(find_route(self, self.pg3.local_ip4, 32))
1659         self.assertFalse(find_route(self, self.pg3.local_ip4, 24))
1660
1661         #
1662         # Start the procedure again. Use requested lease time option.
1663         # this time wait for the lease to expire and the client to
1664         # self-destruct
1665         #
1666         hostname += "-2"
1667         self.pg3.admin_down()
1668         self.sleep(1)
1669         self.pg3.admin_up()
1670         self.pg_enable_capture(self.pg_interfaces)
1671         Client.set_client(self.pg3.sw_if_index, hostname)
1672         Client.add_vpp_config()
1673
1674         rx = self.pg3.get_capture(1)
1675
1676         self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname)
1677
1678         #
1679         # Send back on offer with requested lease time, expect the request
1680         #
1681         lease_time = 1
1682         p_offer = (
1683             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1684             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1685             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1686             / BOOTP(
1687                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1688             )
1689             / DHCP(
1690                 options=[
1691                     ("message-type", "offer"),
1692                     ("server_id", self.pg3.remote_ip4),
1693                     ("lease_time", lease_time),
1694                     "end",
1695                 ]
1696             )
1697         )
1698
1699         self.pg3.add_stream(p_offer)
1700         self.pg_enable_capture(self.pg_interfaces)
1701         self.pg_start()
1702
1703         rx = self.pg3.get_capture(1)
1704         self.verify_orig_dhcp_request(rx[0], self.pg3, hostname, self.pg3.local_ip4)
1705
1706         #
1707         # Send an acknowledgment
1708         #
1709         p_ack = (
1710             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1711             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1712             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1713             / BOOTP(
1714                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1715             )
1716             / DHCP(
1717                 options=[
1718                     ("message-type", "ack"),
1719                     ("subnet_mask", "255.255.255.0"),
1720                     ("router", self.pg3.remote_ip4),
1721                     ("server_id", self.pg3.remote_ip4),
1722                     ("lease_time", lease_time),
1723                     "end",
1724                 ]
1725             )
1726         )
1727
1728         self.pg3.add_stream(p_ack)
1729         self.pg_enable_capture(self.pg_interfaces)
1730         self.pg_start()
1731
1732         #
1733         # We'll get an ARP request for the router address
1734         #
1735         rx = self.pg3.get_capture(1)
1736
1737         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1738
1739         #
1740         # At the end of this procedure there should be a connected route
1741         # in the FIB
1742         #
1743         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1744         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1745
1746         #
1747         # the route should be gone after the lease expires
1748         #
1749         self.assertTrue(self.wait_for_no_route(self.pg3.local_ip4, 32))
1750         self.assertTrue(self.wait_for_no_route(self.pg3.local_ip4, 24))
1751
1752         #
1753         # remove the DHCP config
1754         #
1755         Client.remove_vpp_config()
1756
1757     def test_dhcp_client_vlan(self):
1758         """DHCP Client w/ VLAN"""
1759
1760         vdscp = VppEnum.vl_api_ip_dscp_t
1761         vqos = VppEnum.vl_api_qos_source_t
1762         hostname = "universal-dp"
1763
1764         self.pg_enable_capture(self.pg_interfaces)
1765
1766         vlan_100 = VppDot1QSubint(self, self.pg3, 100)
1767         vlan_100.admin_up()
1768
1769         output = [scapy.compat.chb(4)] * 256
1770         os = b"".join(output)
1771         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
1772
1773         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
1774         qm1 = VppQosMark(
1775             self, vlan_100, qem1, vqos.QOS_API_SOURCE_VLAN
1776         ).add_vpp_config()
1777
1778         #
1779         # Configure DHCP client on PG3 and capture the discover sent
1780         #
1781         Client = VppDHCPClient(
1782             self, vlan_100.sw_if_index, hostname, dscp=vdscp.IP_API_DSCP_EF
1783         )
1784         Client.add_vpp_config()
1785
1786         rx = self.pg3.get_capture(1)
1787
1788         self.assertEqual(rx[0][Dot1Q].vlan, 100)
1789         self.assertEqual(rx[0][Dot1Q].prio, 2)
1790
1791         self.verify_orig_dhcp_discover(
1792             rx[0], self.pg3, hostname, dscp=vdscp.IP_API_DSCP_EF
1793         )
1794
1795
1796 if __name__ == "__main__":
1797     unittest.main(testRunner=VppTestRunner)