from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.layers.inet import IP, UDP
-from scapy.layers.inet6 import ICMPv6ND_RA, IPv6
+from scapy.layers.inet6 import IPv6
from scapy.volatile import RandMAC, RandIP
from util import ppp, ppc
def create_stream_ip4(self, src_if, src_ip, dst_ip):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_ip, dst=dst_ip) /
src_ip, dst_ip):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
src_ip, dst_ip):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
tunnel_src, tunnel_dst):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
tunnel_src, tunnel_dst, vlan):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
pkts.append(p)
return pkts
- def verify_filter(self, capture, sent):
- if not len(capture) == len(sent):
- # filter out any IPv6 RAs from the capture
- for p in capture:
- if (p.haslayer(ICMPv6ND_RA)):
- capture.remove(p)
- return capture
-
def verify_tunneled_4o4(self, src_if, capture, sent,
tunnel_src, tunnel_dst):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
def verify_tunneled_l2o4(self, src_if, capture, sent,
tunnel_src, tunnel_dst):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
def verify_tunneled_vlano4(self, src_if, capture, sent,
tunnel_src, tunnel_dst, vlan):
try:
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
except:
ppc("Unexpected packets captured:", capture)
raise
def verify_decapped_4o4(self, src_if, capture, sent):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
raise
def verify_decapped_6o4(self, src_if, capture, sent):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_4o4(self.pg0, rx, tx,
self.pg0.local_ip4, "1.1.1.2")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_decapped_4o4(self.pg0, rx, tx)
#
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_decapped_6o4(self.pg0, rx, tx)
#
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg1.get_capture()
+ rx = self.pg1.get_capture(len(tx))
self.verify_tunneled_4o4(self.pg1, rx, tx,
self.pg1.local_ip4, "2.2.2.2")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_decapped_4o4(self.pg0, rx, tx)
#
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_l2o4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.3")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_l2o4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.2")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_vlano4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.3",
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_vlano4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.2",