udp: fix csum computation when offload disabled
[vpp.git] / test / test_cdp.py
1 #!/usr/bin/env python3
2 """ CDP tests """
3
4 from scapy.packet import Packet
5 from scapy.all import ShortField, StrField
6 from scapy.layers.l2 import Dot3, LLC, SNAP
7 from scapy.contrib.cdp import (
8     CDPMsgDeviceID,
9     CDPMsgSoftwareVersion,
10     CDPMsgPlatform,
11     CDPMsgPortID,
12     CDPv2_HDR,
13 )
14
15 from framework import VppTestCase
16 from scapy.all import raw
17 from re import compile
18 from time import sleep
19 from util import ppp
20 import platform
21 import sys
22 import unittest
23
24
25 """ TestCDP is a subclass of  VPPTestCase classes.
26
27 CDP test.
28
29 """
30
31
32 class CustomTLV(Packet):
33     """Custom TLV protocol layer for scapy"""
34
35     fields_desc = [
36         ShortField("type", 0),
37         ShortField("length", 4),
38         StrField("value", ""),
39     ]
40
41
42 class TestCDP(VppTestCase):
43     """CDP Test Case"""
44
45     nen_ptr = compile(r"not enabled")
46     cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
47     err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
48
49     @property
50     def device_id(self):
51         return platform.node()
52
53     @property
54     def version(self):
55         return platform.release()
56
57     @property
58     def port_id(self):
59         return self.interface.name
60
61     @property
62     def platform(self):
63         return platform.system()
64
65     @classmethod
66     def setUpClass(cls):
67         super(TestCDP, cls).setUpClass()
68         try:
69             cls.create_pg_interfaces(range(1))
70             cls.interface = cls.pg_interfaces[0]
71
72             cls.interface.admin_up()
73             cls.interface.config_ip4()
74             cls.interface.resolve_arp()
75
76         except Exception:
77             super(TestCDP, cls).tearDownClass()
78             raise
79
80     @classmethod
81     def tearDownClass(cls):
82         super(TestCDP, cls).tearDownClass()
83
84     def test_enable_cdp(self):
85         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
86         ret = self.vapi.cli("show cdp")
87         self.logger.info(ret)
88         not_enabled = self.nen_ptr.search(ret)
89         self.assertFalse(not_enabled, "CDP isn't enabled")
90
91     def test_send_cdp_packet(self):
92         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
93         self.send_packet(self.create_packet())
94
95         neighbors = list(self.show_cdp())
96         self.assertTrue(neighbors, "CDP didn't register neighbor")
97
98         port, system = neighbors[0]
99         length = min(len(system), len(self.device_id))
100
101         self.assert_equal(port, self.port_id, "CDP received invalid port id")
102         self.assert_equal(
103             system[:length], self.device_id[:length], "CDP received invalid device id"
104         )
105
106     def test_cdp_underflow_tlv(self):
107         self.send_bad_packet(3, ".")
108
109     def test_cdp_overflow_tlv(self):
110         self.send_bad_packet(8, ".")
111
112     def send_bad_packet(self, l, v):
113         self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
114         self.send_packet(self.create_bad_packet(l, v))
115
116         err = self.statistics.get_err_counter(
117             "/err/cdp-input/cdp packets with bad TLVs"
118         )
119         self.assertTrue(err >= 1, "CDP didn't drop bad packet")
120
121     def send_packet(self, packet):
122         self.logger.debug(ppp("Sending packet:", packet))
123         self.interface.add_stream(packet)
124         self.pg_start()
125
126     def create_base_packet(self):
127         packet = (
128             Dot3(src=self.interface.remote_mac, dst="01:00:0c:cc:cc:cc")
129             / LLC(dsap=0xAA, ssap=0xAA, ctrl=0x03)
130             / SNAP()
131             / CDPv2_HDR()
132         )
133         return packet
134
135     def create_packet(self):
136         packet = (
137             self.create_base_packet()
138             / CDPMsgDeviceID(val=self.device_id)
139             / CDPMsgSoftwareVersion(val=self.version)
140             / CDPMsgPortID(iface=self.port_id)
141             / CDPMsgPlatform(val=self.platform)
142         )
143         return packet
144
145     def create_bad_packet(self, tl=4, tv=""):
146         packet = self.create_base_packet() / CustomTLV(type=1, length=tl, value=tv)
147         return packet
148
149     def process_cli(self, exp, ptr):
150         for line in self.vapi.cli(exp).split("\n")[1:]:
151             m = ptr.match(line.strip())
152             if m:
153                 yield m.groups()
154
155     def show_cdp(self):
156         for pack in self.process_cli("show cdp", self.cdp_ptr):
157             try:
158                 port, system, _, _ = pack
159             except ValueError:
160                 pass
161             else:
162                 yield port, system