from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
getLogger, colorize
from vpp_object import VppObjectRegistry
+from util import ppp
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
return result
@classmethod
- def create_loopback_interfaces(cls, interfaces):
+ def create_loopback_interfaces(cls, count):
"""
Create loopback interfaces.
- :param interfaces: iterable indexes of the interfaces.
+ :param count: number of interfaces created.
:returns: List of created interfaces.
"""
- result = []
- for i in interfaces:
- intf = VppLoInterface(cls, i)
+ result = [VppLoInterface(cls) for i in range(count)]
+ for intf in result:
setattr(cls, intf.name, intf)
- result.append(intf)
cls.lo_interfaces = result
return result
def assert_packet_checksums_valid(self, packet,
ignore_zero_udp_checksums=True):
+ received = packet.__class__(str(packet))
+ self.logger.debug(
+ ppp("Verifying packet checksums for packet:", received))
udp_layers = ['UDP', 'UDPerror']
checksum_fields = ['cksum', 'chksum']
checksums = []
counter = 0
- temp = packet.__class__(str(packet))
+ temp = received.__class__(str(received))
while True:
layer = temp.getlayer(counter)
if layer:
else:
break
counter = counter + 1
+ if 0 == len(checksums):
+ return
temp = temp.__class__(str(temp))
for layer, cf in checksums:
- self.assert_equal(getattr(packet[layer], cf),
- getattr(temp[layer], cf),
- "packet checksum on layer #%d: %s" % (
- layer, temp[layer].name))
+ calc_sum = getattr(temp[layer], cf)
+ self.assert_equal(
+ getattr(received[layer], cf), calc_sum,
+ "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
+ self.logger.debug(
+ "Checksum field `%s` on `%s` layer has correct value `%s`" %
+ (cf, temp[layer].name, calc_sum))
def assert_checksum_valid(self, received_packet, layer,
field_name='chksum',
if hasattr(self, 'test_framework_failed_pipe'):
pipe = self.test_framework_failed_pipe
if pipe:
- pipe.send(test.__class__)
+ if test.__class__.__name__ == "_ErrorHolder":
+ x = str(test)
+ if x.startswith("setUpClass"):
+ # x looks like setUpClass (test_function.test_class)
+ cls = x.split(".")[1].split(")")[0]
+ for t in self.test_suite:
+ if t.__class__.__name__ == cls:
+ pipe.send(t.__class__)
+ break
+ else:
+ raise Exception("Can't find class name `%s' "
+ "(from ErrorHolder) in test suite "
+ "`%s'" % (cls, self.test_suite))
+ else:
+ raise Exception("FIXME: unexpected special case - "
+ "ErrorHolder description is `%s'" %
+ str(test))
+ else:
+ pipe.send(test.__class__)
def addFailure(self, test, err):
"""
filtered.countTestCases(), test.countTestCases()))
if not running_extended_tests():
print("Not running extended tests (some tests will be skipped)")
+ # super-ugly hack #2
+ VppTestResult.test_suite = filtered
return super(VppTestRunner, self).run(filtered)