7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
16 class MethodHolder(VppTestCase):
17 """ SNAT create capture and verify method holder """
21 super(MethodHolder, cls).setUpClass()
24 super(MethodHolder, self).tearDown()
26 def create_stream_in(self, in_if, out_if, ttl=64):
28 Create packet stream for inside network
30 :param in_if: Inside interface
31 :param out_if: Outside interface
32 :param ttl: TTL of generated packets
36 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
37 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
38 TCP(sport=self.tcp_port_in))
42 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
43 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
44 UDP(sport=self.udp_port_in))
48 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
49 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
50 ICMP(id=self.icmp_id_in, type='echo-request'))
55 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
57 Create packet stream for outside network
59 :param out_if: Outside interface
60 :param dst_ip: Destination IP address (Default use global SNAT address)
61 :param ttl: TTL of generated packets
64 dst_ip = self.snat_addr
67 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
68 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
69 TCP(dport=self.tcp_port_out))
73 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
74 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
75 UDP(dport=self.udp_port_out))
79 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
80 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
81 ICMP(id=self.icmp_id_out, type='echo-reply'))
86 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
89 Verify captured packets on outside network
91 :param capture: Captured packets
92 :param nat_ip: Translated IP address (Default use global SNAT address)
93 :param same_port: Sorce port number is not translated (Default False)
94 :param packet_num: Expected number of packets (Default 3)
97 nat_ip = self.snat_addr
98 self.assertEqual(packet_num, len(capture))
99 for packet in capture:
101 self.assertEqual(packet[IP].src, nat_ip)
102 if packet.haslayer(TCP):
104 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
107 packet[TCP].sport, self.tcp_port_in)
108 self.tcp_port_out = packet[TCP].sport
109 elif packet.haslayer(UDP):
111 self.assertEqual(packet[UDP].sport, self.udp_port_in)
114 packet[UDP].sport, self.udp_port_in)
115 self.udp_port_out = packet[UDP].sport
118 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
120 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
121 self.icmp_id_out = packet[ICMP].id
123 self.logger.error(ppp("Unexpected or invalid packet "
124 "(outside network):", packet))
127 def verify_capture_in(self, capture, in_if, packet_num=3):
129 Verify captured packets on inside network
131 :param capture: Captured packets
132 :param in_if: Inside interface
133 :param packet_num: Expected number of packets (Default 3)
135 self.assertEqual(packet_num, len(capture))
136 for packet in capture:
138 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
139 if packet.haslayer(TCP):
140 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
141 elif packet.haslayer(UDP):
142 self.assertEqual(packet[UDP].dport, self.udp_port_in)
144 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
146 self.logger.error(ppp("Unexpected or invalid packet "
147 "(inside network):", packet))
150 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
152 Verify captured packet that don't have to be translated
154 :param capture: Captured packets
155 :param ingress_if: Ingress interface
156 :param egress_if: Egress interface
158 for packet in capture:
160 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
161 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
162 if packet.haslayer(TCP):
163 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
164 elif packet.haslayer(UDP):
165 self.assertEqual(packet[UDP].sport, self.udp_port_in)
167 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
169 self.logger.error(ppp("Unexpected or invalid packet "
170 "(inside network):", packet))
173 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
174 packet_num=3, icmp_type=11):
176 Verify captured packets with ICMP errors on outside network
178 :param capture: Captured packets
179 :param src_ip: Translated IP address or IP address of VPP
180 (Default use global SNAT address)
181 :param packet_num: Expected number of packets (Default 3)
182 :param icmp_type: Type of error ICMP packet
183 we are expecting (Default 11)
186 src_ip = self.snat_addr
187 self.assertEqual(packet_num, len(capture))
188 for packet in capture:
190 self.assertEqual(packet[IP].src, src_ip)
191 self.assertTrue(packet.haslayer(ICMP))
193 self.assertEqual(icmp.type, icmp_type)
194 self.assertTrue(icmp.haslayer(IPerror))
195 inner_ip = icmp[IPerror]
196 if inner_ip.haslayer(TCPerror):
197 self.assertEqual(inner_ip[TCPerror].dport,
199 elif inner_ip.haslayer(UDPerror):
200 self.assertEqual(inner_ip[UDPerror].dport,
203 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
205 self.logger.error(ppp("Unexpected or invalid packet "
206 "(outside network):", packet))
209 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
212 Verify captured packets with ICMP errors on inside network
214 :param capture: Captured packets
215 :param in_if: Inside interface
216 :param packet_num: Expected number of packets (Default 3)
217 :param icmp_type: Type of error ICMP packet
218 we are expecting (Default 11)
220 self.assertEqual(packet_num, len(capture))
221 for packet in capture:
223 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
224 self.assertTrue(packet.haslayer(ICMP))
226 self.assertEqual(icmp.type, icmp_type)
227 self.assertTrue(icmp.haslayer(IPerror))
228 inner_ip = icmp[IPerror]
229 if inner_ip.haslayer(TCPerror):
230 self.assertEqual(inner_ip[TCPerror].sport,
232 elif inner_ip.haslayer(UDPerror):
233 self.assertEqual(inner_ip[UDPerror].sport,
236 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
238 self.logger.error(ppp("Unexpected or invalid packet "
239 "(inside network):", packet))
242 def verify_ipfix_nat44_ses(self, data):
244 Verify IPFIX NAT44 session create/delete event
246 :param data: Decoded IPFIX data records
248 nat44_ses_create_num = 0
249 nat44_ses_delete_num = 0
250 self.assertEqual(6, len(data))
253 self.assertIn(ord(record[230]), [4, 5])
254 if ord(record[230]) == 4:
255 nat44_ses_create_num += 1
257 nat44_ses_delete_num += 1
259 self.assertEqual(self.pg0.remote_ip4n, record[8])
260 # postNATSourceIPv4Address
261 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
264 self.assertEqual(struct.pack("!I", 0), record[234])
265 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
266 if IP_PROTOS.icmp == ord(record[4]):
267 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
268 self.assertEqual(struct.pack("!H", self.icmp_id_out),
270 elif IP_PROTOS.tcp == ord(record[4]):
271 self.assertEqual(struct.pack("!H", self.tcp_port_in),
273 self.assertEqual(struct.pack("!H", self.tcp_port_out),
275 elif IP_PROTOS.udp == ord(record[4]):
276 self.assertEqual(struct.pack("!H", self.udp_port_in),
278 self.assertEqual(struct.pack("!H", self.udp_port_out),
281 self.fail("Invalid protocol")
282 self.assertEqual(3, nat44_ses_create_num)
283 self.assertEqual(3, nat44_ses_delete_num)
285 def verify_ipfix_addr_exhausted(self, data):
287 Verify IPFIX NAT addresses event
289 :param data: Decoded IPFIX data records
291 self.assertEqual(1, len(data))
294 self.assertEqual(ord(record[230]), 3)
296 self.assertEqual(struct.pack("!I", 0), record[283])
299 class TestSNAT(MethodHolder):
300 """ SNAT Test Cases """
304 super(TestSNAT, cls).setUpClass()
307 cls.tcp_port_in = 6303
308 cls.tcp_port_out = 6303
309 cls.udp_port_in = 6304
310 cls.udp_port_out = 6304
311 cls.icmp_id_in = 6305
312 cls.icmp_id_out = 6305
313 cls.snat_addr = '10.0.0.3'
315 cls.create_pg_interfaces(range(8))
316 cls.interfaces = list(cls.pg_interfaces[0:4])
318 for i in cls.interfaces:
323 cls.pg0.generate_remote_hosts(2)
324 cls.pg0.configure_ipv4_neighbors()
326 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
328 cls.pg4._local_ip4 = "172.16.255.1"
329 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
330 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
331 cls.pg4.set_table_ip4(10)
332 cls.pg5._local_ip4 = "172.16.255.3"
333 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
334 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
335 cls.pg5.set_table_ip4(10)
336 cls.pg6._local_ip4 = "172.16.255.1"
337 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
338 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
339 cls.pg6.set_table_ip4(20)
340 for i in cls.overlapping_interfaces:
348 super(TestSNAT, cls).tearDownClass()
351 def clear_snat(self):
353 Clear SNAT configuration.
355 if self.pg7.has_ip4_config:
356 self.pg7.unconfig_ip4()
358 interfaces = self.vapi.snat_interface_addr_dump()
359 for intf in interfaces:
360 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
362 self.vapi.snat_ipfix(enable=0)
364 interfaces = self.vapi.snat_interface_dump()
365 for intf in interfaces:
366 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
370 static_mappings = self.vapi.snat_static_mapping_dump()
371 for sm in static_mappings:
372 self.vapi.snat_add_static_mapping(sm.local_ip_address,
373 sm.external_ip_address,
374 local_port=sm.local_port,
375 external_port=sm.external_port,
376 addr_only=sm.addr_only,
378 protocol=sm.protocol,
381 adresses = self.vapi.snat_address_dump()
382 for addr in adresses:
383 self.vapi.snat_add_address_range(addr.ip_address,
387 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
388 local_port=0, external_port=0, vrf_id=0,
389 is_add=1, external_sw_if_index=0xFFFFFFFF,
392 Add/delete S-NAT static mapping
394 :param local_ip: Local IP address
395 :param external_ip: External IP address
396 :param local_port: Local port number (Optional)
397 :param external_port: External port number (Optional)
398 :param vrf_id: VRF ID (Default 0)
399 :param is_add: 1 if add, 0 if delete (Default add)
400 :param external_sw_if_index: External interface instead of IP address
401 :param proto: IP protocol (Mandatory if port specified)
404 if local_port and external_port:
406 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
407 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
408 self.vapi.snat_add_static_mapping(
411 external_sw_if_index,
419 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
421 Add/delete S-NAT address
423 :param ip: IP address
424 :param is_add: 1 if add, 0 if delete (Default add)
426 snat_addr = socket.inet_pton(socket.AF_INET, ip)
427 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
430 def test_dynamic(self):
431 """ SNAT dynamic translation test """
433 self.snat_add_address(self.snat_addr)
434 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
435 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
439 pkts = self.create_stream_in(self.pg0, self.pg1)
440 self.pg0.add_stream(pkts)
441 self.pg_enable_capture(self.pg_interfaces)
443 capture = self.pg1.get_capture(len(pkts))
444 self.verify_capture_out(capture)
447 pkts = self.create_stream_out(self.pg1)
448 self.pg1.add_stream(pkts)
449 self.pg_enable_capture(self.pg_interfaces)
451 capture = self.pg0.get_capture(len(pkts))
452 self.verify_capture_in(capture, self.pg0)
454 def test_dynamic_icmp_errors_in2out_ttl_1(self):
455 """ SNAT handling of client packets with TTL=1 """
457 self.snat_add_address(self.snat_addr)
458 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
459 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
462 # Client side - generate traffic
463 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
464 self.pg0.add_stream(pkts)
465 self.pg_enable_capture(self.pg_interfaces)
468 # Client side - verify ICMP type 11 packets
469 capture = self.pg0.get_capture(len(pkts))
470 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
472 def test_dynamic_icmp_errors_out2in_ttl_1(self):
473 """ SNAT handling of server packets with TTL=1 """
475 self.snat_add_address(self.snat_addr)
476 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
477 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
480 # Client side - create sessions
481 pkts = self.create_stream_in(self.pg0, self.pg1)
482 self.pg0.add_stream(pkts)
483 self.pg_enable_capture(self.pg_interfaces)
486 # Server side - generate traffic
487 capture = self.pg1.get_capture(len(pkts))
488 self.verify_capture_out(capture)
489 pkts = self.create_stream_out(self.pg1, ttl=1)
490 self.pg1.add_stream(pkts)
491 self.pg_enable_capture(self.pg_interfaces)
494 # Server side - verify ICMP type 11 packets
495 capture = self.pg1.get_capture(len(pkts))
496 self.verify_capture_out_with_icmp_errors(capture,
497 src_ip=self.pg1.local_ip4)
499 def test_dynamic_icmp_errors_in2out_ttl_2(self):
500 """ SNAT handling of error responses to client packets with TTL=2 """
502 self.snat_add_address(self.snat_addr)
503 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
504 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
507 # Client side - generate traffic
508 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
509 self.pg0.add_stream(pkts)
510 self.pg_enable_capture(self.pg_interfaces)
513 # Server side - simulate ICMP type 11 response
514 capture = self.pg1.get_capture(len(pkts))
515 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
516 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
517 ICMP(type=11) / packet[IP] for packet in capture]
518 self.pg1.add_stream(pkts)
519 self.pg_enable_capture(self.pg_interfaces)
522 # Client side - verify ICMP type 11 packets
523 capture = self.pg0.get_capture(len(pkts))
524 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
526 def test_dynamic_icmp_errors_out2in_ttl_2(self):
527 """ SNAT handling of error responses to server packets with TTL=2 """
529 self.snat_add_address(self.snat_addr)
530 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
531 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
534 # Client side - create sessions
535 pkts = self.create_stream_in(self.pg0, self.pg1)
536 self.pg0.add_stream(pkts)
537 self.pg_enable_capture(self.pg_interfaces)
540 # Server side - generate traffic
541 capture = self.pg1.get_capture(len(pkts))
542 self.verify_capture_out(capture)
543 pkts = self.create_stream_out(self.pg1, ttl=2)
544 self.pg1.add_stream(pkts)
545 self.pg_enable_capture(self.pg_interfaces)
548 # Client side - simulate ICMP type 11 response
549 capture = self.pg0.get_capture(len(pkts))
550 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
551 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
552 ICMP(type=11) / packet[IP] for packet in capture]
553 self.pg0.add_stream(pkts)
554 self.pg_enable_capture(self.pg_interfaces)
557 # Server side - verify ICMP type 11 packets
558 capture = self.pg1.get_capture(len(pkts))
559 self.verify_capture_out_with_icmp_errors(capture)
561 def test_ping_out_interface_from_outside(self):
562 """ Ping SNAT out interface from outside """
564 self.snat_add_address(self.snat_addr)
565 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
566 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
569 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
570 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
571 ICMP(id=self.icmp_id_out, type='echo-request'))
573 self.pg1.add_stream(pkts)
574 self.pg_enable_capture(self.pg_interfaces)
576 capture = self.pg1.get_capture(len(pkts))
577 self.assertEqual(1, len(capture))
580 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
581 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
582 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
583 self.assertEqual(packet[ICMP].type, 0) # echo reply
585 self.logger.error(ppp("Unexpected or invalid packet "
586 "(outside network):", packet))
589 def test_static_in(self):
590 """ SNAT 1:1 NAT initialized from inside network """
593 self.tcp_port_out = 6303
594 self.udp_port_out = 6304
595 self.icmp_id_out = 6305
597 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
598 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
599 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
603 pkts = self.create_stream_in(self.pg0, self.pg1)
604 self.pg0.add_stream(pkts)
605 self.pg_enable_capture(self.pg_interfaces)
607 capture = self.pg1.get_capture(len(pkts))
608 self.verify_capture_out(capture, nat_ip, True)
611 pkts = self.create_stream_out(self.pg1, nat_ip)
612 self.pg1.add_stream(pkts)
613 self.pg_enable_capture(self.pg_interfaces)
615 capture = self.pg0.get_capture(len(pkts))
616 self.verify_capture_in(capture, self.pg0)
618 def test_static_out(self):
619 """ SNAT 1:1 NAT initialized from outside network """
622 self.tcp_port_out = 6303
623 self.udp_port_out = 6304
624 self.icmp_id_out = 6305
626 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
627 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
628 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
632 pkts = self.create_stream_out(self.pg1, nat_ip)
633 self.pg1.add_stream(pkts)
634 self.pg_enable_capture(self.pg_interfaces)
636 capture = self.pg0.get_capture(len(pkts))
637 self.verify_capture_in(capture, self.pg0)
640 pkts = self.create_stream_in(self.pg0, self.pg1)
641 self.pg0.add_stream(pkts)
642 self.pg_enable_capture(self.pg_interfaces)
644 capture = self.pg1.get_capture(len(pkts))
645 self.verify_capture_out(capture, nat_ip, True)
647 def test_static_with_port_in(self):
648 """ SNAT 1:1 NAT with port initialized from inside network """
650 self.tcp_port_out = 3606
651 self.udp_port_out = 3607
652 self.icmp_id_out = 3608
654 self.snat_add_address(self.snat_addr)
655 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
656 self.tcp_port_in, self.tcp_port_out,
658 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
659 self.udp_port_in, self.udp_port_out,
661 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
662 self.icmp_id_in, self.icmp_id_out,
663 proto=IP_PROTOS.icmp)
664 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
665 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
669 pkts = self.create_stream_in(self.pg0, self.pg1)
670 self.pg0.add_stream(pkts)
671 self.pg_enable_capture(self.pg_interfaces)
673 capture = self.pg1.get_capture(len(pkts))
674 self.verify_capture_out(capture)
677 pkts = self.create_stream_out(self.pg1)
678 self.pg1.add_stream(pkts)
679 self.pg_enable_capture(self.pg_interfaces)
681 capture = self.pg0.get_capture(len(pkts))
682 self.verify_capture_in(capture, self.pg0)
684 def test_static_with_port_out(self):
685 """ SNAT 1:1 NAT with port initialized from outside network """
687 self.tcp_port_out = 30606
688 self.udp_port_out = 30607
689 self.icmp_id_out = 30608
691 self.snat_add_address(self.snat_addr)
692 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
693 self.tcp_port_in, self.tcp_port_out,
695 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
696 self.udp_port_in, self.udp_port_out,
698 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
699 self.icmp_id_in, self.icmp_id_out,
700 proto=IP_PROTOS.icmp)
701 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
702 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
706 pkts = self.create_stream_out(self.pg1)
707 self.pg1.add_stream(pkts)
708 self.pg_enable_capture(self.pg_interfaces)
710 capture = self.pg0.get_capture(len(pkts))
711 self.verify_capture_in(capture, self.pg0)
714 pkts = self.create_stream_in(self.pg0, self.pg1)
715 self.pg0.add_stream(pkts)
716 self.pg_enable_capture(self.pg_interfaces)
718 capture = self.pg1.get_capture(len(pkts))
719 self.verify_capture_out(capture)
721 def test_static_vrf_aware(self):
722 """ SNAT 1:1 NAT VRF awareness """
724 nat_ip1 = "10.0.0.30"
725 nat_ip2 = "10.0.0.40"
726 self.tcp_port_out = 6303
727 self.udp_port_out = 6304
728 self.icmp_id_out = 6305
730 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
732 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
734 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
736 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
737 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
739 # inside interface VRF match SNAT static mapping VRF
740 pkts = self.create_stream_in(self.pg4, self.pg3)
741 self.pg4.add_stream(pkts)
742 self.pg_enable_capture(self.pg_interfaces)
744 capture = self.pg3.get_capture(len(pkts))
745 self.verify_capture_out(capture, nat_ip1, True)
747 # inside interface VRF don't match SNAT static mapping VRF (packets
749 pkts = self.create_stream_in(self.pg0, self.pg3)
750 self.pg0.add_stream(pkts)
751 self.pg_enable_capture(self.pg_interfaces)
753 self.pg3.assert_nothing_captured()
755 def test_multiple_inside_interfaces(self):
756 """ SNAT multiple inside interfaces (non-overlapping address space) """
758 self.snat_add_address(self.snat_addr)
759 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
760 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
761 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
764 # between two S-NAT inside interfaces (no translation)
765 pkts = self.create_stream_in(self.pg0, self.pg1)
766 self.pg0.add_stream(pkts)
767 self.pg_enable_capture(self.pg_interfaces)
769 capture = self.pg1.get_capture(len(pkts))
770 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
772 # from S-NAT inside to interface without S-NAT feature (no translation)
773 pkts = self.create_stream_in(self.pg0, self.pg2)
774 self.pg0.add_stream(pkts)
775 self.pg_enable_capture(self.pg_interfaces)
777 capture = self.pg2.get_capture(len(pkts))
778 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
780 # in2out 1st interface
781 pkts = self.create_stream_in(self.pg0, self.pg3)
782 self.pg0.add_stream(pkts)
783 self.pg_enable_capture(self.pg_interfaces)
785 capture = self.pg3.get_capture(len(pkts))
786 self.verify_capture_out(capture)
788 # out2in 1st interface
789 pkts = self.create_stream_out(self.pg3)
790 self.pg3.add_stream(pkts)
791 self.pg_enable_capture(self.pg_interfaces)
793 capture = self.pg0.get_capture(len(pkts))
794 self.verify_capture_in(capture, self.pg0)
796 # in2out 2nd interface
797 pkts = self.create_stream_in(self.pg1, self.pg3)
798 self.pg1.add_stream(pkts)
799 self.pg_enable_capture(self.pg_interfaces)
801 capture = self.pg3.get_capture(len(pkts))
802 self.verify_capture_out(capture)
804 # out2in 2nd interface
805 pkts = self.create_stream_out(self.pg3)
806 self.pg3.add_stream(pkts)
807 self.pg_enable_capture(self.pg_interfaces)
809 capture = self.pg1.get_capture(len(pkts))
810 self.verify_capture_in(capture, self.pg1)
812 def test_inside_overlapping_interfaces(self):
813 """ SNAT multiple inside interfaces with overlapping address space """
815 static_nat_ip = "10.0.0.10"
816 self.snat_add_address(self.snat_addr)
817 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
819 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
820 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
821 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
822 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
825 # between S-NAT inside interfaces with same VRF (no translation)
826 pkts = self.create_stream_in(self.pg4, self.pg5)
827 self.pg4.add_stream(pkts)
828 self.pg_enable_capture(self.pg_interfaces)
830 capture = self.pg5.get_capture(len(pkts))
831 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
833 # between S-NAT inside interfaces with different VRF (hairpinning)
834 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
835 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
836 TCP(sport=1234, dport=5678))
837 self.pg4.add_stream(p)
838 self.pg_enable_capture(self.pg_interfaces)
840 capture = self.pg6.get_capture(1)
845 self.assertEqual(ip.src, self.snat_addr)
846 self.assertEqual(ip.dst, self.pg6.remote_ip4)
847 self.assertNotEqual(tcp.sport, 1234)
848 self.assertEqual(tcp.dport, 5678)
850 self.logger.error(ppp("Unexpected or invalid packet:", p))
853 # in2out 1st interface
854 pkts = self.create_stream_in(self.pg4, self.pg3)
855 self.pg4.add_stream(pkts)
856 self.pg_enable_capture(self.pg_interfaces)
858 capture = self.pg3.get_capture(len(pkts))
859 self.verify_capture_out(capture)
861 # out2in 1st interface
862 pkts = self.create_stream_out(self.pg3)
863 self.pg3.add_stream(pkts)
864 self.pg_enable_capture(self.pg_interfaces)
866 capture = self.pg4.get_capture(len(pkts))
867 self.verify_capture_in(capture, self.pg4)
869 # in2out 2nd interface
870 pkts = self.create_stream_in(self.pg5, self.pg3)
871 self.pg5.add_stream(pkts)
872 self.pg_enable_capture(self.pg_interfaces)
874 capture = self.pg3.get_capture(len(pkts))
875 self.verify_capture_out(capture)
877 # out2in 2nd interface
878 pkts = self.create_stream_out(self.pg3)
879 self.pg3.add_stream(pkts)
880 self.pg_enable_capture(self.pg_interfaces)
882 capture = self.pg5.get_capture(len(pkts))
883 self.verify_capture_in(capture, self.pg5)
886 addresses = self.vapi.snat_address_dump()
887 self.assertEqual(len(addresses), 1)
888 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
889 self.assertEqual(len(sessions), 3)
890 for session in sessions:
891 self.assertFalse(session.is_static)
892 self.assertEqual(session.inside_ip_address[0:4],
893 self.pg5.remote_ip4n)
894 self.assertEqual(session.outside_ip_address,
895 addresses[0].ip_address)
896 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
897 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
898 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
899 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
900 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
901 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
902 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
903 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
904 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
906 # in2out 3rd interface
907 pkts = self.create_stream_in(self.pg6, self.pg3)
908 self.pg6.add_stream(pkts)
909 self.pg_enable_capture(self.pg_interfaces)
911 capture = self.pg3.get_capture(len(pkts))
912 self.verify_capture_out(capture, static_nat_ip, True)
914 # out2in 3rd interface
915 pkts = self.create_stream_out(self.pg3, static_nat_ip)
916 self.pg3.add_stream(pkts)
917 self.pg_enable_capture(self.pg_interfaces)
919 capture = self.pg6.get_capture(len(pkts))
920 self.verify_capture_in(capture, self.pg6)
922 # general user and session dump verifications
923 users = self.vapi.snat_user_dump()
924 self.assertTrue(len(users) >= 3)
925 addresses = self.vapi.snat_address_dump()
926 self.assertEqual(len(addresses), 1)
928 sessions = self.vapi.snat_user_session_dump(user.ip_address,
930 for session in sessions:
931 self.assertEqual(user.ip_address, session.inside_ip_address)
932 self.assertTrue(session.total_bytes > session.total_pkts > 0)
933 self.assertTrue(session.protocol in
934 [IP_PROTOS.tcp, IP_PROTOS.udp,
938 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
939 self.assertTrue(len(sessions) >= 4)
940 for session in sessions:
941 self.assertFalse(session.is_static)
942 self.assertEqual(session.inside_ip_address[0:4],
943 self.pg4.remote_ip4n)
944 self.assertEqual(session.outside_ip_address,
945 addresses[0].ip_address)
948 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
949 self.assertTrue(len(sessions) >= 3)
950 for session in sessions:
951 self.assertTrue(session.is_static)
952 self.assertEqual(session.inside_ip_address[0:4],
953 self.pg6.remote_ip4n)
954 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
955 map(int, static_nat_ip.split('.')))
956 self.assertTrue(session.inside_port in
957 [self.tcp_port_in, self.udp_port_in,
960 def test_hairpinning(self):
961 """ SNAT hairpinning """
963 host = self.pg0.remote_hosts[0]
964 server = self.pg0.remote_hosts[1]
967 server_in_port = 5678
968 server_out_port = 8765
970 self.snat_add_address(self.snat_addr)
971 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
972 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
974 # add static mapping for server
975 self.snat_add_static_mapping(server.ip4, self.snat_addr,
976 server_in_port, server_out_port,
979 # send packet from host to server
980 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
981 IP(src=host.ip4, dst=self.snat_addr) /
982 TCP(sport=host_in_port, dport=server_out_port))
983 self.pg0.add_stream(p)
984 self.pg_enable_capture(self.pg_interfaces)
986 capture = self.pg0.get_capture(1)
991 self.assertEqual(ip.src, self.snat_addr)
992 self.assertEqual(ip.dst, server.ip4)
993 self.assertNotEqual(tcp.sport, host_in_port)
994 self.assertEqual(tcp.dport, server_in_port)
995 host_out_port = tcp.sport
997 self.logger.error(ppp("Unexpected or invalid packet:", p))
1000 # send reply from server to host
1001 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1002 IP(src=server.ip4, dst=self.snat_addr) /
1003 TCP(sport=server_in_port, dport=host_out_port))
1004 self.pg0.add_stream(p)
1005 self.pg_enable_capture(self.pg_interfaces)
1007 capture = self.pg0.get_capture(1)
1012 self.assertEqual(ip.src, self.snat_addr)
1013 self.assertEqual(ip.dst, host.ip4)
1014 self.assertEqual(tcp.sport, server_out_port)
1015 self.assertEqual(tcp.dport, host_in_port)
1017 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1020 def test_max_translations_per_user(self):
1021 """ MAX translations per user - recycle the least recently used """
1023 self.snat_add_address(self.snat_addr)
1024 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1025 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1028 # get maximum number of translations per user
1029 snat_config = self.vapi.snat_show_config()
1031 # send more than maximum number of translations per user packets
1032 pkts_num = snat_config.max_translations_per_user + 5
1034 for port in range(0, pkts_num):
1035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1037 TCP(sport=1025 + port))
1039 self.pg0.add_stream(pkts)
1040 self.pg_enable_capture(self.pg_interfaces)
1043 # verify number of translated packet
1044 self.pg1.get_capture(pkts_num)
1046 def test_interface_addr(self):
1047 """ Acquire SNAT addresses from interface """
1048 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1050 # no address in NAT pool
1051 adresses = self.vapi.snat_address_dump()
1052 self.assertEqual(0, len(adresses))
1054 # configure interface address and check NAT address pool
1055 self.pg7.config_ip4()
1056 adresses = self.vapi.snat_address_dump()
1057 self.assertEqual(1, len(adresses))
1058 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1060 # remove interface address and check NAT address pool
1061 self.pg7.unconfig_ip4()
1062 adresses = self.vapi.snat_address_dump()
1063 self.assertEqual(0, len(adresses))
1065 def test_interface_addr_static_mapping(self):
1066 """ Static mapping with addresses from interface """
1067 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1068 self.snat_add_static_mapping('1.2.3.4',
1069 external_sw_if_index=self.pg7.sw_if_index)
1071 # static mappings with external interface
1072 static_mappings = self.vapi.snat_static_mapping_dump()
1073 self.assertEqual(1, len(static_mappings))
1074 self.assertEqual(self.pg7.sw_if_index,
1075 static_mappings[0].external_sw_if_index)
1077 # configure interface address and check static mappings
1078 self.pg7.config_ip4()
1079 static_mappings = self.vapi.snat_static_mapping_dump()
1080 self.assertEqual(1, len(static_mappings))
1081 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1082 self.pg7.local_ip4n)
1083 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1085 # remove interface address and check static mappings
1086 self.pg7.unconfig_ip4()
1087 static_mappings = self.vapi.snat_static_mapping_dump()
1088 self.assertEqual(0, len(static_mappings))
1090 def test_ipfix_nat44_sess(self):
1091 """ S-NAT IPFIX logging NAT44 session created/delted """
1092 self.snat_add_address(self.snat_addr)
1093 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1094 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1096 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1097 src_address=self.pg3.local_ip4n,
1099 template_interval=10)
1100 self.vapi.snat_ipfix()
1102 pkts = self.create_stream_in(self.pg0, self.pg1)
1103 self.pg0.add_stream(pkts)
1104 self.pg_enable_capture(self.pg_interfaces)
1106 capture = self.pg1.get_capture(len(pkts))
1107 self.verify_capture_out(capture)
1108 self.snat_add_address(self.snat_addr, is_add=0)
1109 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1110 capture = self.pg3.get_capture(3)
1111 ipfix = IPFIXDecoder()
1112 # first load template
1114 self.assertTrue(p.haslayer(IPFIX))
1115 if p.haslayer(Template):
1116 ipfix.add_template(p.getlayer(Template))
1117 # verify events in data set
1119 if p.haslayer(Data):
1120 data = ipfix.decode_data_set(p.getlayer(Set))
1121 self.verify_ipfix_nat44_ses(data)
1123 def test_ipfix_addr_exhausted(self):
1124 """ S-NAT IPFIX logging NAT addresses exhausted """
1125 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1126 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1128 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1129 src_address=self.pg3.local_ip4n,
1131 template_interval=10)
1132 self.vapi.snat_ipfix()
1134 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1135 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1137 self.pg0.add_stream(p)
1138 self.pg_enable_capture(self.pg_interfaces)
1140 capture = self.pg1.get_capture(0)
1141 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1142 capture = self.pg3.get_capture(3)
1143 ipfix = IPFIXDecoder()
1144 # first load template
1146 self.assertTrue(p.haslayer(IPFIX))
1147 if p.haslayer(Template):
1148 ipfix.add_template(p.getlayer(Template))
1149 # verify events in data set
1151 if p.haslayer(Data):
1152 data = ipfix.decode_data_set(p.getlayer(Set))
1153 self.verify_ipfix_addr_exhausted(data)
1155 def test_pool_addr_fib(self):
1156 """ S-NAT add pool addresses to FIB """
1157 static_addr = '10.0.0.10'
1158 self.snat_add_address(self.snat_addr)
1159 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1160 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1162 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1165 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1166 ARP(op=ARP.who_has, pdst=self.snat_addr,
1167 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1168 self.pg1.add_stream(p)
1169 self.pg_enable_capture(self.pg_interfaces)
1171 capture = self.pg1.get_capture(1)
1172 self.assertTrue(capture[0].haslayer(ARP))
1173 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1176 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1177 ARP(op=ARP.who_has, pdst=static_addr,
1178 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1179 self.pg1.add_stream(p)
1180 self.pg_enable_capture(self.pg_interfaces)
1182 capture = self.pg1.get_capture(1)
1183 self.assertTrue(capture[0].haslayer(ARP))
1184 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1186 # send ARP to non-SNAT interface
1187 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1188 ARP(op=ARP.who_has, pdst=self.snat_addr,
1189 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1190 self.pg2.add_stream(p)
1191 self.pg_enable_capture(self.pg_interfaces)
1193 capture = self.pg1.get_capture(0)
1195 # remove addresses and verify
1196 self.snat_add_address(self.snat_addr, is_add=0)
1197 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1200 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1201 ARP(op=ARP.who_has, pdst=self.snat_addr,
1202 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1203 self.pg1.add_stream(p)
1204 self.pg_enable_capture(self.pg_interfaces)
1206 capture = self.pg1.get_capture(0)
1208 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1209 ARP(op=ARP.who_has, pdst=static_addr,
1210 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1211 self.pg1.add_stream(p)
1212 self.pg_enable_capture(self.pg_interfaces)
1214 capture = self.pg1.get_capture(0)
1216 def test_vrf_mode(self):
1217 """ S-NAT tenant VRF aware address pool mode """
1221 nat_ip1 = "10.0.0.10"
1222 nat_ip2 = "10.0.0.11"
1224 self.pg0.unconfig_ip4()
1225 self.pg1.unconfig_ip4()
1226 self.pg0.set_table_ip4(vrf_id1)
1227 self.pg1.set_table_ip4(vrf_id2)
1228 self.pg0.config_ip4()
1229 self.pg1.config_ip4()
1231 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1232 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1233 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1234 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1235 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1239 pkts = self.create_stream_in(self.pg0, self.pg2)
1240 self.pg0.add_stream(pkts)
1241 self.pg_enable_capture(self.pg_interfaces)
1243 capture = self.pg2.get_capture(len(pkts))
1244 self.verify_capture_out(capture, nat_ip1)
1247 pkts = self.create_stream_in(self.pg1, self.pg2)
1248 self.pg1.add_stream(pkts)
1249 self.pg_enable_capture(self.pg_interfaces)
1251 capture = self.pg2.get_capture(len(pkts))
1252 self.verify_capture_out(capture, nat_ip2)
1254 def test_vrf_feature_independent(self):
1255 """ S-NAT tenant VRF independent address pool mode """
1257 nat_ip1 = "10.0.0.10"
1258 nat_ip2 = "10.0.0.11"
1260 self.snat_add_address(nat_ip1)
1261 self.snat_add_address(nat_ip2)
1262 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1263 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1264 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1268 pkts = self.create_stream_in(self.pg0, self.pg2)
1269 self.pg0.add_stream(pkts)
1270 self.pg_enable_capture(self.pg_interfaces)
1272 capture = self.pg2.get_capture(len(pkts))
1273 self.verify_capture_out(capture, nat_ip1)
1276 pkts = self.create_stream_in(self.pg1, self.pg2)
1277 self.pg1.add_stream(pkts)
1278 self.pg_enable_capture(self.pg_interfaces)
1280 capture = self.pg2.get_capture(len(pkts))
1281 self.verify_capture_out(capture, nat_ip1)
1284 super(TestSNAT, self).tearDown()
1285 if not self.vpp_dead:
1286 self.logger.info(self.vapi.cli("show snat verbose"))
1290 class TestDeterministicNAT(MethodHolder):
1291 """ Deterministic NAT Test Cases """
1294 def setUpConstants(cls):
1295 super(TestDeterministicNAT, cls).setUpConstants()
1296 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1299 def setUpClass(cls):
1300 super(TestDeterministicNAT, cls).setUpClass()
1303 cls.create_pg_interfaces(range(2))
1304 cls.interfaces = list(cls.pg_interfaces)
1306 for i in cls.interfaces:
1312 super(TestDeterministicNAT, cls).tearDownClass()
1315 def test_deterministic_mode(self):
1316 """ S-NAT run deterministic mode """
1317 in_addr = '172.16.255.0'
1318 out_addr = '172.17.255.50'
1319 in_addr_t = '172.16.255.20'
1320 in_addr_n = socket.inet_aton(in_addr)
1321 out_addr_n = socket.inet_aton(out_addr)
1322 in_addr_t_n = socket.inet_aton(in_addr_t)
1326 snat_config = self.vapi.snat_show_config()
1327 self.assertEqual(1, snat_config.deterministic)
1329 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1331 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1332 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1333 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1334 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1336 deterministic_mappings = self.vapi.snat_det_map_dump()
1337 self.assertEqual(len(deterministic_mappings), 1)
1338 dsm = deterministic_mappings[0]
1339 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1340 self.assertEqual(in_plen, dsm.in_plen)
1341 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1342 self.assertEqual(out_plen, dsm.out_plen)
1345 deterministic_mappings = self.vapi.snat_det_map_dump()
1346 self.assertEqual(len(deterministic_mappings), 0)
1348 def clear_snat(self):
1350 Clear SNAT configuration.
1352 deterministic_mappings = self.vapi.snat_det_map_dump()
1353 for dsm in deterministic_mappings:
1354 self.vapi.snat_add_det_map(dsm.in_addr,
1361 super(TestDeterministicNAT, self).tearDown()
1362 if not self.vpp_dead:
1363 self.logger.info(self.vapi.cli("show snat detail"))
1366 if __name__ == '__main__':
1367 unittest.main(testRunner=VppTestRunner)