#!/usr/bin/env python3
+import ipaddress
+import random
import socket
-import unittest
import struct
-import random
-
-from framework import VppTestCase, VppTestRunner, running_extended_tests
+import unittest
+from io import BytesIO
+from time import sleep
import scapy.compat
+from framework import VppTestCase, VppTestRunner, running_extended_tests
+from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
+from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
+ IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
+ PacketListField
+from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
-from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
from scapy.layers.l2 import Ether, ARP, GRE
-from scapy.data import IP_PROTOS
-from scapy.packet import bind_layers, Raw
-from util import ppp
-from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
-from time import sleep
-from util import ip4_range
-from vpp_papi import mac_pton
+from scapy.packet import Raw
from syslog_rfc5424_parser import SyslogMessage, ParseError
-from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
-from io import BytesIO
-from vpp_papi import VppEnum
-from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
-from vpp_neighbor import VppNeighbor
-from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
- IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
- PacketListField
-from ipaddress import IPv6Network
+from syslog_rfc5424_parser.constants import SyslogSeverity
+from util import ip4_range
from util import ppc, ppp
-from socket import inet_pton, AF_INET
from vpp_acl import AclRule, VppAcl, VppAclInterface
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_neighbor import VppNeighbor
+from vpp_papi import VppEnum
# NAT HA protocol event data
else:
nat44_ses_delete_num += 1
# sourceIPv4Address
- self.assertEqual(self.pg0.remote_ip4n, record[8])
+ self.assertEqual(self.pg0.remote_ip4,
+ str(ipaddress.IPv4Address(record[8])))
# postNATSourceIPv4Address
self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
record[225])
# maxBIBEntries
self.assertEqual(struct.pack("I", limit), record[472])
- def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
- """
- Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
-
- :param data: Decoded IPFIX data records
- :param limit: Number of maximum fragments pending reassembly
- :param src_addr: IPv6 source address
- """
- self.assertEqual(1, len(data))
- record = data[0]
- # natEvent
- self.assertEqual(scapy.compat.orb(record[230]), 13)
- # natQuotaExceededEvent
- self.assertEqual(struct.pack("I", 5), record[466])
- # maxFragmentsPendingReassembly
- self.assertEqual(struct.pack("I", limit), record[475])
- # sourceIPv6Address
- self.assertEqual(src_addr, record[27])
-
- def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
- """
- Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
-
- :param data: Decoded IPFIX data records
- :param limit: Number of maximum fragments pending reassembly
- :param src_addr: IPv4 source address
- """
- self.assertEqual(1, len(data))
- record = data[0]
- # natEvent
- self.assertEqual(scapy.compat.orb(record[230]), 13)
- # natQuotaExceededEvent
- self.assertEqual(struct.pack("I", 5), record[466])
- # maxFragmentsPendingReassembly
- self.assertEqual(struct.pack("I", limit), record[475])
- # sourceIPv4Address
- self.assertEqual(src_addr, record[8])
-
def verify_ipfix_bib(self, data, is_create, src_addr):
"""
Verify IPFIX NAT64 BIB create and delete events
else:
self.assertEqual(scapy.compat.orb(record[230]), 11)
# sourceIPv6Address
- self.assertEqual(src_addr, record[27])
+ self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
# postNATSourceIPv4Address
self.assertEqual(self.nat_addr_n, record[225])
# protocolIdentifier
else:
self.assertEqual(scapy.compat.orb(record[230]), 7)
# sourceIPv6Address
- self.assertEqual(src_addr, record[27])
+ self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
# destinationIPv6Address
self.assertEqual(socket.inet_pton(socket.AF_INET6,
self.compose_ip6(dst_addr,
def tearDownClass(cls):
super(TestNAT44, cls).tearDownClass()
+ def test_clear_sessions(self):
+ """ NAT44 session clearing test """
+
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(0, nat_config.endpoint_dependent)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertTrue(sessions[0][0] > 0)
+ self.logger.info("sessions before clearing: %s" % sessions[0][0])
+
+ self.vapi.cli("clear nat44 sessions")
+
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 0)
+ self.logger.info("sessions after clearing: %s" % sessions[0][0])
+
def test_dynamic(self):
""" NAT44 dynamic translation test """
self.nat44_add_address(self.nat_addr)
self.verify_capture_out(capture)
self.nat44_add_address(self.nat_addr, is_add=0)
self.vapi.ipfix_flush()
- capture = self.pg3.get_capture(9)
+ capture = self.pg3.get_capture(7)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
self.pg1.assert_nothing_captured()
sleep(1)
self.vapi.ipfix_flush()
- capture = self.pg3.get_capture(9)
+ capture = self.pg3.get_capture(7)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
self.pg1.assert_nothing_captured()
sleep(1)
self.vapi.ipfix_flush()
- capture = self.pg3.get_capture(9)
+ capture = self.pg3.get_capture(7)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
self.logger.info(self.vapi.cli("show nat ha"))
+class TestNAT44EndpointDependent2(MethodHolder):
+ """ Endpoint-Dependent mapping and filtering test cases """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNAT44EndpointDependent2, cls).tearDownClass()
+
+ def tearDown(self):
+ super(TestNAT44EndpointDependent2, self).tearDown()
+
+ @classmethod
+ def create_and_add_ip4_table(cls, i, table_id):
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': table_id})
+ i.set_table_ip4(table_id)
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpClass()
+
+ cls.create_pg_interfaces(range(3))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ cls.create_and_add_ip4_table(cls.pg1, 10)
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ i.generate_remote_hosts(1)
+ i.configure_ipv4_neighbors()
+
+ def setUp(self):
+ super(TestNAT44EndpointDependent2, self).setUp()
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(1, nat_config.endpoint_dependent)
+
+ def nat_add_inside_interface(self, i):
+ self.vapi.nat44_interface_add_del_feature(
+ flags=self.config_flags.NAT_IS_INSIDE,
+ sw_if_index=i.sw_if_index, is_add=1)
+
+ def nat_add_outside_interface(self, i):
+ self.vapi.nat44_interface_add_del_feature(
+ flags=self.config_flags.NAT_IS_OUTSIDE,
+ sw_if_index=i.sw_if_index, is_add=1)
+
+ def nat_add_interface_address(self, i):
+ self.nat_addr = i.local_ip4
+ self.vapi.nat44_add_del_interface_addr(
+ sw_if_index=i.sw_if_index, is_add=1)
+
+ def nat_add_address(self, address, vrf_id=0xFFFFFFFF):
+ self.nat_addr = address
+ self.nat44_add_address(address, vrf_id=vrf_id)
+
+ def cli(self, command):
+ result = self.vapi.cli(command)
+ self.logger.info(result)
+ # print(result)
+
+ def show_configuration(self):
+ self.cli("show interface")
+ self.cli("show interface address")
+ self.cli("show nat44 addresses")
+ self.cli("show nat44 interfaces")
+
+ def create_tcp_stream(self, in_if, out_if, count):
+ """
+ Create tcp packet stream
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ :param count: count of packets to generate
+ """
+ pkts = []
+ port = 6303
+
+ for i in range(count):
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) /
+ TCP(sport=port + i, dport=20))
+ pkts.append(p)
+
+ return pkts
+
+ def test_session_limit_per_vrf(self):
+
+ inside = self.pg0
+ inside_vrf10 = self.pg1
+ outside = self.pg2
+
+ limit = 5
+
+ # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
+ # non existing vrf_id makes process core dump
+ self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
+
+ self.nat_add_inside_interface(inside)
+ self.nat_add_inside_interface(inside_vrf10)
+ self.nat_add_outside_interface(outside)
+
+ # vrf independent
+ self.nat_add_interface_address(outside)
+
+ # BUG: causing core dump - when bad vrf_id is specified
+ # self.nat44_add_address(outside.local_ip4, vrf_id=20)
+
+ self.show_configuration()
+
+ stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
+ inside_vrf10.add_stream(stream)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ capture = outside.get_capture(limit)
+
+ stream = self.create_tcp_stream(inside, outside, limit * 2)
+ inside.add_stream(stream)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ capture = outside.get_capture(len(stream))
+
+
class TestNAT44EndpointDependent(MethodHolder):
""" Endpoint-Dependent mapping and filtering test cases """
self.reass_hairpinning(proto=IP_PROTOS.udp)
self.reass_hairpinning(proto=IP_PROTOS.icmp)
+ def test_clear_sessions(self):
+ """ NAT44 ED session clearing test """
+
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(1, nat_config.endpoint_dependent)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertTrue(sessions[0][0] > 0)
+ self.logger.info("sessions before clearing: %s" % sessions[0][0])
+
+ # just for testing purposes
+ self.logger.info(self.vapi.cli("show nat44 summary"))
+
+ self.vapi.cli("clear nat44 sessions")
+
+ self.logger.info(self.vapi.cli("show nat44 summary"))
+
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 0)
+ self.logger.info("sessions after clearing: %s" % sessions[0][0])
+
def test_dynamic(self):
""" NAT44 dynamic translation test """
'/err/nat44-ed-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 3)
- users = self.statistics.get_counter('/nat44/total-users')
- self.assertEqual(users[0][0], 1)
sessions = self.statistics.get_counter('/nat44/total-sessions')
self.assertEqual(sessions[0][0], 3)
'/err/nat44-ed-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 3)
- users = self.statistics.get_counter('/nat44/total-users')
- self.assertEqual(users[0][0], 1)
sessions = self.statistics.get_counter('/nat44/total-sessions')
self.assertEqual(sessions[0][0], 3)
self.pg_start()
self.pg1.get_capture(1)
- nsessions = 0
- users = self.vapi.nat44_user_dump()
- self.assertEqual(len(users), 1)
- self.assertEqual(str(users[0].ip_address),
- self.pg0.remote_ip4)
- self.assertEqual(users[0].nsessions, 1)
-
def test_syslog_sess(self):
""" Test syslog session creation and deletion """
self.vapi.syslog_set_filter(
self.vapi.nat64_add_del_interface(is_add=1, flags=0,
sw_if_index=self.pg1.sw_if_index)
- self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6n,
+ self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
o_addr=self.nat_addr,
i_port=self.tcp_port_in,
o_port=self.tcp_port_out,
proto=IP_PROTOS.tcp, vrf_id=0,
is_add=1)
- self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6n,
+ self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
o_addr=self.nat_addr,
i_port=self.udp_port_in,
o_port=self.udp_port_out,
proto=IP_PROTOS.udp, vrf_id=0,
is_add=1)
- self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6n,
+ self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
o_addr=self.nat_addr,
i_port=self.icmp_id_in,
o_port=self.icmp_id_out,
self.pg1.assert_nothing_captured()
sleep(1)
self.vapi.ipfix_flush()
- capture = self.pg3.get_capture(9)
+ capture = self.pg3.get_capture(7)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
p = self.pg1.get_capture(1)
self.tcp_port_out = p[0][TCP].sport
self.vapi.ipfix_flush()
- capture = self.pg3.get_capture(10)
+ capture = self.pg3.get_capture(8)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
if scapy.compat.orb(data[0][230]) == 10:
- self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
+ self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
elif scapy.compat.orb(data[0][230]) == 6:
self.verify_ipfix_nat64_ses(data,
1,
- self.pg0.remote_ip6n,
+ self.pg0.remote_ip6,
self.pg1.remote_ip4,
25)
else:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
if scapy.compat.orb(data[0][230]) == 11:
- self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
+ self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
elif scapy.compat.orb(data[0][230]) == 7:
self.verify_ipfix_nat64_ses(data,
0,
- self.pg0.remote_ip6n,
+ self.pg0.remote_ip6,
self.pg1.remote_ip4,
25)
else:
self.vapi.nat66_add_del_interface(is_add=1,
sw_if_index=self.pg1.sw_if_index)
self.vapi.nat66_add_del_static_mapping(
- local_ip_address=self.pg0.remote_ip6n,
+ local_ip_address=self.pg0.remote_ip6,
external_ip_address=self.nat_addr,
is_add=1)
self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
sw_if_index=self.pg1.sw_if_index)
self.vapi.nat66_add_del_static_mapping(
- local_ip_address=self.pg0.remote_ip6n,
+ local_ip_address=self.pg0.remote_ip6,
external_ip_address=self.nat_addr,
is_add=1)