from framework import VppTestCase, VppTestRunner
from vpp_neighbor import VppNeighbor
-from vpp_ip_route import find_route
+from vpp_ip_route import find_route, VppIpTable
from util import mk_ll_addr
-from scapy.layers.l2 import Ether, getmacbyip
+from scapy.layers.l2 import Ether, getmacbyip, ARP
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, in6_getnsmac, in6_mactoifaceid
from scapy.layers.dhcp import DHCP, BOOTP, DHCPTypes
from socket import AF_INET, AF_INET6
from scapy.utils import inet_pton, inet_ntop
from scapy.utils6 import in6_ptop
+from util import mactobinary
DHCP4_CLIENT_PORT = 68
DHCP4_SERVER_PORT = 67
# create 3 pg interfaces
self.create_pg_interfaces(range(4))
+ self.tables = []
# pg0 and 1 are IP configured in VRF 0 and 1.
# pg2 and 3 are non IP-configured in VRF 0 and 1
+ table_id = 0
+ for table_id in range(1, 4):
+ tbl4 = VppIpTable(self, table_id)
+ tbl4.add_vpp_config()
+ self.tables.append(tbl4)
+ tbl6 = VppIpTable(self, table_id, is_ip6=1)
+ tbl6.add_vpp_config()
+ self.tables.append(tbl6)
+
table_id = 0
for i in self.pg_interfaces[:2]:
i.admin_up()
table_id += 1
def tearDown(self):
- super(TestDHCP, self).tearDown()
- for i in self.pg_interfaces:
+ for i in self.pg_interfaces[:2]:
i.unconfig_ip4()
i.unconfig_ip6()
+
+ for i in self.pg_interfaces:
+ i.set_table_ip4(0)
+ i.set_table_ip6(0)
i.admin_down()
+ super(TestDHCP, self).tearDown()
def send_and_assert_no_replies(self, intf, pkts, remark):
intf.add_stream(pkts)
self.assertEqual(udp.dport, DHCP4_SERVER_PORT)
self.assertEqual(udp.sport, DHCP4_CLIENT_PORT)
- def verify_orig_dhcp_discover(self, pkt, intf, hostname):
+ def verify_orig_dhcp_discover(self, pkt, intf, hostname, client_id=None):
self.verify_orig_dhcp_pkt(pkt, intf)
self.verify_dhcp_msg_type(pkt, "discover")
self.verify_dhcp_has_option(pkt, "hostname", hostname)
+ if client_id:
+ self.verify_dhcp_has_option(pkt, "client_id", client_id)
+ bootp = pkt[BOOTP]
+ self.assertEqual(bootp.ciaddr, "0.0.0.0")
+ self.assertEqual(bootp.giaddr, "0.0.0.0")
+ self.assertEqual(bootp.flags, 0x8000)
def verify_orig_dhcp_request(self, pkt, intf, hostname, ip):
self.verify_orig_dhcp_pkt(pkt, intf)
self.verify_dhcp_msg_type(pkt, "request")
self.verify_dhcp_has_option(pkt, "hostname", hostname)
self.verify_dhcp_has_option(pkt, "requested_addr", ip)
+ bootp = pkt[BOOTP]
+ self.assertEqual(bootp.ciaddr, "0.0.0.0")
+ self.assertEqual(bootp.giaddr, "0.0.0.0")
+ self.assertEqual(bootp.flags, 0x8000)
def verify_relayed_dhcp_discover(self, pkt, intf, src_intf=None,
fib_id=0, oui=0,
"DHCP cleanup VRF 0")
self.send_and_assert_no_replies(self.pg3, pkts_disc_vrf1,
"DHCP cleanup VRF 1")
+ self.pg2.unconfig_ip4()
+ self.pg3.unconfig_ip4()
def test_dhcp6_proxy(self):
""" DHCPv6 Proxy"""
server_table_id=0,
is_ipv6=1,
is_add=0)
+ self.pg2.unconfig_ip6()
+ self.pg3.unconfig_ip6()
def test_dhcp_client(self):
""" DHCP Client"""
#
# Sned back on offer, expect the request
#
- p = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
- IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
- UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
- BOOTP(op=1,
- yiaddr=self.pg2.local_ip4) /
- DHCP(options=[('message-type', 'offer'), ('end')]))
+ p_offer = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+ IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
+ UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
+ BOOTP(op=1, yiaddr=self.pg2.local_ip4) /
+ DHCP(options=[('message-type', 'offer'),
+ ('server_id', self.pg2.remote_ip4),
+ ('end')]))
- self.pg2.add_stream(p)
+ self.pg2.add_stream(p_offer)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
#
# Send an acknowloedgement
#
- p = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
- IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
- UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
- BOOTP(op=1,
- yiaddr=self.pg2.local_ip4) /
- DHCP(options=[('message-type', 'ack'),
- ('subnet_mask', "255.255.255.0"),
- ('router', self.pg2.remote_ip4),
- ('server_id', self.pg2.remote_ip4),
- ('lease_time', 43200),
- ('end')]))
-
- self.pg2.add_stream(p)
+ p_ack = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+ IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
+ UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
+ BOOTP(op=1, yiaddr=self.pg2.local_ip4) /
+ DHCP(options=[('message-type', 'ack'),
+ ('subnet_mask', "255.255.255.0"),
+ ('router', self.pg2.remote_ip4),
+ ('server_id', self.pg2.remote_ip4),
+ ('lease_time', 43200),
+ ('end')]))
+
+ self.pg2.add_stream(p_ack)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+ #
+ # We'll get an ARP request for the router address
+ #
+ rx = self.pg2.get_capture(1)
+
+ self.assertEqual(rx[0][ARP].pdst, self.pg2.remote_ip4)
+ self.pg_enable_capture(self.pg_interfaces)
+
#
# At the end of this procedure there should be a connected route
# in the FIB
#
+ self.assertTrue(find_route(self, self.pg2.local_ip4, 24))
self.assertTrue(find_route(self, self.pg2.local_ip4, 32))
+ # remove the left over ARP entry
+ self.vapi.ip_neighbor_add_del(self.pg2.sw_if_index,
+ mactobinary(self.pg2.remote_mac),
+ self.pg2.remote_ip4,
+ is_add=0)
#
# remove the DHCP config
#
# and now the route should be gone
#
self.assertFalse(find_route(self, self.pg2.local_ip4, 32))
+ self.assertFalse(find_route(self, self.pg2.local_ip4, 24))
+
+ #
+ # Start the procedure again. this time have VPP send the client-ID
+ #
+ self.pg2.admin_down()
+ self.sleep(1)
+ self.pg2.admin_up()
+ self.vapi.dhcp_client(self.pg2.sw_if_index, hostname,
+ client_id=self.pg2.local_mac)
+
+ rx = self.pg2.get_capture(1)
+
+ self.verify_orig_dhcp_discover(rx[0], self.pg2, hostname,
+ self.pg2.local_mac)
+
+ self.pg2.add_stream(p_offer)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_orig_dhcp_request(rx[0], self.pg2, hostname,
+ self.pg2.local_ip4)
+
+ #
+ # unicast the ack to the offered address
+ #
+ p_ack = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+ IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
+ UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
+ BOOTP(op=1, yiaddr=self.pg2.local_ip4) /
+ DHCP(options=[('message-type', 'ack'),
+ ('subnet_mask', "255.255.255.0"),
+ ('router', self.pg2.remote_ip4),
+ ('server_id', self.pg2.remote_ip4),
+ ('lease_time', 43200),
+ ('end')]))
+
+ self.pg2.add_stream(p_ack)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ #
+ # At the end of this procedure there should be a connected route
+ # in the FIB
+ #
+ self.assertTrue(find_route(self, self.pg2.local_ip4, 32))
+ self.assertTrue(find_route(self, self.pg2.local_ip4, 24))
+
+ #
+ # remove the DHCP config
+ #
+ self.vapi.dhcp_client(self.pg2.sw_if_index, hostname, is_add=0)
+
+ self.assertFalse(find_route(self, self.pg2.local_ip4, 32))
+ self.assertFalse(find_route(self, self.pg2.local_ip4, 24))
if __name__ == '__main__':