MAP: Convert from DPO to input feature.
[vpp.git] / test / test_cdp.py
1 #!/usr/bin/env python
2 """ CDP tests """
3
4 from scapy.packet import Packet
5 from scapy.all import ShortField, StrField
6 from scapy.layers.l2 import Dot3, LLC, SNAP
7 from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
8         CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR
9
10 from framework import VppTestCase
11 from scapy.all import raw
12 from re import compile
13 from time import sleep
14 from util import ppp
15 import platform
16
17
18 """ TestCDP is a subclass of  VPPTestCase classes.
19
20 CDP test.
21
22 """
23
24
25 class CustomTLV(Packet):
26     """ Custom TLV protocol layer for scapy """
27
28     fields_desc = [
29         ShortField("type", 0),
30         ShortField("length", 4),
31         StrField("value", "")
32
33     ]
34
35
36 class TestCDP(VppTestCase):
37     """ CDP Test Case """
38
39     nen_ptr = compile(r"not enabled")
40     cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
41     err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
42
43     @property
44     def device_id(self):
45         return platform.node()
46
47     @property
48     def version(self):
49         return platform.release()
50
51     @property
52     def port_id(self):
53         return self.interface.name
54
55     @property
56     def platform(self):
57         return platform.system()
58
59     @classmethod
60     def setUpClass(cls):
61         super(TestCDP, cls).setUpClass()
62         try:
63             cls.create_pg_interfaces(range(1))
64             cls.interface = cls.pg_interfaces[0]
65
66             cls.interface.admin_up()
67             cls.interface.config_ip4()
68             cls.interface.resolve_arp()
69
70         except Exception:
71             super(TestCDP, cls).tearDownClass()
72             raise
73
74     def test_enable_cdp(self):
75         self.logger.info(self.vapi.cli("cdp enable"))
76         ret = self.vapi.cli("show cdp")
77         self.logger.info(ret)
78         not_enabled = self.nen_ptr.search(ret)
79         self.assertFalse(not_enabled, "CDP isn't enabled")
80
81     def test_send_cdp_packet(self):
82         self.logger.info(self.vapi.cli("cdp enable"))
83         self.send_packet(self.create_packet())
84
85         neighbors = list(self.show_cdp())
86         self.assertTrue(neighbors, "CDP didn't register neighbor")
87
88         port, system = neighbors[0]
89         length = min(len(system), len(self.device_id))
90
91         self.assert_equal(port, self.port_id, "CDP received invalid port id")
92         self.assert_equal(system[:length], self.device_id[:length],
93                           "CDP received invalid device id")
94
95     def test_cdp_underflow_tlv(self):
96         self.send_bad_packet(3, ".")
97
98     def test_cdp_overflow_tlv(self):
99         self.send_bad_packet(8, ".")
100
101     def send_bad_packet(self, l, v):
102         self.logger.info(self.vapi.cli("cdp enable"))
103         self.send_packet(self.create_bad_packet(l, v))
104
105         errors = list(self.show_errors())
106         self.assertTrue(errors)
107
108         expected_errors = False
109         for count, node, reason in errors:
110             if (node == u'cdp-input' and
111                     reason == u'cdp packets with bad TLVs' and
112                     int(count) >= 1):
113
114                 expected_errors = True
115                 break
116         self.assertTrue(expected_errors, "CDP didn't drop bad packet")
117
118     def send_packet(self, packet):
119         self.logger.debug(ppp("Sending packet:", packet))
120         self.interface.add_stream(packet)
121         self.pg_start()
122
123     def create_base_packet(self):
124         packet = (Dot3(src=self.interface.remote_mac,
125                        dst="01:00:0c:cc:cc:cc") /
126                   LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
127                   SNAP()/CDPv2_HDR())
128         return packet
129
130     def create_packet(self):
131         packet = (self.create_base_packet() /
132                   CDPMsgDeviceID(val=self.device_id) /
133                   CDPMsgSoftwareVersion(val=self.version) /
134                   CDPMsgPortID(iface=self.port_id) /
135                   CDPMsgPlatform(val=self.platform))
136         return packet
137
138     def create_bad_packet(self, tl=4, tv=""):
139         packet = (self.create_base_packet() /
140                   CustomTLV(type=1,
141                             length=tl,
142                             value=tv))
143         return packet
144
145     def process_cli(self, exp, ptr):
146         for line in self.vapi.cli(exp).split('\n')[1:]:
147             m = ptr.match(line.strip())
148             if m:
149                 yield m.groups()
150
151     def show_cdp(self):
152         for pack in self.process_cli("show cdp", self.cdp_ptr):
153             try:
154                 port, system, _, _ = pack
155             except ValueError:
156                 pass
157             else:
158                 yield port, system
159
160     def show_errors(self):
161         for pack in self.process_cli("show errors", self.err_ptr):
162             try:
163                 count, node, reason = pack
164             except ValueError:
165                 pass
166             else:
167                 yield count, node, reason