X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_cdp.py;h=da378dbebe3b7fb4a416523c5263340617bb085f;hb=39d7699c205bb243af12a2d32913e9a9af51b173;hp=3eec4a73244443295fcc387ddcc15f7c1a15abc0;hpb=eb601240981a02f1baf0d52808503293382f8450;p=vpp.git diff --git a/test/test_cdp.py b/test/test_cdp.py index 3eec4a73244..da378dbebe3 100644 --- a/test/test_cdp.py +++ b/test/test_cdp.py @@ -1,11 +1,16 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ CDP tests """ from scapy.packet import Packet from scapy.all import ShortField, StrField from scapy.layers.l2 import Dot3, LLC, SNAP -from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \ - CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR +from scapy.contrib.cdp import ( + CDPMsgDeviceID, + CDPMsgSoftwareVersion, + CDPMsgPlatform, + CDPMsgPortID, + CDPv2_HDR, +) from framework import VppTestCase from scapy.all import raw @@ -13,6 +18,8 @@ from re import compile from time import sleep from util import ppp import platform +import sys +import unittest """ TestCDP is a subclass of VPPTestCase classes. @@ -23,18 +30,17 @@ CDP test. class CustomTLV(Packet): - """ Custom TLV protocol layer for scapy """ + """Custom TLV protocol layer for scapy""" fields_desc = [ ShortField("type", 0), ShortField("length", 4), - StrField("value", "") - + StrField("value", ""), ] class TestCDP(VppTestCase): - """ CDP Test Case """ + """CDP Test Case""" nen_ptr = compile(r"not enabled") cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$") @@ -71,42 +77,46 @@ class TestCDP(VppTestCase): super(TestCDP, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestCDP, cls).tearDownClass() + def test_enable_cdp(self): - self.logger.info(self.vapi.cli("cdp enable")) + self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) ret = self.vapi.cli("show cdp") self.logger.info(ret) not_enabled = self.nen_ptr.search(ret) self.assertFalse(not_enabled, "CDP isn't enabled") def test_send_cdp_packet(self): - self.logger.info(self.vapi.cli("cdp enable")) + self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) self.send_packet(self.create_packet()) neighbors = list(self.show_cdp()) self.assertTrue(neighbors, "CDP didn't register neighbor") port, system = neighbors[0] + length = min(len(system), len(self.device_id)) self.assert_equal(port, self.port_id, "CDP received invalid port id") - self.assert_equal(system, self.device_id, - "CDP received invalid device id") + self.assert_equal( + system[:length], self.device_id[:length], "CDP received invalid device id" + ) - def test_send_cdp_bad_packet(self): - self.logger.info(self.vapi.cli("cdp enable")) - self.send_packet(self.create_bad_packet(8, ".")) + def test_cdp_underflow_tlv(self): + self.send_bad_packet(3, ".") - errors = list(self.show_errors()) - self.assertTrue(errors) + def test_cdp_overflow_tlv(self): + self.send_bad_packet(8, ".") - expected_errors = False - for count, node, reason in errors: - if (node == u'cdp-input' and - reason == u'cdp packets with bad TLVs' and - int(count) == 1): + def send_bad_packet(self, l, v): + self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) + self.send_packet(self.create_bad_packet(l, v)) - expected_errors = True - break - self.assertTrue(expected_errors, "CDP didn't drop bad packet") + err = self.statistics.get_err_counter( + "/err/cdp-input/cdp packets with bad TLVs" + ) + self.assertTrue(err >= 1, "CDP didn't drop bad packet") def send_packet(self, packet): self.logger.debug(ppp("Sending packet:", packet)) @@ -114,29 +124,30 @@ class TestCDP(VppTestCase): self.pg_start() def create_base_packet(self): - packet = (Dot3(src=self.interface.remote_mac, - dst="01:00:0c:cc:cc:cc") / - LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) / - SNAP()/CDPv2_HDR()) + packet = ( + Dot3(src=self.interface.remote_mac, dst="01:00:0c:cc:cc:cc") + / LLC(dsap=0xAA, ssap=0xAA, ctrl=0x03) + / SNAP() + / CDPv2_HDR() + ) return packet def create_packet(self): - packet = (self.create_base_packet() / - CDPMsgDeviceID(val=self.device_id) / - CDPMsgSoftwareVersion(val=self.version) / - CDPMsgPortID(iface=self.port_id) / - CDPMsgPlatform(val=self.platform)) + packet = ( + self.create_base_packet() + / CDPMsgDeviceID(val=self.device_id) + / CDPMsgSoftwareVersion(val=self.version) + / CDPMsgPortID(iface=self.port_id) + / CDPMsgPlatform(val=self.platform) + ) return packet def create_bad_packet(self, tl=4, tv=""): - packet = (self.create_base_packet() / - CustomTLV(type=1, - length=tl, - value=tv)) + packet = self.create_base_packet() / CustomTLV(type=1, length=tl, value=tv) return packet def process_cli(self, exp, ptr): - for line in self.vapi.cli(exp).split('\n')[1:]: + for line in self.vapi.cli(exp).split("\n")[1:]: m = ptr.match(line.strip()) if m: yield m.groups() @@ -149,12 +160,3 @@ class TestCDP(VppTestCase): pass else: yield port, system - - def show_errors(self): - for pack in self.process_cli("show errors", self.err_ptr): - try: - count, node, reason = pack - except ValueError: - pass - else: - yield count, node, reason