3eec4a73244443295fcc387ddcc15f7c1a15abc0
[vpp.git] / test / test_cdp.py
1 #!/usr/bin/env python
2 """ CDP tests """
3
4 from scapy.packet import Packet
5 from scapy.all import ShortField, StrField
6 from scapy.layers.l2 import Dot3, LLC, SNAP
7 from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
8         CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR
9
10 from framework import VppTestCase
11 from scapy.all import raw
12 from re import compile
13 from time import sleep
14 from util import ppp
15 import platform
16
17
18 """ TestCDP is a subclass of  VPPTestCase classes.
19
20 CDP test.
21
22 """
23
24
25 class CustomTLV(Packet):
26     """ Custom TLV protocol layer for scapy """
27
28     fields_desc = [
29         ShortField("type", 0),
30         ShortField("length", 4),
31         StrField("value", "")
32
33     ]
34
35
36 class TestCDP(VppTestCase):
37     """ CDP Test Case """
38
39     nen_ptr = compile(r"not enabled")
40     cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
41     err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
42
43     @property
44     def device_id(self):
45         return platform.node()
46
47     @property
48     def version(self):
49         return platform.release()
50
51     @property
52     def port_id(self):
53         return self.interface.name
54
55     @property
56     def platform(self):
57         return platform.system()
58
59     @classmethod
60     def setUpClass(cls):
61         super(TestCDP, cls).setUpClass()
62         try:
63             cls.create_pg_interfaces(range(1))
64             cls.interface = cls.pg_interfaces[0]
65
66             cls.interface.admin_up()
67             cls.interface.config_ip4()
68             cls.interface.resolve_arp()
69
70         except Exception:
71             super(TestCDP, cls).tearDownClass()
72             raise
73
74     def test_enable_cdp(self):
75         self.logger.info(self.vapi.cli("cdp enable"))
76         ret = self.vapi.cli("show cdp")
77         self.logger.info(ret)
78         not_enabled = self.nen_ptr.search(ret)
79         self.assertFalse(not_enabled, "CDP isn't enabled")
80
81     def test_send_cdp_packet(self):
82         self.logger.info(self.vapi.cli("cdp enable"))
83         self.send_packet(self.create_packet())
84
85         neighbors = list(self.show_cdp())
86         self.assertTrue(neighbors, "CDP didn't register neighbor")
87
88         port, system = neighbors[0]
89
90         self.assert_equal(port, self.port_id, "CDP received invalid port id")
91         self.assert_equal(system, self.device_id,
92                           "CDP received invalid device id")
93
94     def test_send_cdp_bad_packet(self):
95         self.logger.info(self.vapi.cli("cdp enable"))
96         self.send_packet(self.create_bad_packet(8, "."))
97
98         errors = list(self.show_errors())
99         self.assertTrue(errors)
100
101         expected_errors = False
102         for count, node, reason in errors:
103             if (node == u'cdp-input' and
104                     reason == u'cdp packets with bad TLVs' and
105                     int(count) == 1):
106
107                 expected_errors = True
108                 break
109         self.assertTrue(expected_errors, "CDP didn't drop bad packet")
110
111     def send_packet(self, packet):
112         self.logger.debug(ppp("Sending packet:", packet))
113         self.interface.add_stream(packet)
114         self.pg_start()
115
116     def create_base_packet(self):
117         packet = (Dot3(src=self.interface.remote_mac,
118                        dst="01:00:0c:cc:cc:cc") /
119                   LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
120                   SNAP()/CDPv2_HDR())
121         return packet
122
123     def create_packet(self):
124         packet = (self.create_base_packet() /
125                   CDPMsgDeviceID(val=self.device_id) /
126                   CDPMsgSoftwareVersion(val=self.version) /
127                   CDPMsgPortID(iface=self.port_id) /
128                   CDPMsgPlatform(val=self.platform))
129         return packet
130
131     def create_bad_packet(self, tl=4, tv=""):
132         packet = (self.create_base_packet() /
133                   CustomTLV(type=1,
134                             length=tl,
135                             value=tv))
136         return packet
137
138     def process_cli(self, exp, ptr):
139         for line in self.vapi.cli(exp).split('\n')[1:]:
140             m = ptr.match(line.strip())
141             if m:
142                 yield m.groups()
143
144     def show_cdp(self):
145         for pack in self.process_cli("show cdp", self.cdp_ptr):
146             try:
147                 port, system, _, _ = pack
148             except ValueError:
149                 pass
150             else:
151                 yield port, system
152
153     def show_errors(self):
154         for pack in self.process_cli("show errors", self.err_ptr):
155             try:
156                 count, node, reason = pack
157             except ValueError:
158                 pass
159             else:
160                 yield count, node, reason