tests: cdp plugin. Replace cdp enable cli command with API call.
[vpp.git] / test / test_cdp.py
1 #!/usr/bin/env python
2 """ CDP tests """
3
4 from scapy.packet import Packet
5 from scapy.all import ShortField, StrField
6 from scapy.layers.l2 import Dot3, LLC, SNAP
7 from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
8         CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR
9
10 from framework import VppTestCase
11 from scapy.all import raw
12 from re import compile
13 from time import sleep
14 from util import ppp
15 import platform
16
17
18 """ TestCDP is a subclass of  VPPTestCase classes.
19
20 CDP test.
21
22 """
23
24
25 class CustomTLV(Packet):
26     """ Custom TLV protocol layer for scapy """
27
28     fields_desc = [
29         ShortField("type", 0),
30         ShortField("length", 4),
31         StrField("value", "")
32
33     ]
34
35
36 class TestCDP(VppTestCase):
37     """ CDP Test Case """
38
39     nen_ptr = compile(r"not enabled")
40     cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
41     err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
42
43     @property
44     def device_id(self):
45         return platform.node()
46
47     @property
48     def version(self):
49         return platform.release()
50
51     @property
52     def port_id(self):
53         return self.interface.name
54
55     @property
56     def platform(self):
57         return platform.system()
58
59     @classmethod
60     def setUpClass(cls):
61         super(TestCDP, cls).setUpClass()
62         try:
63             cls.create_pg_interfaces(range(1))
64             cls.interface = cls.pg_interfaces[0]
65
66             cls.interface.admin_up()
67             cls.interface.config_ip4()
68             cls.interface.resolve_arp()
69
70         except Exception:
71             super(TestCDP, cls).tearDownClass()
72             raise
73
74     @classmethod
75     def tearDownClass(cls):
76         super(TestCDP, cls).tearDownClass()
77
78     def test_enable_cdp(self):
79         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
80         ret = self.vapi.cli("show cdp")
81         self.logger.info(ret)
82         not_enabled = self.nen_ptr.search(ret)
83         self.assertFalse(not_enabled, "CDP isn't enabled")
84
85     def test_send_cdp_packet(self):
86         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
87         self.send_packet(self.create_packet())
88
89         neighbors = list(self.show_cdp())
90         self.assertTrue(neighbors, "CDP didn't register neighbor")
91
92         port, system = neighbors[0]
93         length = min(len(system), len(self.device_id))
94
95         self.assert_equal(port, self.port_id, "CDP received invalid port id")
96         self.assert_equal(system[:length], self.device_id[:length],
97                           "CDP received invalid device id")
98
99     def test_cdp_underflow_tlv(self):
100         self.send_bad_packet(3, ".")
101
102     def test_cdp_overflow_tlv(self):
103         self.send_bad_packet(8, ".")
104
105     def send_bad_packet(self, l, v):
106         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
107         self.send_packet(self.create_bad_packet(l, v))
108
109         errors = list(self.show_errors())
110         self.assertTrue(errors)
111
112         expected_errors = False
113         for count, node, reason in errors:
114             if (node == u'cdp-input' and
115                     reason == u'cdp packets with bad TLVs' and
116                     int(count) >= 1):
117
118                 expected_errors = True
119                 break
120         self.assertTrue(expected_errors, "CDP didn't drop bad packet")
121
122     def send_packet(self, packet):
123         self.logger.debug(ppp("Sending packet:", packet))
124         self.interface.add_stream(packet)
125         self.pg_start()
126
127     def create_base_packet(self):
128         packet = (Dot3(src=self.interface.remote_mac,
129                        dst="01:00:0c:cc:cc:cc") /
130                   LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
131                   SNAP()/CDPv2_HDR())
132         return packet
133
134     def create_packet(self):
135         packet = (self.create_base_packet() /
136                   CDPMsgDeviceID(val=self.device_id) /
137                   CDPMsgSoftwareVersion(val=self.version) /
138                   CDPMsgPortID(iface=self.port_id) /
139                   CDPMsgPlatform(val=self.platform))
140         return packet
141
142     def create_bad_packet(self, tl=4, tv=""):
143         packet = (self.create_base_packet() /
144                   CustomTLV(type=1,
145                             length=tl,
146                             value=tv))
147         return packet
148
149     def process_cli(self, exp, ptr):
150         for line in self.vapi.cli(exp).split('\n')[1:]:
151             m = ptr.match(line.strip())
152             if m:
153                 yield m.groups()
154
155     def show_cdp(self):
156         for pack in self.process_cli("show cdp", self.cdp_ptr):
157             try:
158                 port, system, _, _ = pack
159             except ValueError:
160                 pass
161             else:
162                 yield port, system
163
164     def show_errors(self):
165         for pack in self.process_cli("show errors", self.err_ptr):
166             try:
167                 count, node, reason = pack
168             except ValueError:
169                 pass
170             else:
171                 yield count, node, reason