- self.logger.info("Verifying capture on interface %s" % dst_if.name)
- last_info = dict()
- for i in self.interfaces:
- last_info[i.sw_if_index] = None
- dst_sw_if_index = dst_if.sw_if_index
- for packet in capture:
- try:
- ip = packet[IP]
- udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
- packet_index = payload_info.index
- self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug(
- "Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
- next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
- last_info[payload_info.src] = next_info
- self.assertTrue(next_info is not None)
- self.assertEqual(packet_index, next_info.index)
- saved_packet = next_info.data
- # Check standard fields
- self.assertEqual(ip.src, saved_packet[IP].src)
- self.assertEqual(ip.dst, saved_packet[IP].dst)
- self.assertEqual(udp.sport, saved_packet[UDP].sport)
- self.assertEqual(udp.dport, saved_packet[UDP].dport)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- for i in self.interfaces:
- remaining_packet = self.get_next_packet_info_for_interface2(
- i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(remaining_packet is None,
- "Interface %s: Packet expected from interface %s "
- "didn't arrive" % (dst_if.name, i.name))
-
- @staticmethod
- def build_ip_mask(proto='', src_ip='', dst_ip='',
- src_port='', dst_port=''):
- """Build IP ACL mask data with hexstring format
-
- :param str proto: protocol number <0-ff>
- :param str src_ip: source ip address <0-ffffffff>
- :param str dst_ip: destination ip address <0-ffffffff>
- :param str src_port: source port number <0-ffff>
- :param str dst_port: destination port number <0-ffff>
+
+ # Basic iACL testing with source and destination IP
+ pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
+ self.pg0.add_stream(pkts)
+
+ key = 'ip'
+ self.create_classify_table(
+ key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
+ self.create_classify_session(
+ self.acl_tbl_idx.get(key),
+ self.build_ip_match(src_ip=self.pg0.remote_ip4,
+ dst_ip=self.pg1.remote_ip4))
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ pkts = self.pg1.get_capture(len(pkts))
+ self.verify_capture(self.pg1, pkts)
+ self.pg0.assert_nothing_captured(remark="packets forwarded")
+ self.pg2.assert_nothing_captured(remark="packets forwarded")
+ self.pg3.assert_nothing_captured(remark="packets forwarded")
+
+
+class TestClassifierUDP(TestClassifier):
+ """ Classifier UDP proto Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestClassifierUDP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifierUDP, cls).tearDownClass()
+
+ def test_iacl_proto_udp(self):
+ """ UDP protocol iACL test
+
+ Test scenario for basic protocol ACL with UDP protocol
+ - Create IPv4 stream for pg0 -> pg1 interface.
+ - Create iACL with UDP IP protocol.
+ - Send and verify received packets on pg1 interface.