http: fix client parse error handling
[vpp.git] / test / test_dhcp.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5 import six
6
7 from framework import VppTestCase
8 from asfframework import VppTestRunner, tag_run_solo
9 from vpp_neighbor import VppNeighbor
10 from vpp_ip_route import find_route, VppIpTable
11 from util import mk_ll_addr
12 import scapy.compat
13 from scapy.layers.l2 import Ether, ARP, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6, in6_getnsmac
16 from scapy.layers.dhcp import DHCP, BOOTP, DHCPTypes
17 from scapy.layers.dhcp6 import (
18     DHCP6_Solicit,
19     DHCP6_RelayForward,
20     DHCP6_RelayReply,
21     DHCP6_Advertise,
22     DHCP6OptRelayMsg,
23     DHCP6OptIfaceId,
24     DHCP6OptStatusCode,
25     DHCP6OptVSS,
26     DHCP6OptClientLinkLayerAddr,
27     DHCP6_Request,
28 )
29 from socket import AF_INET, AF_INET6, inet_pton
30 from scapy.utils6 import in6_ptop
31 from vpp_papi import mac_pton, VppEnum
32 from vpp_sub_interface import VppDot1QSubint
33 from vpp_qos import VppQosEgressMap, VppQosMark
34 from vpp_dhcp import VppDHCPClient, VppDHCPProxy
35
36
37 DHCP4_CLIENT_PORT = 68
38 DHCP4_SERVER_PORT = 67
39 DHCP6_CLIENT_PORT = 547
40 DHCP6_SERVER_PORT = 546
41
42
43 @tag_run_solo
44 class TestDHCP(VppTestCase):
45     """DHCP Test Case"""
46
47     @classmethod
48     def setUpClass(cls):
49         super(TestDHCP, cls).setUpClass()
50
51     @classmethod
52     def tearDownClass(cls):
53         super(TestDHCP, cls).tearDownClass()
54
55     def setUp(self):
56         super(TestDHCP, self).setUp()
57
58         # create 6 pg interfaces for pg0 to pg5
59         self.create_pg_interfaces(range(6))
60         self.tables = []
61
62         # pg0 to 2 are IP configured in VRF 0, 1 and 2.
63         # pg3 to 5 are non IP-configured in VRF 0, 1 and 2.
64         table_id = 0
65         for table_id in range(1, 4):
66             tbl4 = VppIpTable(self, table_id)
67             tbl4.add_vpp_config()
68             self.tables.append(tbl4)
69             tbl6 = VppIpTable(self, table_id, is_ip6=1)
70             tbl6.add_vpp_config()
71             self.tables.append(tbl6)
72
73         table_id = 0
74         for i in self.pg_interfaces[:3]:
75             i.admin_up()
76             i.set_table_ip4(table_id)
77             i.set_table_ip6(table_id)
78             i.config_ip4()
79             i.resolve_arp()
80             i.config_ip6()
81             i.resolve_ndp()
82             table_id += 1
83
84         table_id = 0
85         for i in self.pg_interfaces[3:]:
86             i.admin_up()
87             i.set_table_ip4(table_id)
88             i.set_table_ip6(table_id)
89             table_id += 1
90
91     def tearDown(self):
92         for i in self.pg_interfaces[:3]:
93             i.unconfig_ip4()
94             i.unconfig_ip6()
95
96         for i in self.pg_interfaces:
97             i.set_table_ip4(0)
98             i.set_table_ip6(0)
99             i.admin_down()
100         super(TestDHCP, self).tearDown()
101
102     def verify_dhcp_has_option(self, pkt, option, value):
103         dhcp = pkt[DHCP]
104         found = False
105
106         for i in dhcp.options:
107             if isinstance(i, tuple):
108                 if i[0] == option:
109                     self.assertEqual(i[1], value)
110                     found = True
111
112         self.assertTrue(found)
113
114     def validate_relay_options(self, pkt, intf, ip_addr, vpn_id, fib_id, oui):
115         dhcp = pkt[DHCP]
116         found = 0
117         data = []
118         id_len = len(vpn_id)
119
120         for i in dhcp.options:
121             if isinstance(i, tuple):
122                 if i[0] == "relay_agent_Information":
123                     #
124                     # There are two sb-options present - each of length 6.
125                     #
126                     data = i[1]
127                     if oui != 0:
128                         self.assertEqual(len(data), 24)
129                     elif len(vpn_id) > 0:
130                         self.assertEqual(len(data), len(vpn_id) + 17)
131                     else:
132                         self.assertEqual(len(data), 12)
133
134                     #
135                     # First sub-option is ID 1, len 4, then encoded
136                     #  sw_if_index. This test uses low valued indicies
137                     # so [2:4] are 0.
138                     # The ID space is VPP internal - so no matching value
139                     # scapy
140                     #
141                     self.assertEqual(six.byte2int(data[0:1]), 1)
142                     self.assertEqual(six.byte2int(data[1:2]), 4)
143                     self.assertEqual(six.byte2int(data[2:3]), 0)
144                     self.assertEqual(six.byte2int(data[3:4]), 0)
145                     self.assertEqual(six.byte2int(data[4:5]), 0)
146                     self.assertEqual(six.byte2int(data[5:6]), intf._sw_if_index)
147
148                     #
149                     # next sub-option is the IP address of the client side
150                     # interface.
151                     # sub-option ID=5, length (of a v4 address)=4
152                     #
153                     claddr = socket.inet_pton(AF_INET, ip_addr)
154
155                     self.assertEqual(six.byte2int(data[6:7]), 5)
156                     self.assertEqual(six.byte2int(data[7:8]), 4)
157                     self.assertEqual(data[8], claddr[0])
158                     self.assertEqual(data[9], claddr[1])
159                     self.assertEqual(data[10], claddr[2])
160                     self.assertEqual(data[11], claddr[3])
161
162                     if oui != 0:
163                         # sub-option 151 encodes vss_type 1,
164                         # the 3 byte oui and the 4 byte fib_id
165                         self.assertEqual(id_len, 0)
166                         self.assertEqual(six.byte2int(data[12:13]), 151)
167                         self.assertEqual(six.byte2int(data[13:14]), 8)
168                         self.assertEqual(six.byte2int(data[14:15]), 1)
169                         self.assertEqual(six.byte2int(data[15:16]), 0)
170                         self.assertEqual(six.byte2int(data[16:17]), 0)
171                         self.assertEqual(six.byte2int(data[17:18]), oui)
172                         self.assertEqual(six.byte2int(data[18:19]), 0)
173                         self.assertEqual(six.byte2int(data[19:20]), 0)
174                         self.assertEqual(six.byte2int(data[20:21]), 0)
175                         self.assertEqual(six.byte2int(data[21:22]), fib_id)
176
177                         # VSS control sub-option
178                         self.assertEqual(six.byte2int(data[22:23]), 152)
179                         self.assertEqual(six.byte2int(data[23:24]), 0)
180
181                     if id_len > 0:
182                         # sub-option 151 encode vss_type of 0
183                         # followerd by vpn_id in ascii
184                         self.assertEqual(oui, 0)
185                         self.assertEqual(six.byte2int(data[12:13]), 151)
186                         self.assertEqual(six.byte2int(data[13:14]), id_len + 1)
187                         self.assertEqual(six.byte2int(data[14:15]), 0)
188                         self.assertEqual(data[15 : 15 + id_len].decode("ascii"), vpn_id)
189
190                         # VSS control sub-option
191                         self.assertEqual(
192                             six.byte2int(data[15 + len(vpn_id) : 16 + len(vpn_id)]), 152
193                         )
194                         self.assertEqual(
195                             six.byte2int(data[16 + len(vpn_id) : 17 + len(vpn_id)]), 0
196                         )
197
198                     found = 1
199         self.assertTrue(found)
200
201         return data
202
203     def verify_dhcp_msg_type(self, pkt, name):
204         dhcp = pkt[DHCP]
205         found = False
206         for o in dhcp.options:
207             if isinstance(o, tuple):
208                 if o[0] == "message-type" and DHCPTypes[o[1]] == name:
209                     found = True
210         self.assertTrue(found)
211
212     def verify_dhcp_offer(self, pkt, intf, vpn_id="", fib_id=0, oui=0):
213         ether = pkt[Ether]
214         self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
215         self.assertEqual(ether.src, intf.local_mac)
216
217         ip = pkt[IP]
218         self.assertEqual(ip.dst, "255.255.255.255")
219         self.assertEqual(ip.src, intf.local_ip4)
220
221         udp = pkt[UDP]
222         self.assertEqual(udp.dport, DHCP4_CLIENT_PORT)
223         self.assertEqual(udp.sport, DHCP4_SERVER_PORT)
224
225         self.verify_dhcp_msg_type(pkt, "offer")
226         data = self.validate_relay_options(
227             pkt, intf, intf.local_ip4, vpn_id, fib_id, oui
228         )
229
230     def verify_orig_dhcp_pkt(self, pkt, intf, dscp, l2_bc=True):
231         ether = pkt[Ether]
232         if l2_bc:
233             self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
234         else:
235             self.assertEqual(ether.dst, intf.remote_mac)
236         self.assertEqual(ether.src, intf.local_mac)
237
238         ip = pkt[IP]
239
240         if l2_bc:
241             self.assertEqual(ip.dst, "255.255.255.255")
242             self.assertEqual(ip.src, "0.0.0.0")
243         else:
244             self.assertEqual(ip.dst, intf.remote_ip4)
245             self.assertEqual(ip.src, intf.local_ip4)
246         self.assertEqual(ip.tos, dscp)
247
248         udp = pkt[UDP]
249         self.assertEqual(udp.dport, DHCP4_SERVER_PORT)
250         self.assertEqual(udp.sport, DHCP4_CLIENT_PORT)
251
252     def verify_orig_dhcp_discover(
253         self, pkt, intf, hostname, client_id=None, broadcast=True, dscp=0
254     ):
255         self.verify_orig_dhcp_pkt(pkt, intf, dscp)
256
257         self.verify_dhcp_msg_type(pkt, "discover")
258         self.verify_dhcp_has_option(pkt, "hostname", hostname.encode("ascii"))
259         if client_id:
260             client_id = "\x00" + client_id
261             self.verify_dhcp_has_option(pkt, "client_id", client_id.encode("ascii"))
262         bootp = pkt[BOOTP]
263         self.assertEqual(bootp.ciaddr, "0.0.0.0")
264         self.assertEqual(bootp.giaddr, "0.0.0.0")
265         if broadcast:
266             self.assertEqual(bootp.flags, 0x8000)
267         else:
268             self.assertEqual(bootp.flags, 0x0000)
269
270     def verify_orig_dhcp_request(
271         self, pkt, intf, hostname, ip, broadcast=True, l2_bc=True, dscp=0
272     ):
273         self.verify_orig_dhcp_pkt(pkt, intf, dscp, l2_bc=l2_bc)
274
275         self.verify_dhcp_msg_type(pkt, "request")
276         self.verify_dhcp_has_option(pkt, "hostname", hostname.encode("ascii"))
277         self.verify_dhcp_has_option(pkt, "requested_addr", ip)
278         bootp = pkt[BOOTP]
279
280         if l2_bc:
281             self.assertEqual(bootp.ciaddr, "0.0.0.0")
282         else:
283             self.assertEqual(bootp.ciaddr, intf.local_ip4)
284         self.assertEqual(bootp.giaddr, "0.0.0.0")
285
286         if broadcast:
287             self.assertEqual(bootp.flags, 0x8000)
288         else:
289             self.assertEqual(bootp.flags, 0x0000)
290
291     def verify_relayed_dhcp_discover(
292         self,
293         pkt,
294         intf,
295         src_intf=None,
296         fib_id=0,
297         oui=0,
298         vpn_id="",
299         dst_mac=None,
300         dst_ip=None,
301     ):
302         if not dst_mac:
303             dst_mac = intf.remote_mac
304         if not dst_ip:
305             dst_ip = intf.remote_ip4
306
307         ether = pkt[Ether]
308         self.assertEqual(ether.dst, dst_mac)
309         self.assertEqual(ether.src, intf.local_mac)
310
311         ip = pkt[IP]
312         self.assertEqual(ip.dst, dst_ip)
313         self.assertEqual(ip.src, intf.local_ip4)
314
315         udp = pkt[UDP]
316         self.assertEqual(udp.dport, DHCP4_SERVER_PORT)
317         self.assertEqual(udp.sport, DHCP4_CLIENT_PORT)
318
319         dhcp = pkt[DHCP]
320
321         is_discover = False
322         for o in dhcp.options:
323             if isinstance(o, tuple):
324                 if o[0] == "message-type" and DHCPTypes[o[1]] == "discover":
325                     is_discover = True
326         self.assertTrue(is_discover)
327
328         data = self.validate_relay_options(
329             pkt, src_intf, src_intf.local_ip4, vpn_id, fib_id, oui
330         )
331         return data
332
333     def verify_dhcp6_solicit(
334         self,
335         pkt,
336         intf,
337         peer_ip,
338         peer_mac,
339         vpn_id="",
340         fib_id=0,
341         oui=0,
342         dst_mac=None,
343         dst_ip=None,
344     ):
345         if not dst_mac:
346             dst_mac = intf.remote_mac
347         if not dst_ip:
348             dst_ip = in6_ptop(intf.remote_ip6)
349
350         ether = pkt[Ether]
351         self.assertEqual(ether.dst, dst_mac)
352         self.assertEqual(ether.src, intf.local_mac)
353
354         ip = pkt[IPv6]
355         self.assertEqual(in6_ptop(ip.dst), dst_ip)
356         self.assertEqual(in6_ptop(ip.src), in6_ptop(intf.local_ip6))
357
358         udp = pkt[UDP]
359         self.assertEqual(udp.dport, DHCP6_CLIENT_PORT)
360         self.assertEqual(udp.sport, DHCP6_SERVER_PORT)
361
362         relay = pkt[DHCP6_RelayForward]
363         self.assertEqual(in6_ptop(relay.peeraddr), in6_ptop(peer_ip))
364         oid = pkt[DHCP6OptIfaceId]
365         cll = pkt[DHCP6OptClientLinkLayerAddr]
366         self.assertEqual(cll.optlen, 8)
367         self.assertEqual(cll.lltype, 1)
368         self.assertEqual(cll.clladdr, peer_mac)
369
370         id_len = len(vpn_id)
371
372         if fib_id != 0:
373             self.assertEqual(id_len, 0)
374             vss = pkt[DHCP6OptVSS]
375             self.assertEqual(vss.optlen, 8)
376             self.assertEqual(vss.type, 1)
377             # the OUI and FIB-id are really 3 and 4 bytes resp.
378             # but the tested range is small
379             self.assertEqual(six.byte2int(vss.data[0:1]), 0)
380             self.assertEqual(six.byte2int(vss.data[1:2]), 0)
381             self.assertEqual(six.byte2int(vss.data[2:3]), oui)
382             self.assertEqual(six.byte2int(vss.data[3:4]), 0)
383             self.assertEqual(six.byte2int(vss.data[4:5]), 0)
384             self.assertEqual(six.byte2int(vss.data[5:6]), 0)
385             self.assertEqual(six.byte2int(vss.data[6:7]), fib_id)
386
387         if id_len > 0:
388             self.assertEqual(oui, 0)
389             vss = pkt[DHCP6OptVSS]
390             self.assertEqual(vss.optlen, id_len + 1)
391             self.assertEqual(vss.type, 0)
392             self.assertEqual(vss.data[0:id_len].decode("ascii"), vpn_id)
393
394         # the relay message should be an encoded Solicit
395         msg = pkt[DHCP6OptRelayMsg]
396         sol = DHCP6_Solicit()
397         self.assertEqual(msg.optlen, len(sol))
398         self.assertEqual(sol, msg[1])
399
400     def verify_dhcp6_advert(self, pkt, intf, peer):
401         ether = pkt[Ether]
402         self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
403         self.assertEqual(ether.src, intf.local_mac)
404
405         ip = pkt[IPv6]
406         self.assertEqual(in6_ptop(ip.dst), in6_ptop(peer))
407         self.assertEqual(in6_ptop(ip.src), in6_ptop(intf.local_ip6))
408
409         udp = pkt[UDP]
410         self.assertEqual(udp.dport, DHCP6_SERVER_PORT)
411         self.assertEqual(udp.sport, DHCP6_CLIENT_PORT)
412
413         # not sure why this is not decoding
414         # adv = pkt[DHCP6_Advertise]
415
416     def wait_for_no_route(self, address, length, n_tries=50, s_time=1):
417         while n_tries:
418             if not find_route(self, address, length):
419                 return True
420             n_tries = n_tries - 1
421             self.sleep(s_time)
422
423         return False
424
425     def test_dhcp_proxy(self):
426         """DHCPv4 Proxy"""
427
428         #
429         # Verify no response to DHCP request without DHCP config
430         #
431         p_disc_vrf0 = (
432             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg3.remote_mac)
433             / IP(src="0.0.0.0", dst="255.255.255.255")
434             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
435             / BOOTP(op=1)
436             / DHCP(options=[("message-type", "discover"), ("end")])
437         )
438         pkts_disc_vrf0 = [p_disc_vrf0]
439         p_disc_vrf1 = (
440             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg4.remote_mac)
441             / IP(src="0.0.0.0", dst="255.255.255.255")
442             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
443             / BOOTP(op=1)
444             / DHCP(options=[("message-type", "discover"), ("end")])
445         )
446         pkts_disc_vrf1 = [p_disc_vrf1]
447         p_disc_vrf2 = (
448             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg5.remote_mac)
449             / IP(src="0.0.0.0", dst="255.255.255.255")
450             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
451             / BOOTP(op=1)
452             / DHCP(options=[("message-type", "discover"), ("end")])
453         )
454         pkts_disc_vrf2 = [p_disc_vrf2]
455
456         self.send_and_assert_no_replies(
457             self.pg3, pkts_disc_vrf0, "DHCP with no configuration"
458         )
459         self.send_and_assert_no_replies(
460             self.pg4, pkts_disc_vrf1, "DHCP with no configuration"
461         )
462         self.send_and_assert_no_replies(
463             self.pg5, pkts_disc_vrf2, "DHCP with no configuration"
464         )
465
466         #
467         # Enable DHCP proxy in VRF 0
468         #
469         server_addr = self.pg0.remote_ip4
470         src_addr = self.pg0.local_ip4
471
472         Proxy = VppDHCPProxy(self, server_addr, src_addr, rx_vrf_id=0)
473         Proxy.add_vpp_config()
474
475         #
476         # Discover packets from the client are dropped because there is no
477         # IP address configured on the client facing interface
478         #
479         self.send_and_assert_no_replies(
480             self.pg3, pkts_disc_vrf0, "Discover DHCP no relay address"
481         )
482
483         #
484         # Inject a response from the server
485         #  dropped, because there is no IP addrees on the
486         #  client interfce to fill in the option.
487         #
488         p = (
489             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
490             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
491             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
492             / BOOTP(op=1)
493             / DHCP(options=[("message-type", "offer"), ("end")])
494         )
495         pkts = [p]
496
497         self.send_and_assert_no_replies(self.pg3, pkts, "Offer DHCP no relay address")
498
499         #
500         # configure an IP address on the client facing interface
501         #
502         self.pg3.config_ip4()
503
504         #
505         # Try again with a discover packet
506         # Rx'd packet should be to the server address and from the configured
507         # source address
508         # UDP source ports are unchanged
509         # we've no option 82 config so that should be absent
510         #
511         self.pg3.add_stream(pkts_disc_vrf0)
512         self.pg_enable_capture(self.pg_interfaces)
513         self.pg_start()
514
515         rx = self.pg0.get_capture(1)
516         rx = rx[0]
517
518         option_82 = self.verify_relayed_dhcp_discover(rx, self.pg0, src_intf=self.pg3)
519
520         #
521         # Create an DHCP offer reply from the server with a correctly formatted
522         # option 82. i.e. send back what we just captured
523         # The offer, sent mcast to the client, still has option 82.
524         #
525         p = (
526             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
527             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
528             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
529             / BOOTP(op=1)
530             / DHCP(
531                 options=[
532                     ("message-type", "offer"),
533                     ("relay_agent_Information", option_82),
534                     ("end"),
535                 ]
536             )
537         )
538         pkts = [p]
539
540         self.pg0.add_stream(pkts)
541         self.pg_enable_capture(self.pg_interfaces)
542         self.pg_start()
543
544         rx = self.pg3.get_capture(1)
545         rx = rx[0]
546
547         self.verify_dhcp_offer(rx, self.pg3)
548
549         #
550         # Bogus Option 82:
551         #
552         # 1. not our IP address = not checked by VPP? so offer is replayed
553         #    to client
554         bad_ip = option_82[0:8] + scapy.compat.chb(33) + option_82[9:]
555
556         p = (
557             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
558             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
559             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
560             / BOOTP(op=1)
561             / DHCP(
562                 options=[
563                     ("message-type", "offer"),
564                     ("relay_agent_Information", bad_ip),
565                     ("end"),
566                 ]
567             )
568         )
569         pkts = [p]
570         self.send_and_assert_no_replies(
571             self.pg0, pkts, "DHCP offer option 82 bad address"
572         )
573
574         # 2. Not a sw_if_index VPP knows
575         bad_if_index = option_82[0:2] + scapy.compat.chb(33) + option_82[3:]
576
577         p = (
578             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
579             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
580             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
581             / BOOTP(op=1)
582             / DHCP(
583                 options=[
584                     ("message-type", "offer"),
585                     ("relay_agent_Information", bad_if_index),
586                     ("end"),
587                 ]
588             )
589         )
590         pkts = [p]
591         self.send_and_assert_no_replies(
592             self.pg0, pkts, "DHCP offer option 82 bad if index"
593         )
594
595         #
596         # Send a DHCP request in VRF 1. should be dropped.
597         #
598         self.send_and_assert_no_replies(
599             self.pg4, pkts_disc_vrf1, "DHCP with no configuration VRF 1"
600         )
601
602         #
603         # Delete the DHCP config in VRF 0
604         # Should now drop requests.
605         #
606         Proxy.remove_vpp_config()
607
608         self.send_and_assert_no_replies(
609             self.pg3, pkts_disc_vrf0, "DHCP config removed VRF 0"
610         )
611         self.send_and_assert_no_replies(
612             self.pg4, pkts_disc_vrf1, "DHCP config removed VRF 1"
613         )
614
615         #
616         # Add DHCP config for VRF 1 & 2
617         #
618         server_addr1 = self.pg1.remote_ip4
619         src_addr1 = self.pg1.local_ip4
620         Proxy1 = VppDHCPProxy(
621             self, server_addr1, src_addr1, rx_vrf_id=1, server_vrf_id=1
622         )
623         Proxy1.add_vpp_config()
624
625         server_addr2 = self.pg2.remote_ip4
626         src_addr2 = self.pg2.local_ip4
627         Proxy2 = VppDHCPProxy(
628             self, server_addr2, src_addr2, rx_vrf_id=2, server_vrf_id=2
629         )
630         Proxy2.add_vpp_config()
631
632         #
633         # Confim DHCP requests ok in VRF 1 & 2.
634         #  - dropped on IP config on client interface
635         #
636         self.send_and_assert_no_replies(
637             self.pg4, pkts_disc_vrf1, "DHCP config removed VRF 1"
638         )
639         self.send_and_assert_no_replies(
640             self.pg5, pkts_disc_vrf2, "DHCP config removed VRF 2"
641         )
642
643         #
644         # configure an IP address on the client facing interface
645         #
646         self.pg4.config_ip4()
647         self.pg4.add_stream(pkts_disc_vrf1)
648         self.pg_enable_capture(self.pg_interfaces)
649         self.pg_start()
650         rx = self.pg1.get_capture(1)
651         rx = rx[0]
652         self.verify_relayed_dhcp_discover(rx, self.pg1, src_intf=self.pg4)
653
654         self.pg5.config_ip4()
655         self.pg5.add_stream(pkts_disc_vrf2)
656         self.pg_enable_capture(self.pg_interfaces)
657         self.pg_start()
658         rx = self.pg2.get_capture(1)
659         rx = rx[0]
660         self.verify_relayed_dhcp_discover(rx, self.pg2, src_intf=self.pg5)
661
662         #
663         # Add VSS config
664         #  table=1, vss_type=1, vpn_index=1, oui=4
665         #  table=2, vss_type=0, vpn_id = "ip4-table-2"
666         self.vapi.dhcp_proxy_set_vss(tbl_id=1, vss_type=1, vpn_index=1, oui=4, is_add=1)
667         self.vapi.dhcp_proxy_set_vss(
668             tbl_id=2, vss_type=0, vpn_ascii_id="ip4-table-2", is_add=1
669         )
670
671         self.pg4.add_stream(pkts_disc_vrf1)
672         self.pg_enable_capture(self.pg_interfaces)
673         self.pg_start()
674
675         rx = self.pg1.get_capture(1)
676         rx = rx[0]
677         self.verify_relayed_dhcp_discover(
678             rx, self.pg1, src_intf=self.pg4, fib_id=1, oui=4
679         )
680
681         self.pg5.add_stream(pkts_disc_vrf2)
682         self.pg_enable_capture(self.pg_interfaces)
683         self.pg_start()
684
685         rx = self.pg2.get_capture(1)
686         rx = rx[0]
687         self.verify_relayed_dhcp_discover(
688             rx, self.pg2, src_intf=self.pg5, vpn_id="ip4-table-2"
689         )
690
691         #
692         # Add a second DHCP server in VRF 1
693         #  expect clients messages to be relay to both configured servers
694         #
695         self.pg1.generate_remote_hosts(2)
696         server_addr12 = self.pg1.remote_hosts[1].ip4
697
698         Proxy12 = VppDHCPProxy(
699             self, server_addr12, src_addr, rx_vrf_id=1, server_vrf_id=1
700         )
701         Proxy12.add_vpp_config()
702
703         #
704         # We'll need an ARP entry for the server to send it packets
705         #
706         arp_entry = VppNeighbor(
707             self,
708             self.pg1.sw_if_index,
709             self.pg1.remote_hosts[1].mac,
710             self.pg1.remote_hosts[1].ip4,
711         )
712         arp_entry.add_vpp_config()
713
714         #
715         # Send a discover from the client. expect two relayed messages
716         # The frist packet is sent to the second server
717         # We're not enforcing that here, it's just the way it is.
718         #
719         self.pg4.add_stream(pkts_disc_vrf1)
720         self.pg_enable_capture(self.pg_interfaces)
721         self.pg_start()
722
723         rx = self.pg1.get_capture(2)
724
725         option_82 = self.verify_relayed_dhcp_discover(
726             rx[0],
727             self.pg1,
728             src_intf=self.pg4,
729             dst_mac=self.pg1.remote_hosts[1].mac,
730             dst_ip=self.pg1.remote_hosts[1].ip4,
731             fib_id=1,
732             oui=4,
733         )
734         self.verify_relayed_dhcp_discover(
735             rx[1], self.pg1, src_intf=self.pg4, fib_id=1, oui=4
736         )
737
738         #
739         # Send both packets back. Client gets both.
740         #
741         p1 = (
742             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
743             / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
744             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
745             / BOOTP(op=1)
746             / DHCP(
747                 options=[
748                     ("message-type", "offer"),
749                     ("relay_agent_Information", option_82),
750                     ("end"),
751                 ]
752             )
753         )
754         p2 = (
755             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
756             / IP(src=self.pg1.remote_hosts[1].ip4, dst=self.pg1.local_ip4)
757             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
758             / BOOTP(op=1)
759             / DHCP(
760                 options=[
761                     ("message-type", "offer"),
762                     ("relay_agent_Information", option_82),
763                     ("end"),
764                 ]
765             )
766         )
767         pkts = [p1, p2]
768
769         self.pg1.add_stream(pkts)
770         self.pg_enable_capture(self.pg_interfaces)
771         self.pg_start()
772
773         rx = self.pg4.get_capture(2)
774
775         self.verify_dhcp_offer(rx[0], self.pg4, fib_id=1, oui=4)
776         self.verify_dhcp_offer(rx[1], self.pg4, fib_id=1, oui=4)
777
778         #
779         # Ensure offers from non-servers are dropeed
780         #
781         p2 = (
782             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
783             / IP(src="8.8.8.8", dst=self.pg1.local_ip4)
784             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_SERVER_PORT)
785             / BOOTP(op=1)
786             / DHCP(
787                 options=[
788                     ("message-type", "offer"),
789                     ("relay_agent_Information", option_82),
790                     ("end"),
791                 ]
792             )
793         )
794         self.send_and_assert_no_replies(self.pg1, p2, "DHCP offer from non-server")
795
796         #
797         # Ensure only the discover is sent to multiple servers
798         #
799         p_req_vrf1 = (
800             Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg4.remote_mac)
801             / IP(src="0.0.0.0", dst="255.255.255.255")
802             / UDP(sport=DHCP4_CLIENT_PORT, dport=DHCP4_SERVER_PORT)
803             / BOOTP(op=1)
804             / DHCP(options=[("message-type", "request"), ("end")])
805         )
806
807         self.pg4.add_stream(p_req_vrf1)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810
811         rx = self.pg1.get_capture(1)
812
813         #
814         # Remove the second DHCP server
815         #
816         Proxy12.remove_vpp_config()
817
818         #
819         # Test we can still relay with the first
820         #
821         self.pg4.add_stream(pkts_disc_vrf1)
822         self.pg_enable_capture(self.pg_interfaces)
823         self.pg_start()
824
825         rx = self.pg1.get_capture(1)
826         rx = rx[0]
827         self.verify_relayed_dhcp_discover(
828             rx, self.pg1, src_intf=self.pg4, fib_id=1, oui=4
829         )
830
831         #
832         # Remove the VSS config
833         #  relayed DHCP has default vlaues in the option.
834         #
835         self.vapi.dhcp_proxy_set_vss(tbl_id=1, is_add=0)
836         self.vapi.dhcp_proxy_set_vss(tbl_id=2, is_add=0)
837
838         self.pg4.add_stream(pkts_disc_vrf1)
839         self.pg_enable_capture(self.pg_interfaces)
840         self.pg_start()
841
842         rx = self.pg1.get_capture(1)
843         rx = rx[0]
844         self.verify_relayed_dhcp_discover(rx, self.pg1, src_intf=self.pg4)
845
846         #
847         # remove DHCP config to cleanup
848         #
849         Proxy1.remove_vpp_config()
850         Proxy2.remove_vpp_config()
851
852         self.send_and_assert_no_replies(self.pg3, pkts_disc_vrf0, "DHCP cleanup VRF 0")
853         self.send_and_assert_no_replies(self.pg4, pkts_disc_vrf1, "DHCP cleanup VRF 1")
854         self.send_and_assert_no_replies(self.pg5, pkts_disc_vrf2, "DHCP cleanup VRF 2")
855
856         self.pg3.unconfig_ip4()
857         self.pg4.unconfig_ip4()
858         self.pg5.unconfig_ip4()
859
860     def test_dhcp6_proxy(self):
861         """DHCPv6 Proxy"""
862         #
863         # Verify no response to DHCP request without DHCP config
864         #
865         dhcp_solicit_dst = "ff02::1:2"
866         dhcp_solicit_src_vrf0 = mk_ll_addr(self.pg3.remote_mac)
867         dhcp_solicit_src_vrf1 = mk_ll_addr(self.pg4.remote_mac)
868         dhcp_solicit_src_vrf2 = mk_ll_addr(self.pg5.remote_mac)
869         server_addr_vrf0 = self.pg0.remote_ip6
870         src_addr_vrf0 = self.pg0.local_ip6
871         server_addr_vrf1 = self.pg1.remote_ip6
872         src_addr_vrf1 = self.pg1.local_ip6
873         server_addr_vrf2 = self.pg2.remote_ip6
874         src_addr_vrf2 = self.pg2.local_ip6
875
876         dmac = in6_getnsmac(inet_pton(socket.AF_INET6, dhcp_solicit_dst))
877         p_solicit_vrf0 = (
878             Ether(dst=dmac, src=self.pg3.remote_mac)
879             / IPv6(src=dhcp_solicit_src_vrf0, dst=dhcp_solicit_dst)
880             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
881             / DHCP6_Solicit()
882         )
883         p_solicit_vrf1 = (
884             Ether(dst=dmac, src=self.pg4.remote_mac)
885             / IPv6(src=dhcp_solicit_src_vrf1, dst=dhcp_solicit_dst)
886             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
887             / DHCP6_Solicit()
888         )
889         p_solicit_vrf2 = (
890             Ether(dst=dmac, src=self.pg5.remote_mac)
891             / IPv6(src=dhcp_solicit_src_vrf2, dst=dhcp_solicit_dst)
892             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
893             / DHCP6_Solicit()
894         )
895
896         self.send_and_assert_no_replies(
897             self.pg3, p_solicit_vrf0, "DHCP with no configuration"
898         )
899         self.send_and_assert_no_replies(
900             self.pg4, p_solicit_vrf1, "DHCP with no configuration"
901         )
902         self.send_and_assert_no_replies(
903             self.pg5, p_solicit_vrf2, "DHCP with no configuration"
904         )
905
906         #
907         # DHCPv6 config in VRF 0.
908         # Packets still dropped because the client facing interface has no
909         # IPv6 config
910         #
911         Proxy = VppDHCPProxy(
912             self, server_addr_vrf0, src_addr_vrf0, rx_vrf_id=0, server_vrf_id=0
913         )
914         Proxy.add_vpp_config()
915
916         self.send_and_assert_no_replies(
917             self.pg3, p_solicit_vrf0, "DHCP with no configuration"
918         )
919         self.send_and_assert_no_replies(
920             self.pg4, p_solicit_vrf1, "DHCP with no configuration"
921         )
922
923         #
924         # configure an IP address on the client facing interface
925         #
926         self.pg3.config_ip6()
927
928         #
929         # Now the DHCP requests are relayed to the server
930         #
931         self.pg3.add_stream(p_solicit_vrf0)
932         self.pg_enable_capture(self.pg_interfaces)
933         self.pg_start()
934
935         rx = self.pg0.get_capture(1)
936
937         self.verify_dhcp6_solicit(
938             rx[0], self.pg0, dhcp_solicit_src_vrf0, self.pg3.remote_mac
939         )
940
941         #
942         # Exception cases for rejected relay responses
943         #
944
945         # 1 - not a relay reply
946         p_adv_vrf0 = (
947             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
948             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
949             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
950             / DHCP6_Advertise()
951         )
952         self.send_and_assert_no_replies(self.pg3, p_adv_vrf0, "DHCP6 not a relay reply")
953
954         # 2 - no relay message option
955         p_adv_vrf0 = (
956             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
957             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
958             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
959             / DHCP6_RelayReply()
960             / DHCP6_Advertise()
961         )
962         self.send_and_assert_no_replies(
963             self.pg3, p_adv_vrf0, "DHCP not a relay message"
964         )
965
966         # 3 - no circuit ID
967         p_adv_vrf0 = (
968             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
969             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
970             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
971             / DHCP6_RelayReply()
972             / DHCP6OptRelayMsg(optlen=0)
973             / DHCP6_Advertise()
974         )
975         self.send_and_assert_no_replies(self.pg3, p_adv_vrf0, "DHCP6 no circuit ID")
976         # 4 - wrong circuit ID
977         p_adv_vrf0 = (
978             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
979             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
980             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
981             / DHCP6_RelayReply()
982             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
983             / DHCP6OptRelayMsg(optlen=0)
984             / DHCP6_Advertise()
985         )
986         self.send_and_assert_no_replies(self.pg3, p_adv_vrf0, "DHCP6 wrong circuit ID")
987
988         #
989         # Send the relay response (the advertisement)
990         #   - no peer address
991         p_adv_vrf0 = (
992             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
993             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
994             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
995             / DHCP6_RelayReply()
996             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x04")
997             / DHCP6OptRelayMsg(optlen=0)
998             / DHCP6_Advertise(trid=1)
999             / DHCP6OptStatusCode(statuscode=0)
1000         )
1001         pkts_adv_vrf0 = [p_adv_vrf0]
1002
1003         self.pg0.add_stream(pkts_adv_vrf0)
1004         self.pg_enable_capture(self.pg_interfaces)
1005         self.pg_start()
1006
1007         rx = self.pg3.get_capture(1)
1008
1009         self.verify_dhcp6_advert(rx[0], self.pg3, "::")
1010
1011         #
1012         # Send the relay response (the advertisement)
1013         #   - with peer address
1014         p_adv_vrf0 = (
1015             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1016             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
1017             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1018             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf0)
1019             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x04")
1020             / DHCP6OptRelayMsg(optlen=0)
1021             / DHCP6_Advertise(trid=1)
1022             / DHCP6OptStatusCode(statuscode=0)
1023         )
1024         pkts_adv_vrf0 = [p_adv_vrf0]
1025
1026         self.pg0.add_stream(pkts_adv_vrf0)
1027         self.pg_enable_capture(self.pg_interfaces)
1028         self.pg_start()
1029
1030         rx = self.pg3.get_capture(1)
1031
1032         self.verify_dhcp6_advert(rx[0], self.pg3, dhcp_solicit_src_vrf0)
1033
1034         #
1035         # Add all the config for VRF 1 & 2
1036         #
1037         Proxy1 = VppDHCPProxy(
1038             self, server_addr_vrf1, src_addr_vrf1, rx_vrf_id=1, server_vrf_id=1
1039         )
1040         Proxy1.add_vpp_config()
1041         self.pg4.config_ip6()
1042
1043         Proxy2 = VppDHCPProxy(
1044             self, server_addr_vrf2, src_addr_vrf2, rx_vrf_id=2, server_vrf_id=2
1045         )
1046         Proxy2.add_vpp_config()
1047         self.pg5.config_ip6()
1048
1049         #
1050         # VRF 1 solicit
1051         #
1052         self.pg4.add_stream(p_solicit_vrf1)
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055
1056         rx = self.pg1.get_capture(1)
1057
1058         self.verify_dhcp6_solicit(
1059             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1060         )
1061
1062         #
1063         # VRF 2 solicit
1064         #
1065         self.pg5.add_stream(p_solicit_vrf2)
1066         self.pg_enable_capture(self.pg_interfaces)
1067         self.pg_start()
1068
1069         rx = self.pg2.get_capture(1)
1070
1071         self.verify_dhcp6_solicit(
1072             rx[0], self.pg2, dhcp_solicit_src_vrf2, self.pg5.remote_mac
1073         )
1074
1075         #
1076         # VRF 1 Advert
1077         #
1078         p_adv_vrf1 = (
1079             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1080             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_ip6)
1081             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1082             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1083             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1084             / DHCP6OptRelayMsg(optlen=0)
1085             / DHCP6_Advertise(trid=1)
1086             / DHCP6OptStatusCode(statuscode=0)
1087         )
1088         pkts_adv_vrf1 = [p_adv_vrf1]
1089
1090         self.pg1.add_stream(pkts_adv_vrf1)
1091         self.pg_enable_capture(self.pg_interfaces)
1092         self.pg_start()
1093
1094         rx = self.pg4.get_capture(1)
1095
1096         self.verify_dhcp6_advert(rx[0], self.pg4, dhcp_solicit_src_vrf1)
1097
1098         #
1099         # Add VSS config
1100         #
1101         self.vapi.dhcp_proxy_set_vss(
1102             tbl_id=1, vss_type=1, oui=4, vpn_index=1, is_ipv6=1, is_add=1
1103         )
1104         self.vapi.dhcp_proxy_set_vss(
1105             tbl_id=2, vss_type=0, vpn_ascii_id="IPv6-table-2", is_ipv6=1, is_add=1
1106         )
1107
1108         self.pg4.add_stream(p_solicit_vrf1)
1109         self.pg_enable_capture(self.pg_interfaces)
1110         self.pg_start()
1111
1112         rx = self.pg1.get_capture(1)
1113
1114         self.verify_dhcp6_solicit(
1115             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac, fib_id=1, oui=4
1116         )
1117
1118         self.pg5.add_stream(p_solicit_vrf2)
1119         self.pg_enable_capture(self.pg_interfaces)
1120         self.pg_start()
1121
1122         rx = self.pg2.get_capture(1)
1123
1124         self.verify_dhcp6_solicit(
1125             rx[0],
1126             self.pg2,
1127             dhcp_solicit_src_vrf2,
1128             self.pg5.remote_mac,
1129             vpn_id="IPv6-table-2",
1130         )
1131
1132         #
1133         # Remove the VSS config
1134         #  relayed DHCP has default vlaues in the option.
1135         #
1136         self.vapi.dhcp_proxy_set_vss(tbl_id=1, is_ipv6=1, is_add=0)
1137
1138         self.pg4.add_stream(p_solicit_vrf1)
1139         self.pg_enable_capture(self.pg_interfaces)
1140         self.pg_start()
1141
1142         rx = self.pg1.get_capture(1)
1143
1144         self.verify_dhcp6_solicit(
1145             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1146         )
1147
1148         #
1149         # Add a second DHCP server in VRF 1
1150         #  expect clients messages to be relay to both configured servers
1151         #
1152         self.pg1.generate_remote_hosts(2)
1153         server_addr12 = self.pg1.remote_hosts[1].ip6
1154
1155         Proxy12 = VppDHCPProxy(
1156             self, server_addr12, src_addr_vrf1, rx_vrf_id=1, server_vrf_id=1
1157         )
1158         Proxy12.add_vpp_config()
1159
1160         #
1161         # We'll need an ND entry for the server to send it packets
1162         #
1163         nd_entry = VppNeighbor(
1164             self,
1165             self.pg1.sw_if_index,
1166             self.pg1.remote_hosts[1].mac,
1167             self.pg1.remote_hosts[1].ip6,
1168         )
1169         nd_entry.add_vpp_config()
1170
1171         #
1172         # Send a discover from the client. expect two relayed messages
1173         # The frist packet is sent to the second server
1174         # We're not enforcing that here, it's just the way it is.
1175         #
1176         self.pg4.add_stream(p_solicit_vrf1)
1177         self.pg_enable_capture(self.pg_interfaces)
1178         self.pg_start()
1179
1180         rx = self.pg1.get_capture(2)
1181
1182         self.verify_dhcp6_solicit(
1183             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1184         )
1185         self.verify_dhcp6_solicit(
1186             rx[1],
1187             self.pg1,
1188             dhcp_solicit_src_vrf1,
1189             self.pg4.remote_mac,
1190             dst_mac=self.pg1.remote_hosts[1].mac,
1191             dst_ip=self.pg1.remote_hosts[1].ip6,
1192         )
1193
1194         #
1195         # Send both packets back. Client gets both.
1196         #
1197         p1 = (
1198             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1199             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_ip6)
1200             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1201             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1202             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1203             / DHCP6OptRelayMsg(optlen=0)
1204             / DHCP6_Advertise(trid=1)
1205             / DHCP6OptStatusCode(statuscode=0)
1206         )
1207         p2 = (
1208             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_hosts[1].mac)
1209             / IPv6(dst=self.pg1.local_ip6, src=self.pg1._remote_hosts[1].ip6)
1210             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1211             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1212             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1213             / DHCP6OptRelayMsg(optlen=0)
1214             / DHCP6_Advertise(trid=1)
1215             / DHCP6OptStatusCode(statuscode=0)
1216         )
1217
1218         pkts = [p1, p2]
1219
1220         self.pg1.add_stream(pkts)
1221         self.pg_enable_capture(self.pg_interfaces)
1222         self.pg_start()
1223
1224         rx = self.pg4.get_capture(2)
1225
1226         self.verify_dhcp6_advert(rx[0], self.pg4, dhcp_solicit_src_vrf1)
1227         self.verify_dhcp6_advert(rx[1], self.pg4, dhcp_solicit_src_vrf1)
1228
1229         #
1230         # Ensure only solicit messages are duplicated
1231         #
1232         p_request_vrf1 = (
1233             Ether(dst=dmac, src=self.pg4.remote_mac)
1234             / IPv6(src=dhcp_solicit_src_vrf1, dst=dhcp_solicit_dst)
1235             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_CLIENT_PORT)
1236             / DHCP6_Request()
1237         )
1238
1239         self.pg4.add_stream(p_request_vrf1)
1240         self.pg_enable_capture(self.pg_interfaces)
1241         self.pg_start()
1242
1243         rx = self.pg1.get_capture(1)
1244
1245         #
1246         # Test we drop DHCP packets from addresses that are not configured as
1247         # DHCP servers
1248         #
1249         p2 = (
1250             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_hosts[1].mac)
1251             / IPv6(dst=self.pg1.local_ip6, src="3001::1")
1252             / UDP(sport=DHCP6_SERVER_PORT, dport=DHCP6_SERVER_PORT)
1253             / DHCP6_RelayReply(peeraddr=dhcp_solicit_src_vrf1)
1254             / DHCP6OptIfaceId(optlen=4, ifaceid="\x00\x00\x00\x05")
1255             / DHCP6OptRelayMsg(optlen=0)
1256             / DHCP6_Advertise(trid=1)
1257             / DHCP6OptStatusCode(statuscode=0)
1258         )
1259         self.send_and_assert_no_replies(self.pg1, p2, "DHCP6 not from server")
1260
1261         #
1262         # Remove the second DHCP server
1263         #
1264         Proxy12.remove_vpp_config()
1265
1266         #
1267         # Test we can still relay with the first
1268         #
1269         self.pg4.add_stream(p_solicit_vrf1)
1270         self.pg_enable_capture(self.pg_interfaces)
1271         self.pg_start()
1272
1273         rx = self.pg1.get_capture(1)
1274
1275         self.verify_dhcp6_solicit(
1276             rx[0], self.pg1, dhcp_solicit_src_vrf1, self.pg4.remote_mac
1277         )
1278
1279         #
1280         # Cleanup
1281         #
1282         Proxy.remove_vpp_config()
1283         Proxy1.remove_vpp_config()
1284         Proxy2.remove_vpp_config()
1285
1286         self.pg3.unconfig_ip6()
1287         self.pg4.unconfig_ip6()
1288         self.pg5.unconfig_ip6()
1289
1290     def test_dhcp_client(self):
1291         """DHCP Client"""
1292
1293         vdscp = VppEnum.vl_api_ip_dscp_t
1294         hostname = "universal-dp"
1295
1296         self.pg_enable_capture(self.pg_interfaces)
1297
1298         #
1299         # Configure DHCP client on PG3 and capture the discover sent
1300         #
1301         Client = VppDHCPClient(self, self.pg3.sw_if_index, hostname)
1302         Client.add_vpp_config()
1303         self.assertTrue(Client.query_vpp_config())
1304
1305         rx = self.pg3.get_capture(1)
1306
1307         self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname)
1308
1309         #
1310         # Send back on offer, expect the request
1311         #
1312         p_offer = (
1313             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1314             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1315             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1316             / BOOTP(
1317                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1318             )
1319             / DHCP(
1320                 options=[
1321                     ("message-type", "offer"),
1322                     ("server_id", self.pg3.remote_ip4),
1323                     "end",
1324                 ]
1325             )
1326         )
1327
1328         self.pg3.add_stream(p_offer)
1329         self.pg_enable_capture(self.pg_interfaces)
1330         self.pg_start()
1331
1332         rx = self.pg3.get_capture(1)
1333         self.verify_orig_dhcp_request(rx[0], self.pg3, hostname, self.pg3.local_ip4)
1334
1335         #
1336         # Send an acknowledgment
1337         #
1338         p_ack = (
1339             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1340             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1341             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1342             / BOOTP(
1343                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1344             )
1345             / DHCP(
1346                 options=[
1347                     ("message-type", "ack"),
1348                     ("subnet_mask", "255.255.255.0"),
1349                     ("router", self.pg3.remote_ip4),
1350                     ("server_id", self.pg3.remote_ip4),
1351                     ("lease_time", 43200),
1352                     "end",
1353                 ]
1354             )
1355         )
1356
1357         self.pg3.add_stream(p_ack)
1358         self.pg_enable_capture(self.pg_interfaces)
1359         self.pg_start()
1360
1361         #
1362         # We'll get an ARP request for the router address
1363         #
1364         rx = self.pg3.get_capture(1)
1365
1366         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1367         self.pg_enable_capture(self.pg_interfaces)
1368
1369         #
1370         # At the end of this procedure there should be a connected route
1371         # in the FIB
1372         #
1373         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1374         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1375
1376         #
1377         # remove the DHCP config
1378         #
1379         Client.remove_vpp_config()
1380
1381         #
1382         # and now the route should be gone
1383         #
1384         self.assertFalse(find_route(self, self.pg3.local_ip4, 32))
1385         self.assertFalse(find_route(self, self.pg3.local_ip4, 24))
1386
1387         #
1388         # Start the procedure again. this time have VPP send the client-ID
1389         # and set the DSCP value
1390         #
1391         self.pg3.admin_down()
1392         self.sleep(1)
1393         self.pg3.admin_up()
1394         Client.set_client(
1395             self.pg3.sw_if_index,
1396             hostname,
1397             id=self.pg3.local_mac,
1398             dscp=vdscp.IP_API_DSCP_EF,
1399         )
1400         Client.add_vpp_config()
1401
1402         rx = self.pg3.get_capture(1)
1403
1404         self.verify_orig_dhcp_discover(
1405             rx[0], self.pg3, hostname, self.pg3.local_mac, dscp=vdscp.IP_API_DSCP_EF
1406         )
1407
1408         # TODO: VPP DHCP client should not accept DHCP OFFER message with
1409         # the XID (Transaction ID) not matching the XID of the most recent
1410         # DHCP DISCOVERY message.
1411         # Such DHCP OFFER message must be silently discarded - RFC2131.
1412         # Reported in Jira ticket: VPP-99
1413         self.pg3.add_stream(p_offer)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416
1417         rx = self.pg3.get_capture(1)
1418         self.verify_orig_dhcp_request(
1419             rx[0], self.pg3, hostname, self.pg3.local_ip4, dscp=vdscp.IP_API_DSCP_EF
1420         )
1421
1422         #
1423         # unicast the ack to the offered address
1424         #
1425         p_ack = (
1426             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1427             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1428             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1429             / BOOTP(
1430                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1431             )
1432             / DHCP(
1433                 options=[
1434                     ("message-type", "ack"),
1435                     ("subnet_mask", "255.255.255.0"),
1436                     ("router", self.pg3.remote_ip4),
1437                     ("server_id", self.pg3.remote_ip4),
1438                     ("lease_time", 43200),
1439                     "end",
1440                 ]
1441             )
1442         )
1443
1444         self.pg3.add_stream(p_ack)
1445         self.pg_enable_capture(self.pg_interfaces)
1446         self.pg_start()
1447
1448         #
1449         # We'll get an ARP request for the router address
1450         #
1451         rx = self.pg3.get_capture(1)
1452
1453         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1454         self.pg_enable_capture(self.pg_interfaces)
1455
1456         #
1457         # At the end of this procedure there should be a connected route
1458         # in the FIB
1459         #
1460         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1461         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1462
1463         #
1464         # remove the DHCP config
1465         #
1466         Client.remove_vpp_config()
1467
1468         self.assertFalse(find_route(self, self.pg3.local_ip4, 32))
1469         self.assertFalse(find_route(self, self.pg3.local_ip4, 24))
1470
1471         #
1472         # Rince and repeat, this time with VPP configured not to set
1473         # the braodcast flag in the discover and request messages,
1474         # and for the server to unicast the responses.
1475         #
1476         # Configure DHCP client on PG3 and capture the discover sent
1477         #
1478         Client.set_client(self.pg3.sw_if_index, hostname, set_broadcast_flag=False)
1479         Client.add_vpp_config()
1480
1481         rx = self.pg3.get_capture(1)
1482
1483         self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname, broadcast=False)
1484
1485         #
1486         # Send back on offer, unicasted to the offered address.
1487         # Expect the request.
1488         #
1489         p_offer = (
1490             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1491             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1492             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1493             / BOOTP(
1494                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1495             )
1496             / DHCP(
1497                 options=[
1498                     ("message-type", "offer"),
1499                     ("server_id", self.pg3.remote_ip4),
1500                     "end",
1501                 ]
1502             )
1503         )
1504
1505         self.pg3.add_stream(p_offer)
1506         self.pg_enable_capture(self.pg_interfaces)
1507         self.pg_start()
1508
1509         rx = self.pg3.get_capture(1)
1510         self.verify_orig_dhcp_request(
1511             rx[0], self.pg3, hostname, self.pg3.local_ip4, broadcast=False
1512         )
1513
1514         #
1515         # Send an acknowledgment, the lease renewal time is 2 seconds
1516         # so we should expect the renew straight after
1517         #
1518         p_ack = (
1519             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1520             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1521             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1522             / BOOTP(
1523                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1524             )
1525             / DHCP(
1526                 options=[
1527                     ("message-type", "ack"),
1528                     ("subnet_mask", "255.255.255.0"),
1529                     ("router", self.pg3.remote_ip4),
1530                     ("server_id", self.pg3.remote_ip4),
1531                     ("lease_time", 43200),
1532                     ("renewal_time", 2),
1533                     "end",
1534                 ]
1535             )
1536         )
1537
1538         self.pg3.add_stream(p_ack)
1539         self.pg_enable_capture(self.pg_interfaces)
1540         self.pg_start()
1541
1542         #
1543         # We'll get an ARP request for the router address
1544         #
1545         rx = self.pg3.get_capture(1)
1546
1547         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1548         self.pg_enable_capture(self.pg_interfaces)
1549
1550         #
1551         # At the end of this procedure there should be a connected route
1552         # in the FIB
1553         #
1554         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1555         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1556
1557         #
1558         # read the DHCP client details from a dump
1559         #
1560         clients = self.vapi.dhcp_client_dump()
1561
1562         self.assertEqual(clients[0].client.sw_if_index, self.pg3.sw_if_index)
1563         self.assertEqual(clients[0].lease.sw_if_index, self.pg3.sw_if_index)
1564         self.assertEqual(clients[0].client.hostname, hostname)
1565         self.assertEqual(clients[0].lease.hostname, hostname)
1566         # 0 = DISCOVER, 1 = REQUEST, 2 = BOUND
1567         self.assertEqual(clients[0].lease.state, 2)
1568         self.assertEqual(clients[0].lease.mask_width, 24)
1569         self.assertEqual(str(clients[0].lease.router_address), self.pg3.remote_ip4)
1570         self.assertEqual(str(clients[0].lease.host_address), self.pg3.local_ip4)
1571
1572         #
1573         # wait for the unicasted renewal
1574         #  the first attempt will be an ARP packet, since we have not yet
1575         #  responded to VPP's request
1576         #
1577         self.logger.info(self.vapi.cli("sh dhcp client intfc pg3 verbose"))
1578         rx = self.pg3.get_capture(1, timeout=10)
1579
1580         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1581
1582         # respond to the arp
1583         p_arp = Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / ARP(
1584             op="is-at",
1585             hwdst=self.pg3.local_mac,
1586             hwsrc=self.pg3.remote_mac,
1587             pdst=self.pg3.local_ip4,
1588             psrc=self.pg3.remote_ip4,
1589         )
1590         self.pg3.add_stream(p_arp)
1591         self.pg_enable_capture(self.pg_interfaces)
1592         self.pg_start()
1593
1594         # the next packet is the unicasted renewal
1595         rx = self.pg3.get_capture(1, timeout=10)
1596         self.verify_orig_dhcp_request(
1597             rx[0], self.pg3, hostname, self.pg3.local_ip4, l2_bc=False, broadcast=False
1598         )
1599
1600         # send an ACK with different data from the original offer *
1601         self.pg3.generate_remote_hosts(4)
1602         p_ack = (
1603             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1604             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
1605             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1606             / BOOTP(
1607                 op=1,
1608                 yiaddr=self.pg3.remote_hosts[3].ip4,
1609                 chaddr=mac_pton(self.pg3.local_mac),
1610             )
1611             / DHCP(
1612                 options=[
1613                     ("message-type", "ack"),
1614                     ("subnet_mask", "255.255.255.0"),
1615                     ("router", self.pg3.remote_hosts[1].ip4),
1616                     ("server_id", self.pg3.remote_hosts[2].ip4),
1617                     ("lease_time", 43200),
1618                     ("renewal_time", 2),
1619                     "end",
1620                 ]
1621             )
1622         )
1623
1624         self.pg3.add_stream(p_ack)
1625         self.pg_enable_capture(self.pg_interfaces)
1626         self.pg_start()
1627
1628         #
1629         # read the DHCP client details from a dump
1630         #
1631         clients = self.vapi.dhcp_client_dump()
1632
1633         self.assertEqual(clients[0].client.sw_if_index, self.pg3.sw_if_index)
1634         self.assertEqual(clients[0].lease.sw_if_index, self.pg3.sw_if_index)
1635         self.assertEqual(clients[0].client.hostname, hostname)
1636         self.assertEqual(clients[0].lease.hostname, hostname)
1637         # 0 = DISCOVER, 1 = REQUEST, 2 = BOUND
1638         self.assertEqual(clients[0].lease.state, 2)
1639         self.assertEqual(clients[0].lease.mask_width, 24)
1640         self.assertEqual(
1641             str(clients[0].lease.router_address), self.pg3.remote_hosts[1].ip4
1642         )
1643         self.assertEqual(
1644             str(clients[0].lease.host_address), self.pg3.remote_hosts[3].ip4
1645         )
1646
1647         #
1648         # remove the DHCP config
1649         #
1650         Client.remove_vpp_config()
1651
1652         #
1653         # and now the route should be gone
1654         #
1655         self.assertFalse(find_route(self, self.pg3.local_ip4, 32))
1656         self.assertFalse(find_route(self, self.pg3.local_ip4, 24))
1657
1658         #
1659         # Start the procedure again. Use requested lease time option.
1660         # this time wait for the lease to expire and the client to
1661         # self-destruct
1662         #
1663         hostname += "-2"
1664         self.pg3.admin_down()
1665         self.sleep(1)
1666         self.pg3.admin_up()
1667         self.pg_enable_capture(self.pg_interfaces)
1668         Client.set_client(self.pg3.sw_if_index, hostname)
1669         Client.add_vpp_config()
1670
1671         rx = self.pg3.get_capture(1)
1672
1673         self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname)
1674
1675         #
1676         # Send back on offer with requested lease time, expect the request
1677         #
1678         lease_time = 1
1679         p_offer = (
1680             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1681             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1682             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1683             / BOOTP(
1684                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1685             )
1686             / DHCP(
1687                 options=[
1688                     ("message-type", "offer"),
1689                     ("server_id", self.pg3.remote_ip4),
1690                     ("lease_time", lease_time),
1691                     "end",
1692                 ]
1693             )
1694         )
1695
1696         self.pg3.add_stream(p_offer)
1697         self.pg_enable_capture(self.pg_interfaces)
1698         self.pg_start()
1699
1700         rx = self.pg3.get_capture(1)
1701         self.verify_orig_dhcp_request(rx[0], self.pg3, hostname, self.pg3.local_ip4)
1702
1703         #
1704         # Send an acknowledgment
1705         #
1706         p_ack = (
1707             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
1708             / IP(src=self.pg3.remote_ip4, dst="255.255.255.255")
1709             / UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT)
1710             / BOOTP(
1711                 op=1, yiaddr=self.pg3.local_ip4, chaddr=mac_pton(self.pg3.local_mac)
1712             )
1713             / DHCP(
1714                 options=[
1715                     ("message-type", "ack"),
1716                     ("subnet_mask", "255.255.255.0"),
1717                     ("router", self.pg3.remote_ip4),
1718                     ("server_id", self.pg3.remote_ip4),
1719                     ("lease_time", lease_time),
1720                     "end",
1721                 ]
1722             )
1723         )
1724
1725         self.pg3.add_stream(p_ack)
1726         self.pg_enable_capture(self.pg_interfaces)
1727         self.pg_start()
1728
1729         #
1730         # We'll get an ARP request for the router address
1731         #
1732         rx = self.pg3.get_capture(1)
1733
1734         self.assertEqual(rx[0][ARP].pdst, self.pg3.remote_ip4)
1735
1736         #
1737         # At the end of this procedure there should be a connected route
1738         # in the FIB
1739         #
1740         self.assertTrue(find_route(self, self.pg3.local_ip4, 32))
1741         self.assertTrue(find_route(self, self.pg3.local_ip4, 24))
1742
1743         #
1744         # the route should be gone after the lease expires
1745         #
1746         self.assertTrue(self.wait_for_no_route(self.pg3.local_ip4, 32))
1747         self.assertTrue(self.wait_for_no_route(self.pg3.local_ip4, 24))
1748
1749         #
1750         # remove the DHCP config
1751         #
1752         Client.remove_vpp_config()
1753
1754     def test_dhcp_client_vlan(self):
1755         """DHCP Client w/ VLAN"""
1756
1757         vdscp = VppEnum.vl_api_ip_dscp_t
1758         vqos = VppEnum.vl_api_qos_source_t
1759         hostname = "universal-dp"
1760
1761         self.pg_enable_capture(self.pg_interfaces)
1762
1763         vlan_100 = VppDot1QSubint(self, self.pg3, 100)
1764         vlan_100.admin_up()
1765
1766         output = [scapy.compat.chb(4)] * 256
1767         os = b"".join(output)
1768         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
1769
1770         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
1771         qm1 = VppQosMark(
1772             self, vlan_100, qem1, vqos.QOS_API_SOURCE_VLAN
1773         ).add_vpp_config()
1774
1775         #
1776         # Configure DHCP client on PG3 and capture the discover sent
1777         #
1778         Client = VppDHCPClient(
1779             self, vlan_100.sw_if_index, hostname, dscp=vdscp.IP_API_DSCP_EF
1780         )
1781         Client.add_vpp_config()
1782
1783         rx = self.pg3.get_capture(1)
1784
1785         self.assertEqual(rx[0][Dot1Q].vlan, 100)
1786         self.assertEqual(rx[0][Dot1Q].prio, 2)
1787
1788         self.verify_orig_dhcp_discover(
1789             rx[0], self.pg3, hostname, dscp=vdscp.IP_API_DSCP_EF
1790         )
1791
1792
1793 if __name__ == "__main__":
1794     unittest.main(testRunner=VppTestRunner)