api: API trace improvements
[vpp.git] / test / test_nat66.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9 from time import sleep
10
11 import scapy.compat
12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
16     PacketListField
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
21 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
22     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
23 from scapy.layers.l2 import Ether, ARP, GRE
24 from scapy.packet import Raw
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogSeverity
27 from util import ip4_range
28 from util import ppc, ppp
29 from vpp_acl import AclRule, VppAcl, VppAclInterface
30 from vpp_ip_route import VppIpRoute, VppRoutePath
31 from vpp_neighbor import VppNeighbor
32 from vpp_papi import VppEnum
33
34
35 class TestNAT66(VppTestCase):
36     """ NAT66 Test Cases """
37
38     @classmethod
39     def setUpClass(cls):
40         super(TestNAT66, cls).setUpClass()
41
42         cls.nat_addr = 'fd01:ff::2'
43         cls.create_pg_interfaces(range(2))
44         cls.interfaces = list(cls.pg_interfaces)
45
46         for i in cls.interfaces:
47             i.admin_up()
48             i.config_ip6()
49             i.configure_ipv6_neighbors()
50
51     @property
52     def config_flags(self):
53         return VppEnum.vl_api_nat_config_flags_t
54
55     def plugin_enable(self):
56         self.vapi.nat66_plugin_enable_disable(enable=1)
57
58     def plugin_disable(self):
59         self.vapi.nat66_plugin_enable_disable(enable=0)
60
61     def setUp(self):
62         super(TestNAT66, self).setUp()
63         self.plugin_enable()
64
65     def tearDown(self):
66         super(TestNAT66, self).tearDown()
67         if not self.vpp_dead:
68             self.plugin_disable()
69
70     def test_static(self):
71         """ 1:1 NAT66 test """
72         flags = self.config_flags.NAT_IS_INSIDE
73         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
74                                           sw_if_index=self.pg0.sw_if_index)
75         self.vapi.nat66_add_del_interface(is_add=1,
76                                           sw_if_index=self.pg1.sw_if_index)
77         self.vapi.nat66_add_del_static_mapping(
78             local_ip_address=self.pg0.remote_ip6,
79             external_ip_address=self.nat_addr,
80             is_add=1)
81
82         # in2out
83         pkts = []
84         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
85              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
86              TCP())
87         pkts.append(p)
88         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
89              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
90              UDP())
91         pkts.append(p)
92         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
93              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
94              ICMPv6EchoRequest())
95         pkts.append(p)
96         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
97              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
98              GRE() / IP() / TCP())
99         pkts.append(p)
100         self.pg0.add_stream(pkts)
101         self.pg_enable_capture(self.pg_interfaces)
102         self.pg_start()
103         capture = self.pg1.get_capture(len(pkts))
104
105         for packet in capture:
106             try:
107                 self.assertEqual(packet[IPv6].src, self.nat_addr)
108                 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
109                 self.assert_packet_checksums_valid(packet)
110             except:
111                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
112                 raise
113
114         # out2in
115         pkts = []
116         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
117              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
118              TCP())
119         pkts.append(p)
120         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
121              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
122              UDP())
123         pkts.append(p)
124         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
125              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
126              ICMPv6EchoReply())
127         pkts.append(p)
128         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
129              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
130              GRE() / IP() / TCP())
131         pkts.append(p)
132         self.pg1.add_stream(pkts)
133         self.pg_enable_capture(self.pg_interfaces)
134         self.pg_start()
135         capture = self.pg0.get_capture(len(pkts))
136         for packet in capture:
137             try:
138                 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
139                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
140                 self.assert_packet_checksums_valid(packet)
141             except:
142                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
143                 raise
144
145         sm = self.vapi.nat66_static_mapping_dump()
146         self.assertEqual(len(sm), 1)
147         self.assertEqual(sm[0].total_pkts, 8)
148
149     def test_check_no_translate(self):
150         """ NAT66 translate only when egress interface is outside interface """
151         flags = self.config_flags.NAT_IS_INSIDE
152         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
153                                           sw_if_index=self.pg0.sw_if_index)
154         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
155                                           sw_if_index=self.pg1.sw_if_index)
156         self.vapi.nat66_add_del_static_mapping(
157             local_ip_address=self.pg0.remote_ip6,
158             external_ip_address=self.nat_addr,
159             is_add=1)
160
161         # in2out
162         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
163              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
164              UDP())
165         self.pg0.add_stream([p])
166         self.pg_enable_capture(self.pg_interfaces)
167         self.pg_start()
168         capture = self.pg1.get_capture(1)
169         packet = capture[0]
170         try:
171             self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
172             self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
173         except:
174             self.logger.error(ppp("Unexpected or invalid packet:", packet))
175             raise
176
177
178 if __name__ == '__main__':
179     unittest.main(testRunner=VppTestRunner)