7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
16 class TestSNAT(VppTestCase):
17 """ SNAT Test Cases """
21 super(TestSNAT, cls).setUpClass()
24 cls.tcp_port_in = 6303
25 cls.tcp_port_out = 6303
26 cls.udp_port_in = 6304
27 cls.udp_port_out = 6304
29 cls.icmp_id_out = 6305
30 cls.snat_addr = '10.0.0.3'
32 cls.create_pg_interfaces(range(8))
33 cls.interfaces = list(cls.pg_interfaces[0:4])
35 for i in cls.interfaces:
40 cls.pg0.generate_remote_hosts(2)
41 cls.pg0.configure_ipv4_neighbors()
43 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
45 cls.pg4._local_ip4 = "172.16.255.1"
46 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
47 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
48 cls.pg4.set_table_ip4(10)
49 cls.pg5._local_ip4 = "172.16.255.3"
50 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
51 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
52 cls.pg5.set_table_ip4(10)
53 cls.pg6._local_ip4 = "172.16.255.1"
54 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
55 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
56 cls.pg6.set_table_ip4(20)
57 for i in cls.overlapping_interfaces:
65 super(TestSNAT, cls).tearDownClass()
68 def create_stream_in(self, in_if, out_if, ttl=64):
70 Create packet stream for inside network
72 :param in_if: Inside interface
73 :param out_if: Outside interface
74 :param ttl: TTL of generated packets
78 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
79 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
80 TCP(sport=self.tcp_port_in))
84 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
85 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
86 UDP(sport=self.udp_port_in))
90 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
91 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
92 ICMP(id=self.icmp_id_in, type='echo-request'))
97 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
99 Create packet stream for outside network
101 :param out_if: Outside interface
102 :param dst_ip: Destination IP address (Default use global SNAT address)
103 :param ttl: TTL of generated packets
106 dst_ip = self.snat_addr
109 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
110 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
111 TCP(dport=self.tcp_port_out))
115 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
116 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
117 UDP(dport=self.udp_port_out))
121 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
122 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
123 ICMP(id=self.icmp_id_out, type='echo-reply'))
128 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
131 Verify captured packets on outside network
133 :param capture: Captured packets
134 :param nat_ip: Translated IP address (Default use global SNAT address)
135 :param same_port: Sorce port number is not translated (Default False)
136 :param packet_num: Expected number of packets (Default 3)
139 nat_ip = self.snat_addr
140 self.assertEqual(packet_num, len(capture))
141 for packet in capture:
143 self.assertEqual(packet[IP].src, nat_ip)
144 if packet.haslayer(TCP):
146 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
149 packet[TCP].sport, self.tcp_port_in)
150 self.tcp_port_out = packet[TCP].sport
151 elif packet.haslayer(UDP):
153 self.assertEqual(packet[UDP].sport, self.udp_port_in)
156 packet[UDP].sport, self.udp_port_in)
157 self.udp_port_out = packet[UDP].sport
160 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
162 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
163 self.icmp_id_out = packet[ICMP].id
165 self.logger.error(ppp("Unexpected or invalid packet "
166 "(outside network):", packet))
169 def verify_capture_in(self, capture, in_if, packet_num=3):
171 Verify captured packets on inside network
173 :param capture: Captured packets
174 :param in_if: Inside interface
175 :param packet_num: Expected number of packets (Default 3)
177 self.assertEqual(packet_num, len(capture))
178 for packet in capture:
180 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
181 if packet.haslayer(TCP):
182 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
183 elif packet.haslayer(UDP):
184 self.assertEqual(packet[UDP].dport, self.udp_port_in)
186 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
188 self.logger.error(ppp("Unexpected or invalid packet "
189 "(inside network):", packet))
192 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
194 Verify captured packet that don't have to be translated
196 :param capture: Captured packets
197 :param ingress_if: Ingress interface
198 :param egress_if: Egress interface
200 for packet in capture:
202 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
203 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
204 if packet.haslayer(TCP):
205 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
206 elif packet.haslayer(UDP):
207 self.assertEqual(packet[UDP].sport, self.udp_port_in)
209 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
211 self.logger.error(ppp("Unexpected or invalid packet "
212 "(inside network):", packet))
215 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
216 packet_num=3, icmp_type=11):
218 Verify captured packets with ICMP errors on outside network
220 :param capture: Captured packets
221 :param src_ip: Translated IP address or IP address of VPP
222 (Default use global SNAT address)
223 :param packet_num: Expected number of packets (Default 3)
224 :param icmp_type: Type of error ICMP packet
225 we are expecting (Default 11)
228 src_ip = self.snat_addr
229 self.assertEqual(packet_num, len(capture))
230 for packet in capture:
232 self.assertEqual(packet[IP].src, src_ip)
233 self.assertTrue(packet.haslayer(ICMP))
235 self.assertEqual(icmp.type, icmp_type)
236 self.assertTrue(icmp.haslayer(IPerror))
237 inner_ip = icmp[IPerror]
238 if inner_ip.haslayer(TCPerror):
239 self.assertEqual(inner_ip[TCPerror].dport,
241 elif inner_ip.haslayer(UDPerror):
242 self.assertEqual(inner_ip[UDPerror].dport,
245 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
247 self.logger.error(ppp("Unexpected or invalid packet "
248 "(outside network):", packet))
251 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
254 Verify captured packets with ICMP errors on inside network
256 :param capture: Captured packets
257 :param in_if: Inside interface
258 :param packet_num: Expected number of packets (Default 3)
259 :param icmp_type: Type of error ICMP packet
260 we are expecting (Default 11)
262 self.assertEqual(packet_num, len(capture))
263 for packet in capture:
265 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
266 self.assertTrue(packet.haslayer(ICMP))
268 self.assertEqual(icmp.type, icmp_type)
269 self.assertTrue(icmp.haslayer(IPerror))
270 inner_ip = icmp[IPerror]
271 if inner_ip.haslayer(TCPerror):
272 self.assertEqual(inner_ip[TCPerror].sport,
274 elif inner_ip.haslayer(UDPerror):
275 self.assertEqual(inner_ip[UDPerror].sport,
278 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
280 self.logger.error(ppp("Unexpected or invalid packet "
281 "(inside network):", packet))
284 def verify_ipfix_nat44_ses(self, data):
286 Verify IPFIX NAT44 session create/delete event
288 :param data: Decoded IPFIX data records
290 nat44_ses_create_num = 0
291 nat44_ses_delete_num = 0
292 self.assertEqual(6, len(data))
295 self.assertIn(ord(record[230]), [4, 5])
296 if ord(record[230]) == 4:
297 nat44_ses_create_num += 1
299 nat44_ses_delete_num += 1
301 self.assertEqual(self.pg0.remote_ip4n, record[8])
302 # postNATSourceIPv4Address
303 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
306 self.assertEqual(struct.pack("!I", 0), record[234])
307 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
308 if IP_PROTOS.icmp == ord(record[4]):
309 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
310 self.assertEqual(struct.pack("!H", self.icmp_id_out),
312 elif IP_PROTOS.tcp == ord(record[4]):
313 self.assertEqual(struct.pack("!H", self.tcp_port_in),
315 self.assertEqual(struct.pack("!H", self.tcp_port_out),
317 elif IP_PROTOS.udp == ord(record[4]):
318 self.assertEqual(struct.pack("!H", self.udp_port_in),
320 self.assertEqual(struct.pack("!H", self.udp_port_out),
323 self.fail("Invalid protocol")
324 self.assertEqual(3, nat44_ses_create_num)
325 self.assertEqual(3, nat44_ses_delete_num)
327 def verify_ipfix_addr_exhausted(self, data):
329 Verify IPFIX NAT addresses event
331 :param data: Decoded IPFIX data records
333 self.assertEqual(1, len(data))
336 self.assertEqual(ord(record[230]), 3)
338 self.assertEqual(struct.pack("!I", 0), record[283])
340 def clear_snat(self):
342 Clear SNAT configuration.
344 if self.pg7.has_ip4_config:
345 self.pg7.unconfig_ip4()
347 interfaces = self.vapi.snat_interface_addr_dump()
348 for intf in interfaces:
349 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
351 self.vapi.snat_ipfix(enable=0)
353 interfaces = self.vapi.snat_interface_dump()
354 for intf in interfaces:
355 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
359 static_mappings = self.vapi.snat_static_mapping_dump()
360 for sm in static_mappings:
361 self.vapi.snat_add_static_mapping(sm.local_ip_address,
362 sm.external_ip_address,
363 local_port=sm.local_port,
364 external_port=sm.external_port,
365 addr_only=sm.addr_only,
367 protocol=sm.protocol,
370 adresses = self.vapi.snat_address_dump()
371 for addr in adresses:
372 self.vapi.snat_add_address_range(addr.ip_address,
376 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
377 local_port=0, external_port=0, vrf_id=0,
378 is_add=1, external_sw_if_index=0xFFFFFFFF,
381 Add/delete S-NAT static mapping
383 :param local_ip: Local IP address
384 :param external_ip: External IP address
385 :param local_port: Local port number (Optional)
386 :param external_port: External port number (Optional)
387 :param vrf_id: VRF ID (Default 0)
388 :param is_add: 1 if add, 0 if delete (Default add)
389 :param external_sw_if_index: External interface instead of IP address
390 :param proto: IP protocol (Mandatory if port specified)
393 if local_port and external_port:
395 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
396 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
397 self.vapi.snat_add_static_mapping(
400 external_sw_if_index,
408 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
410 Add/delete S-NAT address
412 :param ip: IP address
413 :param is_add: 1 if add, 0 if delete (Default add)
415 snat_addr = socket.inet_pton(socket.AF_INET, ip)
416 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
419 def test_dynamic(self):
420 """ SNAT dynamic translation test """
422 self.snat_add_address(self.snat_addr)
423 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
424 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
428 pkts = self.create_stream_in(self.pg0, self.pg1)
429 self.pg0.add_stream(pkts)
430 self.pg_enable_capture(self.pg_interfaces)
432 capture = self.pg1.get_capture(len(pkts))
433 self.verify_capture_out(capture)
436 pkts = self.create_stream_out(self.pg1)
437 self.pg1.add_stream(pkts)
438 self.pg_enable_capture(self.pg_interfaces)
440 capture = self.pg0.get_capture(len(pkts))
441 self.verify_capture_in(capture, self.pg0)
443 def test_dynamic_icmp_errors_in2out_ttl_1(self):
444 """ SNAT handling of client packets with TTL=1 """
446 self.snat_add_address(self.snat_addr)
447 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
448 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
451 # Client side - generate traffic
452 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
453 self.pg0.add_stream(pkts)
454 self.pg_enable_capture(self.pg_interfaces)
457 # Client side - verify ICMP type 11 packets
458 capture = self.pg0.get_capture(len(pkts))
459 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
461 def test_dynamic_icmp_errors_out2in_ttl_1(self):
462 """ SNAT handling of server packets with TTL=1 """
464 self.snat_add_address(self.snat_addr)
465 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
466 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
469 # Client side - create sessions
470 pkts = self.create_stream_in(self.pg0, self.pg1)
471 self.pg0.add_stream(pkts)
472 self.pg_enable_capture(self.pg_interfaces)
475 # Server side - generate traffic
476 capture = self.pg1.get_capture(len(pkts))
477 self.verify_capture_out(capture)
478 pkts = self.create_stream_out(self.pg1, ttl=1)
479 self.pg1.add_stream(pkts)
480 self.pg_enable_capture(self.pg_interfaces)
483 # Server side - verify ICMP type 11 packets
484 capture = self.pg1.get_capture(len(pkts))
485 self.verify_capture_out_with_icmp_errors(capture,
486 src_ip=self.pg1.local_ip4)
488 def test_dynamic_icmp_errors_in2out_ttl_2(self):
489 """ SNAT handling of error responses to client packets with TTL=2 """
491 self.snat_add_address(self.snat_addr)
492 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
493 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
496 # Client side - generate traffic
497 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
498 self.pg0.add_stream(pkts)
499 self.pg_enable_capture(self.pg_interfaces)
502 # Server side - simulate ICMP type 11 response
503 capture = self.pg1.get_capture(len(pkts))
504 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
505 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
506 ICMP(type=11) / packet[IP] for packet in capture]
507 self.pg1.add_stream(pkts)
508 self.pg_enable_capture(self.pg_interfaces)
511 # Client side - verify ICMP type 11 packets
512 capture = self.pg0.get_capture(len(pkts))
513 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
515 def test_dynamic_icmp_errors_out2in_ttl_2(self):
516 """ SNAT handling of error responses to server packets with TTL=2 """
518 self.snat_add_address(self.snat_addr)
519 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
520 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
523 # Client side - create sessions
524 pkts = self.create_stream_in(self.pg0, self.pg1)
525 self.pg0.add_stream(pkts)
526 self.pg_enable_capture(self.pg_interfaces)
529 # Server side - generate traffic
530 capture = self.pg1.get_capture(len(pkts))
531 self.verify_capture_out(capture)
532 pkts = self.create_stream_out(self.pg1, ttl=2)
533 self.pg1.add_stream(pkts)
534 self.pg_enable_capture(self.pg_interfaces)
537 # Client side - simulate ICMP type 11 response
538 capture = self.pg0.get_capture(len(pkts))
539 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
540 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
541 ICMP(type=11) / packet[IP] for packet in capture]
542 self.pg0.add_stream(pkts)
543 self.pg_enable_capture(self.pg_interfaces)
546 # Server side - verify ICMP type 11 packets
547 capture = self.pg1.get_capture(len(pkts))
548 self.verify_capture_out_with_icmp_errors(capture)
550 def test_ping_out_interface_from_outside(self):
551 """ Ping SNAT out interface from outside """
553 self.snat_add_address(self.snat_addr)
554 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
555 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
558 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
559 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
560 ICMP(id=self.icmp_id_out, type='echo-request'))
562 self.pg1.add_stream(pkts)
563 self.pg_enable_capture(self.pg_interfaces)
565 capture = self.pg1.get_capture(len(pkts))
566 self.assertEqual(1, len(capture))
569 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
570 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
571 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
572 self.assertEqual(packet[ICMP].type, 0) # echo reply
574 self.logger.error(ppp("Unexpected or invalid packet "
575 "(outside network):", packet))
578 def test_static_in(self):
579 """ SNAT 1:1 NAT initialized from inside network """
582 self.tcp_port_out = 6303
583 self.udp_port_out = 6304
584 self.icmp_id_out = 6305
586 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
587 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
588 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
592 pkts = self.create_stream_in(self.pg0, self.pg1)
593 self.pg0.add_stream(pkts)
594 self.pg_enable_capture(self.pg_interfaces)
596 capture = self.pg1.get_capture(len(pkts))
597 self.verify_capture_out(capture, nat_ip, True)
600 pkts = self.create_stream_out(self.pg1, nat_ip)
601 self.pg1.add_stream(pkts)
602 self.pg_enable_capture(self.pg_interfaces)
604 capture = self.pg0.get_capture(len(pkts))
605 self.verify_capture_in(capture, self.pg0)
607 def test_static_out(self):
608 """ SNAT 1:1 NAT initialized from outside network """
611 self.tcp_port_out = 6303
612 self.udp_port_out = 6304
613 self.icmp_id_out = 6305
615 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
616 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
617 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
621 pkts = self.create_stream_out(self.pg1, nat_ip)
622 self.pg1.add_stream(pkts)
623 self.pg_enable_capture(self.pg_interfaces)
625 capture = self.pg0.get_capture(len(pkts))
626 self.verify_capture_in(capture, self.pg0)
629 pkts = self.create_stream_in(self.pg0, self.pg1)
630 self.pg0.add_stream(pkts)
631 self.pg_enable_capture(self.pg_interfaces)
633 capture = self.pg1.get_capture(len(pkts))
634 self.verify_capture_out(capture, nat_ip, True)
636 def test_static_with_port_in(self):
637 """ SNAT 1:1 NAT with port initialized from inside network """
639 self.tcp_port_out = 3606
640 self.udp_port_out = 3607
641 self.icmp_id_out = 3608
643 self.snat_add_address(self.snat_addr)
644 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
645 self.tcp_port_in, self.tcp_port_out,
647 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
648 self.udp_port_in, self.udp_port_out,
650 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
651 self.icmp_id_in, self.icmp_id_out,
652 proto=IP_PROTOS.icmp)
653 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
654 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
658 pkts = self.create_stream_in(self.pg0, self.pg1)
659 self.pg0.add_stream(pkts)
660 self.pg_enable_capture(self.pg_interfaces)
662 capture = self.pg1.get_capture(len(pkts))
663 self.verify_capture_out(capture)
666 pkts = self.create_stream_out(self.pg1)
667 self.pg1.add_stream(pkts)
668 self.pg_enable_capture(self.pg_interfaces)
670 capture = self.pg0.get_capture(len(pkts))
671 self.verify_capture_in(capture, self.pg0)
673 def test_static_with_port_out(self):
674 """ SNAT 1:1 NAT with port initialized from outside network """
676 self.tcp_port_out = 30606
677 self.udp_port_out = 30607
678 self.icmp_id_out = 30608
680 self.snat_add_address(self.snat_addr)
681 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
682 self.tcp_port_in, self.tcp_port_out,
684 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
685 self.udp_port_in, self.udp_port_out,
687 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
688 self.icmp_id_in, self.icmp_id_out,
689 proto=IP_PROTOS.icmp)
690 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
691 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
695 pkts = self.create_stream_out(self.pg1)
696 self.pg1.add_stream(pkts)
697 self.pg_enable_capture(self.pg_interfaces)
699 capture = self.pg0.get_capture(len(pkts))
700 self.verify_capture_in(capture, self.pg0)
703 pkts = self.create_stream_in(self.pg0, self.pg1)
704 self.pg0.add_stream(pkts)
705 self.pg_enable_capture(self.pg_interfaces)
707 capture = self.pg1.get_capture(len(pkts))
708 self.verify_capture_out(capture)
710 def test_static_vrf_aware(self):
711 """ SNAT 1:1 NAT VRF awareness """
713 nat_ip1 = "10.0.0.30"
714 nat_ip2 = "10.0.0.40"
715 self.tcp_port_out = 6303
716 self.udp_port_out = 6304
717 self.icmp_id_out = 6305
719 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
721 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
723 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
725 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
726 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
728 # inside interface VRF match SNAT static mapping VRF
729 pkts = self.create_stream_in(self.pg4, self.pg3)
730 self.pg4.add_stream(pkts)
731 self.pg_enable_capture(self.pg_interfaces)
733 capture = self.pg3.get_capture(len(pkts))
734 self.verify_capture_out(capture, nat_ip1, True)
736 # inside interface VRF don't match SNAT static mapping VRF (packets
738 pkts = self.create_stream_in(self.pg0, self.pg3)
739 self.pg0.add_stream(pkts)
740 self.pg_enable_capture(self.pg_interfaces)
742 self.pg3.assert_nothing_captured()
744 def test_multiple_inside_interfaces(self):
745 """ SNAT multiple inside interfaces (non-overlapping address space) """
747 self.snat_add_address(self.snat_addr)
748 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
749 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
750 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
753 # between two S-NAT inside interfaces (no translation)
754 pkts = self.create_stream_in(self.pg0, self.pg1)
755 self.pg0.add_stream(pkts)
756 self.pg_enable_capture(self.pg_interfaces)
758 capture = self.pg1.get_capture(len(pkts))
759 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
761 # from S-NAT inside to interface without S-NAT feature (no translation)
762 pkts = self.create_stream_in(self.pg0, self.pg2)
763 self.pg0.add_stream(pkts)
764 self.pg_enable_capture(self.pg_interfaces)
766 capture = self.pg2.get_capture(len(pkts))
767 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
769 # in2out 1st interface
770 pkts = self.create_stream_in(self.pg0, self.pg3)
771 self.pg0.add_stream(pkts)
772 self.pg_enable_capture(self.pg_interfaces)
774 capture = self.pg3.get_capture(len(pkts))
775 self.verify_capture_out(capture)
777 # out2in 1st interface
778 pkts = self.create_stream_out(self.pg3)
779 self.pg3.add_stream(pkts)
780 self.pg_enable_capture(self.pg_interfaces)
782 capture = self.pg0.get_capture(len(pkts))
783 self.verify_capture_in(capture, self.pg0)
785 # in2out 2nd interface
786 pkts = self.create_stream_in(self.pg1, self.pg3)
787 self.pg1.add_stream(pkts)
788 self.pg_enable_capture(self.pg_interfaces)
790 capture = self.pg3.get_capture(len(pkts))
791 self.verify_capture_out(capture)
793 # out2in 2nd interface
794 pkts = self.create_stream_out(self.pg3)
795 self.pg3.add_stream(pkts)
796 self.pg_enable_capture(self.pg_interfaces)
798 capture = self.pg1.get_capture(len(pkts))
799 self.verify_capture_in(capture, self.pg1)
801 def test_inside_overlapping_interfaces(self):
802 """ SNAT multiple inside interfaces with overlapping address space """
804 static_nat_ip = "10.0.0.10"
805 self.snat_add_address(self.snat_addr)
806 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
808 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
809 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
810 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
811 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
814 # between S-NAT inside interfaces with same VRF (no translation)
815 pkts = self.create_stream_in(self.pg4, self.pg5)
816 self.pg4.add_stream(pkts)
817 self.pg_enable_capture(self.pg_interfaces)
819 capture = self.pg5.get_capture(len(pkts))
820 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
822 # between S-NAT inside interfaces with different VRF (hairpinning)
823 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
824 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
825 TCP(sport=1234, dport=5678))
826 self.pg4.add_stream(p)
827 self.pg_enable_capture(self.pg_interfaces)
829 capture = self.pg6.get_capture(1)
834 self.assertEqual(ip.src, self.snat_addr)
835 self.assertEqual(ip.dst, self.pg6.remote_ip4)
836 self.assertNotEqual(tcp.sport, 1234)
837 self.assertEqual(tcp.dport, 5678)
839 self.logger.error(ppp("Unexpected or invalid packet:", p))
842 # in2out 1st interface
843 pkts = self.create_stream_in(self.pg4, self.pg3)
844 self.pg4.add_stream(pkts)
845 self.pg_enable_capture(self.pg_interfaces)
847 capture = self.pg3.get_capture(len(pkts))
848 self.verify_capture_out(capture)
850 # out2in 1st interface
851 pkts = self.create_stream_out(self.pg3)
852 self.pg3.add_stream(pkts)
853 self.pg_enable_capture(self.pg_interfaces)
855 capture = self.pg4.get_capture(len(pkts))
856 self.verify_capture_in(capture, self.pg4)
858 # in2out 2nd interface
859 pkts = self.create_stream_in(self.pg5, self.pg3)
860 self.pg5.add_stream(pkts)
861 self.pg_enable_capture(self.pg_interfaces)
863 capture = self.pg3.get_capture(len(pkts))
864 self.verify_capture_out(capture)
866 # out2in 2nd interface
867 pkts = self.create_stream_out(self.pg3)
868 self.pg3.add_stream(pkts)
869 self.pg_enable_capture(self.pg_interfaces)
871 capture = self.pg5.get_capture(len(pkts))
872 self.verify_capture_in(capture, self.pg5)
875 addresses = self.vapi.snat_address_dump()
876 self.assertEqual(len(addresses), 1)
877 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
878 self.assertEqual(len(sessions), 3)
879 for session in sessions:
880 self.assertFalse(session.is_static)
881 self.assertEqual(session.inside_ip_address[0:4],
882 self.pg5.remote_ip4n)
883 self.assertEqual(session.outside_ip_address,
884 addresses[0].ip_address)
885 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
886 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
887 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
888 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
889 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
890 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
891 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
892 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
893 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
895 # in2out 3rd interface
896 pkts = self.create_stream_in(self.pg6, self.pg3)
897 self.pg6.add_stream(pkts)
898 self.pg_enable_capture(self.pg_interfaces)
900 capture = self.pg3.get_capture(len(pkts))
901 self.verify_capture_out(capture, static_nat_ip, True)
903 # out2in 3rd interface
904 pkts = self.create_stream_out(self.pg3, static_nat_ip)
905 self.pg3.add_stream(pkts)
906 self.pg_enable_capture(self.pg_interfaces)
908 capture = self.pg6.get_capture(len(pkts))
909 self.verify_capture_in(capture, self.pg6)
911 # general user and session dump verifications
912 users = self.vapi.snat_user_dump()
913 self.assertTrue(len(users) >= 3)
914 addresses = self.vapi.snat_address_dump()
915 self.assertEqual(len(addresses), 1)
917 sessions = self.vapi.snat_user_session_dump(user.ip_address,
919 for session in sessions:
920 self.assertEqual(user.ip_address, session.inside_ip_address)
921 self.assertTrue(session.total_bytes > session.total_pkts > 0)
922 self.assertTrue(session.protocol in
923 [IP_PROTOS.tcp, IP_PROTOS.udp,
927 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
928 self.assertTrue(len(sessions) >= 4)
929 for session in sessions:
930 self.assertFalse(session.is_static)
931 self.assertEqual(session.inside_ip_address[0:4],
932 self.pg4.remote_ip4n)
933 self.assertEqual(session.outside_ip_address,
934 addresses[0].ip_address)
937 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
938 self.assertTrue(len(sessions) >= 3)
939 for session in sessions:
940 self.assertTrue(session.is_static)
941 self.assertEqual(session.inside_ip_address[0:4],
942 self.pg6.remote_ip4n)
943 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
944 map(int, static_nat_ip.split('.')))
945 self.assertTrue(session.inside_port in
946 [self.tcp_port_in, self.udp_port_in,
949 def test_hairpinning(self):
950 """ SNAT hairpinning """
952 host = self.pg0.remote_hosts[0]
953 server = self.pg0.remote_hosts[1]
956 server_in_port = 5678
957 server_out_port = 8765
959 self.snat_add_address(self.snat_addr)
960 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
961 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
963 # add static mapping for server
964 self.snat_add_static_mapping(server.ip4, self.snat_addr,
965 server_in_port, server_out_port,
968 # send packet from host to server
969 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
970 IP(src=host.ip4, dst=self.snat_addr) /
971 TCP(sport=host_in_port, dport=server_out_port))
972 self.pg0.add_stream(p)
973 self.pg_enable_capture(self.pg_interfaces)
975 capture = self.pg0.get_capture(1)
980 self.assertEqual(ip.src, self.snat_addr)
981 self.assertEqual(ip.dst, server.ip4)
982 self.assertNotEqual(tcp.sport, host_in_port)
983 self.assertEqual(tcp.dport, server_in_port)
984 host_out_port = tcp.sport
986 self.logger.error(ppp("Unexpected or invalid packet:", p))
989 # send reply from server to host
990 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
991 IP(src=server.ip4, dst=self.snat_addr) /
992 TCP(sport=server_in_port, dport=host_out_port))
993 self.pg0.add_stream(p)
994 self.pg_enable_capture(self.pg_interfaces)
996 capture = self.pg0.get_capture(1)
1001 self.assertEqual(ip.src, self.snat_addr)
1002 self.assertEqual(ip.dst, host.ip4)
1003 self.assertEqual(tcp.sport, server_out_port)
1004 self.assertEqual(tcp.dport, host_in_port)
1006 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1009 def test_max_translations_per_user(self):
1010 """ MAX translations per user - recycle the least recently used """
1012 self.snat_add_address(self.snat_addr)
1013 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1014 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1017 # get maximum number of translations per user
1018 snat_config = self.vapi.snat_show_config()
1020 # send more than maximum number of translations per user packets
1021 pkts_num = snat_config.max_translations_per_user + 5
1023 for port in range(0, pkts_num):
1024 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1025 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1026 TCP(sport=1025 + port))
1028 self.pg0.add_stream(pkts)
1029 self.pg_enable_capture(self.pg_interfaces)
1032 # verify number of translated packet
1033 self.pg1.get_capture(pkts_num)
1035 def test_interface_addr(self):
1036 """ Acquire SNAT addresses from interface """
1037 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1039 # no address in NAT pool
1040 adresses = self.vapi.snat_address_dump()
1041 self.assertEqual(0, len(adresses))
1043 # configure interface address and check NAT address pool
1044 self.pg7.config_ip4()
1045 adresses = self.vapi.snat_address_dump()
1046 self.assertEqual(1, len(adresses))
1047 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1049 # remove interface address and check NAT address pool
1050 self.pg7.unconfig_ip4()
1051 adresses = self.vapi.snat_address_dump()
1052 self.assertEqual(0, len(adresses))
1054 def test_interface_addr_static_mapping(self):
1055 """ Static mapping with addresses from interface """
1056 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1057 self.snat_add_static_mapping('1.2.3.4',
1058 external_sw_if_index=self.pg7.sw_if_index)
1060 # static mappings with external interface
1061 static_mappings = self.vapi.snat_static_mapping_dump()
1062 self.assertEqual(1, len(static_mappings))
1063 self.assertEqual(self.pg7.sw_if_index,
1064 static_mappings[0].external_sw_if_index)
1066 # configure interface address and check static mappings
1067 self.pg7.config_ip4()
1068 static_mappings = self.vapi.snat_static_mapping_dump()
1069 self.assertEqual(1, len(static_mappings))
1070 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1071 self.pg7.local_ip4n)
1072 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1074 # remove interface address and check static mappings
1075 self.pg7.unconfig_ip4()
1076 static_mappings = self.vapi.snat_static_mapping_dump()
1077 self.assertEqual(0, len(static_mappings))
1079 def test_ipfix_nat44_sess(self):
1080 """ S-NAT IPFIX logging NAT44 session created/delted """
1081 self.snat_add_address(self.snat_addr)
1082 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1083 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1085 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1086 src_address=self.pg3.local_ip4n,
1088 template_interval=10)
1089 self.vapi.snat_ipfix()
1091 pkts = self.create_stream_in(self.pg0, self.pg1)
1092 self.pg0.add_stream(pkts)
1093 self.pg_enable_capture(self.pg_interfaces)
1095 capture = self.pg1.get_capture(len(pkts))
1096 self.verify_capture_out(capture)
1097 self.snat_add_address(self.snat_addr, is_add=0)
1098 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1099 capture = self.pg3.get_capture(3)
1100 ipfix = IPFIXDecoder()
1101 # first load template
1103 self.assertTrue(p.haslayer(IPFIX))
1104 if p.haslayer(Template):
1105 ipfix.add_template(p.getlayer(Template))
1106 # verify events in data set
1108 if p.haslayer(Data):
1109 data = ipfix.decode_data_set(p.getlayer(Set))
1110 self.verify_ipfix_nat44_ses(data)
1112 def test_ipfix_addr_exhausted(self):
1113 """ S-NAT IPFIX logging NAT addresses exhausted """
1114 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1115 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1117 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1118 src_address=self.pg3.local_ip4n,
1120 template_interval=10)
1121 self.vapi.snat_ipfix()
1123 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1124 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1126 self.pg0.add_stream(p)
1127 self.pg_enable_capture(self.pg_interfaces)
1129 capture = self.pg1.get_capture(0)
1130 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1131 capture = self.pg3.get_capture(3)
1132 ipfix = IPFIXDecoder()
1133 # first load template
1135 self.assertTrue(p.haslayer(IPFIX))
1136 if p.haslayer(Template):
1137 ipfix.add_template(p.getlayer(Template))
1138 # verify events in data set
1140 if p.haslayer(Data):
1141 data = ipfix.decode_data_set(p.getlayer(Set))
1142 self.verify_ipfix_addr_exhausted(data)
1144 def test_pool_addr_fib(self):
1145 """ S-NAT add pool addresses to FIB """
1146 static_addr = '10.0.0.10'
1147 self.snat_add_address(self.snat_addr)
1148 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1149 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1151 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1154 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1155 ARP(op=ARP.who_has, pdst=self.snat_addr,
1156 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1157 self.pg1.add_stream(p)
1158 self.pg_enable_capture(self.pg_interfaces)
1160 capture = self.pg1.get_capture(1)
1161 self.assertTrue(capture[0].haslayer(ARP))
1162 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1165 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1166 ARP(op=ARP.who_has, pdst=static_addr,
1167 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1168 self.pg1.add_stream(p)
1169 self.pg_enable_capture(self.pg_interfaces)
1171 capture = self.pg1.get_capture(1)
1172 self.assertTrue(capture[0].haslayer(ARP))
1173 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1175 # send ARP to non-SNAT interface
1176 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1177 ARP(op=ARP.who_has, pdst=self.snat_addr,
1178 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1179 self.pg2.add_stream(p)
1180 self.pg_enable_capture(self.pg_interfaces)
1182 capture = self.pg1.get_capture(0)
1184 # remove addresses and verify
1185 self.snat_add_address(self.snat_addr, is_add=0)
1186 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1189 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1190 ARP(op=ARP.who_has, pdst=self.snat_addr,
1191 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1192 self.pg1.add_stream(p)
1193 self.pg_enable_capture(self.pg_interfaces)
1195 capture = self.pg1.get_capture(0)
1197 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1198 ARP(op=ARP.who_has, pdst=static_addr,
1199 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1200 self.pg1.add_stream(p)
1201 self.pg_enable_capture(self.pg_interfaces)
1203 capture = self.pg1.get_capture(0)
1205 def test_vrf_mode(self):
1206 """ S-NAT tenant VRF aware address pool mode """
1210 nat_ip1 = "10.0.0.10"
1211 nat_ip2 = "10.0.0.11"
1213 self.pg0.unconfig_ip4()
1214 self.pg1.unconfig_ip4()
1215 self.pg0.set_table_ip4(vrf_id1)
1216 self.pg1.set_table_ip4(vrf_id2)
1217 self.pg0.config_ip4()
1218 self.pg1.config_ip4()
1220 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1221 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1222 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1224 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1228 pkts = self.create_stream_in(self.pg0, self.pg2)
1229 self.pg0.add_stream(pkts)
1230 self.pg_enable_capture(self.pg_interfaces)
1232 capture = self.pg2.get_capture(len(pkts))
1233 self.verify_capture_out(capture, nat_ip1)
1236 pkts = self.create_stream_in(self.pg1, self.pg2)
1237 self.pg1.add_stream(pkts)
1238 self.pg_enable_capture(self.pg_interfaces)
1240 capture = self.pg2.get_capture(len(pkts))
1241 self.verify_capture_out(capture, nat_ip2)
1243 def test_vrf_feature_independent(self):
1244 """ S-NAT tenant VRF independent address pool mode """
1246 nat_ip1 = "10.0.0.10"
1247 nat_ip2 = "10.0.0.11"
1249 self.snat_add_address(nat_ip1)
1250 self.snat_add_address(nat_ip2)
1251 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1252 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1253 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1257 pkts = self.create_stream_in(self.pg0, self.pg2)
1258 self.pg0.add_stream(pkts)
1259 self.pg_enable_capture(self.pg_interfaces)
1261 capture = self.pg2.get_capture(len(pkts))
1262 self.verify_capture_out(capture, nat_ip1)
1265 pkts = self.create_stream_in(self.pg1, self.pg2)
1266 self.pg1.add_stream(pkts)
1267 self.pg_enable_capture(self.pg_interfaces)
1269 capture = self.pg2.get_capture(len(pkts))
1270 self.verify_capture_out(capture, nat_ip1)
1273 super(TestSNAT, self).tearDown()
1274 if not self.vpp_dead:
1275 self.logger.info(self.vapi.cli("show snat verbose"))
1279 class TestDeterministicNAT(VppTestCase):
1280 """ Deterministic NAT Test Cases """
1283 def setUpConstants(cls):
1284 super(TestDeterministicNAT, cls).setUpConstants()
1285 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1288 def setUpClass(cls):
1289 super(TestDeterministicNAT, cls).setUpClass()
1292 cls.create_pg_interfaces(range(2))
1293 cls.interfaces = list(cls.pg_interfaces)
1295 for i in cls.interfaces:
1301 super(TestDeterministicNAT, cls).tearDownClass()
1304 def test_deterministic_mode(self):
1305 """ S-NAT run deterministic mode """
1306 in_addr = '172.16.255.0'
1307 out_addr = '172.17.255.50'
1308 in_addr_t = '172.16.255.20'
1309 in_addr_n = socket.inet_aton(in_addr)
1310 out_addr_n = socket.inet_aton(out_addr)
1311 in_addr_t_n = socket.inet_aton(in_addr_t)
1315 snat_config = self.vapi.snat_show_config()
1316 self.assertEqual(1, snat_config.deterministic)
1318 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1320 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1321 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1322 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1323 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1325 deterministic_mappings = self.vapi.snat_det_map_dump()
1326 self.assertEqual(len(deterministic_mappings), 1)
1327 dsm = deterministic_mappings[0]
1328 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1329 self.assertEqual(in_plen, dsm.in_plen)
1330 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1331 self.assertEqual(out_plen, dsm.out_plen)
1334 deterministic_mappings = self.vapi.snat_det_map_dump()
1335 self.assertEqual(len(deterministic_mappings), 0)
1337 def clear_snat(self):
1339 Clear SNAT configuration.
1341 deterministic_mappings = self.vapi.snat_det_map_dump()
1342 for dsm in deterministic_mappings:
1343 self.vapi.snat_add_det_map(dsm.in_addr,
1350 super(TestDeterministicNAT, self).tearDown()
1351 if not self.vpp_dead:
1352 self.logger.info(self.vapi.cli("show snat detail"))
1355 if __name__ == '__main__':
1356 unittest.main(testRunner=VppTestRunner)