tests: replace pycodestyle with black
[vpp.git] / test / test_dns.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7 from ipaddress import *
8
9 import scapy.compat
10 from scapy.contrib.mpls import MPLS
11 from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.dns import DNSRR, DNS, DNSQR
15
16
17 class TestDns(VppTestCase):
18     """Dns Test Cases"""
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestDns, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestDns, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestDns, self).setUp()
30
31         self.create_pg_interfaces(range(1))
32
33         for i in self.pg_interfaces:
34             i.admin_up()
35             i.config_ip4()
36             i.resolve_arp()
37
38     def tearDown(self):
39         super(TestDns, self).tearDown()
40
41     def create_stream(self, src_if):
42         """Create input packet stream for defined interface.
43
44         :param VppInterface src_if: Interface to create packet stream for.
45         """
46         good_request = (
47             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
48             / IP(src=src_if.remote_ip4)
49             / UDP(sport=1234, dport=53)
50             / DNS(rd=1, qd=DNSQR(qname="bozo.clown.org"))
51         )
52
53         bad_request = (
54             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
55             / IP(src=src_if.remote_ip4)
56             / UDP(sport=1234, dport=53)
57             / DNS(rd=1, qd=DNSQR(qname="no.clown.org"))
58         )
59         pkts = [good_request, bad_request]
60         return pkts
61
62     def verify_capture(self, dst_if, capture):
63         """Verify captured input packet stream for defined interface.
64
65         :param VppInterface dst_if: Interface to verify captured packet stream
66             for.
67         :param list capture: Captured packet stream.
68         """
69         self.logger.info("Verifying capture on interface %s" % dst_if.name)
70         for packet in capture:
71             dns = packet[DNS]
72             self.assertEqual(dns.an[0].rdata, "1.2.3.4")
73
74     def test_dns_unittest(self):
75         """DNS Name Resolver Basic Functional Test"""
76
77         # Set up an upstream name resolver. We won't actually go there
78         self.vapi.dns_name_server_add_del(
79             is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
80         )
81
82         # Enable name resolution
83         self.vapi.dns_enable_disable(enable=1)
84
85         # Manually add a static dns cache entry
86         self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
87
88         # Test the binary API
89         rv = self.vapi.dns_resolve_name(name=b"bozo.clown.org")
90         self.assertEqual(rv.ip4_address, IPv4Address("1.2.3.4").packed)
91
92         # Configure 127.0.0.1/8 on the pg interface
93         self.vapi.sw_interface_add_del_address(
94             sw_if_index=self.pg0.sw_if_index, prefix="127.0.0.1/8"
95         )
96
97         # Send a couple of DNS request packets, one for bozo.clown.org
98         # and one for no.clown.org which won't resolve
99
100         pkts = self.create_stream(self.pg0)
101         self.pg0.add_stream(pkts)
102         self.pg_enable_capture(self.pg_interfaces)
103
104         self.pg_start()
105         pkts = self.pg0.get_capture(1)
106         self.verify_capture(self.pg0, pkts)
107
108         # Make sure that the cache contents are correct
109         str = self.vapi.cli("show dns cache verbose")
110         self.assertIn("1.2.3.4", str)
111         self.assertIn("[P] no.clown.org:", str)
112
113
114 if __name__ == "__main__":
115     unittest.main(testRunner=VppTestRunner)