#!/usr/bin/env python3 """GRO functional tests""" # # Add tests for: # - GRO # - Verify that sending 1500 Bytes frame without GRO enabled correctly # - Verify that sending 1500 Bytes frame with GRO enabled correctly # import unittest from scapy.packet import Raw from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig from scapy.layers.inet6 import ipv6nh, IPerror6 from scapy.layers.inet import TCP, ICMP from scapy.data import ETH_P_IP, ETH_P_IPV6, ETH_P_ARP from framework import VppTestCase, VppTestRunner from vpp_object import VppObject from vpp_interface import VppInterface """ Test_gro is a subclass of VPPTestCase classes. GRO tests. """ class TestGRO(VppTestCase): """ GRO Test Case """ @classmethod def setUpClass(self): super(TestGRO, self).setUpClass() res = self.create_pg_interfaces(range(2)) res_gro = self.create_pg_interfaces(range(2, 3), 1, 1460) self.create_pg_interfaces(range(3, 4), 1, 8940) self.pg_interfaces.append(res[0]) self.pg_interfaces.append(res[1]) self.pg_interfaces.append(res_gro[0]) self.pg2.coalesce_enable() self.pg3.coalesce_enable() @classmethod def tearDownClass(self): super(TestGRO, self).tearDownClass() def setUp(self): super(TestGRO, self).setUp() for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.config_ip6() i.disable_ipv6_ra() i.resolve_arp() i.resolve_ndp() def tearDown(self): super(TestGRO, self).tearDown() if not self.vpp_dead: for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() def test_gro(self): """ GRO test """ n_packets = 124 # # Send 1500 bytes frame with gro disabled # p4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags='DF') / TCP(sport=1234, dport=4321) / Raw(b'\xa5' * 1460)) rxs = self.send_and_expect(self.pg0, n_packets * p4, self.pg1) for rx in rxs: self.assertEqual(rx[Ether].src, self.pg1.local_mac) self.assertEqual(rx[Ether].dst, self.pg1.remote_mac) self.assertEqual(rx[IP].src, self.pg0.remote_ip4) self.assertEqual(rx[IP].dst, self.pg1.remote_ip4) self.assertEqual(rx[TCP].sport, 1234) self.assertEqual(rx[TCP].dport, 4321) # # Send 1500 bytes frame with gro enabled on # output interfaces support GRO # p = [] s = 0 for n in range(0, n_packets): p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4, flags='DF') / TCP(sport=1234, dport=4321, seq=s, ack=n, flags='A') / Raw(b'\xa5' * 1460))) s += 1460 rxs = self.send_and_expect(self.pg0, p, self.pg2, n_rx=2) i = 0 for rx in rxs: i += 1 self.assertEqual(rx[Ether].src, self.pg2.local_mac) self.assertEqual(rx[Ether].dst, self.pg2.remote_mac) self.assertEqual(rx[IP].src, self.pg0.remote_ip4) self.assertEqual(rx[IP].dst, self.pg2.remote_ip4) self.assertEqual(rx[IP].len, 64280) # 1460 * 44 + 40 < 65536 self.assertEqual(rx[TCP].sport, 1234) self.assertEqual(rx[TCP].dport, 4321) self.assertEqual(rx[TCP].ack, (44*i - 1)) p4_temp = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / IP(src=self.pg2.remote_ip4, dst=self.pg0.remote_ip4, flags='DF') / TCP(sport=1234, dport=4321, flags='F')) rxs = self.send_and_expect(self.pg2, 100*[p4_temp], self.pg0, n_rx=100) rx_coalesce = self.pg2.get_capture(1, timeout=1) rx0 = rx_coalesce[0] self.assertEqual(rx0[Ether].src, self.pg2.local_mac) self.assertEqual(rx0[Ether].dst, self.pg2.remote_mac) self.assertEqual(rx0[IP].src, self.pg0.remote_ip4) self.assertEqual(rx0[IP].dst, self.pg2.remote_ip4) self.assertEqual(rx0[IP].len, 52600) # 1460 * 36 + 40 self.assertEqual(rx0[TCP].sport, 1234) self.assertEqual(rx0[TCP].dport, 4321) for rx in rxs: self.assertEqual(rx[Ether].src, self.pg0.local_mac) self.assertEqual(rx[Ether].dst, self.pg0.remote_mac) self.assertEqual(rx[IP].src, self.pg2.remote_ip4) self.assertEqual(rx[IP].dst, self.pg0.remote_ip4) self.assertEqual(rx[IP].len, 40) self.assertEqual(rx[TCP].sport, 1234) self.assertEqual(rx[TCP].dport, 4321) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)