import scapy.compat
from framework import VppTestCase, VppTestRunner
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
-from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
- IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
- PacketListField
+from scapy.all import (
+ bind_layers,
+ Packet,
+ ByteEnumField,
+ ShortField,
+ IPField,
+ IntField,
+ LongField,
+ XByteField,
+ FlagsField,
+ FieldLenField,
+ PacketListField,
+)
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
- ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
+from scapy.layers.inet6 import (
+ IPv6,
+ ICMPv6EchoRequest,
+ ICMPv6EchoReply,
+ ICMPv6ND_NS,
+ ICMPv6ND_NA,
+ ICMPv6NDOptDstLLAddr,
+ fragment6,
+)
from scapy.layers.l2 import Ether, ARP, GRE
from scapy.packet import Raw
from syslog_rfc5424_parser import SyslogMessage, ParseError
class TestNAT66(VppTestCase):
- """ NAT66 Test Cases """
+ """NAT66 Test Cases"""
@classmethod
def setUpClass(cls):
super(TestNAT66, cls).setUpClass()
- cls.nat_addr = 'fd01:ff::2'
+ cls.nat_addr = "fd01:ff::2"
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
self.plugin_disable()
def test_static(self):
- """ 1:1 NAT66 test """
+ """1:1 NAT66 test"""
flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
- sw_if_index=self.pg0.sw_if_index)
- self.vapi.nat66_add_del_interface(is_add=1,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.nat66_add_del_interface(
+ is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
+ )
+ self.vapi.nat66_add_del_interface(is_add=1, sw_if_index=self.pg1.sw_if_index)
self.vapi.nat66_add_del_static_mapping(
local_ip_address=self.pg0.remote_ip6,
external_ip_address=self.nat_addr,
- is_add=1)
+ is_add=1,
+ )
# in2out
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
- TCP())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
+ / TCP()
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
- UDP())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
+ / UDP()
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
- ICMPv6EchoRequest())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
+ / ICMPv6EchoRequest()
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
- GRE() / IP() / TCP())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
+ / GRE()
+ / IP()
+ / TCP()
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
# out2in
pkts = []
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
- TCP())
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
+ / TCP()
+ )
pkts.append(p)
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
- UDP())
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
+ / UDP()
+ )
pkts.append(p)
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
- ICMPv6EchoReply())
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
+ / ICMPv6EchoReply()
+ )
pkts.append(p)
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
- GRE() / IP() / TCP())
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
+ / GRE()
+ / IP()
+ / TCP()
+ )
pkts.append(p)
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.assertEqual(sm[0].total_pkts, 8)
def test_check_no_translate(self):
- """ NAT66 translate only when egress interface is outside interface """
+ """NAT66 translate only when egress interface is outside interface"""
flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
- sw_if_index=self.pg0.sw_if_index)
- self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.nat66_add_del_interface(
+ is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
+ )
+ self.vapi.nat66_add_del_interface(
+ is_add=1, flags=flags, sw_if_index=self.pg1.sw_if_index
+ )
self.vapi.nat66_add_del_static_mapping(
local_ip_address=self.pg0.remote_ip6,
external_ip_address=self.nat_addr,
- is_add=1)
+ is_add=1,
+ )
# in2out
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
- UDP())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
+ / UDP()
+ )
self.pg0.add_stream([p])
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
raise
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)