X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ip4.py;h=ab733ac1bb662b26b07b9c11daed1bb202128bc5;hb=22ab6f7cbb0f6139302aa6ca9f0c96dba17a37a7;hp=e28a896b815a4b8ceb697928ec94b215fea3846d;hpb=4eed7474f4fba7193ba342fb29c6ec1d73aef909;p=vpp.git diff --git a/test/test_ip4.py b/test/test_ip4.py index e28a896b815..ab733ac1bb6 100644 --- a/test/test_ip4.py +++ b/test/test_ip4.py @@ -4,6 +4,7 @@ import random import socket import unittest +import scapy.compat from scapy.contrib.mpls import MPLS from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes from scapy.layers.l2 import Ether, Dot1Q, ARP @@ -168,7 +169,7 @@ class TestIPv4(VppTestCase): try: ip = packet[IP] udp = packet[UDP] - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( @@ -310,7 +311,7 @@ class TestIPv4FibCrud(VppTestCase): dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): - n_dest_addr = '{:08x}'.format(dest_addr).decode('hex') + n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr)) self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len, n_next_hop_addr) added_ips.append(socket.inet_ntoa(n_dest_addr)) @@ -325,7 +326,7 @@ class TestIPv4FibCrud(VppTestCase): dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): - n_dest_addr = '{:08x}'.format(dest_addr).decode('hex') + n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr)) self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len, n_next_hop_addr, is_add=0) removed_ips.append(socket.inet_ntoa(n_dest_addr)) @@ -351,8 +352,8 @@ class TestIPv4FibCrud(VppTestCase): def _find_ip_match(self, find_in, pkt): for p in find_in: - if self.payload_to_info(str(p[Raw])) == \ - self.payload_to_info(str(pkt[Raw])): + if self.payload_to_info(p[Raw]) == \ + self.payload_to_info(pkt[Raw]): if p[IP].src != pkt[IP].src: break if p[IP].dst != pkt[IP].dst: @@ -395,7 +396,7 @@ class TestIPv4FibCrud(VppTestCase): for ip in ips: self.assertTrue(_ip_in_route_dump(ip, fib_dump), - 'IP {} is not in fib dump.'.format(ip)) + 'IP {!s} is not in fib dump.'.format(ip)) def verify_not_in_route_dump(self, fib_dump, ips): @@ -406,7 +407,7 @@ class TestIPv4FibCrud(VppTestCase): for ip in ips: self.assertFalse(_ip_in_route_dump(ip, fib_dump), - 'IP {} is in fib dump.'.format(ip)) + 'IP {!s} is in fib dump.'.format(ip)) @classmethod def setUpClass(cls): @@ -804,9 +805,9 @@ class TestIPSubNets(VppTestCase): # ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10") - self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index, - ip_addr_n, - 16) + self.vapi.sw_interface_add_del_address( + sw_if_index=self.pg0.sw_if_index, address=ip_addr_n, + address_length=16) pn = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / @@ -823,10 +824,9 @@ class TestIPSubNets(VppTestCase): self.send_and_assert_no_replies(self.pg1, pb, "IP Broadcast address") # remove the sub-net and we are forwarding via the cover again - self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index, - ip_addr_n, - 16, - is_add=0) + self.vapi.sw_interface_add_del_address( + sw_if_index=self.pg0.sw_if_index, address=ip_addr_n, + address_length=16, is_add=0) self.pg1.add_stream(pn) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -842,9 +842,9 @@ class TestIPSubNets(VppTestCase): # ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10") - self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index, - ip_addr_n, - 31) + self.vapi.sw_interface_add_del_address( + sw_if_index=self.pg0.sw_if_index, address=ip_addr_n, + address_length=31) pn = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / @@ -859,10 +859,9 @@ class TestIPSubNets(VppTestCase): rx[ARP] # remove the sub-net and we are forwarding via the cover again - self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index, - ip_addr_n, - 31, - is_add=0) + self.vapi.sw_interface_add_del_address( + sw_if_index=self.pg0.sw_if_index, address=ip_addr_n, + address_length=31, is_add=0) self.pg1.add_stream(pn) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1147,7 +1146,7 @@ class TestIPPunt(VppTestCase): # # add a policer # - policer = self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0, + policer = self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0, rate_type=1) self.vapi.ip_punt_police(policer.policer_index) @@ -1168,7 +1167,7 @@ class TestIPPunt(VppTestCase): # remove the poilcer. back to full rx # self.vapi.ip_punt_police(policer.policer_index, is_add=0) - self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0, + self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0, rate_type=1, is_add=0) self.send_and_expect(self.pg0, pkts, self.pg1)