7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.l2 import Ether, ARP
10 from scapy.data import IP_PROTOS
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 class TestSNAT(VppTestCase):
16 """ SNAT Test Cases """
20 super(TestSNAT, cls).setUpClass()
23 cls.tcp_port_in = 6303
24 cls.tcp_port_out = 6303
25 cls.udp_port_in = 6304
26 cls.udp_port_out = 6304
28 cls.icmp_id_out = 6305
29 cls.snat_addr = '10.0.0.3'
31 cls.create_pg_interfaces(range(8))
32 cls.interfaces = list(cls.pg_interfaces[0:4])
34 for i in cls.interfaces:
39 cls.pg0.generate_remote_hosts(2)
40 cls.pg0.configure_ipv4_neighbors()
42 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
44 cls.pg4._local_ip4 = "172.16.255.1"
45 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
46 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
47 cls.pg4.set_table_ip4(10)
48 cls.pg5._local_ip4 = "172.16.255.3"
49 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
50 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
51 cls.pg5.set_table_ip4(10)
52 cls.pg6._local_ip4 = "172.16.255.1"
53 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
54 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
55 cls.pg6.set_table_ip4(20)
56 for i in cls.overlapping_interfaces:
64 super(TestSNAT, cls).tearDownClass()
67 def create_stream_in(self, in_if, out_if):
69 Create packet stream for inside network
71 :param in_if: Inside interface
72 :param out_if: Outside interface
76 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
77 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
78 TCP(sport=self.tcp_port_in))
82 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
83 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
84 UDP(sport=self.udp_port_in))
88 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
89 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
90 ICMP(id=self.icmp_id_in, type='echo-request'))
95 def create_stream_out(self, out_if, dst_ip=None):
97 Create packet stream for outside network
99 :param out_if: Outside interface
100 :param dst_ip: Destination IP address (Default use global SNAT address)
103 dst_ip = self.snat_addr
106 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
107 IP(src=out_if.remote_ip4, dst=dst_ip) /
108 TCP(dport=self.tcp_port_out))
112 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
113 IP(src=out_if.remote_ip4, dst=dst_ip) /
114 UDP(dport=self.udp_port_out))
118 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
119 IP(src=out_if.remote_ip4, dst=dst_ip) /
120 ICMP(id=self.icmp_id_out, type='echo-reply'))
125 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
128 Verify captured packets on outside network
130 :param capture: Captured packets
131 :param nat_ip: Translated IP address (Default use global SNAT address)
132 :param same_port: Sorce port number is not translated (Default False)
133 :param packet_num: Expected number of packets (Default 3)
136 nat_ip = self.snat_addr
137 self.assertEqual(packet_num, len(capture))
138 for packet in capture:
140 self.assertEqual(packet[IP].src, nat_ip)
141 if packet.haslayer(TCP):
143 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
146 packet[TCP].sport, self.tcp_port_in)
147 self.tcp_port_out = packet[TCP].sport
148 elif packet.haslayer(UDP):
150 self.assertEqual(packet[UDP].sport, self.udp_port_in)
153 packet[UDP].sport, self.udp_port_in)
154 self.udp_port_out = packet[UDP].sport
157 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
159 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
160 self.icmp_id_out = packet[ICMP].id
162 self.logger.error(ppp("Unexpected or invalid packet "
163 "(outside network):", packet))
166 def verify_capture_in(self, capture, in_if, packet_num=3):
168 Verify captured packets on inside network
170 :param capture: Captured packets
171 :param in_if: Inside interface
172 :param packet_num: Expected number of packets (Default 3)
174 self.assertEqual(packet_num, len(capture))
175 for packet in capture:
177 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
178 if packet.haslayer(TCP):
179 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
180 elif packet.haslayer(UDP):
181 self.assertEqual(packet[UDP].dport, self.udp_port_in)
183 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
185 self.logger.error(ppp("Unexpected or invalid packet "
186 "(inside network):", packet))
189 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
191 Verify captured packet that don't have to be translated
193 :param capture: Captured packets
194 :param ingress_if: Ingress interface
195 :param egress_if: Egress interface
197 for packet in capture:
199 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
200 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
201 if packet.haslayer(TCP):
202 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
203 elif packet.haslayer(UDP):
204 self.assertEqual(packet[UDP].sport, self.udp_port_in)
206 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
208 self.logger.error(ppp("Unexpected or invalid packet "
209 "(inside network):", packet))
212 def verify_ipfix_nat44_ses(self, data):
214 Verify IPFIX NAT44 session create/delete event
216 :param data: Decoded IPFIX data records
218 nat44_ses_create_num = 0
219 nat44_ses_delete_num = 0
220 self.assertEqual(6, len(data))
223 self.assertIn(ord(record[230]), [4, 5])
224 if ord(record[230]) == 4:
225 nat44_ses_create_num += 1
227 nat44_ses_delete_num += 1
229 self.assertEqual(self.pg0.remote_ip4n, record[8])
230 # postNATSourceIPv4Address
231 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
234 self.assertEqual(struct.pack("!I", 0), record[234])
235 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
236 if IP_PROTOS.icmp == ord(record[4]):
237 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
238 self.assertEqual(struct.pack("!H", self.icmp_id_out),
240 elif IP_PROTOS.tcp == ord(record[4]):
241 self.assertEqual(struct.pack("!H", self.tcp_port_in),
243 self.assertEqual(struct.pack("!H", self.tcp_port_out),
245 elif IP_PROTOS.udp == ord(record[4]):
246 self.assertEqual(struct.pack("!H", self.udp_port_in),
248 self.assertEqual(struct.pack("!H", self.udp_port_out),
251 self.fail("Invalid protocol")
252 self.assertEqual(3, nat44_ses_create_num)
253 self.assertEqual(3, nat44_ses_delete_num)
255 def verify_ipfix_addr_exhausted(self, data):
257 Verify IPFIX NAT addresses event
259 :param data: Decoded IPFIX data records
261 self.assertEqual(1, len(data))
264 self.assertEqual(ord(record[230]), 3)
266 self.assertEqual(struct.pack("!I", 0), record[283])
268 def clear_snat(self):
270 Clear SNAT configuration.
272 if self.pg7.has_ip4_config:
273 self.pg7.unconfig_ip4()
275 interfaces = self.vapi.snat_interface_addr_dump()
276 for intf in interfaces:
277 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
279 self.vapi.snat_ipfix(enable=0)
281 interfaces = self.vapi.snat_interface_dump()
282 for intf in interfaces:
283 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
287 static_mappings = self.vapi.snat_static_mapping_dump()
288 for sm in static_mappings:
289 self.vapi.snat_add_static_mapping(sm.local_ip_address,
290 sm.external_ip_address,
291 local_port=sm.local_port,
292 external_port=sm.external_port,
293 addr_only=sm.addr_only,
297 adresses = self.vapi.snat_address_dump()
298 for addr in adresses:
299 self.vapi.snat_add_address_range(addr.ip_address,
303 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
304 local_port=0, external_port=0, vrf_id=0,
305 is_add=1, external_sw_if_index=0xFFFFFFFF):
307 Add/delete S-NAT static mapping
309 :param local_ip: Local IP address
310 :param external_ip: External IP address
311 :param local_port: Local port number (Optional)
312 :param external_port: External port number (Optional)
313 :param vrf_id: VRF ID (Default 0)
314 :param is_add: 1 if add, 0 if delete (Default add)
315 :param external_sw_if_index: External interface instead of IP address
318 if local_port and external_port:
320 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
321 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
322 self.vapi.snat_add_static_mapping(
325 external_sw_if_index,
332 def snat_add_address(self, ip, is_add=1):
334 Add/delete S-NAT address
336 :param ip: IP address
337 :param is_add: 1 if add, 0 if delete (Default add)
339 snat_addr = socket.inet_pton(socket.AF_INET, ip)
340 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
342 def test_dynamic(self):
343 """ SNAT dynamic translation test """
345 self.snat_add_address(self.snat_addr)
346 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
347 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
351 pkts = self.create_stream_in(self.pg0, self.pg1)
352 self.pg0.add_stream(pkts)
353 self.pg_enable_capture(self.pg_interfaces)
355 capture = self.pg1.get_capture(len(pkts))
356 self.verify_capture_out(capture)
359 pkts = self.create_stream_out(self.pg1)
360 self.pg1.add_stream(pkts)
361 self.pg_enable_capture(self.pg_interfaces)
363 capture = self.pg0.get_capture(len(pkts))
364 self.verify_capture_in(capture, self.pg0)
366 def test_static_in(self):
367 """ SNAT 1:1 NAT initialized from inside network """
370 self.tcp_port_out = 6303
371 self.udp_port_out = 6304
372 self.icmp_id_out = 6305
374 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
375 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
376 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
380 pkts = self.create_stream_in(self.pg0, self.pg1)
381 self.pg0.add_stream(pkts)
382 self.pg_enable_capture(self.pg_interfaces)
384 capture = self.pg1.get_capture(len(pkts))
385 self.verify_capture_out(capture, nat_ip, True)
388 pkts = self.create_stream_out(self.pg1, nat_ip)
389 self.pg1.add_stream(pkts)
390 self.pg_enable_capture(self.pg_interfaces)
392 capture = self.pg0.get_capture(len(pkts))
393 self.verify_capture_in(capture, self.pg0)
395 def test_static_out(self):
396 """ SNAT 1:1 NAT initialized from outside network """
399 self.tcp_port_out = 6303
400 self.udp_port_out = 6304
401 self.icmp_id_out = 6305
403 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
404 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
405 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
409 pkts = self.create_stream_out(self.pg1, nat_ip)
410 self.pg1.add_stream(pkts)
411 self.pg_enable_capture(self.pg_interfaces)
413 capture = self.pg0.get_capture(len(pkts))
414 self.verify_capture_in(capture, self.pg0)
417 pkts = self.create_stream_in(self.pg0, self.pg1)
418 self.pg0.add_stream(pkts)
419 self.pg_enable_capture(self.pg_interfaces)
421 capture = self.pg1.get_capture(len(pkts))
422 self.verify_capture_out(capture, nat_ip, True)
424 def test_static_with_port_in(self):
425 """ SNAT 1:1 NAT with port initialized from inside network """
427 self.tcp_port_out = 3606
428 self.udp_port_out = 3607
429 self.icmp_id_out = 3608
431 self.snat_add_address(self.snat_addr)
432 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
433 self.tcp_port_in, self.tcp_port_out)
434 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
435 self.udp_port_in, self.udp_port_out)
436 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
437 self.icmp_id_in, self.icmp_id_out)
438 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
439 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
443 pkts = self.create_stream_in(self.pg0, self.pg1)
444 self.pg0.add_stream(pkts)
445 self.pg_enable_capture(self.pg_interfaces)
447 capture = self.pg1.get_capture(len(pkts))
448 self.verify_capture_out(capture)
451 pkts = self.create_stream_out(self.pg1)
452 self.pg1.add_stream(pkts)
453 self.pg_enable_capture(self.pg_interfaces)
455 capture = self.pg0.get_capture(len(pkts))
456 self.verify_capture_in(capture, self.pg0)
458 def test_static_with_port_out(self):
459 """ SNAT 1:1 NAT with port initialized from outside network """
461 self.tcp_port_out = 30606
462 self.udp_port_out = 30607
463 self.icmp_id_out = 30608
465 self.snat_add_address(self.snat_addr)
466 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
467 self.tcp_port_in, self.tcp_port_out)
468 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
469 self.udp_port_in, self.udp_port_out)
470 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
471 self.icmp_id_in, self.icmp_id_out)
472 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
473 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
477 pkts = self.create_stream_out(self.pg1)
478 self.pg1.add_stream(pkts)
479 self.pg_enable_capture(self.pg_interfaces)
481 capture = self.pg0.get_capture(len(pkts))
482 self.verify_capture_in(capture, self.pg0)
485 pkts = self.create_stream_in(self.pg0, self.pg1)
486 self.pg0.add_stream(pkts)
487 self.pg_enable_capture(self.pg_interfaces)
489 capture = self.pg1.get_capture(len(pkts))
490 self.verify_capture_out(capture)
492 def test_static_vrf_aware(self):
493 """ SNAT 1:1 NAT VRF awareness """
495 nat_ip1 = "10.0.0.30"
496 nat_ip2 = "10.0.0.40"
497 self.tcp_port_out = 6303
498 self.udp_port_out = 6304
499 self.icmp_id_out = 6305
501 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
503 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
505 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
507 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
508 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
510 # inside interface VRF match SNAT static mapping VRF
511 pkts = self.create_stream_in(self.pg4, self.pg3)
512 self.pg4.add_stream(pkts)
513 self.pg_enable_capture(self.pg_interfaces)
515 capture = self.pg3.get_capture(len(pkts))
516 self.verify_capture_out(capture, nat_ip1, True)
518 # inside interface VRF don't match SNAT static mapping VRF (packets
520 pkts = self.create_stream_in(self.pg0, self.pg3)
521 self.pg0.add_stream(pkts)
522 self.pg_enable_capture(self.pg_interfaces)
524 self.pg3.assert_nothing_captured()
526 def test_multiple_inside_interfaces(self):
527 """ SNAT multiple inside interfaces (non-overlapping address space) """
529 self.snat_add_address(self.snat_addr)
530 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
531 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
532 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
535 # between two S-NAT inside interfaces (no translation)
536 pkts = self.create_stream_in(self.pg0, self.pg1)
537 self.pg0.add_stream(pkts)
538 self.pg_enable_capture(self.pg_interfaces)
540 capture = self.pg1.get_capture(len(pkts))
541 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
543 # from S-NAT inside to interface without S-NAT feature (no translation)
544 pkts = self.create_stream_in(self.pg0, self.pg2)
545 self.pg0.add_stream(pkts)
546 self.pg_enable_capture(self.pg_interfaces)
548 capture = self.pg2.get_capture(len(pkts))
549 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
551 # in2out 1st interface
552 pkts = self.create_stream_in(self.pg0, self.pg3)
553 self.pg0.add_stream(pkts)
554 self.pg_enable_capture(self.pg_interfaces)
556 capture = self.pg3.get_capture(len(pkts))
557 self.verify_capture_out(capture)
559 # out2in 1st interface
560 pkts = self.create_stream_out(self.pg3)
561 self.pg3.add_stream(pkts)
562 self.pg_enable_capture(self.pg_interfaces)
564 capture = self.pg0.get_capture(len(pkts))
565 self.verify_capture_in(capture, self.pg0)
567 # in2out 2nd interface
568 pkts = self.create_stream_in(self.pg1, self.pg3)
569 self.pg1.add_stream(pkts)
570 self.pg_enable_capture(self.pg_interfaces)
572 capture = self.pg3.get_capture(len(pkts))
573 self.verify_capture_out(capture)
575 # out2in 2nd interface
576 pkts = self.create_stream_out(self.pg3)
577 self.pg3.add_stream(pkts)
578 self.pg_enable_capture(self.pg_interfaces)
580 capture = self.pg1.get_capture(len(pkts))
581 self.verify_capture_in(capture, self.pg1)
583 def test_inside_overlapping_interfaces(self):
584 """ SNAT multiple inside interfaces with overlapping address space """
586 static_nat_ip = "10.0.0.10"
587 self.snat_add_address(self.snat_addr)
588 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
590 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
591 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
592 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
593 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
596 # between S-NAT inside interfaces with same VRF (no translation)
597 pkts = self.create_stream_in(self.pg4, self.pg5)
598 self.pg4.add_stream(pkts)
599 self.pg_enable_capture(self.pg_interfaces)
601 capture = self.pg5.get_capture(len(pkts))
602 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
604 # between S-NAT inside interfaces with different VRF (hairpinning)
605 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
606 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
607 TCP(sport=1234, dport=5678))
608 self.pg4.add_stream(p)
609 self.pg_enable_capture(self.pg_interfaces)
611 capture = self.pg6.get_capture(1)
616 self.assertEqual(ip.src, self.snat_addr)
617 self.assertEqual(ip.dst, self.pg6.remote_ip4)
618 self.assertNotEqual(tcp.sport, 1234)
619 self.assertEqual(tcp.dport, 5678)
621 self.logger.error(ppp("Unexpected or invalid packet:", p))
624 # in2out 1st interface
625 pkts = self.create_stream_in(self.pg4, self.pg3)
626 self.pg4.add_stream(pkts)
627 self.pg_enable_capture(self.pg_interfaces)
629 capture = self.pg3.get_capture(len(pkts))
630 self.verify_capture_out(capture)
632 # out2in 1st interface
633 pkts = self.create_stream_out(self.pg3)
634 self.pg3.add_stream(pkts)
635 self.pg_enable_capture(self.pg_interfaces)
637 capture = self.pg4.get_capture(len(pkts))
638 self.verify_capture_in(capture, self.pg4)
640 # in2out 2nd interface
641 pkts = self.create_stream_in(self.pg5, self.pg3)
642 self.pg5.add_stream(pkts)
643 self.pg_enable_capture(self.pg_interfaces)
645 capture = self.pg3.get_capture(len(pkts))
646 self.verify_capture_out(capture)
648 # out2in 2nd interface
649 pkts = self.create_stream_out(self.pg3)
650 self.pg3.add_stream(pkts)
651 self.pg_enable_capture(self.pg_interfaces)
653 capture = self.pg5.get_capture(len(pkts))
654 self.verify_capture_in(capture, self.pg5)
656 # in2out 3rd interface
657 pkts = self.create_stream_in(self.pg6, self.pg3)
658 self.pg6.add_stream(pkts)
659 self.pg_enable_capture(self.pg_interfaces)
661 capture = self.pg3.get_capture(len(pkts))
662 self.verify_capture_out(capture, static_nat_ip, True)
664 # out2in 3rd interface
665 pkts = self.create_stream_out(self.pg3, static_nat_ip)
666 self.pg3.add_stream(pkts)
667 self.pg_enable_capture(self.pg_interfaces)
669 capture = self.pg6.get_capture(len(pkts))
670 self.verify_capture_in(capture, self.pg6)
672 def test_hairpinning(self):
673 """ SNAT hairpinning """
675 host = self.pg0.remote_hosts[0]
676 server = self.pg0.remote_hosts[1]
679 server_in_port = 5678
680 server_out_port = 8765
682 self.snat_add_address(self.snat_addr)
683 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
684 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
686 # add static mapping for server
687 self.snat_add_static_mapping(server.ip4, self.snat_addr,
688 server_in_port, server_out_port)
690 # send packet from host to server
691 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
692 IP(src=host.ip4, dst=self.snat_addr) /
693 TCP(sport=host_in_port, dport=server_out_port))
694 self.pg0.add_stream(p)
695 self.pg_enable_capture(self.pg_interfaces)
697 capture = self.pg0.get_capture(1)
702 self.assertEqual(ip.src, self.snat_addr)
703 self.assertEqual(ip.dst, server.ip4)
704 self.assertNotEqual(tcp.sport, host_in_port)
705 self.assertEqual(tcp.dport, server_in_port)
706 host_out_port = tcp.sport
708 self.logger.error(ppp("Unexpected or invalid packet:", p))
711 # send reply from server to host
712 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
713 IP(src=server.ip4, dst=self.snat_addr) /
714 TCP(sport=server_in_port, dport=host_out_port))
715 self.pg0.add_stream(p)
716 self.pg_enable_capture(self.pg_interfaces)
718 capture = self.pg0.get_capture(1)
723 self.assertEqual(ip.src, self.snat_addr)
724 self.assertEqual(ip.dst, host.ip4)
725 self.assertEqual(tcp.sport, server_out_port)
726 self.assertEqual(tcp.dport, host_in_port)
728 self.logger.error(ppp("Unexpected or invalid packet:"), p)
731 def test_max_translations_per_user(self):
732 """ MAX translations per user - recycle the least recently used """
734 self.snat_add_address(self.snat_addr)
735 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
736 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
739 # get maximum number of translations per user
740 snat_config = self.vapi.snat_show_config()
742 # send more than maximum number of translations per user packets
743 pkts_num = snat_config.max_translations_per_user + 5
745 for port in range(0, pkts_num):
746 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
747 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
748 TCP(sport=1025 + port))
750 self.pg0.add_stream(pkts)
751 self.pg_enable_capture(self.pg_interfaces)
754 # verify number of translated packet
755 self.pg1.get_capture(pkts_num)
757 def test_interface_addr(self):
758 """ Acquire SNAT addresses from interface """
759 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
761 # no address in NAT pool
762 adresses = self.vapi.snat_address_dump()
763 self.assertEqual(0, len(adresses))
765 # configure interface address and check NAT address pool
766 self.pg7.config_ip4()
767 adresses = self.vapi.snat_address_dump()
768 self.assertEqual(1, len(adresses))
769 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
771 # remove interface address and check NAT address pool
772 self.pg7.unconfig_ip4()
773 adresses = self.vapi.snat_address_dump()
774 self.assertEqual(0, len(adresses))
776 def test_interface_addr_static_mapping(self):
777 """ Static mapping with addresses from interface """
778 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
779 self.snat_add_static_mapping('1.2.3.4',
780 external_sw_if_index=self.pg7.sw_if_index)
783 static_mappings = self.vapi.snat_static_mapping_dump()
784 self.assertEqual(0, len(static_mappings))
786 # configure interface address and check static mappings
787 self.pg7.config_ip4()
788 static_mappings = self.vapi.snat_static_mapping_dump()
789 self.assertEqual(1, len(static_mappings))
790 self.assertEqual(static_mappings[0].external_ip_address[0:4],
793 # remove interface address and check static mappings
794 self.pg7.unconfig_ip4()
795 static_mappings = self.vapi.snat_static_mapping_dump()
796 self.assertEqual(0, len(static_mappings))
798 def test_ipfix_nat44_sess(self):
799 """ S-NAT IPFIX logging NAT44 session created/delted """
800 self.snat_add_address(self.snat_addr)
801 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
802 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
804 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
805 src_address=self.pg3.local_ip4n,
807 template_interval=10)
808 self.vapi.snat_ipfix()
810 pkts = self.create_stream_in(self.pg0, self.pg1)
811 self.pg0.add_stream(pkts)
812 self.pg_enable_capture(self.pg_interfaces)
814 capture = self.pg1.get_capture(len(pkts))
815 self.verify_capture_out(capture)
816 self.snat_add_address(self.snat_addr, is_add=0)
817 self.vapi.cli("ipfix flush") # FIXME this should be an API call
818 capture = self.pg3.get_capture(3)
819 ipfix = IPFIXDecoder()
820 # first load template
822 self.assertTrue(p.haslayer(IPFIX))
823 if p.haslayer(Template):
824 ipfix.add_template(p.getlayer(Template))
825 # verify events in data set
828 data = ipfix.decode_data_set(p.getlayer(Set))
829 self.verify_ipfix_nat44_ses(data)
831 def test_ipfix_addr_exhausted(self):
832 """ S-NAT IPFIX logging NAT addresses exhausted """
833 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
834 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
836 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
837 src_address=self.pg3.local_ip4n,
839 template_interval=10)
840 self.vapi.snat_ipfix()
842 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
843 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
845 self.pg0.add_stream(p)
846 self.pg_enable_capture(self.pg_interfaces)
848 capture = self.pg1.get_capture(0)
849 self.vapi.cli("ipfix flush") # FIXME this should be an API call
850 capture = self.pg3.get_capture(3)
851 ipfix = IPFIXDecoder()
852 # first load template
854 self.assertTrue(p.haslayer(IPFIX))
855 if p.haslayer(Template):
856 ipfix.add_template(p.getlayer(Template))
857 # verify events in data set
860 data = ipfix.decode_data_set(p.getlayer(Set))
861 self.verify_ipfix_addr_exhausted(data)
863 def test_pool_addr_fib(self):
864 """ S-NAT add pool addresses to FIB """
865 static_addr = '10.0.0.10'
866 self.snat_add_address(self.snat_addr)
867 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
868 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
870 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
873 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
874 ARP(op=ARP.who_has, pdst=self.snat_addr,
875 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
876 self.pg1.add_stream(p)
877 self.pg_enable_capture(self.pg_interfaces)
879 capture = self.pg1.get_capture(1)
880 self.assertTrue(capture[0].haslayer(ARP))
881 self.assertTrue(capture[0][ARP].op, ARP.is_at)
884 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
885 ARP(op=ARP.who_has, pdst=static_addr,
886 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
887 self.pg1.add_stream(p)
888 self.pg_enable_capture(self.pg_interfaces)
890 capture = self.pg1.get_capture(1)
891 self.assertTrue(capture[0].haslayer(ARP))
892 self.assertTrue(capture[0][ARP].op, ARP.is_at)
894 # send ARP to non-SNAT interface
895 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
896 ARP(op=ARP.who_has, pdst=self.snat_addr,
897 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
898 self.pg2.add_stream(p)
899 self.pg_enable_capture(self.pg_interfaces)
901 capture = self.pg1.get_capture(0)
903 # remove addresses and verify
904 self.snat_add_address(self.snat_addr, is_add=0)
905 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
908 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
909 ARP(op=ARP.who_has, pdst=self.snat_addr,
910 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
911 self.pg1.add_stream(p)
912 self.pg_enable_capture(self.pg_interfaces)
914 capture = self.pg1.get_capture(0)
916 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
917 ARP(op=ARP.who_has, pdst=static_addr,
918 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
919 self.pg1.add_stream(p)
920 self.pg_enable_capture(self.pg_interfaces)
922 capture = self.pg1.get_capture(0)
925 super(TestSNAT, self).tearDown()
926 if not self.vpp_dead:
927 self.logger.info(self.vapi.cli("show snat verbose"))
931 if __name__ == '__main__':
932 unittest.main(testRunner=VppTestRunner)