7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.l2 import Ether
10 from scapy.data import IP_PROTOS
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 class TestSNAT(VppTestCase):
16 """ SNAT Test Cases """
20 super(TestSNAT, cls).setUpClass()
23 cls.tcp_port_in = 6303
24 cls.tcp_port_out = 6303
25 cls.udp_port_in = 6304
26 cls.udp_port_out = 6304
28 cls.icmp_id_out = 6305
29 cls.snat_addr = '10.0.0.3'
31 cls.create_pg_interfaces(range(8))
32 cls.interfaces = list(cls.pg_interfaces[0:4])
34 for i in cls.interfaces:
39 cls.pg0.generate_remote_hosts(2)
40 cls.pg0.configure_ipv4_neighbors()
42 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
44 cls.pg4._local_ip4 = "172.16.255.1"
45 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
46 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
47 cls.pg4.set_table_ip4(10)
48 cls.pg5._local_ip4 = "172.16.255.3"
49 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
50 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
51 cls.pg5.set_table_ip4(10)
52 cls.pg6._local_ip4 = "172.16.255.1"
53 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
54 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
55 cls.pg6.set_table_ip4(20)
56 for i in cls.overlapping_interfaces:
64 super(TestSNAT, cls).tearDownClass()
67 def create_stream_in(self, in_if, out_if):
69 Create packet stream for inside network
71 :param in_if: Inside interface
72 :param out_if: Outside interface
76 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
77 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
78 TCP(sport=self.tcp_port_in))
82 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
83 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
84 UDP(sport=self.udp_port_in))
88 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
89 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
90 ICMP(id=self.icmp_id_in, type='echo-request'))
95 def create_stream_out(self, out_if, dst_ip=None):
97 Create packet stream for outside network
99 :param out_if: Outside interface
100 :param dst_ip: Destination IP address (Default use global SNAT address)
103 dst_ip = self.snat_addr
106 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
107 IP(src=out_if.remote_ip4, dst=dst_ip) /
108 TCP(dport=self.tcp_port_out))
112 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
113 IP(src=out_if.remote_ip4, dst=dst_ip) /
114 UDP(dport=self.udp_port_out))
118 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
119 IP(src=out_if.remote_ip4, dst=dst_ip) /
120 ICMP(id=self.icmp_id_out, type='echo-reply'))
125 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
128 Verify captured packets on outside network
130 :param capture: Captured packets
131 :param nat_ip: Translated IP address (Default use global SNAT address)
132 :param same_port: Sorce port number is not translated (Default False)
133 :param packet_num: Expected number of packets (Default 3)
136 nat_ip = self.snat_addr
137 self.assertEqual(packet_num, len(capture))
138 for packet in capture:
140 self.assertEqual(packet[IP].src, nat_ip)
141 if packet.haslayer(TCP):
143 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
146 packet[TCP].sport, self.tcp_port_in)
147 self.tcp_port_out = packet[TCP].sport
148 elif packet.haslayer(UDP):
150 self.assertEqual(packet[UDP].sport, self.udp_port_in)
153 packet[UDP].sport, self.udp_port_in)
154 self.udp_port_out = packet[UDP].sport
157 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
159 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
160 self.icmp_id_out = packet[ICMP].id
162 self.logger.error(ppp("Unexpected or invalid packet "
163 "(outside network):", packet))
166 def verify_capture_in(self, capture, in_if, packet_num=3):
168 Verify captured packets on inside network
170 :param capture: Captured packets
171 :param in_if: Inside interface
172 :param packet_num: Expected number of packets (Default 3)
174 self.assertEqual(packet_num, len(capture))
175 for packet in capture:
177 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
178 if packet.haslayer(TCP):
179 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
180 elif packet.haslayer(UDP):
181 self.assertEqual(packet[UDP].dport, self.udp_port_in)
183 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
185 self.logger.error(ppp("Unexpected or invalid packet "
186 "(inside network):", packet))
189 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
191 Verify captured packet that don't have to be translated
193 :param capture: Captured packets
194 :param ingress_if: Ingress interface
195 :param egress_if: Egress interface
197 for packet in capture:
199 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
200 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
201 if packet.haslayer(TCP):
202 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
203 elif packet.haslayer(UDP):
204 self.assertEqual(packet[UDP].sport, self.udp_port_in)
206 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
208 self.logger.error(ppp("Unexpected or invalid packet "
209 "(inside network):", packet))
212 def verify_ipfix_nat44_ses(self, data):
214 Verify IPFIX NAT44 session create/delete event
216 :param data: Decoded IPFIX data records
218 nat44_ses_create_num = 0
219 nat44_ses_delete_num = 0
220 self.assertEqual(6, len(data))
223 self.assertIn(ord(record[230]), [4, 5])
224 if ord(record[230]) == 4:
225 nat44_ses_create_num += 1
227 nat44_ses_delete_num += 1
229 self.assertEqual(self.pg0.remote_ip4n, record[8])
230 # postNATSourceIPv4Address
231 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
234 self.assertEqual(struct.pack("!I", 0), record[234])
235 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
236 if IP_PROTOS.icmp == ord(record[4]):
237 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
238 self.assertEqual(struct.pack("!H", self.icmp_id_out),
240 elif IP_PROTOS.tcp == ord(record[4]):
241 self.assertEqual(struct.pack("!H", self.tcp_port_in),
243 self.assertEqual(struct.pack("!H", self.tcp_port_out),
245 elif IP_PROTOS.udp == ord(record[4]):
246 self.assertEqual(struct.pack("!H", self.udp_port_in),
248 self.assertEqual(struct.pack("!H", self.udp_port_out),
251 self.fail("Invalid protocol")
252 self.assertEqual(3, nat44_ses_create_num)
253 self.assertEqual(3, nat44_ses_delete_num)
255 def verify_ipfix_addr_exhausted(self, data):
257 Verify IPFIX NAT addresses event
259 :param data: Decoded IPFIX data records
261 self.assertEqual(1, len(data))
264 self.assertEqual(ord(record[230]), 3)
266 self.assertEqual(struct.pack("!I", 0), record[283])
268 def clear_snat(self):
270 Clear SNAT configuration.
272 interfaces = self.vapi.snat_interface_addr_dump()
273 for intf in interfaces:
274 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
276 self.vapi.snat_ipfix(enable=0)
278 interfaces = self.vapi.snat_interface_dump()
279 for intf in interfaces:
280 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
284 static_mappings = self.vapi.snat_static_mapping_dump()
285 for sm in static_mappings:
286 self.vapi.snat_add_static_mapping(sm.local_ip_address,
287 sm.external_ip_address,
288 local_port=sm.local_port,
289 external_port=sm.external_port,
290 addr_only=sm.addr_only,
294 adresses = self.vapi.snat_address_dump()
295 for addr in adresses:
296 self.vapi.snat_add_address_range(addr.ip_address,
300 def snat_add_static_mapping(self, local_ip, external_ip, local_port=0,
301 external_port=0, vrf_id=0, is_add=1):
303 Add/delete S-NAT static mapping
305 :param local_ip: Local IP address
306 :param external_ip: External IP address
307 :param local_port: Local port number (Optional)
308 :param external_port: External port number (Optional)
309 :param vrf_id: VRF ID (Default 0)
310 :param is_add: 1 if add, 0 if delete (Default add)
313 if local_port and external_port:
315 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
316 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
317 self.vapi.snat_add_static_mapping(
326 def snat_add_address(self, ip, is_add=1):
328 Add/delete S-NAT address
330 :param ip: IP address
331 :param is_add: 1 if add, 0 if delete (Default add)
333 snat_addr = socket.inet_pton(socket.AF_INET, ip)
334 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
336 def test_dynamic(self):
337 """ SNAT dynamic translation test """
339 self.snat_add_address(self.snat_addr)
340 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
341 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
345 pkts = self.create_stream_in(self.pg0, self.pg1)
346 self.pg0.add_stream(pkts)
347 self.pg_enable_capture(self.pg_interfaces)
349 capture = self.pg1.get_capture(len(pkts))
350 self.verify_capture_out(capture)
353 pkts = self.create_stream_out(self.pg1)
354 self.pg1.add_stream(pkts)
355 self.pg_enable_capture(self.pg_interfaces)
357 capture = self.pg0.get_capture(len(pkts))
358 self.verify_capture_in(capture, self.pg0)
360 def test_static_in(self):
361 """ SNAT 1:1 NAT initialized from inside network """
364 self.tcp_port_out = 6303
365 self.udp_port_out = 6304
366 self.icmp_id_out = 6305
368 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
369 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
370 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
374 pkts = self.create_stream_in(self.pg0, self.pg1)
375 self.pg0.add_stream(pkts)
376 self.pg_enable_capture(self.pg_interfaces)
378 capture = self.pg1.get_capture(len(pkts))
379 self.verify_capture_out(capture, nat_ip, True)
382 pkts = self.create_stream_out(self.pg1, nat_ip)
383 self.pg1.add_stream(pkts)
384 self.pg_enable_capture(self.pg_interfaces)
386 capture = self.pg0.get_capture(len(pkts))
387 self.verify_capture_in(capture, self.pg0)
389 def test_static_out(self):
390 """ SNAT 1:1 NAT initialized from outside network """
393 self.tcp_port_out = 6303
394 self.udp_port_out = 6304
395 self.icmp_id_out = 6305
397 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
398 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
399 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
403 pkts = self.create_stream_out(self.pg1, nat_ip)
404 self.pg1.add_stream(pkts)
405 self.pg_enable_capture(self.pg_interfaces)
407 capture = self.pg0.get_capture(len(pkts))
408 self.verify_capture_in(capture, self.pg0)
411 pkts = self.create_stream_in(self.pg0, self.pg1)
412 self.pg0.add_stream(pkts)
413 self.pg_enable_capture(self.pg_interfaces)
415 capture = self.pg1.get_capture(len(pkts))
416 self.verify_capture_out(capture, nat_ip, True)
418 def test_static_with_port_in(self):
419 """ SNAT 1:1 NAT with port initialized from inside network """
421 self.tcp_port_out = 3606
422 self.udp_port_out = 3607
423 self.icmp_id_out = 3608
425 self.snat_add_address(self.snat_addr)
426 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
427 self.tcp_port_in, self.tcp_port_out)
428 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
429 self.udp_port_in, self.udp_port_out)
430 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
431 self.icmp_id_in, self.icmp_id_out)
432 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
433 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
437 pkts = self.create_stream_in(self.pg0, self.pg1)
438 self.pg0.add_stream(pkts)
439 self.pg_enable_capture(self.pg_interfaces)
441 capture = self.pg1.get_capture(len(pkts))
442 self.verify_capture_out(capture)
445 pkts = self.create_stream_out(self.pg1)
446 self.pg1.add_stream(pkts)
447 self.pg_enable_capture(self.pg_interfaces)
449 capture = self.pg0.get_capture(len(pkts))
450 self.verify_capture_in(capture, self.pg0)
452 def test_static_with_port_out(self):
453 """ SNAT 1:1 NAT with port initialized from outside network """
455 self.tcp_port_out = 30606
456 self.udp_port_out = 30607
457 self.icmp_id_out = 30608
459 self.snat_add_address(self.snat_addr)
460 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
461 self.tcp_port_in, self.tcp_port_out)
462 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
463 self.udp_port_in, self.udp_port_out)
464 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
465 self.icmp_id_in, self.icmp_id_out)
466 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
467 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
471 pkts = self.create_stream_out(self.pg1)
472 self.pg1.add_stream(pkts)
473 self.pg_enable_capture(self.pg_interfaces)
475 capture = self.pg0.get_capture(len(pkts))
476 self.verify_capture_in(capture, self.pg0)
479 pkts = self.create_stream_in(self.pg0, self.pg1)
480 self.pg0.add_stream(pkts)
481 self.pg_enable_capture(self.pg_interfaces)
483 capture = self.pg1.get_capture(len(pkts))
484 self.verify_capture_out(capture)
486 def test_static_vrf_aware(self):
487 """ SNAT 1:1 NAT VRF awareness """
489 nat_ip1 = "10.0.0.30"
490 nat_ip2 = "10.0.0.40"
491 self.tcp_port_out = 6303
492 self.udp_port_out = 6304
493 self.icmp_id_out = 6305
495 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
497 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
499 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
501 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
502 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
504 # inside interface VRF match SNAT static mapping VRF
505 pkts = self.create_stream_in(self.pg4, self.pg3)
506 self.pg4.add_stream(pkts)
507 self.pg_enable_capture(self.pg_interfaces)
509 capture = self.pg3.get_capture(len(pkts))
510 self.verify_capture_out(capture, nat_ip1, True)
512 # inside interface VRF don't match SNAT static mapping VRF (packets
514 pkts = self.create_stream_in(self.pg0, self.pg3)
515 self.pg0.add_stream(pkts)
516 self.pg_enable_capture(self.pg_interfaces)
518 self.pg3.assert_nothing_captured()
520 def test_multiple_inside_interfaces(self):
522 SNAT multiple inside interfaces with non-overlapping address space
525 self.snat_add_address(self.snat_addr)
526 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
527 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
528 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
531 # between two S-NAT inside interfaces (no translation)
532 pkts = self.create_stream_in(self.pg0, self.pg1)
533 self.pg0.add_stream(pkts)
534 self.pg_enable_capture(self.pg_interfaces)
536 capture = self.pg1.get_capture(len(pkts))
537 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
539 # from S-NAT inside to interface without S-NAT feature (no translation)
540 pkts = self.create_stream_in(self.pg0, self.pg2)
541 self.pg0.add_stream(pkts)
542 self.pg_enable_capture(self.pg_interfaces)
544 capture = self.pg2.get_capture(len(pkts))
545 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
547 # in2out 1st interface
548 pkts = self.create_stream_in(self.pg0, self.pg3)
549 self.pg0.add_stream(pkts)
550 self.pg_enable_capture(self.pg_interfaces)
552 capture = self.pg3.get_capture(len(pkts))
553 self.verify_capture_out(capture)
555 # out2in 1st interface
556 pkts = self.create_stream_out(self.pg3)
557 self.pg3.add_stream(pkts)
558 self.pg_enable_capture(self.pg_interfaces)
560 capture = self.pg0.get_capture(len(pkts))
561 self.verify_capture_in(capture, self.pg0)
563 # in2out 2nd interface
564 pkts = self.create_stream_in(self.pg1, self.pg3)
565 self.pg1.add_stream(pkts)
566 self.pg_enable_capture(self.pg_interfaces)
568 capture = self.pg3.get_capture(len(pkts))
569 self.verify_capture_out(capture)
571 # out2in 2nd interface
572 pkts = self.create_stream_out(self.pg3)
573 self.pg3.add_stream(pkts)
574 self.pg_enable_capture(self.pg_interfaces)
576 capture = self.pg1.get_capture(len(pkts))
577 self.verify_capture_in(capture, self.pg1)
579 def test_inside_overlapping_interfaces(self):
580 """ SNAT multiple inside interfaces with overlapping address space """
582 static_nat_ip = "10.0.0.10"
583 self.snat_add_address(self.snat_addr)
584 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
586 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
587 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
588 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
589 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
592 # between S-NAT inside interfaces with same VRF (no translation)
593 pkts = self.create_stream_in(self.pg4, self.pg5)
594 self.pg4.add_stream(pkts)
595 self.pg_enable_capture(self.pg_interfaces)
597 capture = self.pg5.get_capture(len(pkts))
598 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
600 # between S-NAT inside interfaces with different VRF (hairpinning)
601 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
602 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
603 TCP(sport=1234, dport=5678))
604 self.pg4.add_stream(p)
605 self.pg_enable_capture(self.pg_interfaces)
607 capture = self.pg6.get_capture(1)
612 self.assertEqual(ip.src, self.snat_addr)
613 self.assertEqual(ip.dst, self.pg6.remote_ip4)
614 self.assertNotEqual(tcp.sport, 1234)
615 self.assertEqual(tcp.dport, 5678)
617 self.logger.error(ppp("Unexpected or invalid packet:", p))
620 # in2out 1st interface
621 pkts = self.create_stream_in(self.pg4, self.pg3)
622 self.pg4.add_stream(pkts)
623 self.pg_enable_capture(self.pg_interfaces)
625 capture = self.pg3.get_capture(len(pkts))
626 self.verify_capture_out(capture)
628 # out2in 1st interface
629 pkts = self.create_stream_out(self.pg3)
630 self.pg3.add_stream(pkts)
631 self.pg_enable_capture(self.pg_interfaces)
633 capture = self.pg4.get_capture(len(pkts))
634 self.verify_capture_in(capture, self.pg4)
636 # in2out 2nd interface
637 pkts = self.create_stream_in(self.pg5, self.pg3)
638 self.pg5.add_stream(pkts)
639 self.pg_enable_capture(self.pg_interfaces)
641 capture = self.pg3.get_capture(len(pkts))
642 self.verify_capture_out(capture)
644 # out2in 2nd interface
645 pkts = self.create_stream_out(self.pg3)
646 self.pg3.add_stream(pkts)
647 self.pg_enable_capture(self.pg_interfaces)
649 capture = self.pg5.get_capture(len(pkts))
650 self.verify_capture_in(capture, self.pg5)
652 # in2out 3rd interface
653 pkts = self.create_stream_in(self.pg6, self.pg3)
654 self.pg6.add_stream(pkts)
655 self.pg_enable_capture(self.pg_interfaces)
657 capture = self.pg3.get_capture(len(pkts))
658 self.verify_capture_out(capture, static_nat_ip, True)
660 # out2in 3rd interface
661 pkts = self.create_stream_out(self.pg3, static_nat_ip)
662 self.pg3.add_stream(pkts)
663 self.pg_enable_capture(self.pg_interfaces)
665 capture = self.pg6.get_capture(len(pkts))
666 self.verify_capture_in(capture, self.pg6)
668 def test_hairpinning(self):
669 """ SNAT hairpinning """
671 host = self.pg0.remote_hosts[0]
672 server = self.pg0.remote_hosts[1]
675 server_in_port = 5678
676 server_out_port = 8765
678 self.snat_add_address(self.snat_addr)
679 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
680 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
682 # add static mapping for server
683 self.snat_add_static_mapping(server.ip4, self.snat_addr,
684 server_in_port, server_out_port)
686 # send packet from host to server
687 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
688 IP(src=host.ip4, dst=self.snat_addr) /
689 TCP(sport=host_in_port, dport=server_out_port))
690 self.pg0.add_stream(p)
691 self.pg_enable_capture(self.pg_interfaces)
693 capture = self.pg0.get_capture(1)
698 self.assertEqual(ip.src, self.snat_addr)
699 self.assertEqual(ip.dst, server.ip4)
700 self.assertNotEqual(tcp.sport, host_in_port)
701 self.assertEqual(tcp.dport, server_in_port)
702 host_out_port = tcp.sport
704 self.logger.error(ppp("Unexpected or invalid packet:", p))
707 # send reply from server to host
708 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
709 IP(src=server.ip4, dst=self.snat_addr) /
710 TCP(sport=server_in_port, dport=host_out_port))
711 self.pg0.add_stream(p)
712 self.pg_enable_capture(self.pg_interfaces)
714 capture = self.pg0.get_capture(1)
719 self.assertEqual(ip.src, self.snat_addr)
720 self.assertEqual(ip.dst, host.ip4)
721 self.assertEqual(tcp.sport, server_out_port)
722 self.assertEqual(tcp.dport, host_in_port)
724 self.logger.error(ppp("Unexpected or invalid packet:"), p)
727 def test_max_translations_per_user(self):
728 """ MAX translations per user - recycle the least recently used """
730 self.snat_add_address(self.snat_addr)
731 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
732 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
735 # get maximum number of translations per user
736 snat_config = self.vapi.snat_show_config()
738 # send more than maximum number of translations per user packets
739 pkts_num = snat_config.max_translations_per_user + 5
741 for port in range(0, pkts_num):
742 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
743 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
744 TCP(sport=1025 + port))
746 self.pg0.add_stream(pkts)
747 self.pg_enable_capture(self.pg_interfaces)
750 # verify number of translated packet
751 self.pg1.get_capture(pkts_num)
753 def test_interface_addr(self):
754 """ Acquire SNAT addresses from interface """
755 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
757 # no address in NAT pool
758 adresses = self.vapi.snat_address_dump()
759 self.assertEqual(0, len(adresses))
761 # configure interface address and check NAT address pool
762 self.pg7.config_ip4()
763 adresses = self.vapi.snat_address_dump()
764 self.assertEqual(1, len(adresses))
766 # remove interface address and check NAT address pool
767 self.pg7.unconfig_ip4()
768 adresses = self.vapi.snat_address_dump()
769 self.assertEqual(0, len(adresses))
771 def test_ipfix_nat44_sess(self):
772 """ S-NAT IPFIX logging NAT44 session created/delted """
773 self.snat_add_address(self.snat_addr)
774 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
775 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
777 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
778 src_address=self.pg3.local_ip4n,
780 template_interval=10)
781 self.vapi.snat_ipfix()
783 pkts = self.create_stream_in(self.pg0, self.pg1)
784 self.pg0.add_stream(pkts)
785 self.pg_enable_capture(self.pg_interfaces)
787 capture = self.pg1.get_capture(len(pkts))
788 self.verify_capture_out(capture)
789 self.snat_add_address(self.snat_addr, is_add=0)
790 self.vapi.cli("ipfix flush") # FIXME this should be an API call
791 capture = self.pg3.get_capture(3)
792 ipfix = IPFIXDecoder()
793 # first load template
795 self.assertTrue(p.haslayer(IPFIX))
796 if p.haslayer(Template):
797 ipfix.add_template(p.getlayer(Template))
798 # verify events in data set
801 data = ipfix.decode_data_set(p.getlayer(Set))
802 self.verify_ipfix_nat44_ses(data)
804 def test_ipfix_addr_exhausted(self):
805 """ S-NAT IPFIX logging NAT addresses exhausted """
806 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
807 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
809 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
810 src_address=self.pg3.local_ip4n,
812 template_interval=10)
813 self.vapi.snat_ipfix()
815 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
816 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
818 self.pg0.add_stream(p)
819 self.pg_enable_capture(self.pg_interfaces)
821 capture = self.pg1.get_capture(0)
822 self.vapi.cli("ipfix flush") # FIXME this should be an API call
823 capture = self.pg3.get_capture(3)
824 ipfix = IPFIXDecoder()
825 # first load template
827 self.assertTrue(p.haslayer(IPFIX))
828 if p.haslayer(Template):
829 ipfix.add_template(p.getlayer(Template))
830 # verify events in data set
833 data = ipfix.decode_data_set(p.getlayer(Set))
834 self.verify_ipfix_addr_exhausted(data)
837 super(TestSNAT, self).tearDown()
838 if not self.vpp_dead:
839 self.logger.info(self.vapi.cli("show snat verbose"))
843 if __name__ == '__main__':
844 unittest.main(testRunner=VppTestRunner)