5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7 from vpp_ip import VppIpPrefix
8 from ipaddress import *
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
13 from scapy.layers.l2 import Ether
14 from scapy.packet import Raw
15 from scapy.layers.dns import DNSRR, DNS, DNSQR
18 class TestDns(VppTestCase):
19 """ Dns Test Cases """
23 super(TestDns, cls).setUpClass()
26 def tearDownClass(cls):
27 super(TestDns, cls).tearDownClass()
30 super(TestDns, self).setUp()
32 self.create_pg_interfaces(range(1))
34 for i in self.pg_interfaces:
40 super(TestDns, self).tearDown()
42 def create_stream(self, src_if):
43 """Create input packet stream for defined interface.
45 :param VppInterface src_if: Interface to create packet stream for.
47 good_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
48 IP(src=src_if.remote_ip4) /
49 UDP(sport=1234, dport=53) /
50 DNS(rd=1, qd=DNSQR(qname="bozo.clown.org")))
52 bad_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
53 IP(src=src_if.remote_ip4) /
54 UDP(sport=1234, dport=53) /
55 DNS(rd=1, qd=DNSQR(qname="no.clown.org")))
56 pkts = [good_request, bad_request]
59 def verify_capture(self, dst_if, capture):
60 """Verify captured input packet stream for defined interface.
62 :param VppInterface dst_if: Interface to verify captured packet stream
64 :param list capture: Captured packet stream.
66 self.logger.info("Verifying capture on interface %s" % dst_if.name)
67 for packet in capture:
69 self.assertEqual(dns.an[0].rdata, '1.2.3.4')
71 def test_dns_unittest(self):
72 """ DNS Name Resolver Basic Functional Test """
74 # Set up an upstream name resolver. We won't actually go there
75 self.vapi.dns_name_server_add_del(
76 is_ip6=0, is_add=1, server_address=IPv4Address(u'8.8.8.8').packed)
78 # Enable name resolution
79 self.vapi.dns_enable_disable(enable=1)
81 # Manually add a static dns cache entry
82 self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
85 rv = self.vapi.dns_resolve_name(name='bozo.clown.org')
86 self.assertEqual(rv.ip4_address, IPv4Address(u'1.2.3.4').packed)
88 # Configure 127.0.0.1/8 on the pg interface
89 self.vapi.sw_interface_add_del_address(
90 sw_if_index=self.pg0.sw_if_index,
91 prefix=VppIpPrefix("127.0.0.1", 8).encode())
93 # Send a couple of DNS request packets, one for bozo.clown.org
94 # and one for no.clown.org which won't resolve
96 pkts = self.create_stream(self.pg0)
97 self.pg0.add_stream(pkts)
98 self.pg_enable_capture(self.pg_interfaces)
101 pkts = self.pg0.get_capture(1)
102 self.verify_capture(self.pg0, pkts)
104 # Make sure that the cache contents are correct
105 str = self.vapi.cli("show dns cache verbose")
106 self.assertIn('1.2.3.4', str)
107 self.assertIn('[P] no.clown.org:', str)
109 if __name__ == '__main__':
110 unittest.main(testRunner=VppTestRunner)