tests: Remove the unrequired VPP IP address/prefix class wrappers
[vpp.git] / src / plugins / dns / test / test_dns.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7 from ipaddress import *
8
9 import scapy.compat
10 from scapy.contrib.mpls import MPLS
11 from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.dns import DNSRR, DNS, DNSQR
15
16
17 class TestDns(VppTestCase):
18     """ Dns Test Cases """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestDns, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestDns, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestDns, self).setUp()
30
31         self.create_pg_interfaces(range(1))
32
33         for i in self.pg_interfaces:
34             i.admin_up()
35             i.config_ip4()
36             i.resolve_arp()
37
38     def tearDown(self):
39         super(TestDns, self).tearDown()
40
41     def create_stream(self, src_if):
42         """Create input packet stream for defined interface.
43
44         :param VppInterface src_if: Interface to create packet stream for.
45         """
46         good_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
47                         IP(src=src_if.remote_ip4) /
48                         UDP(sport=1234, dport=53) /
49                         DNS(rd=1, qd=DNSQR(qname="bozo.clown.org")))
50
51         bad_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
52                        IP(src=src_if.remote_ip4) /
53                        UDP(sport=1234, dport=53) /
54                        DNS(rd=1, qd=DNSQR(qname="no.clown.org")))
55         pkts = [good_request, bad_request]
56         return pkts
57
58     def verify_capture(self, dst_if, capture):
59         """Verify captured input packet stream for defined interface.
60
61         :param VppInterface dst_if: Interface to verify captured packet stream
62             for.
63         :param list capture: Captured packet stream.
64         """
65         self.logger.info("Verifying capture on interface %s" % dst_if.name)
66         for packet in capture:
67             dns = packet[DNS]
68             self.assertEqual(dns.an[0].rdata, '1.2.3.4')
69
70     def test_dns_unittest(self):
71         """ DNS Name Resolver Basic Functional Test """
72
73         # Set up an upstream name resolver. We won't actually go there
74         self.vapi.dns_name_server_add_del(
75             is_ip6=0, is_add=1, server_address=IPv4Address(u'8.8.8.8').packed)
76
77         # Enable name resolution
78         self.vapi.dns_enable_disable(enable=1)
79
80         # Manually add a static dns cache entry
81         self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
82
83         # Test the binary API
84         rv = self.vapi.dns_resolve_name(name=b'bozo.clown.org')
85         self.assertEqual(rv.ip4_address, IPv4Address(u'1.2.3.4').packed)
86
87         # Configure 127.0.0.1/8 on the pg interface
88         self.vapi.sw_interface_add_del_address(
89             sw_if_index=self.pg0.sw_if_index,
90             prefix="127.0.0.1/8")
91
92         # Send a couple of DNS request packets, one for bozo.clown.org
93         # and one for no.clown.org which won't resolve
94
95         pkts = self.create_stream(self.pg0)
96         self.pg0.add_stream(pkts)
97         self.pg_enable_capture(self.pg_interfaces)
98
99         self.pg_start()
100         pkts = self.pg0.get_capture(1)
101         self.verify_capture(self.pg0, pkts)
102
103         # Make sure that the cache contents are correct
104         str = self.vapi.cli("show dns cache verbose")
105         self.assertIn('1.2.3.4', str)
106         self.assertIn('[P] no.clown.org:', str)
107
108 if __name__ == '__main__':
109     unittest.main(testRunner=VppTestRunner)