misc: Fix python scripts shebang line
[vpp.git] / src / plugins / dns / test / test_dns.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7 from vpp_ip import VppIpPrefix
8 from ipaddress import *
9
10 import scapy.compat
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
13 from scapy.layers.l2 import Ether
14 from scapy.packet import Raw
15 from scapy.layers.dns import DNSRR, DNS, DNSQR
16
17
18 class TestDns(VppTestCase):
19     """ Dns Test Cases """
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestDns, cls).setUpClass()
24
25     @classmethod
26     def tearDownClass(cls):
27         super(TestDns, cls).tearDownClass()
28
29     def setUp(self):
30         super(TestDns, self).setUp()
31
32         self.create_pg_interfaces(range(1))
33
34         for i in self.pg_interfaces:
35             i.admin_up()
36             i.config_ip4()
37             i.resolve_arp()
38
39     def tearDown(self):
40         super(TestDns, self).tearDown()
41
42     def create_stream(self, src_if):
43         """Create input packet stream for defined interface.
44
45         :param VppInterface src_if: Interface to create packet stream for.
46         """
47         good_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
48                         IP(src=src_if.remote_ip4) /
49                         UDP(sport=1234, dport=53) /
50                         DNS(rd=1, qd=DNSQR(qname="bozo.clown.org")))
51
52         bad_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
53                        IP(src=src_if.remote_ip4) /
54                        UDP(sport=1234, dport=53) /
55                        DNS(rd=1, qd=DNSQR(qname="no.clown.org")))
56         pkts = [good_request, bad_request]
57         return pkts
58
59     def verify_capture(self, dst_if, capture):
60         """Verify captured input packet stream for defined interface.
61
62         :param VppInterface dst_if: Interface to verify captured packet stream
63             for.
64         :param list capture: Captured packet stream.
65         """
66         self.logger.info("Verifying capture on interface %s" % dst_if.name)
67         for packet in capture:
68             dns = packet[DNS]
69             self.assertEqual(dns.an[0].rdata, '1.2.3.4')
70
71     def test_dns_unittest(self):
72         """ DNS Name Resolver Basic Functional Test """
73
74         # Set up an upstream name resolver. We won't actually go there
75         self.vapi.dns_name_server_add_del(
76             is_ip6=0, is_add=1, server_address=IPv4Address(u'8.8.8.8').packed)
77
78         # Enable name resolution
79         self.vapi.dns_enable_disable(enable=1)
80
81         # Manually add a static dns cache entry
82         self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
83
84         # Test the binary API
85         rv = self.vapi.dns_resolve_name(name=b'bozo.clown.org')
86         self.assertEqual(rv.ip4_address, IPv4Address(u'1.2.3.4').packed)
87
88         # Configure 127.0.0.1/8 on the pg interface
89         self.vapi.sw_interface_add_del_address(
90             sw_if_index=self.pg0.sw_if_index,
91             prefix=VppIpPrefix("127.0.0.1", 8).encode())
92
93         # Send a couple of DNS request packets, one for bozo.clown.org
94         # and one for no.clown.org which won't resolve
95
96         pkts = self.create_stream(self.pg0)
97         self.pg0.add_stream(pkts)
98         self.pg_enable_capture(self.pg_interfaces)
99
100         self.pg_start()
101         pkts = self.pg0.get_capture(1)
102         self.verify_capture(self.pg0, pkts)
103
104         # Make sure that the cache contents are correct
105         str = self.vapi.cli("show dns cache verbose")
106         self.assertIn('1.2.3.4', str)
107         self.assertIn('[P] no.clown.org:', str)
108
109 if __name__ == '__main__':
110     unittest.main(testRunner=VppTestRunner)