nat: 1:1 policy NAT
[vpp.git] / src / plugins / nat / test / test_pnat.py
1 #!/usr/bin/env python3
2 """Policy 1:1 NAT functional tests"""
3
4 import unittest
5 from scapy.layers.inet import Ether, IP, UDP, ICMP
6 from framework import VppTestCase, VppTestRunner
7 from vpp_papi import VppEnum
8
9
10 class TestPNAT(VppTestCase):
11     """ PNAT Test Case """
12     maxDiff = None
13
14     @classmethod
15     def setUpClass(cls):
16         super(TestPNAT, cls).setUpClass()
17         cls.create_pg_interfaces(range(2))
18         cls.interfaces = list(cls.pg_interfaces)
19
20     @classmethod
21     def tearDownClass(cls):
22         super(TestPNAT, cls).tearDownClass()
23
24     def setUp(self):
25         super(TestPNAT, self).setUp()
26         for i in self.interfaces:
27             i.admin_up()
28             i.config_ip4()
29             i.resolve_arp()
30
31     def tearDown(self):
32         super(TestPNAT, self).tearDown()
33         if not self.vpp_dead:
34             for i in self.pg_interfaces:
35                 i.unconfig_ip4()
36                 i.admin_down()
37
38     def validate(self, rx, expected):
39         self.assertEqual(rx, expected.__class__(expected))
40
41     def validate_bytes(self, rx, expected):
42         self.assertEqual(rx, expected)
43
44     def ping_check(self):
45         """ Verify non matching traffic works. """
46         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
47
48         icmpecho = (IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
49                     ICMP())
50         reply = (IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) /
51                  ICMP(type='echo-reply'))
52         rx = self.send_and_expect(self.pg0, p_ether/icmpecho * 1, self.pg0)
53         for p in rx:
54             reply[IP].id = p[IP].id
55             self.validate(p[1], reply)
56
57     def test_pnat(self):
58         """ PNAT test """
59
60         PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
61         PNAT_IP4_OUTPUT = \
62             VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
63
64         tests = [
65             {
66                 'input': PNAT_IP4_INPUT,
67                 'sw_if_index': self.pg0.sw_if_index,
68                 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
69                           'dport': 6871},
70                 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4},
71                 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
72                          UDP(dport=6871)),
73                 'reply': (IP(src=self.pg0.remote_ip4,
74                              dst=self.pg1.remote_ip4) /
75                           UDP(dport=6871))
76             },
77             {
78                 'input': PNAT_IP4_OUTPUT,
79                 'sw_if_index': self.pg1.sw_if_index,
80                 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17,
81                           'dport': 6871},
82                 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'},
83                 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
84                          UDP(dport=6871)),
85                 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) /
86                           UDP(dport=6871))
87             },
88             {
89                 'input': PNAT_IP4_INPUT,
90                 'sw_if_index': self.pg0.sw_if_index,
91                 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
92                           'dport': 6871},
93                 'rewrite': {'mask': 0xa, 'dst': self.pg1.remote_ip4,
94                             'dport': 5555},
95                 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
96                          UDP(dport=6871)),
97                 'reply': (IP(src=self.pg0.remote_ip4,
98                              dst=self.pg1.remote_ip4) /
99                           UDP(dport=5555))
100             },
101             {
102                 'input': PNAT_IP4_INPUT,
103                 'sw_if_index': self.pg0.sw_if_index,
104                 'match': {'mask': 0xa, 'dst': self.pg1.remote_ip4, 'proto': 17,
105                           'dport': 6871},
106                 'rewrite': {'mask': 0x8, 'dport': 5555},
107                 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
108                          UDP(dport=6871, chksum=0)),
109                 'reply': (IP(src=self.pg0.remote_ip4,
110                              dst=self.pg1.remote_ip4) /
111                           UDP(dport=5555, chksum=0))
112             },
113             {
114                 'input': PNAT_IP4_INPUT,
115                 'sw_if_index': self.pg0.sw_if_index,
116                 'match': {'mask': 0x2, 'dst': self.pg1.remote_ip4, 'proto': 1},
117                 'rewrite': {'mask': 0x1, 'src': '8.8.8.8'},
118                 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
119                          ICMP()),
120                 'reply': IP(src='8.8.8.8', dst=self.pg1.remote_ip4)/ICMP(),
121             },
122         ]
123
124         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
125         for t in tests:
126             rv = self.vapi.pnat_binding_add(match=t['match'],
127                                             rewrite=t['rewrite'])
128             self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
129                                           attachment=t['input'],
130                                           binding_index=rv.binding_index)
131
132             reply = t['reply']
133             reply[IP].ttl -= 1
134             rx = self.send_and_expect(self.pg0, p_ether/t['send']*1, self.pg1)
135             for p in rx:
136                 # p.show2()
137                 self.validate(p[1], reply)
138
139             self.ping_check()
140
141             self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
142                                           attachment=t['input'],
143                                           binding_index=rv.binding_index)
144             self.vapi.pnat_binding_del(binding_index=rv.binding_index)
145
146     def test_pnat_show(self):
147         """ PNAT show tests """
148
149         PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
150         PNAT_IP4_OUTPUT = \
151             VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
152
153         tests = [
154             {
155                 'input': PNAT_IP4_INPUT,
156                 'sw_if_index': self.pg0.sw_if_index,
157                 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
158                           'dport': 6871},
159                 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4},
160                 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
161                          UDP(dport=6871)),
162                 'reply': (IP(src=self.pg0.remote_ip4,
163                              dst=self.pg1.remote_ip4) /
164                           UDP(dport=6871))
165             },
166             {
167                 'input': PNAT_IP4_OUTPUT,
168                 'sw_if_index': self.pg1.sw_if_index,
169                 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17,
170                           'dport': 6871},
171                 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'},
172                 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
173                          UDP(dport=6871)),
174                 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) /
175                           UDP(dport=6871))
176             },
177         ]
178         binding_index = []
179         for t in tests:
180             rv = self.vapi.pnat_binding_add(match=t['match'],
181                                             rewrite=t['rewrite'])
182             binding_index.append(rv.binding_index)
183             self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
184                                           attachment=t['input'],
185                                           binding_index=rv.binding_index)
186
187         rv, l = self.vapi.pnat_bindings_get()
188         self.assertEqual(len(l), len(tests))
189
190         rv, l = self.vapi.pnat_interfaces_get()
191         self.assertEqual(len(l), 2)
192
193         self.logger.info(self.vapi.cli("show pnat translations"))
194         self.logger.info(self.vapi.cli("show pnat interfaces"))
195
196         for i, t in enumerate(tests):
197             self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
198                                           attachment=t['input'],
199                                           binding_index=binding_index[i])
200             self.vapi.pnat_binding_del(binding_index=binding_index[i])
201
202 if __name__ == '__main__':
203     unittest.main(testRunner=VppTestRunner)