tests: make tests less make dependent
[vpp.git] / test / test_nat66.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import VppTestCase, VppTestRunner
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
14     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
15     PacketListField
16 from scapy.data import IP_PROTOS
17 from scapy.layers.inet import IP, TCP, UDP, ICMP
18 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
19 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
20 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
21     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
22 from scapy.layers.l2 import Ether, ARP, GRE
23 from scapy.packet import Raw
24 from syslog_rfc5424_parser import SyslogMessage, ParseError
25 from syslog_rfc5424_parser.constants import SyslogSeverity
26 from util import ip4_range
27 from util import ppc, ppp
28 from vpp_acl import AclRule, VppAcl, VppAclInterface
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_neighbor import VppNeighbor
31 from vpp_papi import VppEnum
32
33
34 class TestNAT66(VppTestCase):
35     """ NAT66 Test Cases """
36
37     @classmethod
38     def setUpClass(cls):
39         super(TestNAT66, cls).setUpClass()
40
41         cls.nat_addr = 'fd01:ff::2'
42         cls.create_pg_interfaces(range(2))
43         cls.interfaces = list(cls.pg_interfaces)
44
45         for i in cls.interfaces:
46             i.admin_up()
47             i.config_ip6()
48             i.configure_ipv6_neighbors()
49
50     @property
51     def config_flags(self):
52         return VppEnum.vl_api_nat_config_flags_t
53
54     def plugin_enable(self):
55         self.vapi.nat66_plugin_enable_disable(enable=1)
56
57     def plugin_disable(self):
58         self.vapi.nat66_plugin_enable_disable(enable=0)
59
60     def setUp(self):
61         super(TestNAT66, self).setUp()
62         self.plugin_enable()
63
64     def tearDown(self):
65         super(TestNAT66, self).tearDown()
66         if not self.vpp_dead:
67             self.plugin_disable()
68
69     def test_static(self):
70         """ 1:1 NAT66 test """
71         flags = self.config_flags.NAT_IS_INSIDE
72         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
73                                           sw_if_index=self.pg0.sw_if_index)
74         self.vapi.nat66_add_del_interface(is_add=1,
75                                           sw_if_index=self.pg1.sw_if_index)
76         self.vapi.nat66_add_del_static_mapping(
77             local_ip_address=self.pg0.remote_ip6,
78             external_ip_address=self.nat_addr,
79             is_add=1)
80
81         # in2out
82         pkts = []
83         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
84              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
85              TCP())
86         pkts.append(p)
87         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
88              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
89              UDP())
90         pkts.append(p)
91         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
92              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
93              ICMPv6EchoRequest())
94         pkts.append(p)
95         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
96              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
97              GRE() / IP() / TCP())
98         pkts.append(p)
99         self.pg0.add_stream(pkts)
100         self.pg_enable_capture(self.pg_interfaces)
101         self.pg_start()
102         capture = self.pg1.get_capture(len(pkts))
103
104         for packet in capture:
105             try:
106                 self.assertEqual(packet[IPv6].src, self.nat_addr)
107                 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
108                 self.assert_packet_checksums_valid(packet)
109             except:
110                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
111                 raise
112
113         # out2in
114         pkts = []
115         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
116              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
117              TCP())
118         pkts.append(p)
119         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
120              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
121              UDP())
122         pkts.append(p)
123         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
124              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
125              ICMPv6EchoReply())
126         pkts.append(p)
127         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
128              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
129              GRE() / IP() / TCP())
130         pkts.append(p)
131         self.pg1.add_stream(pkts)
132         self.pg_enable_capture(self.pg_interfaces)
133         self.pg_start()
134         capture = self.pg0.get_capture(len(pkts))
135         for packet in capture:
136             try:
137                 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
138                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
139                 self.assert_packet_checksums_valid(packet)
140             except:
141                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
142                 raise
143
144         sm = self.vapi.nat66_static_mapping_dump()
145         self.assertEqual(len(sm), 1)
146         self.assertEqual(sm[0].total_pkts, 8)
147
148     def test_check_no_translate(self):
149         """ NAT66 translate only when egress interface is outside interface """
150         flags = self.config_flags.NAT_IS_INSIDE
151         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
152                                           sw_if_index=self.pg0.sw_if_index)
153         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
154                                           sw_if_index=self.pg1.sw_if_index)
155         self.vapi.nat66_add_del_static_mapping(
156             local_ip_address=self.pg0.remote_ip6,
157             external_ip_address=self.nat_addr,
158             is_add=1)
159
160         # in2out
161         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
162              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
163              UDP())
164         self.pg0.add_stream([p])
165         self.pg_enable_capture(self.pg_interfaces)
166         self.pg_start()
167         capture = self.pg1.get_capture(1)
168         packet = capture[0]
169         try:
170             self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
171             self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
172         except:
173             self.logger.error(ppp("Unexpected or invalid packet:", packet))
174             raise
175
176
177 if __name__ == '__main__':
178     unittest.main(testRunner=VppTestRunner)