MAP: Use explicit address/prefix types in API
[vpp.git] / test / test_cdp.py
1 #!/usr/bin/env python
2 """ CDP tests """
3
4 from scapy.packet import Packet
5 from scapy.all import ShortField, StrField
6 from scapy.layers.l2 import Dot3, LLC, SNAP
7 from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
8         CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR
9
10 from framework import VppTestCase
11 from scapy.all import raw
12 from re import compile
13 from time import sleep
14 from util import ppp
15 import platform
16
17
18 """ TestCDP is a subclass of  VPPTestCase classes.
19
20 CDP test.
21
22 """
23
24
25 class CustomTLV(Packet):
26     """ Custom TLV protocol layer for scapy """
27
28     fields_desc = [
29         ShortField("type", 0),
30         ShortField("length", 4),
31         StrField("value", "")
32
33     ]
34
35
36 class TestCDP(VppTestCase):
37     """ CDP Test Case """
38
39     nen_ptr = compile(r"not enabled")
40     cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
41     err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
42
43     @property
44     def device_id(self):
45         return platform.node()
46
47     @property
48     def version(self):
49         return platform.release()
50
51     @property
52     def port_id(self):
53         return self.interface.name
54
55     @property
56     def platform(self):
57         return platform.system()
58
59     @classmethod
60     def setUpClass(cls):
61         super(TestCDP, cls).setUpClass()
62         try:
63             cls.create_pg_interfaces(range(1))
64             cls.interface = cls.pg_interfaces[0]
65
66             cls.interface.admin_up()
67             cls.interface.config_ip4()
68             cls.interface.resolve_arp()
69
70         except Exception:
71             super(TestCDP, cls).tearDownClass()
72             raise
73
74     def test_enable_cdp(self):
75         self.logger.info(self.vapi.cli("cdp enable"))
76         ret = self.vapi.cli("show cdp")
77         self.logger.info(ret)
78         not_enabled = self.nen_ptr.search(ret)
79         self.assertFalse(not_enabled, "CDP isn't enabled")
80
81     def test_send_cdp_packet(self):
82         self.logger.info(self.vapi.cli("cdp enable"))
83         self.send_packet(self.create_packet())
84
85         neighbors = list(self.show_cdp())
86         self.assertTrue(neighbors, "CDP didn't register neighbor")
87
88         port, system = neighbors[0]
89
90         self.assert_equal(port, self.port_id, "CDP received invalid port id")
91         self.assert_equal(system, self.device_id,
92                           "CDP received invalid device id")
93
94     def test_cdp_underflow_tlv(self):
95         self.send_bad_packet(3, ".")
96
97     def test_cdp_overflow_tlv(self):
98         self.send_bad_packet(8, ".")
99
100     def send_bad_packet(self, l, v):
101         self.logger.info(self.vapi.cli("cdp enable"))
102         self.send_packet(self.create_bad_packet(l, v))
103
104         errors = list(self.show_errors())
105         self.assertTrue(errors)
106
107         expected_errors = False
108         for count, node, reason in errors:
109             if (node == u'cdp-input' and
110                     reason == u'cdp packets with bad TLVs' and
111                     int(count) >= 1):
112
113                 expected_errors = True
114                 break
115         self.assertTrue(expected_errors, "CDP didn't drop bad packet")
116
117     def send_packet(self, packet):
118         self.logger.debug(ppp("Sending packet:", packet))
119         self.interface.add_stream(packet)
120         self.pg_start()
121
122     def create_base_packet(self):
123         packet = (Dot3(src=self.interface.remote_mac,
124                        dst="01:00:0c:cc:cc:cc") /
125                   LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
126                   SNAP()/CDPv2_HDR())
127         return packet
128
129     def create_packet(self):
130         packet = (self.create_base_packet() /
131                   CDPMsgDeviceID(val=self.device_id) /
132                   CDPMsgSoftwareVersion(val=self.version) /
133                   CDPMsgPortID(iface=self.port_id) /
134                   CDPMsgPlatform(val=self.platform))
135         return packet
136
137     def create_bad_packet(self, tl=4, tv=""):
138         packet = (self.create_base_packet() /
139                   CustomTLV(type=1,
140                             length=tl,
141                             value=tv))
142         return packet
143
144     def process_cli(self, exp, ptr):
145         for line in self.vapi.cli(exp).split('\n')[1:]:
146             m = ptr.match(line.strip())
147             if m:
148                 yield m.groups()
149
150     def show_cdp(self):
151         for pack in self.process_cli("show cdp", self.cdp_ptr):
152             try:
153                 port, system, _, _ = pack
154             except ValueError:
155                 pass
156             else:
157                 yield port, system
158
159     def show_errors(self):
160         for pack in self.process_cli("show errors", self.err_ptr):
161             try:
162                 count, node, reason = pack
163             except ValueError:
164                 pass
165             else:
166                 yield count, node, reason