stats: counters data model
[vpp.git] / src / plugins / cdp / test / test_cdp.py
1 #!/usr/bin/env python3
2 """ CDP tests """
3
4 from scapy.packet import Packet
5 from scapy.all import ShortField, StrField
6 from scapy.layers.l2 import Dot3, LLC, SNAP
7 from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
8         CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR
9
10 from framework import VppTestCase
11 from scapy.all import raw
12 from re import compile
13 from time import sleep
14 from util import ppp
15 import platform
16 import sys
17 import unittest
18
19
20 """ TestCDP is a subclass of  VPPTestCase classes.
21
22 CDP test.
23
24 """
25
26
27 class CustomTLV(Packet):
28     """ Custom TLV protocol layer for scapy """
29
30     fields_desc = [
31         ShortField("type", 0),
32         ShortField("length", 4),
33         StrField("value", "")
34
35     ]
36
37
38 class TestCDP(VppTestCase):
39     """ CDP Test Case """
40
41     nen_ptr = compile(r"not enabled")
42     cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
43     err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
44
45     @property
46     def device_id(self):
47         return platform.node()
48
49     @property
50     def version(self):
51         return platform.release()
52
53     @property
54     def port_id(self):
55         return self.interface.name
56
57     @property
58     def platform(self):
59         return platform.system()
60
61     @classmethod
62     def setUpClass(cls):
63         super(TestCDP, cls).setUpClass()
64         try:
65             cls.create_pg_interfaces(range(1))
66             cls.interface = cls.pg_interfaces[0]
67
68             cls.interface.admin_up()
69             cls.interface.config_ip4()
70             cls.interface.resolve_arp()
71
72         except Exception:
73             super(TestCDP, cls).tearDownClass()
74             raise
75
76     @classmethod
77     def tearDownClass(cls):
78         super(TestCDP, cls).tearDownClass()
79
80     def test_enable_cdp(self):
81         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
82         ret = self.vapi.cli("show cdp")
83         self.logger.info(ret)
84         not_enabled = self.nen_ptr.search(ret)
85         self.assertFalse(not_enabled, "CDP isn't enabled")
86
87     def test_send_cdp_packet(self):
88         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
89         self.send_packet(self.create_packet())
90
91         neighbors = list(self.show_cdp())
92         self.assertTrue(neighbors, "CDP didn't register neighbor")
93
94         port, system = neighbors[0]
95         length = min(len(system), len(self.device_id))
96
97         self.assert_equal(port, self.port_id, "CDP received invalid port id")
98         self.assert_equal(system[:length], self.device_id[:length],
99                           "CDP received invalid device id")
100
101     def test_cdp_underflow_tlv(self):
102         self.send_bad_packet(3, ".")
103
104     def test_cdp_overflow_tlv(self):
105         self.send_bad_packet(8, ".")
106
107     def send_bad_packet(self, l, v):
108         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
109         self.send_packet(self.create_bad_packet(l, v))
110
111         err = self.statistics.get_err_counter(
112             '/err/cdp-input/cdp packets with bad TLVs')
113         self.assertTrue(err >= 1, "CDP didn't drop bad packet")
114
115     def send_packet(self, packet):
116         self.logger.debug(ppp("Sending packet:", packet))
117         self.interface.add_stream(packet)
118         self.pg_start()
119
120     def create_base_packet(self):
121         packet = (Dot3(src=self.interface.remote_mac,
122                        dst="01:00:0c:cc:cc:cc") /
123                   LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
124                   SNAP()/CDPv2_HDR())
125         return packet
126
127     def create_packet(self):
128         packet = (self.create_base_packet() /
129                   CDPMsgDeviceID(val=self.device_id) /
130                   CDPMsgSoftwareVersion(val=self.version) /
131                   CDPMsgPortID(iface=self.port_id) /
132                   CDPMsgPlatform(val=self.platform))
133         return packet
134
135     def create_bad_packet(self, tl=4, tv=""):
136         packet = (self.create_base_packet() /
137                   CustomTLV(type=1,
138                             length=tl,
139                             value=tv))
140         return packet
141
142     def process_cli(self, exp, ptr):
143         for line in self.vapi.cli(exp).split('\n')[1:]:
144             m = ptr.match(line.strip())
145             if m:
146                 yield m.groups()
147
148     def show_cdp(self):
149         for pack in self.process_cli("show cdp", self.cdp_ptr):
150             try:
151                 port, system, _, _ = pack
152             except ValueError:
153                 pass
154             else:
155                 yield port, system