5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7 from ipaddress import *
10 from scapy.contrib.mpls import MPLS
11 from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.dns import DNSRR, DNS, DNSQR
17 class TestDns(VppTestCase):
22 super(TestDns, cls).setUpClass()
25 def tearDownClass(cls):
26 super(TestDns, cls).tearDownClass()
29 super(TestDns, self).setUp()
31 self.create_pg_interfaces(range(1))
33 for i in self.pg_interfaces:
39 super(TestDns, self).tearDown()
41 def create_stream(self, src_if):
42 """Create input packet stream for defined interface.
44 :param VppInterface src_if: Interface to create packet stream for.
47 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
48 / IP(src=src_if.remote_ip4)
49 / UDP(sport=1234, dport=53)
50 / DNS(rd=1, qd=DNSQR(qname="bozo.clown.org"))
54 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
55 / IP(src=src_if.remote_ip4)
56 / UDP(sport=1234, dport=53)
57 / DNS(rd=1, qd=DNSQR(qname="no.clown.org"))
59 pkts = [good_request, bad_request]
62 def verify_capture(self, dst_if, capture):
63 """Verify captured input packet stream for defined interface.
65 :param VppInterface dst_if: Interface to verify captured packet stream
67 :param list capture: Captured packet stream.
69 self.logger.info("Verifying capture on interface %s" % dst_if.name)
70 for packet in capture:
72 self.assertEqual(dns.an[0].rdata, "1.2.3.4")
74 def test_dns_unittest(self):
75 """DNS Name Resolver Basic Functional Test"""
77 # Set up an upstream name resolver. We won't actually go there
78 self.vapi.dns_name_server_add_del(
79 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
82 # Enable name resolution
83 self.vapi.dns_enable_disable(enable=1)
85 # Manually add a static dns cache entry
86 self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
89 rv = self.vapi.dns_resolve_name(name=b"bozo.clown.org")
90 self.assertEqual(rv.ip4_address, IPv4Address("1.2.3.4").packed)
92 # Configure 127.0.0.1/8 on the pg interface
93 self.vapi.sw_interface_add_del_address(
94 sw_if_index=self.pg0.sw_if_index, prefix="127.0.0.1/8"
97 # Send a couple of DNS request packets, one for bozo.clown.org
98 # and one for no.clown.org which won't resolve
100 pkts = self.create_stream(self.pg0)
101 self.pg0.add_stream(pkts)
102 self.pg_enable_capture(self.pg_interfaces)
105 pkts = self.pg0.get_capture(1)
106 self.verify_capture(self.pg0, pkts)
108 # Make sure that the cache contents are correct
109 str = self.vapi.cli("show dns cache verbose")
110 self.assertIn("1.2.3.4", str)
111 self.assertIn("[P] no.clown.org:", str)
114 if __name__ == "__main__":
115 unittest.main(testRunner=VppTestRunner)