from time import sleep
import scapy.compat
-from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
# natEvent
self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
- self.assertEqual(struct.pack("I", 1), record[466])
+ self.assertEqual(struct.pack("!I", 1), record[466])
# maxSessionEntries
- self.assertEqual(struct.pack("I", limit), record[471])
+ self.assertEqual(struct.pack("!I", limit), record[471])
def verify_no_nat44_user(self):
""" Verify that there is no NAT44EI user """
return 1 + h % vpp_worker_count
-@tag_fixme_vpp_workers
class TestNAT44EI(MethodHolder):
""" NAT44EI Test Cases """
pkts = []
for i in range(x):
+ info = self.create_packet_info(self.pg0, self.pg1)
+ payload = self.info_to_payload(info)
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_hosts[i].ip4,
dst=self.pg1.remote_ip4) /
- UDP(sport=7000+i, dport=80+i))
+ UDP(sport=7000+i, dport=8000+i) /
+ Raw(payload))
+ info.data = p
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
recvd = self.pg1.get_capture(len(pkts))
- for (p_sent, p_recvd) in zip(pkts, recvd):
+ for p_recvd in recvd:
+ payload_info = self.payload_to_info(p_recvd[Raw])
+ packet_index = payload_info.index
+ info = self._packet_infos[packet_index]
+ self.assertTrue(info is not None)
+ self.assertEqual(packet_index, info.index)
+ p_sent = info.data
packed = socket.inet_aton(p_sent[IP].src)
numeric = struct.unpack("!L", packed)[0]
numeric = socket.htonl(numeric)
a = nat_addresses[(numeric-1) % len(nat_addresses)]
- self.assertEqual(a, p_recvd[IP].src, "Packet not translated")
+ self.assertEqual(
+ a, p_recvd[IP].src,
+ "Invalid packet (src IP %s translated to %s, but expected %s)"
+ % (p_sent[IP].src, p_recvd[IP].src, a))
class TestNAT44Out2InDPO(MethodHolder):