7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.l2 import Ether
10 from scapy.data import IP_PROTOS
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 class TestSNAT(VppTestCase):
16 """ SNAT Test Cases """
20 super(TestSNAT, cls).setUpClass()
23 cls.tcp_port_in = 6303
24 cls.tcp_port_out = 6303
25 cls.udp_port_in = 6304
26 cls.udp_port_out = 6304
28 cls.icmp_id_out = 6305
29 cls.snat_addr = '10.0.0.3'
31 cls.create_pg_interfaces(range(8))
32 cls.interfaces = list(cls.pg_interfaces[0:4])
34 for i in cls.interfaces:
39 cls.pg0.generate_remote_hosts(2)
40 cls.pg0.configure_ipv4_neighbors()
42 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
44 cls.pg4._local_ip4 = "172.16.255.1"
45 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
46 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
47 cls.pg4.set_table_ip4(10)
48 cls.pg5._local_ip4 = "172.16.255.3"
49 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
50 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
51 cls.pg5.set_table_ip4(10)
52 cls.pg6._local_ip4 = "172.16.255.1"
53 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
54 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
55 cls.pg6.set_table_ip4(20)
56 for i in cls.overlapping_interfaces:
64 super(TestSNAT, cls).tearDownClass()
67 def create_stream_in(self, in_if, out_if):
69 Create packet stream for inside network
71 :param in_if: Inside interface
72 :param out_if: Outside interface
76 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
77 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
78 TCP(sport=self.tcp_port_in))
82 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
83 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
84 UDP(sport=self.udp_port_in))
88 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
89 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
90 ICMP(id=self.icmp_id_in, type='echo-request'))
95 def create_stream_out(self, out_if, dst_ip=None):
97 Create packet stream for outside network
99 :param out_if: Outside interface
100 :param dst_ip: Destination IP address (Default use global SNAT address)
103 dst_ip = self.snat_addr
106 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
107 IP(src=out_if.remote_ip4, dst=dst_ip) /
108 TCP(dport=self.tcp_port_out))
112 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
113 IP(src=out_if.remote_ip4, dst=dst_ip) /
114 UDP(dport=self.udp_port_out))
118 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
119 IP(src=out_if.remote_ip4, dst=dst_ip) /
120 ICMP(id=self.icmp_id_out, type='echo-reply'))
125 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
128 Verify captured packets on outside network
130 :param capture: Captured packets
131 :param nat_ip: Translated IP address (Default use global SNAT address)
132 :param same_port: Sorce port number is not translated (Default False)
133 :param packet_num: Expected number of packets (Default 3)
136 nat_ip = self.snat_addr
137 self.assertEqual(packet_num, len(capture))
138 for packet in capture:
140 self.assertEqual(packet[IP].src, nat_ip)
141 if packet.haslayer(TCP):
143 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
146 packet[TCP].sport, self.tcp_port_in)
147 self.tcp_port_out = packet[TCP].sport
148 elif packet.haslayer(UDP):
150 self.assertEqual(packet[UDP].sport, self.udp_port_in)
153 packet[UDP].sport, self.udp_port_in)
154 self.udp_port_out = packet[UDP].sport
157 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
159 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
160 self.icmp_id_out = packet[ICMP].id
162 self.logger.error(ppp("Unexpected or invalid packet "
163 "(outside network):", packet))
166 def verify_capture_in(self, capture, in_if, packet_num=3):
168 Verify captured packets on inside network
170 :param capture: Captured packets
171 :param in_if: Inside interface
172 :param packet_num: Expected number of packets (Default 3)
174 self.assertEqual(packet_num, len(capture))
175 for packet in capture:
177 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
178 if packet.haslayer(TCP):
179 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
180 elif packet.haslayer(UDP):
181 self.assertEqual(packet[UDP].dport, self.udp_port_in)
183 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
185 self.logger.error(ppp("Unexpected or invalid packet "
186 "(inside network):", packet))
189 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
191 Verify captured packet that don't have to be translated
193 :param capture: Captured packets
194 :param ingress_if: Ingress interface
195 :param egress_if: Egress interface
197 for packet in capture:
199 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
200 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
201 if packet.haslayer(TCP):
202 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
203 elif packet.haslayer(UDP):
204 self.assertEqual(packet[UDP].sport, self.udp_port_in)
206 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
208 self.logger.error(ppp("Unexpected or invalid packet "
209 "(inside network):", packet))
212 def verify_ipfix_nat44_ses(self, data):
214 Verify IPFIX NAT44 session create/delete event
216 :param data: Decoded IPFIX data records
218 nat44_ses_create_num = 0
219 nat44_ses_delete_num = 0
220 self.assertEqual(6, len(data))
223 self.assertIn(ord(record[230]), [4, 5])
224 if ord(record[230]) == 4:
225 nat44_ses_create_num += 1
227 nat44_ses_delete_num += 1
229 self.assertEqual(self.pg0.remote_ip4n, record[8])
230 # postNATSourceIPv4Address
231 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
234 self.assertEqual(struct.pack("!I", 0), record[234])
235 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
236 if IP_PROTOS.icmp == ord(record[4]):
237 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
238 self.assertEqual(struct.pack("!H", self.icmp_id_out),
240 elif IP_PROTOS.tcp == ord(record[4]):
241 self.assertEqual(struct.pack("!H", self.tcp_port_in),
243 self.assertEqual(struct.pack("!H", self.tcp_port_out),
245 elif IP_PROTOS.udp == ord(record[4]):
246 self.assertEqual(struct.pack("!H", self.udp_port_in),
248 self.assertEqual(struct.pack("!H", self.udp_port_out),
251 self.fail("Invalid protocol")
252 self.assertEqual(3, nat44_ses_create_num)
253 self.assertEqual(3, nat44_ses_delete_num)
255 def verify_ipfix_addr_exhausted(self, data):
257 Verify IPFIX NAT addresses event
259 :param data: Decoded IPFIX data records
261 self.assertEqual(1, len(data))
264 self.assertEqual(ord(record[230]), 3)
266 self.assertEqual(struct.pack("!I", 0), record[283])
268 def clear_snat(self):
270 Clear SNAT configuration.
272 if self.pg7.has_ip4_config:
273 self.pg7.unconfig_ip4()
275 interfaces = self.vapi.snat_interface_addr_dump()
276 for intf in interfaces:
277 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
279 self.vapi.snat_ipfix(enable=0)
281 interfaces = self.vapi.snat_interface_dump()
282 for intf in interfaces:
283 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
287 static_mappings = self.vapi.snat_static_mapping_dump()
288 for sm in static_mappings:
289 self.vapi.snat_add_static_mapping(sm.local_ip_address,
290 sm.external_ip_address,
291 local_port=sm.local_port,
292 external_port=sm.external_port,
293 addr_only=sm.addr_only,
297 adresses = self.vapi.snat_address_dump()
298 for addr in adresses:
299 self.vapi.snat_add_address_range(addr.ip_address,
303 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
304 local_port=0, external_port=0, vrf_id=0,
305 is_add=1, external_sw_if_index=0xFFFFFFFF):
307 Add/delete S-NAT static mapping
309 :param local_ip: Local IP address
310 :param external_ip: External IP address
311 :param local_port: Local port number (Optional)
312 :param external_port: External port number (Optional)
313 :param vrf_id: VRF ID (Default 0)
314 :param is_add: 1 if add, 0 if delete (Default add)
315 :param external_sw_if_index: External interface instead of IP address
318 if local_port and external_port:
320 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
321 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
322 self.vapi.snat_add_static_mapping(
325 external_sw_if_index,
332 def snat_add_address(self, ip, is_add=1):
334 Add/delete S-NAT address
336 :param ip: IP address
337 :param is_add: 1 if add, 0 if delete (Default add)
339 snat_addr = socket.inet_pton(socket.AF_INET, ip)
340 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
342 def test_dynamic(self):
343 """ SNAT dynamic translation test """
345 self.snat_add_address(self.snat_addr)
346 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
347 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
351 pkts = self.create_stream_in(self.pg0, self.pg1)
352 self.pg0.add_stream(pkts)
353 self.pg_enable_capture(self.pg_interfaces)
355 capture = self.pg1.get_capture(len(pkts))
356 self.verify_capture_out(capture)
359 pkts = self.create_stream_out(self.pg1)
360 self.pg1.add_stream(pkts)
361 self.pg_enable_capture(self.pg_interfaces)
363 capture = self.pg0.get_capture(len(pkts))
364 self.verify_capture_in(capture, self.pg0)
366 def test_static_in(self):
367 """ SNAT 1:1 NAT initialized from inside network """
370 self.tcp_port_out = 6303
371 self.udp_port_out = 6304
372 self.icmp_id_out = 6305
374 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
375 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
376 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
380 pkts = self.create_stream_in(self.pg0, self.pg1)
381 self.pg0.add_stream(pkts)
382 self.pg_enable_capture(self.pg_interfaces)
384 capture = self.pg1.get_capture(len(pkts))
385 self.verify_capture_out(capture, nat_ip, True)
388 pkts = self.create_stream_out(self.pg1, nat_ip)
389 self.pg1.add_stream(pkts)
390 self.pg_enable_capture(self.pg_interfaces)
392 capture = self.pg0.get_capture(len(pkts))
393 self.verify_capture_in(capture, self.pg0)
395 def test_static_out(self):
396 """ SNAT 1:1 NAT initialized from outside network """
399 self.tcp_port_out = 6303
400 self.udp_port_out = 6304
401 self.icmp_id_out = 6305
403 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
404 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
405 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
409 pkts = self.create_stream_out(self.pg1, nat_ip)
410 self.pg1.add_stream(pkts)
411 self.pg_enable_capture(self.pg_interfaces)
413 capture = self.pg0.get_capture(len(pkts))
414 self.verify_capture_in(capture, self.pg0)
417 pkts = self.create_stream_in(self.pg0, self.pg1)
418 self.pg0.add_stream(pkts)
419 self.pg_enable_capture(self.pg_interfaces)
421 capture = self.pg1.get_capture(len(pkts))
422 self.verify_capture_out(capture, nat_ip, True)
424 def test_static_with_port_in(self):
425 """ SNAT 1:1 NAT with port initialized from inside network """
427 self.tcp_port_out = 3606
428 self.udp_port_out = 3607
429 self.icmp_id_out = 3608
431 self.snat_add_address(self.snat_addr)
432 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
433 self.tcp_port_in, self.tcp_port_out)
434 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
435 self.udp_port_in, self.udp_port_out)
436 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
437 self.icmp_id_in, self.icmp_id_out)
438 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
439 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
443 pkts = self.create_stream_in(self.pg0, self.pg1)
444 self.pg0.add_stream(pkts)
445 self.pg_enable_capture(self.pg_interfaces)
447 capture = self.pg1.get_capture(len(pkts))
448 self.verify_capture_out(capture)
451 pkts = self.create_stream_out(self.pg1)
452 self.pg1.add_stream(pkts)
453 self.pg_enable_capture(self.pg_interfaces)
455 capture = self.pg0.get_capture(len(pkts))
456 self.verify_capture_in(capture, self.pg0)
458 def test_static_with_port_out(self):
459 """ SNAT 1:1 NAT with port initialized from outside network """
461 self.tcp_port_out = 30606
462 self.udp_port_out = 30607
463 self.icmp_id_out = 30608
465 self.snat_add_address(self.snat_addr)
466 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
467 self.tcp_port_in, self.tcp_port_out)
468 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
469 self.udp_port_in, self.udp_port_out)
470 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
471 self.icmp_id_in, self.icmp_id_out)
472 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
473 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
477 pkts = self.create_stream_out(self.pg1)
478 self.pg1.add_stream(pkts)
479 self.pg_enable_capture(self.pg_interfaces)
481 capture = self.pg0.get_capture(len(pkts))
482 self.verify_capture_in(capture, self.pg0)
485 pkts = self.create_stream_in(self.pg0, self.pg1)
486 self.pg0.add_stream(pkts)
487 self.pg_enable_capture(self.pg_interfaces)
489 capture = self.pg1.get_capture(len(pkts))
490 self.verify_capture_out(capture)
492 def test_static_vrf_aware(self):
493 """ SNAT 1:1 NAT VRF awareness """
495 nat_ip1 = "10.0.0.30"
496 nat_ip2 = "10.0.0.40"
497 self.tcp_port_out = 6303
498 self.udp_port_out = 6304
499 self.icmp_id_out = 6305
501 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
503 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
505 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
507 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
508 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
510 # inside interface VRF match SNAT static mapping VRF
511 pkts = self.create_stream_in(self.pg4, self.pg3)
512 self.pg4.add_stream(pkts)
513 self.pg_enable_capture(self.pg_interfaces)
515 capture = self.pg3.get_capture(len(pkts))
516 self.verify_capture_out(capture, nat_ip1, True)
518 # inside interface VRF don't match SNAT static mapping VRF (packets
520 pkts = self.create_stream_in(self.pg0, self.pg3)
521 self.pg0.add_stream(pkts)
522 self.pg_enable_capture(self.pg_interfaces)
524 self.pg3.assert_nothing_captured()
526 def test_multiple_inside_interfaces(self):
528 SNAT multiple inside interfaces with non-overlapping address space
531 self.snat_add_address(self.snat_addr)
532 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
533 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
534 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
537 # between two S-NAT inside interfaces (no translation)
538 pkts = self.create_stream_in(self.pg0, self.pg1)
539 self.pg0.add_stream(pkts)
540 self.pg_enable_capture(self.pg_interfaces)
542 capture = self.pg1.get_capture(len(pkts))
543 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
545 # from S-NAT inside to interface without S-NAT feature (no translation)
546 pkts = self.create_stream_in(self.pg0, self.pg2)
547 self.pg0.add_stream(pkts)
548 self.pg_enable_capture(self.pg_interfaces)
550 capture = self.pg2.get_capture(len(pkts))
551 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
553 # in2out 1st interface
554 pkts = self.create_stream_in(self.pg0, self.pg3)
555 self.pg0.add_stream(pkts)
556 self.pg_enable_capture(self.pg_interfaces)
558 capture = self.pg3.get_capture(len(pkts))
559 self.verify_capture_out(capture)
561 # out2in 1st interface
562 pkts = self.create_stream_out(self.pg3)
563 self.pg3.add_stream(pkts)
564 self.pg_enable_capture(self.pg_interfaces)
566 capture = self.pg0.get_capture(len(pkts))
567 self.verify_capture_in(capture, self.pg0)
569 # in2out 2nd interface
570 pkts = self.create_stream_in(self.pg1, self.pg3)
571 self.pg1.add_stream(pkts)
572 self.pg_enable_capture(self.pg_interfaces)
574 capture = self.pg3.get_capture(len(pkts))
575 self.verify_capture_out(capture)
577 # out2in 2nd interface
578 pkts = self.create_stream_out(self.pg3)
579 self.pg3.add_stream(pkts)
580 self.pg_enable_capture(self.pg_interfaces)
582 capture = self.pg1.get_capture(len(pkts))
583 self.verify_capture_in(capture, self.pg1)
585 def test_inside_overlapping_interfaces(self):
586 """ SNAT multiple inside interfaces with overlapping address space """
588 static_nat_ip = "10.0.0.10"
589 self.snat_add_address(self.snat_addr)
590 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
592 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
593 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
594 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
595 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
598 # between S-NAT inside interfaces with same VRF (no translation)
599 pkts = self.create_stream_in(self.pg4, self.pg5)
600 self.pg4.add_stream(pkts)
601 self.pg_enable_capture(self.pg_interfaces)
603 capture = self.pg5.get_capture(len(pkts))
604 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
606 # between S-NAT inside interfaces with different VRF (hairpinning)
607 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
608 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
609 TCP(sport=1234, dport=5678))
610 self.pg4.add_stream(p)
611 self.pg_enable_capture(self.pg_interfaces)
613 capture = self.pg6.get_capture(1)
618 self.assertEqual(ip.src, self.snat_addr)
619 self.assertEqual(ip.dst, self.pg6.remote_ip4)
620 self.assertNotEqual(tcp.sport, 1234)
621 self.assertEqual(tcp.dport, 5678)
623 self.logger.error(ppp("Unexpected or invalid packet:", p))
626 # in2out 1st interface
627 pkts = self.create_stream_in(self.pg4, self.pg3)
628 self.pg4.add_stream(pkts)
629 self.pg_enable_capture(self.pg_interfaces)
631 capture = self.pg3.get_capture(len(pkts))
632 self.verify_capture_out(capture)
634 # out2in 1st interface
635 pkts = self.create_stream_out(self.pg3)
636 self.pg3.add_stream(pkts)
637 self.pg_enable_capture(self.pg_interfaces)
639 capture = self.pg4.get_capture(len(pkts))
640 self.verify_capture_in(capture, self.pg4)
642 # in2out 2nd interface
643 pkts = self.create_stream_in(self.pg5, self.pg3)
644 self.pg5.add_stream(pkts)
645 self.pg_enable_capture(self.pg_interfaces)
647 capture = self.pg3.get_capture(len(pkts))
648 self.verify_capture_out(capture)
650 # out2in 2nd interface
651 pkts = self.create_stream_out(self.pg3)
652 self.pg3.add_stream(pkts)
653 self.pg_enable_capture(self.pg_interfaces)
655 capture = self.pg5.get_capture(len(pkts))
656 self.verify_capture_in(capture, self.pg5)
658 # in2out 3rd interface
659 pkts = self.create_stream_in(self.pg6, self.pg3)
660 self.pg6.add_stream(pkts)
661 self.pg_enable_capture(self.pg_interfaces)
663 capture = self.pg3.get_capture(len(pkts))
664 self.verify_capture_out(capture, static_nat_ip, True)
666 # out2in 3rd interface
667 pkts = self.create_stream_out(self.pg3, static_nat_ip)
668 self.pg3.add_stream(pkts)
669 self.pg_enable_capture(self.pg_interfaces)
671 capture = self.pg6.get_capture(len(pkts))
672 self.verify_capture_in(capture, self.pg6)
674 def test_hairpinning(self):
675 """ SNAT hairpinning """
677 host = self.pg0.remote_hosts[0]
678 server = self.pg0.remote_hosts[1]
681 server_in_port = 5678
682 server_out_port = 8765
684 self.snat_add_address(self.snat_addr)
685 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
686 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
688 # add static mapping for server
689 self.snat_add_static_mapping(server.ip4, self.snat_addr,
690 server_in_port, server_out_port)
692 # send packet from host to server
693 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
694 IP(src=host.ip4, dst=self.snat_addr) /
695 TCP(sport=host_in_port, dport=server_out_port))
696 self.pg0.add_stream(p)
697 self.pg_enable_capture(self.pg_interfaces)
699 capture = self.pg0.get_capture(1)
704 self.assertEqual(ip.src, self.snat_addr)
705 self.assertEqual(ip.dst, server.ip4)
706 self.assertNotEqual(tcp.sport, host_in_port)
707 self.assertEqual(tcp.dport, server_in_port)
708 host_out_port = tcp.sport
710 self.logger.error(ppp("Unexpected or invalid packet:", p))
713 # send reply from server to host
714 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
715 IP(src=server.ip4, dst=self.snat_addr) /
716 TCP(sport=server_in_port, dport=host_out_port))
717 self.pg0.add_stream(p)
718 self.pg_enable_capture(self.pg_interfaces)
720 capture = self.pg0.get_capture(1)
725 self.assertEqual(ip.src, self.snat_addr)
726 self.assertEqual(ip.dst, host.ip4)
727 self.assertEqual(tcp.sport, server_out_port)
728 self.assertEqual(tcp.dport, host_in_port)
730 self.logger.error(ppp("Unexpected or invalid packet:"), p)
733 def test_max_translations_per_user(self):
734 """ MAX translations per user - recycle the least recently used """
736 self.snat_add_address(self.snat_addr)
737 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
738 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
741 # get maximum number of translations per user
742 snat_config = self.vapi.snat_show_config()
744 # send more than maximum number of translations per user packets
745 pkts_num = snat_config.max_translations_per_user + 5
747 for port in range(0, pkts_num):
748 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
749 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
750 TCP(sport=1025 + port))
752 self.pg0.add_stream(pkts)
753 self.pg_enable_capture(self.pg_interfaces)
756 # verify number of translated packet
757 self.pg1.get_capture(pkts_num)
759 def test_interface_addr(self):
760 """ Acquire SNAT addresses from interface """
761 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
763 # no address in NAT pool
764 adresses = self.vapi.snat_address_dump()
765 self.assertEqual(0, len(adresses))
767 # configure interface address and check NAT address pool
768 self.pg7.config_ip4()
769 adresses = self.vapi.snat_address_dump()
770 self.assertEqual(1, len(adresses))
771 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
773 # remove interface address and check NAT address pool
774 self.pg7.unconfig_ip4()
775 adresses = self.vapi.snat_address_dump()
776 self.assertEqual(0, len(adresses))
778 def test_interface_addr_static_mapping(self):
779 """ Static mapping with addresses from interface """
780 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
781 self.snat_add_static_mapping('1.2.3.4',
782 external_sw_if_index=self.pg7.sw_if_index)
785 static_mappings = self.vapi.snat_static_mapping_dump()
786 self.assertEqual(0, len(static_mappings))
788 # configure interface address and check static mappings
789 self.pg7.config_ip4()
790 static_mappings = self.vapi.snat_static_mapping_dump()
791 self.assertEqual(1, len(static_mappings))
792 self.assertEqual(static_mappings[0].external_ip_address[0:4],
795 # remove interface address and check static mappings
796 self.pg7.unconfig_ip4()
797 static_mappings = self.vapi.snat_static_mapping_dump()
798 self.assertEqual(0, len(static_mappings))
800 def test_ipfix_nat44_sess(self):
801 """ S-NAT IPFIX logging NAT44 session created/delted """
802 self.snat_add_address(self.snat_addr)
803 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
804 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
806 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
807 src_address=self.pg3.local_ip4n,
809 template_interval=10)
810 self.vapi.snat_ipfix()
812 pkts = self.create_stream_in(self.pg0, self.pg1)
813 self.pg0.add_stream(pkts)
814 self.pg_enable_capture(self.pg_interfaces)
816 capture = self.pg1.get_capture(len(pkts))
817 self.verify_capture_out(capture)
818 self.snat_add_address(self.snat_addr, is_add=0)
819 self.vapi.cli("ipfix flush") # FIXME this should be an API call
820 capture = self.pg3.get_capture(3)
821 ipfix = IPFIXDecoder()
822 # first load template
824 self.assertTrue(p.haslayer(IPFIX))
825 if p.haslayer(Template):
826 ipfix.add_template(p.getlayer(Template))
827 # verify events in data set
830 data = ipfix.decode_data_set(p.getlayer(Set))
831 self.verify_ipfix_nat44_ses(data)
833 def test_ipfix_addr_exhausted(self):
834 """ S-NAT IPFIX logging NAT addresses exhausted """
835 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
836 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
838 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
839 src_address=self.pg3.local_ip4n,
841 template_interval=10)
842 self.vapi.snat_ipfix()
844 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
845 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
847 self.pg0.add_stream(p)
848 self.pg_enable_capture(self.pg_interfaces)
850 capture = self.pg1.get_capture(0)
851 self.vapi.cli("ipfix flush") # FIXME this should be an API call
852 capture = self.pg3.get_capture(3)
853 ipfix = IPFIXDecoder()
854 # first load template
856 self.assertTrue(p.haslayer(IPFIX))
857 if p.haslayer(Template):
858 ipfix.add_template(p.getlayer(Template))
859 # verify events in data set
862 data = ipfix.decode_data_set(p.getlayer(Set))
863 self.verify_ipfix_addr_exhausted(data)
866 super(TestSNAT, self).tearDown()
867 if not self.vpp_dead:
868 self.logger.info(self.vapi.cli("show snat verbose"))
872 if __name__ == '__main__':
873 unittest.main(testRunner=VppTestRunner)