- addr_len = 24
- self.vapi.ip_add_del_route(intf.local_ip4n,
- addr_len,
- intf.remote_ip4n,
- table_id=self.pbr_vrfid,
- is_add=is_add)
-
- def create_stream(self, src_if, dst_if, packet_sizes):
- """Create input packet stream for defined interfaces.
-
- :param VppInterface src_if: Source Interface for packet stream.
- :param VppInterface dst_if: Destination Interface for packet stream.
- :param list packet_sizes: packet size to test.
- """
- pkts = []
- for size in packet_sizes:
- info = self.create_packet_info(src_if, dst_if)
- payload = self.info_to_payload(info)
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- info.data = p.copy()
- self.extend_packet(p, size)
- pkts.append(p)
- return pkts
-
- def verify_capture(self, dst_if, capture):
- """Verify captured input packet stream for defined interface.
-
- :param VppInterface dst_if: Interface to verify captured packet stream.
- :param list capture: Captured packet stream.
- """
- self.logger.info("Verifying capture on interface %s" % dst_if.name)
- last_info = dict()
- for i in self.interfaces:
- last_info[i.sw_if_index] = None
- dst_sw_if_index = dst_if.sw_if_index
- for packet in capture:
- try:
- ip = packet[IP]
- udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
- packet_index = payload_info.index
- self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug(
- "Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
- next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
- last_info[payload_info.src] = next_info
- self.assertTrue(next_info is not None)
- self.assertEqual(packet_index, next_info.index)
- saved_packet = next_info.data
- # Check standard fields
- self.assertEqual(ip.src, saved_packet[IP].src)
- self.assertEqual(ip.dst, saved_packet[IP].dst)
- self.assertEqual(udp.sport, saved_packet[UDP].sport)
- self.assertEqual(udp.dport, saved_packet[UDP].dport)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- for i in self.interfaces:
- remaining_packet = self.get_next_packet_info_for_interface2(
- i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(remaining_packet is None,
- "Interface %s: Packet expected from interface %s "
- "didn't arrive" % (dst_if.name, i.name))
-
- def verify_vrf(self, vrf_id):
- """
- Check if the FIB table / VRF ID is configured.