import socket
import unittest
+import scapy.compat
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether, Dot1Q, ARP
for i in self.interfaces:
next_hop_address = i.local_ip4n
for j in range(count / n_int):
- self.vapi.ip_add_del_route(
- dest_addr, dest_addr_len, next_hop_address)
+ self.vapi.ip_add_del_route(dst_address=dest_addr,
+ dst_address_length=dest_addr_len,
+ next_hop_address=next_hop_address)
counter += 1
if counter / count * 100 > percent:
self.logger.info("Configure %d FIB entries .. %d%% done" %
try:
ip = packet[IP]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
- self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
- n_next_hop_addr)
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
+ self.vapi.ip_add_del_route(dst_address=n_dest_addr,
+ dst_address_length=dest_addr_len,
+ next_hop_address=n_next_hop_addr)
added_ips.append(socket.inet_ntoa(n_dest_addr))
dest_addr += 1
return added_ips
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
- self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
- n_next_hop_addr, is_add=0)
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
+ self.vapi.ip_add_del_route(dst_address=n_dest_addr,
+ dst_address_length=dest_addr_len,
+ next_hop_address=n_next_hop_addr,
+ is_add=0)
removed_ips.append(socket.inet_ntoa(n_dest_addr))
dest_addr += 1
return removed_ips
def _find_ip_match(self, find_in, pkt):
for p in find_in:
- if self.payload_to_info(str(p[Raw])) == \
- self.payload_to_info(str(pkt[Raw])):
+ if self.payload_to_info(p[Raw]) == \
+ self.payload_to_info(pkt[Raw]):
if p[IP].src != pkt[IP].src:
break
if p[IP].dst != pkt[IP].dst:
# - now only the stream with differing source address will
# load-balance
#
- self.vapi.set_ip_flow_hash(0, src=1, dst=1, sport=0, dport=0)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0)
self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
[self.pg1, self.pg2])
#
# change the flow hash config back to defaults
#
- self.vapi.set_ip_flow_hash(0, src=1, dst=1, sport=1, dport=1)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1)
#
# Recursive prefixes
#
# add a policer
#
- policer = self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+ policer = self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0,
rate_type=1)
self.vapi.ip_punt_police(policer.policer_index)
# remove the poilcer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0)
- self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+ self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0,
rate_type=1, is_add=0)
self.send_and_expect(self.pg0, pkts, self.pg1)