make test: FIB add/update/delete - ip4 routes
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 from logging import *
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.layers.inet import IP, TCP, UDP, ICMP
10 from scapy.layers.l2 import Ether
11
12
13 class TestSNAT(VppTestCase):
14     """ SNAT Test Cases """
15
16     @classmethod
17     def setUpClass(cls):
18         super(TestSNAT, cls).setUpClass()
19
20         try:
21             cls.tcp_port_in = 6303
22             cls.tcp_port_out = 6303
23             cls.udp_port_in = 6304
24             cls.udp_port_out = 6304
25             cls.icmp_id_in = 6305
26             cls.icmp_id_out = 6305
27             cls.snat_addr = '10.0.0.3'
28
29             cls.create_pg_interfaces(range(7))
30             cls.interfaces = list(cls.pg_interfaces[0:4])
31
32             for i in cls.interfaces:
33                 i.admin_up()
34                 i.config_ip4()
35                 i.resolve_arp()
36
37             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
38
39             for i in cls.overlapping_interfaces:
40                 i._local_ip4 = "172.16.255.1"
41                 i._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
42                 i._remote_hosts[0]._ip4 = "172.16.255.2"
43                 i.set_table_ip4(i.sw_if_index)
44                 i.config_ip4()
45                 i.admin_up()
46                 i.resolve_arp()
47
48         except Exception:
49             super(TestSNAT, cls).tearDownClass()
50             raise
51
52     def create_stream_in(self, in_if, out_if):
53         """
54         Create packet stream for inside network
55
56         :param in_if: Inside interface
57         :param out_if: Outside interface
58         """
59         pkts = []
60         # TCP
61         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
62              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
63              TCP(sport=self.tcp_port_in))
64         pkts.append(p)
65
66         # UDP
67         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
68              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
69              UDP(sport=self.udp_port_in))
70         pkts.append(p)
71
72         # ICMP
73         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
74              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
75              ICMP(id=self.icmp_id_in, type='echo-request'))
76         pkts.append(p)
77
78         return pkts
79
80     def create_stream_out(self, out_if, dst_ip=None):
81         """
82         Create packet stream for outside network
83
84         :param out_if: Outside interface
85         :param dst_ip: Destination IP address (Default use global SNAT address)
86         """
87         if dst_ip is None:
88              dst_ip=self.snat_addr
89         pkts = []
90         # TCP
91         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
92              IP(src=out_if.remote_ip4, dst=dst_ip) /
93              TCP(dport=self.tcp_port_out))
94         pkts.append(p)
95
96         # UDP
97         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
98              IP(src=out_if.remote_ip4, dst=dst_ip) /
99              UDP(dport=self.udp_port_out))
100         pkts.append(p)
101
102         # ICMP
103         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
104              IP(src=out_if.remote_ip4, dst=dst_ip) /
105              ICMP(id=self.icmp_id_out, type='echo-reply'))
106         pkts.append(p)
107
108         return pkts
109
110     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
111                            packet_num=3):
112         """
113         Verify captured packets on outside network
114
115         :param capture: Captured packets
116         :param nat_ip: Translated IP address (Default use global SNAT address)
117         :param same_port: Sorce port number is not translated (Default False)
118         :param packet_num: Expected number of packets (Default 3)
119         """
120         if nat_ip is None:
121             nat_ip = self.snat_addr
122         self.assertEqual(packet_num, len(capture))
123         for packet in capture:
124             try:
125                 self.assertEqual(packet[IP].src, nat_ip)
126                 if packet.haslayer(TCP):
127                     if same_port:
128                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
129                     else:
130                         self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
131                     self.tcp_port_out = packet[TCP].sport
132                 elif packet.haslayer(UDP):
133                     if same_port:
134                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
135                     else:
136                         self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
137                     self.udp_port_out = packet[UDP].sport
138                 else:
139                     if same_port:
140                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
141                     else:
142                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
143                     self.icmp_id_out = packet[ICMP].id
144             except:
145                 error("Unexpected or invalid packet (outside network):")
146                 error(packet.show())
147                 raise
148
149     def verify_capture_in(self, capture, in_if, packet_num=3):
150         """
151         Verify captured packets on inside network
152
153         :param capture: Captured packets
154         :param in_if: Inside interface
155         :param packet_num: Expected number of packets (Default 3)
156         """
157         self.assertEqual(packet_num, len(capture))
158         for packet in capture:
159             try:
160                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
161                 if packet.haslayer(TCP):
162                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
163                 elif packet.haslayer(UDP):
164                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
165                 else:
166                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
167             except:
168                 error("Unexpected or invalid packet (inside network):")
169                 error(packet.show())
170                 raise
171
172     def clear_snat(self):
173         """
174         Clear SNAT configuration.
175         """
176         interfaces = self.vapi.snat_interface_dump()
177         for intf in interfaces:
178             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
179                                                      intf.is_inside,
180                                                      is_add=0)
181
182         static_mappings = self.vapi.snat_static_mapping_dump()
183         for sm in static_mappings:
184             self.vapi.snat_add_static_mapping(sm.local_ip_address,
185                                               sm.external_ip_address,
186                                               local_port=sm.local_port,
187                                               external_port=sm.external_port,
188                                               addr_only=sm.addr_only,
189                                               vrf_id=sm.vrf_id,
190                                               is_add=0)
191
192         adresses = self.vapi.snat_address_dump()
193         for addr in adresses:
194             self.vapi.snat_add_address_range(addr.ip_address,
195                                              addr.ip_address,
196                                              is_add=0)
197
198     def snat_add_static_mapping(self, local_ip, external_ip, local_port=0,
199                                 external_port=0, vrf_id=0, is_add=1):
200         """
201         Add/delete S-NAT static mapping
202
203         :param local_ip: Local IP address
204         :param external_ip: External IP address
205         :param local_port: Local port number (Optional)
206         :param external_port: External port number (Optional)
207         :param vrf_id: VRF ID (Default 0)
208         :param is_add: 1 if add, 0 if delete (Default add)
209         """
210         addr_only = 1
211         if local_port and external_port:
212             addr_only = 0
213         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
214         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
215         self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port,
216                                           addr_only, vrf_id, is_add)
217
218     def snat_add_address(self, ip, is_add=1):
219         """
220         Add/delete S-NAT address
221
222         :param ip: IP address
223         :param is_add: 1 if add, 0 if delete (Default add)
224         """
225         snat_addr = socket.inet_pton(socket.AF_INET, ip)
226         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
227
228     def test_dynamic(self):
229         """ SNAT dynamic translation test """
230
231         self.snat_add_address(self.snat_addr)
232         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
233         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
234                                                  is_inside=0)
235
236         # in2out
237         pkts = self.create_stream_in(self.pg0, self.pg1)
238         self.pg0.add_stream(pkts)
239         self.pg_enable_capture(self.pg_interfaces)
240         self.pg_start()
241         capture = self.pg1.get_capture()
242         self.verify_capture_out(capture)
243
244         # out2in
245         pkts = self.create_stream_out(self.pg1)
246         self.pg1.add_stream(pkts)
247         self.pg_enable_capture(self.pg_interfaces)
248         self.pg_start()
249         capture = self.pg0.get_capture()
250         self.verify_capture_in(capture, self.pg0)
251
252     def test_static_in(self):
253         """ SNAT 1:1 NAT initialized from inside network """
254
255         nat_ip = "10.0.0.10"
256         self.tcp_port_out = 6303
257         self.udp_port_out = 6304
258         self.icmp_id_out = 6305
259
260         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
261         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
262         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
263                                                  is_inside=0)
264
265         # in2out
266         pkts = self.create_stream_in(self.pg0, self.pg1)
267         self.pg0.add_stream(pkts)
268         self.pg_enable_capture(self.pg_interfaces)
269         self.pg_start()
270         capture = self.pg1.get_capture()
271         self.verify_capture_out(capture, nat_ip, True)
272
273         # out2in
274         pkts = self.create_stream_out(self.pg1, nat_ip)
275         self.pg1.add_stream(pkts)
276         self.pg_enable_capture(self.pg_interfaces)
277         self.pg_start()
278         capture = self.pg0.get_capture()
279         self.verify_capture_in(capture, self.pg0)
280
281     def test_static_out(self):
282         """ SNAT 1:1 NAT initialized from outside network """
283
284         nat_ip = "10.0.0.20"
285         self.tcp_port_out = 6303
286         self.udp_port_out = 6304
287         self.icmp_id_out = 6305
288
289         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
290         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
291         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
292                                                  is_inside=0)
293
294         # out2in
295         pkts = self.create_stream_out(self.pg1, nat_ip)
296         self.pg1.add_stream(pkts)
297         self.pg_enable_capture(self.pg_interfaces)
298         self.pg_start()
299         capture = self.pg0.get_capture()
300         self.verify_capture_in(capture, self.pg0)
301
302         # in2out
303         pkts = self.create_stream_in(self.pg0, self.pg1)
304         self.pg0.add_stream(pkts)
305         self.pg_enable_capture(self.pg_interfaces)
306         self.pg_start()
307         capture = self.pg1.get_capture()
308         self.verify_capture_out(capture, nat_ip, True)
309
310     def test_static_with_port_in(self):
311         """ SNAT 1:1 NAT with port initialized from inside network """
312
313         self.tcp_port_out = 3606
314         self.udp_port_out = 3607
315         self.icmp_id_out = 3608
316
317         self.snat_add_address(self.snat_addr)
318         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
319                                      self.tcp_port_in, self.tcp_port_out)
320         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
321                                      self.udp_port_in, self.udp_port_out)
322         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
323                                      self.icmp_id_in, self.icmp_id_out)
324         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
325         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
326                                                  is_inside=0)
327
328         # in2out
329         pkts = self.create_stream_in(self.pg0, self.pg1)
330         self.pg0.add_stream(pkts)
331         self.pg_enable_capture(self.pg_interfaces)
332         self.pg_start()
333         capture = self.pg1.get_capture()
334         self.verify_capture_out(capture)
335
336         # out2in
337         pkts = self.create_stream_out(self.pg1)
338         self.pg1.add_stream(pkts)
339         self.pg_enable_capture(self.pg_interfaces)
340         self.pg_start()
341         capture = self.pg0.get_capture()
342         self.verify_capture_in(capture, self.pg0)
343
344     def test_static_with_port_out(self):
345         """ SNAT 1:1 NAT with port initialized from outside network """
346
347         self.tcp_port_out = 30606
348         self.udp_port_out = 30607
349         self.icmp_id_out = 30608
350
351         self.snat_add_address(self.snat_addr)
352         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
353                                      self.tcp_port_in, self.tcp_port_out)
354         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
355                                      self.udp_port_in, self.udp_port_out)
356         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
357                                      self.icmp_id_in, self.icmp_id_out)
358         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
359         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
360                                                  is_inside=0)
361
362         # out2in
363         pkts = self.create_stream_out(self.pg1)
364         self.pg1.add_stream(pkts)
365         self.pg_enable_capture(self.pg_interfaces)
366         self.pg_start()
367         capture = self.pg0.get_capture()
368         self.verify_capture_in(capture, self.pg0)
369
370         # in2out
371         pkts = self.create_stream_in(self.pg0, self.pg1)
372         self.pg0.add_stream(pkts)
373         self.pg_enable_capture(self.pg_interfaces)
374         self.pg_start()
375         capture = self.pg1.get_capture()
376         self.verify_capture_out(capture)
377
378     def test_static_vrf_aware(self):
379         """ SNAT 1:1 NAT VRF awareness """
380
381         nat_ip1 = "10.0.0.30"
382         nat_ip2 = "10.0.0.40"
383         self.tcp_port_out = 6303
384         self.udp_port_out = 6304
385         self.icmp_id_out = 6305
386
387         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
388                                      vrf_id=self.pg4.sw_if_index)
389         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
390                                      vrf_id=self.pg4.sw_if_index)
391         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
392                                                  is_inside=0)
393         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
394         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
395
396         # inside interface VRF match SNAT static mapping VRF
397         pkts = self.create_stream_in(self.pg4, self.pg3)
398         self.pg4.add_stream(pkts)
399         self.pg_enable_capture(self.pg_interfaces)
400         self.pg_start()
401         capture = self.pg3.get_capture()
402         self.verify_capture_out(capture, nat_ip1, True)
403
404         # inside interface VRF don't match SNAT static mapping VRF (packets
405         # are dropped)
406         pkts = self.create_stream_in(self.pg0, self.pg3)
407         self.pg0.add_stream(pkts)
408         self.pg_enable_capture(self.pg_interfaces)
409         self.pg_start()
410         capture = self.pg3.get_capture()
411         self.verify_capture_out(capture, packet_num=0)
412
413     def test_multiple_inside_interfaces(self):
414         """ SNAT multiple inside interfaces with non-overlapping address space """
415
416         self.snat_add_address(self.snat_addr)
417         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
418         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
419         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index)
420         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
421                                                  is_inside=0)
422
423         # in2out 1st interface
424         pkts = self.create_stream_in(self.pg0, self.pg3)
425         self.pg0.add_stream(pkts)
426         self.pg_enable_capture(self.pg_interfaces)
427         self.pg_start()
428         capture = self.pg3.get_capture()
429         self.verify_capture_out(capture)
430
431         # out2in 1st interface
432         pkts = self.create_stream_out(self.pg3)
433         self.pg3.add_stream(pkts)
434         self.pg_enable_capture(self.pg_interfaces)
435         self.pg_start()
436         capture = self.pg0.get_capture()
437         self.verify_capture_in(capture, self.pg0)
438
439         # in2out 2nd interface
440         pkts = self.create_stream_in(self.pg1, self.pg3)
441         self.pg1.add_stream(pkts)
442         self.pg_enable_capture(self.pg_interfaces)
443         self.pg_start()
444         capture = self.pg3.get_capture()
445         self.verify_capture_out(capture)
446
447         # out2in 2nd interface
448         pkts = self.create_stream_out(self.pg3)
449         self.pg3.add_stream(pkts)
450         self.pg_enable_capture(self.pg_interfaces)
451         self.pg_start()
452         capture = self.pg1.get_capture()
453         self.verify_capture_in(capture, self.pg1)
454
455         # in2out 3rd interface
456         pkts = self.create_stream_in(self.pg2, self.pg3)
457         self.pg2.add_stream(pkts)
458         self.pg_enable_capture(self.pg_interfaces)
459         self.pg_start()
460         capture = self.pg3.get_capture()
461         self.verify_capture_out(capture)
462
463         # out2in 3rd interface
464         pkts = self.create_stream_out(self.pg3)
465         self.pg3.add_stream(pkts)
466         self.pg_enable_capture(self.pg_interfaces)
467         self.pg_start()
468         capture = self.pg2.get_capture()
469         self.verify_capture_in(capture, self.pg2)
470
471     def test_inside_overlapping_interfaces(self):
472         """ SNAT multiple inside interfaces with overlapping address space """
473
474         self.snat_add_address(self.snat_addr)
475         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
476                                                  is_inside=0)
477         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
478         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
479         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
480
481         # in2out 1st interface
482         pkts = self.create_stream_in(self.pg4, self.pg3)
483         self.pg4.add_stream(pkts)
484         self.pg_enable_capture(self.pg_interfaces)
485         self.pg_start()
486         capture = self.pg3.get_capture()
487         self.verify_capture_out(capture)
488
489         # out2in 1st interface
490         pkts = self.create_stream_out(self.pg3)
491         self.pg3.add_stream(pkts)
492         self.pg_enable_capture(self.pg_interfaces)
493         self.pg_start()
494         capture = self.pg4.get_capture()
495         self.verify_capture_in(capture, self.pg4)
496
497         # in2out 2nd interface
498         pkts = self.create_stream_in(self.pg5, self.pg3)
499         self.pg5.add_stream(pkts)
500         self.pg_enable_capture(self.pg_interfaces)
501         self.pg_start()
502         capture = self.pg3.get_capture()
503         self.verify_capture_out(capture)
504
505         # out2in 2nd interface
506         pkts = self.create_stream_out(self.pg3)
507         self.pg3.add_stream(pkts)
508         self.pg_enable_capture(self.pg_interfaces)
509         self.pg_start()
510         capture = self.pg5.get_capture()
511         self.verify_capture_in(capture, self.pg5)
512
513         # in2out 3rd interface
514         pkts = self.create_stream_in(self.pg6, self.pg3)
515         self.pg6.add_stream(pkts)
516         self.pg_enable_capture(self.pg_interfaces)
517         self.pg_start()
518         capture = self.pg3.get_capture()
519         self.verify_capture_out(capture)
520
521         # out2in 3rd interface
522         pkts = self.create_stream_out(self.pg3)
523         self.pg3.add_stream(pkts)
524         self.pg_enable_capture(self.pg_interfaces)
525         self.pg_start()
526         capture = self.pg6.get_capture()
527         self.verify_capture_in(capture, self.pg6)
528
529     def tearDown(self):
530         super(TestSNAT, self).tearDown()
531         if not self.vpp_dead:
532             self.logger.info(self.vapi.cli("show snat verbose"))
533             self.clear_snat()
534
535 if __name__ == '__main__':
536     unittest.main(testRunner=VppTestRunner)