import scapy.compat
from config import config
-from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, is_distro_ubuntu2204
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import (
+ tag_fixme_vpp_workers,
+ tag_fixme_ubuntu2204,
+ is_distro_ubuntu2204,
+ VppTestRunner,
+)
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP
# natEvent
self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
- self.assertEqual(struct.pack("I", 2), record[466])
+ self.assertEqual(struct.pack("!I", 2), record[466])
# maxBIBEntries
- self.assertEqual(struct.pack("I", limit), record[472])
+ self.assertEqual(struct.pack("!I", limit), record[472])
+ return len(data)
def verify_ipfix_bib(self, data, is_create, src_addr):
"""
# natEvent
self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
- self.assertEqual(struct.pack("I", 1), record[466])
+ self.assertEqual(struct.pack("!I", 1), record[466])
# maxSessionEntries
- self.assertEqual(struct.pack("I", limit), record[471])
+ self.assertEqual(struct.pack("!I", limit), record[471])
+ return len(data)
def test_nat64_inside_interface_handles_neighbor_advertisement(self):
"""NAT64 inside interface handles Neighbor Advertisement"""
capture = self.pg5.get_capture(len(pkts))
packet = capture[0]
try:
- self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
+ self.assertEqual(packet[IPv6].src, self.pg5.local_ip6_ll)
self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
tgt = packet[ICMPv6ND_NS].tgt
except:
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=src, dst=remote_host_ip6)
/ TCP(sport=12345, dport=25)
- )
+ ) * 3
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
+ event_count = 0
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- self.verify_ipfix_max_sessions(data, max_sessions)
+ event_count += self.verify_ipfix_max_sessions(data, max_sessions)
+ self.assertEqual(event_count, 1)
p = (
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
/ TCP(sport=12345, dport=80)
- )
+ ) * 3
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.vapi.ipfix_flush()
capture = self.pg3.get_capture(1)
# verify events in data set
+ event_count = 0
for p in capture:
self.assertTrue(p.haslayer(IPFIX))
self.assertEqual(p[IP].src, self.pg3.local_ip4)
self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- self.verify_ipfix_max_bibs(data, max_bibs)
+ event_count += self.verify_ipfix_max_bibs(data, max_bibs)
+ self.assertEqual(event_count, 1)
def test_ipfix_bib_ses(self):
"""IPFIX logging NAT64 BIB/session create and delete events"""