27f35c2937d0e32a2db32d7eb5f16d9574bf5145
[vpp.git] / src / plugins / cdp / test / test_cdp.py
1 #!/usr/bin/env python
2 """ CDP tests """
3
4 from scapy.packet import Packet
5 from scapy.all import ShortField, StrField
6 from scapy.layers.l2 import Dot3, LLC, SNAP
7 from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
8         CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR
9
10 from framework import VppTestCase
11 from scapy.all import raw
12 from re import compile
13 from time import sleep
14 from util import ppp
15 import platform
16 import sys
17 import unittest
18
19
20 """ TestCDP is a subclass of  VPPTestCase classes.
21
22 CDP test.
23
24 """
25
26
27 class CustomTLV(Packet):
28     """ Custom TLV protocol layer for scapy """
29
30     fields_desc = [
31         ShortField("type", 0),
32         ShortField("length", 4),
33         StrField("value", "")
34
35     ]
36
37
38 class TestCDP(VppTestCase):
39     """ CDP Test Case """
40
41     nen_ptr = compile(r"not enabled")
42     cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
43     err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
44
45     @property
46     def device_id(self):
47         return platform.node()
48
49     @property
50     def version(self):
51         return platform.release()
52
53     @property
54     def port_id(self):
55         return self.interface.name
56
57     @property
58     def platform(self):
59         return platform.system()
60
61     @classmethod
62     def setUpClass(cls):
63         super(TestCDP, cls).setUpClass()
64         try:
65             cls.create_pg_interfaces(range(1))
66             cls.interface = cls.pg_interfaces[0]
67
68             cls.interface.admin_up()
69             cls.interface.config_ip4()
70             cls.interface.resolve_arp()
71
72         except Exception:
73             super(TestCDP, cls).tearDownClass()
74             raise
75
76     @classmethod
77     def tearDownClass(cls):
78         super(TestCDP, cls).tearDownClass()
79
80     def test_enable_cdp(self):
81         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
82         ret = self.vapi.cli("show cdp")
83         self.logger.info(ret)
84         not_enabled = self.nen_ptr.search(ret)
85         self.assertFalse(not_enabled, "CDP isn't enabled")
86
87     def test_send_cdp_packet(self):
88         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
89         self.send_packet(self.create_packet())
90
91         neighbors = list(self.show_cdp())
92         self.assertTrue(neighbors, "CDP didn't register neighbor")
93
94         port, system = neighbors[0]
95         length = min(len(system), len(self.device_id))
96
97         self.assert_equal(port, self.port_id, "CDP received invalid port id")
98         self.assert_equal(system[:length], self.device_id[:length],
99                           "CDP received invalid device id")
100
101     def test_cdp_underflow_tlv(self):
102         self.send_bad_packet(3, ".")
103
104     def test_cdp_overflow_tlv(self):
105         self.send_bad_packet(8, ".")
106
107     def send_bad_packet(self, l, v):
108         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
109         self.send_packet(self.create_bad_packet(l, v))
110
111         errors = list(self.show_errors())
112         self.assertTrue(errors)
113
114         expected_errors = False
115         for count, node, reason in errors:
116             if (node == u'cdp-input' and
117                     reason == u'cdp packets with bad TLVs' and
118                     int(count) >= 1):
119
120                 expected_errors = True
121                 break
122         self.assertTrue(expected_errors, "CDP didn't drop bad packet")
123
124     def send_packet(self, packet):
125         self.logger.debug(ppp("Sending packet:", packet))
126         self.interface.add_stream(packet)
127         self.pg_start()
128
129     def create_base_packet(self):
130         packet = (Dot3(src=self.interface.remote_mac,
131                        dst="01:00:0c:cc:cc:cc") /
132                   LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
133                   SNAP()/CDPv2_HDR())
134         return packet
135
136     def create_packet(self):
137         packet = (self.create_base_packet() /
138                   CDPMsgDeviceID(val=self.device_id) /
139                   CDPMsgSoftwareVersion(val=self.version) /
140                   CDPMsgPortID(iface=self.port_id) /
141                   CDPMsgPlatform(val=self.platform))
142         return packet
143
144     def create_bad_packet(self, tl=4, tv=""):
145         packet = (self.create_base_packet() /
146                   CustomTLV(type=1,
147                             length=tl,
148                             value=tv))
149         return packet
150
151     def process_cli(self, exp, ptr):
152         for line in self.vapi.cli(exp).split('\n')[1:]:
153             m = ptr.match(line.strip())
154             if m:
155                 yield m.groups()
156
157     def show_cdp(self):
158         for pack in self.process_cli("show cdp", self.cdp_ptr):
159             try:
160                 port, system, _, _ = pack
161             except ValueError:
162                 pass
163             else:
164                 yield port, system
165
166     def show_errors(self):
167         for pack in self.process_cli("show errors", self.err_ptr):
168             try:
169                 count, node, reason = pack
170             except ValueError:
171                 pass
172             else:
173                 yield count, node, reason