5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7 from ipaddress import *
10 from scapy.contrib.mpls import MPLS
11 from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.dns import DNSRR, DNS, DNSQR
17 class TestDns(VppTestCase):
18 """ Dns Test Cases """
22 super(TestDns, cls).setUpClass()
25 def tearDownClass(cls):
26 super(TestDns, cls).tearDownClass()
29 super(TestDns, self).setUp()
31 self.create_pg_interfaces(range(1))
33 for i in self.pg_interfaces:
39 super(TestDns, self).tearDown()
41 def create_stream(self, src_if):
42 """Create input packet stream for defined interface.
44 :param VppInterface src_if: Interface to create packet stream for.
46 good_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
47 IP(src=src_if.remote_ip4) /
48 UDP(sport=1234, dport=53) /
49 DNS(rd=1, qd=DNSQR(qname="bozo.clown.org")))
51 bad_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
52 IP(src=src_if.remote_ip4) /
53 UDP(sport=1234, dport=53) /
54 DNS(rd=1, qd=DNSQR(qname="no.clown.org")))
55 pkts = [good_request, bad_request]
58 def verify_capture(self, dst_if, capture):
59 """Verify captured input packet stream for defined interface.
61 :param VppInterface dst_if: Interface to verify captured packet stream
63 :param list capture: Captured packet stream.
65 self.logger.info("Verifying capture on interface %s" % dst_if.name)
66 for packet in capture:
68 self.assertEqual(dns.an[0].rdata, '1.2.3.4')
70 def test_dns_unittest(self):
71 """ DNS Name Resolver Basic Functional Test """
73 # Set up an upstream name resolver. We won't actually go there
74 self.vapi.dns_name_server_add_del(
75 is_ip6=0, is_add=1, server_address=IPv4Address(u'8.8.8.8').packed)
77 # Enable name resolution
78 self.vapi.dns_enable_disable(enable=1)
80 # Manually add a static dns cache entry
81 self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
84 rv = self.vapi.dns_resolve_name(name=b'bozo.clown.org')
85 self.assertEqual(rv.ip4_address, IPv4Address(u'1.2.3.4').packed)
87 # Configure 127.0.0.1/8 on the pg interface
88 self.vapi.sw_interface_add_del_address(
89 sw_if_index=self.pg0.sw_if_index,
92 # Send a couple of DNS request packets, one for bozo.clown.org
93 # and one for no.clown.org which won't resolve
95 pkts = self.create_stream(self.pg0)
96 self.pg0.add_stream(pkts)
97 self.pg_enable_capture(self.pg_interfaces)
100 pkts = self.pg0.get_capture(1)
101 self.verify_capture(self.pg0, pkts)
103 # Make sure that the cache contents are correct
104 str = self.vapi.cli("show dns cache verbose")
105 self.assertIn('1.2.3.4', str)
106 self.assertIn('[P] no.clown.org:', str)
108 if __name__ == '__main__':
109 unittest.main(testRunner=VppTestRunner)