import socket
import unittest
+import struct
from framework import VppTestCase, VppTestRunner
-
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.l2 import Ether
+from scapy.data import IP_PROTOS
from util import ppp
+from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
class TestSNAT(VppTestCase):
cls.icmp_id_out = 6305
cls.snat_addr = '10.0.0.3'
- cls.create_pg_interfaces(range(7))
+ cls.create_pg_interfaces(range(8))
cls.interfaces = list(cls.pg_interfaces[0:4])
for i in cls.interfaces:
i.admin_up()
i.resolve_arp()
+ cls.pg7.admin_up()
+
except Exception:
super(TestSNAT, cls).tearDownClass()
raise
if same_port:
self.assertEqual(packet[TCP].sport, self.tcp_port_in)
else:
- self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertNotEqual(
+ packet[TCP].sport, self.tcp_port_in)
self.tcp_port_out = packet[TCP].sport
elif packet.haslayer(UDP):
if same_port:
self.assertEqual(packet[UDP].sport, self.udp_port_in)
else:
- self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertNotEqual(
+ packet[UDP].sport, self.udp_port_in)
self.udp_port_out = packet[UDP].sport
else:
if same_port:
"(inside network):", packet))
raise
+ def verify_ipfix_nat44_ses(self, data):
+ """
+ Verify IPFIX NAT44 session create/delete event
+
+ :param data: Decoded IPFIX data records
+ """
+ nat44_ses_create_num = 0
+ nat44_ses_delete_num = 0
+ self.assertEqual(6, len(data))
+ for record in data:
+ # natEvent
+ self.assertIn(ord(record[230]), [4, 5])
+ if ord(record[230]) == 4:
+ nat44_ses_create_num += 1
+ else:
+ nat44_ses_delete_num += 1
+ # sourceIPv4Address
+ self.assertEqual(self.pg0.remote_ip4n, record[8])
+ # postNATSourceIPv4Address
+ self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
+ record[225])
+ # ingressVRFID
+ self.assertEqual(struct.pack("!I", 0), record[234])
+ # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
+ if IP_PROTOS.icmp == ord(record[4]):
+ self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
+ self.assertEqual(struct.pack("!H", self.icmp_id_out),
+ record[227])
+ elif IP_PROTOS.tcp == ord(record[4]):
+ self.assertEqual(struct.pack("!H", self.tcp_port_in),
+ record[7])
+ self.assertEqual(struct.pack("!H", self.tcp_port_out),
+ record[227])
+ elif IP_PROTOS.udp == ord(record[4]):
+ self.assertEqual(struct.pack("!H", self.udp_port_in),
+ record[7])
+ self.assertEqual(struct.pack("!H", self.udp_port_out),
+ record[227])
+ else:
+ self.fail("Invalid protocol")
+ self.assertEqual(3, nat44_ses_create_num)
+ self.assertEqual(3, nat44_ses_delete_num)
+
+ def verify_ipfix_addr_exhausted(self, data):
+ """
+ Verify IPFIX NAT addresses event
+
+ :param data: Decoded IPFIX data records
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 3)
+ # natPoolID
+ self.assertEqual(struct.pack("!I", 0), record[283])
+
def clear_snat(self):
"""
Clear SNAT configuration.
"""
+ interfaces = self.vapi.snat_interface_addr_dump()
+ for intf in interfaces:
+ self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
+
+ self.vapi.snat_ipfix(enable=0)
+
interfaces = self.vapi.snat_interface_dump()
for intf in interfaces:
self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
addr_only = 0
l_ip = socket.inet_pton(socket.AF_INET, local_ip)
e_ip = socket.inet_pton(socket.AF_INET, external_ip)
- self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port,
- addr_only, vrf_id, is_add)
+ self.vapi.snat_add_static_mapping(
+ l_ip,
+ e_ip,
+ local_port,
+ external_port,
+ addr_only,
+ vrf_id,
+ is_add)
def snat_add_address(self, ip, is_add=1):
"""
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_in(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip, True)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_out(self):
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip, True)
def test_static_with_port_in(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_with_port_out(self):
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
def test_static_vrf_aware(self):
self.pg4.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip1, True)
# inside interface VRF don't match SNAT static mapping VRF (packets
self.pg3.assert_nothing_captured()
def test_multiple_inside_interfaces(self):
- """SNAT multiple inside interfaces with non-overlapping address space"""
+ """
+ SNAT multiple inside interfaces with non-overlapping address space
+ """
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 1st interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out 2nd interface
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 2nd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg1)
# in2out 3rd interface
self.pg2.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 3rd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg2.get_capture()
+ capture = self.pg2.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg2)
def test_inside_overlapping_interfaces(self):
self.pg4.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 1st interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg4.get_capture()
+ capture = self.pg4.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg4)
# in2out 2nd interface
self.pg5.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 2nd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg5.get_capture()
+ capture = self.pg5.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg5)
# in2out 3rd interface
self.pg6.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 3rd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg6.get_capture()
+ capture = self.pg6.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg6)
def test_hairpinning(self):
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
- self.assertEqual(1, len(capture))
+ capture = self.pg0.get_capture(1)
p = capture[0]
try:
ip = p[IP]
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
- self.assertEqual(1, len(capture))
+ capture = self.pg0.get_capture(1)
p = capture[0]
try:
ip = p[IP]
self.pg_start()
# verify number of translated packet
- capture = self.pg1.get_capture()
- self.assertEqual(pkts_num, len(capture))
+ self.pg1.get_capture(pkts_num)
+
+ def test_interface_addr(self):
+ """ Acquire SNAT addresses from interface """
+ self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
+
+ # no address in NAT pool
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT address pool
+ self.pg7.config_ip4()
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(1, len(adresses))
+
+ # remove interface address and check NAT address pool
+ self.pg7.unconfig_ip4()
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ def test_ipfix_nat44_sess(self):
+ """ S-NAT IPFIX logging NAT44 session created/delted """
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.snat_ipfix()
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ self.snat_add_address(self.snat_addr, is_add=0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(3)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_nat44_ses(data)
+
+ def test_ipfix_addr_exhausted(self):
+ """ S-NAT IPFIX logging NAT addresses exhausted """
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.snat_ipfix()
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=3025))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(3)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_addr_exhausted(data)
def tearDown(self):
super(TestSNAT, self).tearDown()
self.logger.info(self.vapi.cli("show snat verbose"))
self.clear_snat()
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)