import socket
import unittest
import struct
-import StringIO
import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
- ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
+ ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
from scapy.layers.l2 import Ether, ARP, GRE
from scapy.data import IP_PROTOS
from scapy.packet import bind_layers, Raw
-from scapy.all import fragment6
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep
from util import ip4_range
-from util import mactobinary
+from vpp_mac import mactobinary
+from syslog_rfc5424_parser import SyslogMessage, ParseError
+from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
+from vpp_papi_provider import SYSLOG_SEVERITY
+from io import BytesIO
class MethodHolder(VppTestCase):
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
+ self.vapi.syslog_set_filter(SYSLOG_SEVERITY.EMERG)
+
interfaces = self.vapi.nat44_interface_dump()
for intf in interfaces:
if intf.is_inside > 1:
for packet in capture:
try:
self.assertEqual(packet[IP].src, src_ip)
- self.assertTrue(packet.haslayer(ICMP))
+ self.assertEqual(packet.haslayer(ICMP), 1)
icmp = packet[ICMP]
self.assertEqual(icmp.type, icmp_type)
self.assertTrue(icmp.haslayer(IPerror))
for packet in capture:
try:
self.assertEqual(packet[IP].dst, in_if.remote_ip4)
- self.assertTrue(packet.haslayer(ICMP))
+ self.assertEqual(packet.haslayer(ICMP), 1)
icmp = packet[ICMP]
self.assertEqual(icmp.type, icmp_type)
self.assertTrue(icmp.haslayer(IPerror))
:returns: Reassembled IPv4 packet
"""
- buffer = StringIO.StringIO()
+ buffer = BytesIO()
for p in frags:
self.assertEqual(p[IP].src, src)
self.assertEqual(p[IP].dst, dst)
self.assert_ip_checksum_valid(p)
buffer.seek(p[IP].frag * 8)
- buffer.write(p[IP].payload)
+ buffer.write(bytes(p[IP].payload))
ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
proto=frags[0][IP].proto)
if ip.proto == IP_PROTOS.tcp:
:returns: Reassembled IPv6 packet
"""
- buffer = StringIO.StringIO()
+ buffer = BytesIO()
for p in frags:
self.assertEqual(p[IPv6].src, src)
self.assertEqual(p[IPv6].dst, dst)
buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
- buffer.write(p[IPv6ExtHdrFragment].payload)
+ buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
nh=frags[0][IPv6ExtHdrFragment].nh)
if ip.nh == IP_PROTOS.tcp:
# sourceIPv4Address
self.assertEqual(src_addr, record[8])
+ def verify_syslog_apmap(self, data, is_add=True):
+ message = data.decode('utf-8')
+ try:
+ message = SyslogMessage.parse(message)
+ self.assertEqual(message.severity, SyslogSeverity.info)
+ self.assertEqual(message.appname, 'NAT')
+ self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
+ sd_params = message.sd.get('napmap')
+ self.assertTrue(sd_params is not None)
+ self.assertEqual(sd_params.get('IATYP'), 'IPv4')
+ self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
+ self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
+ self.assertEqual(sd_params.get('XATYP'), 'IPv4')
+ self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
+ self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
+ self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
+ self.assertTrue(sd_params.get('SSUBIX') is not None)
+ self.assertEqual(sd_params.get('SVLAN'), '0')
+ except ParseError as e:
+ self.logger.error(e)
+
+ def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
+ message = data.decode('utf-8')
+ try:
+ message = SyslogMessage.parse(message)
+ self.assertEqual(message.severity, SyslogSeverity.info)
+ self.assertEqual(message.appname, 'NAT')
+ self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
+ sd_params = message.sd.get('nsess')
+ self.assertTrue(sd_params is not None)
+ if is_ip6:
+ self.assertEqual(sd_params.get('IATYP'), 'IPv6')
+ self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
+ else:
+ self.assertEqual(sd_params.get('IATYP'), 'IPv4')
+ self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
+ self.assertTrue(sd_params.get('SSUBIX') is not None)
+ self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
+ self.assertEqual(sd_params.get('XATYP'), 'IPv4')
+ self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
+ self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
+ self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
+ self.assertEqual(sd_params.get('SVLAN'), '0')
+ self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
+ self.assertEqual(sd_params.get('XDPORT'),
+ "%d" % self.tcp_external_port)
+ except ParseError as e:
+ self.logger.error(e)
+
def verify_mss_value(self, pkt, mss):
"""
Verify TCP MSS value
# general user and session dump verifications
users = self.vapi.nat44_user_dump()
- self.assertTrue(len(users) >= 3)
+ self.assertGreaterEqual(len(users), 3)
addresses = self.vapi.nat44_address_dump()
self.assertEqual(len(addresses), 1)
for user in users:
# pg4 session dump
sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
- self.assertTrue(len(sessions) >= 4)
+ self.assertGreaterEqual(len(sessions), 4)
for session in sessions:
self.assertFalse(session.is_static)
self.assertEqual(session.inside_ip_address[0:4],
# pg6 session dump
sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
- self.assertTrue(len(sessions) >= 3)
+ self.assertGreaterEqual(len(sessions), 3)
for session in sessions:
self.assertTrue(session.is_static)
self.assertEqual(session.inside_ip_address[0:4],
data = ipfix.decode_data_set(p.getlayer(Set))
self.verify_ipfix_max_sessions(data, max_sessions)
+ def test_syslog_apmap(self):
+ """ Test syslog address and port mapping creation and deletion """
+ self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_sender(self.pg3.remote_ip4n, self.pg3.local_ip4n)
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=20))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ self.tcp_port_out = capture[0][TCP].sport
+ capture = self.pg3.get_capture(1)
+ self.verify_syslog_apmap(capture[0][Raw].load)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.nat44_add_address(self.nat_addr, is_add=0)
+ capture = self.pg3.get_capture(1)
+ self.verify_syslog_apmap(capture[0][Raw].load, False)
+
def test_pool_addr_fib(self):
""" NAT44 add pool addresses to FIB """
static_addr = '10.0.0.10'
try:
self.assertEqual(packet[IP].src, nat_ip)
self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
try:
self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
try:
self.assertEqual(packet[IP].src, host_nat_ip)
self.assertEqual(packet[IP].dst, server.ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
try:
self.assertEqual(packet[IP].src, server_nat_ip)
self.assertEqual(packet[IP].dst, host.ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
server1_n += 1
else:
server2_n += 1
- self.assertTrue(server1_n > server2_n)
+ self.assertGreater(server1_n, server2_n)
def test_static_lb_2(self):
""" NAT44 local service load balancing (asymmetrical rule) """
try:
self.assertEqual(packet[IP].src, self.nat_addr)
self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
try:
self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
try:
self.assertEqual(packet[IP].src, self.nat_addr)
self.assertEqual(packet[IP].dst, server.ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
try:
self.assertEqual(packet[IP].src, server_nat_ip)
self.assertEqual(packet[IP].dst, host.ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
pkts = []
for i in range(0, max_sessions):
- src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
+ src = "10.11.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=src, dst=self.pg1.remote_ip4) /
ICMP(id=1026, type='echo-request'))
is_inside=0)
self.vapi.nat_set_timeouts(tcp_transitory=5)
- nat44_config = self.vapi.nat_show_config()
-
self.initiate_tcp_session(self.pg0, self.pg1)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
self.pg_start()
self.pg1.get_capture(1)
- pkts_num = nat44_config.max_translations_per_user - 1
- pkts = []
- for i in range(0, pkts_num):
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- UDP(sport=1025 + i, dport=53))
- pkts.append(p)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(pkts_num)
-
sleep(6)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
users = self.vapi.nat44_user_dump()
self.assertEqual(len(users), 1)
self.assertEqual(users[0].ip_address, self.pg0.remote_ip4n)
- self.assertEqual(users[0].nsessions,
- nat44_config.max_translations_per_user)
+ self.assertEqual(users[0].nsessions, 1)
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_session_limit_per_user(self):
self.pg_start()
self.pg1.get_capture(1)
+ def test_syslog_sess(self):
+ """ Test syslog session creation and deletion """
+ self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_sender(self.pg2.remote_ip4n, self.pg2.local_ip4n)
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ self.tcp_port_out = capture[0][TCP].sport
+ capture = self.pg2.get_capture(1)
+ self.verify_syslog_sess(capture[0][Raw].load)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.nat44_add_address(self.nat_addr, is_add=0)
+ capture = self.pg2.get_capture(1)
+ self.verify_syslog_sess(capture[0][Raw].load, False)
+
def tearDown(self):
super(TestNAT44EndpointDependent, self).tearDown()
if not self.vpp_dead:
cls.udp_port_out = 6304
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
+ cls.tcp_external_port = 80
cls.nat_addr = '10.0.0.3'
cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
cls.vrf1_id = 10
packet = capture[0]
try:
self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
- self.assertTrue(packet.haslayer(ICMPv6ND_NS))
+ self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
tgt = packet[ICMPv6ND_NS].tgt
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
try:
self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
- self.assertTrue(packet.haslayer(ICMPv6EchoReply))
+ self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
try:
self.assertEqual(packet[IP].src, self.nat_addr)
self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
- self.assertTrue(packet.haslayer(GRE))
+ self.assertEqual(packet.haslayer(GRE), 1)
self.assert_packet_checksums_valid(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
else:
self.logger.error(ppp("Unexpected or invalid packet: ", p))
+ def test_syslog_sess(self):
+ """ Test syslog session creation and deletion """
+ self.tcp_port_in = random.randint(1025, 65535)
+ remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
+ '64:ff9b::',
+ 96)
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_sender(self.pg3.remote_ip4n, self.pg3.local_ip4n)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg1.get_capture(1)
+ self.tcp_port_out = p[0][TCP].sport
+ capture = self.pg3.get_capture(1)
+ self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n,
+ is_add=0)
+ capture = self.pg3.get_capture(1)
+ self.verify_syslog_sess(capture[0][Raw].load, False, True)
+
def nat64_get_ses_num(self):
"""
Return number of active NAT64 sessions.
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
+ self.vapi.syslog_set_filter(SYSLOG_SEVERITY.EMERG)
+
self.vapi.nat_set_timeouts()
interfaces = self.vapi.nat64_interface_dump()
cls.nat_addr = '10.0.0.3'
cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
- cls.create_pg_interfaces(range(2))
+ cls.create_pg_interfaces(range(3))
cls.pg0.admin_up()
cls.pg0.config_ip4()
cls.pg0.resolve_arp()
cls.pg1.config_ip6()
cls.pg1.generate_remote_hosts(2)
cls.pg1.configure_ipv6_neighbors()
+ cls.pg2.admin_up()
+ cls.pg2.config_ip4()
+ cls.pg2.resolve_arp()
except Exception:
super(TestDSlite, cls).tearDownClass()
raise
+ def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
+ sv6enc, proto):
+ message = data.decode('utf-8')
+ try:
+ message = SyslogMessage.parse(message)
+ self.assertEqual(message.severity, SyslogSeverity.info)
+ self.assertEqual(message.appname, 'NAT')
+ self.assertEqual(message.msgid, 'APMADD')
+ sd_params = message.sd.get('napmap')
+ self.assertTrue(sd_params is not None)
+ self.assertEqual(sd_params.get('IATYP'), 'IPv4')
+ self.assertEqual(sd_params.get('ISADDR'), isaddr)
+ self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
+ self.assertEqual(sd_params.get('XATYP'), 'IPv4')
+ self.assertEqual(sd_params.get('XSADDR'), xsaddr)
+ self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
+ self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
+ self.assertTrue(sd_params.get('SSUBIX') is not None)
+ self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
+ except ParseError as e:
+ self.logger.error(e)
+
def test_dslite(self):
""" Test DS-Lite """
nat_config = self.vapi.nat_show_config()
aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
+ self.vapi.syslog_set_sender(self.pg2.remote_ip4n, self.pg2.local_ip4n)
# UDP
p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
self.assertEqual(capture[UDP].dport, 10000)
self.assert_packet_checksums_valid(capture)
out_port = capture[UDP].sport
+ capture = self.pg2.get_capture(1)
+ self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
+ 20000, self.nat_addr, out_port,
+ self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /