e1dd576e991a4e986d86f05154e77adda3fe8036
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from util import ppp
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14
15
16 class TestSNAT(VppTestCase):
17     """ SNAT Test Cases """
18
19     @classmethod
20     def setUpClass(cls):
21         super(TestSNAT, cls).setUpClass()
22
23         try:
24             cls.tcp_port_in = 6303
25             cls.tcp_port_out = 6303
26             cls.udp_port_in = 6304
27             cls.udp_port_out = 6304
28             cls.icmp_id_in = 6305
29             cls.icmp_id_out = 6305
30             cls.snat_addr = '10.0.0.3'
31
32             cls.create_pg_interfaces(range(8))
33             cls.interfaces = list(cls.pg_interfaces[0:4])
34
35             for i in cls.interfaces:
36                 i.admin_up()
37                 i.config_ip4()
38                 i.resolve_arp()
39
40             cls.pg0.generate_remote_hosts(2)
41             cls.pg0.configure_ipv4_neighbors()
42
43             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
44
45             cls.pg4._local_ip4 = "172.16.255.1"
46             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
47             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
48             cls.pg4.set_table_ip4(10)
49             cls.pg5._local_ip4 = "172.16.255.3"
50             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
51             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
52             cls.pg5.set_table_ip4(10)
53             cls.pg6._local_ip4 = "172.16.255.1"
54             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
55             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
56             cls.pg6.set_table_ip4(20)
57             for i in cls.overlapping_interfaces:
58                 i.config_ip4()
59                 i.admin_up()
60                 i.resolve_arp()
61
62             cls.pg7.admin_up()
63
64         except Exception:
65             super(TestSNAT, cls).tearDownClass()
66             raise
67
68     def create_stream_in(self, in_if, out_if, ttl=64):
69         """
70         Create packet stream for inside network
71
72         :param in_if: Inside interface
73         :param out_if: Outside interface
74         :param ttl: TTL of generated packets
75         """
76         pkts = []
77         # TCP
78         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
79              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
80              TCP(sport=self.tcp_port_in))
81         pkts.append(p)
82
83         # UDP
84         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
85              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
86              UDP(sport=self.udp_port_in))
87         pkts.append(p)
88
89         # ICMP
90         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
91              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
92              ICMP(id=self.icmp_id_in, type='echo-request'))
93         pkts.append(p)
94
95         return pkts
96
97     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
98         """
99         Create packet stream for outside network
100
101         :param out_if: Outside interface
102         :param dst_ip: Destination IP address (Default use global SNAT address)
103         :param ttl: TTL of generated packets
104         """
105         if dst_ip is None:
106             dst_ip = self.snat_addr
107         pkts = []
108         # TCP
109         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
110              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
111              TCP(dport=self.tcp_port_out))
112         pkts.append(p)
113
114         # UDP
115         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
116              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
117              UDP(dport=self.udp_port_out))
118         pkts.append(p)
119
120         # ICMP
121         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
122              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
123              ICMP(id=self.icmp_id_out, type='echo-reply'))
124         pkts.append(p)
125
126         return pkts
127
128     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
129                            packet_num=3):
130         """
131         Verify captured packets on outside network
132
133         :param capture: Captured packets
134         :param nat_ip: Translated IP address (Default use global SNAT address)
135         :param same_port: Sorce port number is not translated (Default False)
136         :param packet_num: Expected number of packets (Default 3)
137         """
138         if nat_ip is None:
139             nat_ip = self.snat_addr
140         self.assertEqual(packet_num, len(capture))
141         for packet in capture:
142             try:
143                 self.assertEqual(packet[IP].src, nat_ip)
144                 if packet.haslayer(TCP):
145                     if same_port:
146                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
147                     else:
148                         self.assertNotEqual(
149                             packet[TCP].sport, self.tcp_port_in)
150                     self.tcp_port_out = packet[TCP].sport
151                 elif packet.haslayer(UDP):
152                     if same_port:
153                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
154                     else:
155                         self.assertNotEqual(
156                             packet[UDP].sport, self.udp_port_in)
157                     self.udp_port_out = packet[UDP].sport
158                 else:
159                     if same_port:
160                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
161                     else:
162                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
163                     self.icmp_id_out = packet[ICMP].id
164             except:
165                 self.logger.error(ppp("Unexpected or invalid packet "
166                                       "(outside network):", packet))
167                 raise
168
169     def verify_capture_in(self, capture, in_if, packet_num=3):
170         """
171         Verify captured packets on inside network
172
173         :param capture: Captured packets
174         :param in_if: Inside interface
175         :param packet_num: Expected number of packets (Default 3)
176         """
177         self.assertEqual(packet_num, len(capture))
178         for packet in capture:
179             try:
180                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
181                 if packet.haslayer(TCP):
182                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
183                 elif packet.haslayer(UDP):
184                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
185                 else:
186                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
187             except:
188                 self.logger.error(ppp("Unexpected or invalid packet "
189                                       "(inside network):", packet))
190                 raise
191
192     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
193         """
194         Verify captured packet that don't have to be translated
195
196         :param capture: Captured packets
197         :param ingress_if: Ingress interface
198         :param egress_if: Egress interface
199         """
200         for packet in capture:
201             try:
202                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
203                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
204                 if packet.haslayer(TCP):
205                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
206                 elif packet.haslayer(UDP):
207                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
208                 else:
209                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
210             except:
211                 self.logger.error(ppp("Unexpected or invalid packet "
212                                       "(inside network):", packet))
213                 raise
214
215     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
216                                             packet_num=3, icmp_type=11):
217         """
218         Verify captured packets with ICMP errors on outside network
219
220         :param capture: Captured packets
221         :param src_ip: Translated IP address or IP address of VPP
222                        (Default use global SNAT address)
223         :param packet_num: Expected number of packets (Default 3)
224         :param icmp_type: Type of error ICMP packet
225                           we are expecting (Default 11)
226         """
227         if src_ip is None:
228             src_ip = self.snat_addr
229         self.assertEqual(packet_num, len(capture))
230         for packet in capture:
231             try:
232                 self.assertEqual(packet[IP].src, src_ip)
233                 self.assertTrue(packet.haslayer(ICMP))
234                 icmp = packet[ICMP]
235                 self.assertEqual(icmp.type, icmp_type)
236                 self.assertTrue(icmp.haslayer(IPerror))
237                 inner_ip = icmp[IPerror]
238                 if inner_ip.haslayer(TCPerror):
239                     self.assertEqual(inner_ip[TCPerror].dport,
240                                      self.tcp_port_out)
241                 elif inner_ip.haslayer(UDPerror):
242                     self.assertEqual(inner_ip[UDPerror].dport,
243                                      self.udp_port_out)
244                 else:
245                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
246             except:
247                 self.logger.error(ppp("Unexpected or invalid packet "
248                                       "(outside network):", packet))
249                 raise
250
251     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
252                                            icmp_type=11):
253         """
254         Verify captured packets with ICMP errors on inside network
255
256         :param capture: Captured packets
257         :param in_if: Inside interface
258         :param packet_num: Expected number of packets (Default 3)
259         :param icmp_type: Type of error ICMP packet
260                           we are expecting (Default 11)
261         """
262         self.assertEqual(packet_num, len(capture))
263         for packet in capture:
264             try:
265                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
266                 self.assertTrue(packet.haslayer(ICMP))
267                 icmp = packet[ICMP]
268                 self.assertEqual(icmp.type, icmp_type)
269                 self.assertTrue(icmp.haslayer(IPerror))
270                 inner_ip = icmp[IPerror]
271                 if inner_ip.haslayer(TCPerror):
272                     self.assertEqual(inner_ip[TCPerror].sport,
273                                      self.tcp_port_in)
274                 elif inner_ip.haslayer(UDPerror):
275                     self.assertEqual(inner_ip[UDPerror].sport,
276                                      self.udp_port_in)
277                 else:
278                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
279             except:
280                 self.logger.error(ppp("Unexpected or invalid packet "
281                                       "(inside network):", packet))
282                 raise
283
284     def verify_ipfix_nat44_ses(self, data):
285         """
286         Verify IPFIX NAT44 session create/delete event
287
288         :param data: Decoded IPFIX data records
289         """
290         nat44_ses_create_num = 0
291         nat44_ses_delete_num = 0
292         self.assertEqual(6, len(data))
293         for record in data:
294             # natEvent
295             self.assertIn(ord(record[230]), [4, 5])
296             if ord(record[230]) == 4:
297                 nat44_ses_create_num += 1
298             else:
299                 nat44_ses_delete_num += 1
300             # sourceIPv4Address
301             self.assertEqual(self.pg0.remote_ip4n, record[8])
302             # postNATSourceIPv4Address
303             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
304                              record[225])
305             # ingressVRFID
306             self.assertEqual(struct.pack("!I", 0), record[234])
307             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
308             if IP_PROTOS.icmp == ord(record[4]):
309                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
310                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
311                                  record[227])
312             elif IP_PROTOS.tcp == ord(record[4]):
313                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
314                                  record[7])
315                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
316                                  record[227])
317             elif IP_PROTOS.udp == ord(record[4]):
318                 self.assertEqual(struct.pack("!H", self.udp_port_in),
319                                  record[7])
320                 self.assertEqual(struct.pack("!H", self.udp_port_out),
321                                  record[227])
322             else:
323                 self.fail("Invalid protocol")
324         self.assertEqual(3, nat44_ses_create_num)
325         self.assertEqual(3, nat44_ses_delete_num)
326
327     def verify_ipfix_addr_exhausted(self, data):
328         """
329         Verify IPFIX NAT addresses event
330
331         :param data: Decoded IPFIX data records
332         """
333         self.assertEqual(1, len(data))
334         record = data[0]
335         # natEvent
336         self.assertEqual(ord(record[230]), 3)
337         # natPoolID
338         self.assertEqual(struct.pack("!I", 0), record[283])
339
340     def clear_snat(self):
341         """
342         Clear SNAT configuration.
343         """
344         if self.pg7.has_ip4_config:
345             self.pg7.unconfig_ip4()
346
347         interfaces = self.vapi.snat_interface_addr_dump()
348         for intf in interfaces:
349             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
350
351         self.vapi.snat_ipfix(enable=0)
352
353         interfaces = self.vapi.snat_interface_dump()
354         for intf in interfaces:
355             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
356                                                      intf.is_inside,
357                                                      is_add=0)
358
359         static_mappings = self.vapi.snat_static_mapping_dump()
360         for sm in static_mappings:
361             self.vapi.snat_add_static_mapping(sm.local_ip_address,
362                                               sm.external_ip_address,
363                                               local_port=sm.local_port,
364                                               external_port=sm.external_port,
365                                               addr_only=sm.addr_only,
366                                               vrf_id=sm.vrf_id,
367                                               protocol=sm.protocol,
368                                               is_add=0)
369
370         adresses = self.vapi.snat_address_dump()
371         for addr in adresses:
372             self.vapi.snat_add_address_range(addr.ip_address,
373                                              addr.ip_address,
374                                              is_add=0)
375
376     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
377                                 local_port=0, external_port=0, vrf_id=0,
378                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
379                                 proto=0):
380         """
381         Add/delete S-NAT static mapping
382
383         :param local_ip: Local IP address
384         :param external_ip: External IP address
385         :param local_port: Local port number (Optional)
386         :param external_port: External port number (Optional)
387         :param vrf_id: VRF ID (Default 0)
388         :param is_add: 1 if add, 0 if delete (Default add)
389         :param external_sw_if_index: External interface instead of IP address
390         :param proto: IP protocol (Mandatory if port specified)
391         """
392         addr_only = 1
393         if local_port and external_port:
394             addr_only = 0
395         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
396         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
397         self.vapi.snat_add_static_mapping(
398             l_ip,
399             e_ip,
400             external_sw_if_index,
401             local_port,
402             external_port,
403             addr_only,
404             vrf_id,
405             proto,
406             is_add)
407
408     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
409         """
410         Add/delete S-NAT address
411
412         :param ip: IP address
413         :param is_add: 1 if add, 0 if delete (Default add)
414         """
415         snat_addr = socket.inet_pton(socket.AF_INET, ip)
416         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
417                                          vrf_id=vrf_id)
418
419     def test_dynamic(self):
420         """ SNAT dynamic translation test """
421
422         self.snat_add_address(self.snat_addr)
423         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
424         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
425                                                  is_inside=0)
426
427         # in2out
428         pkts = self.create_stream_in(self.pg0, self.pg1)
429         self.pg0.add_stream(pkts)
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pg_start()
432         capture = self.pg1.get_capture(len(pkts))
433         self.verify_capture_out(capture)
434
435         # out2in
436         pkts = self.create_stream_out(self.pg1)
437         self.pg1.add_stream(pkts)
438         self.pg_enable_capture(self.pg_interfaces)
439         self.pg_start()
440         capture = self.pg0.get_capture(len(pkts))
441         self.verify_capture_in(capture, self.pg0)
442
443     def test_dynamic_icmp_errors_in2out_ttl_1(self):
444         """ SNAT handling of client packets with TTL=1 """
445
446         self.snat_add_address(self.snat_addr)
447         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
448         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
449                                                  is_inside=0)
450
451         # Client side - generate traffic
452         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
453         self.pg0.add_stream(pkts)
454         self.pg_enable_capture(self.pg_interfaces)
455         self.pg_start()
456
457         # Client side - verify ICMP type 11 packets
458         capture = self.pg0.get_capture(len(pkts))
459         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
460
461     def test_dynamic_icmp_errors_out2in_ttl_1(self):
462         """ SNAT handling of server packets with TTL=1 """
463
464         self.snat_add_address(self.snat_addr)
465         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
466         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
467                                                  is_inside=0)
468
469         # Client side - create sessions
470         pkts = self.create_stream_in(self.pg0, self.pg1)
471         self.pg0.add_stream(pkts)
472         self.pg_enable_capture(self.pg_interfaces)
473         self.pg_start()
474
475         # Server side - generate traffic
476         capture = self.pg1.get_capture(len(pkts))
477         self.verify_capture_out(capture)
478         pkts = self.create_stream_out(self.pg1, ttl=1)
479         self.pg1.add_stream(pkts)
480         self.pg_enable_capture(self.pg_interfaces)
481         self.pg_start()
482
483         # Server side - verify ICMP type 11 packets
484         capture = self.pg1.get_capture(len(pkts))
485         self.verify_capture_out_with_icmp_errors(capture,
486                                                  src_ip=self.pg1.local_ip4)
487
488     def test_dynamic_icmp_errors_in2out_ttl_2(self):
489         """ SNAT handling of error responses to client packets with TTL=2 """
490
491         self.snat_add_address(self.snat_addr)
492         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
493         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
494                                                  is_inside=0)
495
496         # Client side - generate traffic
497         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
498         self.pg0.add_stream(pkts)
499         self.pg_enable_capture(self.pg_interfaces)
500         self.pg_start()
501
502         # Server side - simulate ICMP type 11 response
503         capture = self.pg1.get_capture(len(pkts))
504         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
505                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
506                 ICMP(type=11) / packet[IP] for packet in capture]
507         self.pg1.add_stream(pkts)
508         self.pg_enable_capture(self.pg_interfaces)
509         self.pg_start()
510
511         # Client side - verify ICMP type 11 packets
512         capture = self.pg0.get_capture(len(pkts))
513         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
514
515     def test_dynamic_icmp_errors_out2in_ttl_2(self):
516         """ SNAT handling of error responses to server packets with TTL=2 """
517
518         self.snat_add_address(self.snat_addr)
519         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
520         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
521                                                  is_inside=0)
522
523         # Client side - create sessions
524         pkts = self.create_stream_in(self.pg0, self.pg1)
525         self.pg0.add_stream(pkts)
526         self.pg_enable_capture(self.pg_interfaces)
527         self.pg_start()
528
529         # Server side - generate traffic
530         capture = self.pg1.get_capture(len(pkts))
531         self.verify_capture_out(capture)
532         pkts = self.create_stream_out(self.pg1, ttl=2)
533         self.pg1.add_stream(pkts)
534         self.pg_enable_capture(self.pg_interfaces)
535         self.pg_start()
536
537         # Client side - simulate ICMP type 11 response
538         capture = self.pg0.get_capture(len(pkts))
539         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
540                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
541                 ICMP(type=11) / packet[IP] for packet in capture]
542         self.pg0.add_stream(pkts)
543         self.pg_enable_capture(self.pg_interfaces)
544         self.pg_start()
545
546         # Server side - verify ICMP type 11 packets
547         capture = self.pg1.get_capture(len(pkts))
548         self.verify_capture_out_with_icmp_errors(capture)
549
550     def test_ping_out_interface_from_outside(self):
551         """ Ping SNAT out interface from outside """
552
553         self.snat_add_address(self.snat_addr)
554         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
555         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
556                                                  is_inside=0)
557
558         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
559              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
560              ICMP(id=self.icmp_id_out, type='echo-request'))
561         pkts = [p]
562         self.pg1.add_stream(pkts)
563         self.pg_enable_capture(self.pg_interfaces)
564         self.pg_start()
565         capture = self.pg1.get_capture(len(pkts))
566         self.assertEqual(1, len(capture))
567         packet = capture[0]
568         try:
569             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
570             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
571             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
572             self.assertEqual(packet[ICMP].type, 0)  # echo reply
573         except:
574             self.logger.error(ppp("Unexpected or invalid packet "
575                                   "(outside network):", packet))
576             raise
577
578     def test_static_in(self):
579         """ SNAT 1:1 NAT initialized from inside network """
580
581         nat_ip = "10.0.0.10"
582         self.tcp_port_out = 6303
583         self.udp_port_out = 6304
584         self.icmp_id_out = 6305
585
586         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
587         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
588         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
589                                                  is_inside=0)
590
591         # in2out
592         pkts = self.create_stream_in(self.pg0, self.pg1)
593         self.pg0.add_stream(pkts)
594         self.pg_enable_capture(self.pg_interfaces)
595         self.pg_start()
596         capture = self.pg1.get_capture(len(pkts))
597         self.verify_capture_out(capture, nat_ip, True)
598
599         # out2in
600         pkts = self.create_stream_out(self.pg1, nat_ip)
601         self.pg1.add_stream(pkts)
602         self.pg_enable_capture(self.pg_interfaces)
603         self.pg_start()
604         capture = self.pg0.get_capture(len(pkts))
605         self.verify_capture_in(capture, self.pg0)
606
607     def test_static_out(self):
608         """ SNAT 1:1 NAT initialized from outside network """
609
610         nat_ip = "10.0.0.20"
611         self.tcp_port_out = 6303
612         self.udp_port_out = 6304
613         self.icmp_id_out = 6305
614
615         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
616         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
617         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
618                                                  is_inside=0)
619
620         # out2in
621         pkts = self.create_stream_out(self.pg1, nat_ip)
622         self.pg1.add_stream(pkts)
623         self.pg_enable_capture(self.pg_interfaces)
624         self.pg_start()
625         capture = self.pg0.get_capture(len(pkts))
626         self.verify_capture_in(capture, self.pg0)
627
628         # in2out
629         pkts = self.create_stream_in(self.pg0, self.pg1)
630         self.pg0.add_stream(pkts)
631         self.pg_enable_capture(self.pg_interfaces)
632         self.pg_start()
633         capture = self.pg1.get_capture(len(pkts))
634         self.verify_capture_out(capture, nat_ip, True)
635
636     def test_static_with_port_in(self):
637         """ SNAT 1:1 NAT with port initialized from inside network """
638
639         self.tcp_port_out = 3606
640         self.udp_port_out = 3607
641         self.icmp_id_out = 3608
642
643         self.snat_add_address(self.snat_addr)
644         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
645                                      self.tcp_port_in, self.tcp_port_out,
646                                      proto=IP_PROTOS.tcp)
647         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
648                                      self.udp_port_in, self.udp_port_out,
649                                      proto=IP_PROTOS.udp)
650         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
651                                      self.icmp_id_in, self.icmp_id_out,
652                                      proto=IP_PROTOS.icmp)
653         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
654         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
655                                                  is_inside=0)
656
657         # in2out
658         pkts = self.create_stream_in(self.pg0, self.pg1)
659         self.pg0.add_stream(pkts)
660         self.pg_enable_capture(self.pg_interfaces)
661         self.pg_start()
662         capture = self.pg1.get_capture(len(pkts))
663         self.verify_capture_out(capture)
664
665         # out2in
666         pkts = self.create_stream_out(self.pg1)
667         self.pg1.add_stream(pkts)
668         self.pg_enable_capture(self.pg_interfaces)
669         self.pg_start()
670         capture = self.pg0.get_capture(len(pkts))
671         self.verify_capture_in(capture, self.pg0)
672
673     def test_static_with_port_out(self):
674         """ SNAT 1:1 NAT with port initialized from outside network """
675
676         self.tcp_port_out = 30606
677         self.udp_port_out = 30607
678         self.icmp_id_out = 30608
679
680         self.snat_add_address(self.snat_addr)
681         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
682                                      self.tcp_port_in, self.tcp_port_out,
683                                      proto=IP_PROTOS.tcp)
684         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
685                                      self.udp_port_in, self.udp_port_out,
686                                      proto=IP_PROTOS.udp)
687         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
688                                      self.icmp_id_in, self.icmp_id_out,
689                                      proto=IP_PROTOS.icmp)
690         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
691         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
692                                                  is_inside=0)
693
694         # out2in
695         pkts = self.create_stream_out(self.pg1)
696         self.pg1.add_stream(pkts)
697         self.pg_enable_capture(self.pg_interfaces)
698         self.pg_start()
699         capture = self.pg0.get_capture(len(pkts))
700         self.verify_capture_in(capture, self.pg0)
701
702         # in2out
703         pkts = self.create_stream_in(self.pg0, self.pg1)
704         self.pg0.add_stream(pkts)
705         self.pg_enable_capture(self.pg_interfaces)
706         self.pg_start()
707         capture = self.pg1.get_capture(len(pkts))
708         self.verify_capture_out(capture)
709
710     def test_static_vrf_aware(self):
711         """ SNAT 1:1 NAT VRF awareness """
712
713         nat_ip1 = "10.0.0.30"
714         nat_ip2 = "10.0.0.40"
715         self.tcp_port_out = 6303
716         self.udp_port_out = 6304
717         self.icmp_id_out = 6305
718
719         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
720                                      vrf_id=10)
721         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
722                                      vrf_id=10)
723         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
724                                                  is_inside=0)
725         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
726         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
727
728         # inside interface VRF match SNAT static mapping VRF
729         pkts = self.create_stream_in(self.pg4, self.pg3)
730         self.pg4.add_stream(pkts)
731         self.pg_enable_capture(self.pg_interfaces)
732         self.pg_start()
733         capture = self.pg3.get_capture(len(pkts))
734         self.verify_capture_out(capture, nat_ip1, True)
735
736         # inside interface VRF don't match SNAT static mapping VRF (packets
737         # are dropped)
738         pkts = self.create_stream_in(self.pg0, self.pg3)
739         self.pg0.add_stream(pkts)
740         self.pg_enable_capture(self.pg_interfaces)
741         self.pg_start()
742         self.pg3.assert_nothing_captured()
743
744     def test_multiple_inside_interfaces(self):
745         """ SNAT multiple inside interfaces (non-overlapping address space) """
746
747         self.snat_add_address(self.snat_addr)
748         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
749         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
750         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
751                                                  is_inside=0)
752
753         # between two S-NAT inside interfaces (no translation)
754         pkts = self.create_stream_in(self.pg0, self.pg1)
755         self.pg0.add_stream(pkts)
756         self.pg_enable_capture(self.pg_interfaces)
757         self.pg_start()
758         capture = self.pg1.get_capture(len(pkts))
759         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
760
761         # from S-NAT inside to interface without S-NAT feature (no translation)
762         pkts = self.create_stream_in(self.pg0, self.pg2)
763         self.pg0.add_stream(pkts)
764         self.pg_enable_capture(self.pg_interfaces)
765         self.pg_start()
766         capture = self.pg2.get_capture(len(pkts))
767         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
768
769         # in2out 1st interface
770         pkts = self.create_stream_in(self.pg0, self.pg3)
771         self.pg0.add_stream(pkts)
772         self.pg_enable_capture(self.pg_interfaces)
773         self.pg_start()
774         capture = self.pg3.get_capture(len(pkts))
775         self.verify_capture_out(capture)
776
777         # out2in 1st interface
778         pkts = self.create_stream_out(self.pg3)
779         self.pg3.add_stream(pkts)
780         self.pg_enable_capture(self.pg_interfaces)
781         self.pg_start()
782         capture = self.pg0.get_capture(len(pkts))
783         self.verify_capture_in(capture, self.pg0)
784
785         # in2out 2nd interface
786         pkts = self.create_stream_in(self.pg1, self.pg3)
787         self.pg1.add_stream(pkts)
788         self.pg_enable_capture(self.pg_interfaces)
789         self.pg_start()
790         capture = self.pg3.get_capture(len(pkts))
791         self.verify_capture_out(capture)
792
793         # out2in 2nd interface
794         pkts = self.create_stream_out(self.pg3)
795         self.pg3.add_stream(pkts)
796         self.pg_enable_capture(self.pg_interfaces)
797         self.pg_start()
798         capture = self.pg1.get_capture(len(pkts))
799         self.verify_capture_in(capture, self.pg1)
800
801     def test_inside_overlapping_interfaces(self):
802         """ SNAT multiple inside interfaces with overlapping address space """
803
804         static_nat_ip = "10.0.0.10"
805         self.snat_add_address(self.snat_addr)
806         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
807                                                  is_inside=0)
808         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
809         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
810         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
811         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
812                                      vrf_id=20)
813
814         # between S-NAT inside interfaces with same VRF (no translation)
815         pkts = self.create_stream_in(self.pg4, self.pg5)
816         self.pg4.add_stream(pkts)
817         self.pg_enable_capture(self.pg_interfaces)
818         self.pg_start()
819         capture = self.pg5.get_capture(len(pkts))
820         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
821
822         # between S-NAT inside interfaces with different VRF (hairpinning)
823         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
824              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
825              TCP(sport=1234, dport=5678))
826         self.pg4.add_stream(p)
827         self.pg_enable_capture(self.pg_interfaces)
828         self.pg_start()
829         capture = self.pg6.get_capture(1)
830         p = capture[0]
831         try:
832             ip = p[IP]
833             tcp = p[TCP]
834             self.assertEqual(ip.src, self.snat_addr)
835             self.assertEqual(ip.dst, self.pg6.remote_ip4)
836             self.assertNotEqual(tcp.sport, 1234)
837             self.assertEqual(tcp.dport, 5678)
838         except:
839             self.logger.error(ppp("Unexpected or invalid packet:", p))
840             raise
841
842         # in2out 1st interface
843         pkts = self.create_stream_in(self.pg4, self.pg3)
844         self.pg4.add_stream(pkts)
845         self.pg_enable_capture(self.pg_interfaces)
846         self.pg_start()
847         capture = self.pg3.get_capture(len(pkts))
848         self.verify_capture_out(capture)
849
850         # out2in 1st interface
851         pkts = self.create_stream_out(self.pg3)
852         self.pg3.add_stream(pkts)
853         self.pg_enable_capture(self.pg_interfaces)
854         self.pg_start()
855         capture = self.pg4.get_capture(len(pkts))
856         self.verify_capture_in(capture, self.pg4)
857
858         # in2out 2nd interface
859         pkts = self.create_stream_in(self.pg5, self.pg3)
860         self.pg5.add_stream(pkts)
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pg_start()
863         capture = self.pg3.get_capture(len(pkts))
864         self.verify_capture_out(capture)
865
866         # out2in 2nd interface
867         pkts = self.create_stream_out(self.pg3)
868         self.pg3.add_stream(pkts)
869         self.pg_enable_capture(self.pg_interfaces)
870         self.pg_start()
871         capture = self.pg5.get_capture(len(pkts))
872         self.verify_capture_in(capture, self.pg5)
873
874         # pg5 session dump
875         addresses = self.vapi.snat_address_dump()
876         self.assertEqual(len(addresses), 1)
877         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
878         self.assertEqual(len(sessions), 3)
879         for session in sessions:
880             self.assertFalse(session.is_static)
881             self.assertEqual(session.inside_ip_address[0:4],
882                              self.pg5.remote_ip4n)
883             self.assertEqual(session.outside_ip_address,
884                              addresses[0].ip_address)
885         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
886         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
887         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
888         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
889         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
890         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
891         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
892         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
893         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
894
895         # in2out 3rd interface
896         pkts = self.create_stream_in(self.pg6, self.pg3)
897         self.pg6.add_stream(pkts)
898         self.pg_enable_capture(self.pg_interfaces)
899         self.pg_start()
900         capture = self.pg3.get_capture(len(pkts))
901         self.verify_capture_out(capture, static_nat_ip, True)
902
903         # out2in 3rd interface
904         pkts = self.create_stream_out(self.pg3, static_nat_ip)
905         self.pg3.add_stream(pkts)
906         self.pg_enable_capture(self.pg_interfaces)
907         self.pg_start()
908         capture = self.pg6.get_capture(len(pkts))
909         self.verify_capture_in(capture, self.pg6)
910
911         # general user and session dump verifications
912         users = self.vapi.snat_user_dump()
913         self.assertTrue(len(users) >= 3)
914         addresses = self.vapi.snat_address_dump()
915         self.assertEqual(len(addresses), 1)
916         for user in users:
917             sessions = self.vapi.snat_user_session_dump(user.ip_address,
918                                                         user.vrf_id)
919             for session in sessions:
920                 self.assertEqual(user.ip_address, session.inside_ip_address)
921                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
922                 self.assertTrue(session.protocol in
923                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
924                                  IP_PROTOS.icmp])
925
926         # pg4 session dump
927         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
928         self.assertTrue(len(sessions) >= 4)
929         for session in sessions:
930             self.assertFalse(session.is_static)
931             self.assertEqual(session.inside_ip_address[0:4],
932                              self.pg4.remote_ip4n)
933             self.assertEqual(session.outside_ip_address,
934                              addresses[0].ip_address)
935
936         # pg6 session dump
937         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
938         self.assertTrue(len(sessions) >= 3)
939         for session in sessions:
940             self.assertTrue(session.is_static)
941             self.assertEqual(session.inside_ip_address[0:4],
942                              self.pg6.remote_ip4n)
943             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
944                              map(int, static_nat_ip.split('.')))
945             self.assertTrue(session.inside_port in
946                             [self.tcp_port_in, self.udp_port_in,
947                              self.icmp_id_in])
948
949     def test_hairpinning(self):
950         """ SNAT hairpinning """
951
952         host = self.pg0.remote_hosts[0]
953         server = self.pg0.remote_hosts[1]
954         host_in_port = 1234
955         host_out_port = 0
956         server_in_port = 5678
957         server_out_port = 8765
958
959         self.snat_add_address(self.snat_addr)
960         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
961         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
962                                                  is_inside=0)
963         # add static mapping for server
964         self.snat_add_static_mapping(server.ip4, self.snat_addr,
965                                      server_in_port, server_out_port,
966                                      proto=IP_PROTOS.tcp)
967
968         # send packet from host to server
969         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
970              IP(src=host.ip4, dst=self.snat_addr) /
971              TCP(sport=host_in_port, dport=server_out_port))
972         self.pg0.add_stream(p)
973         self.pg_enable_capture(self.pg_interfaces)
974         self.pg_start()
975         capture = self.pg0.get_capture(1)
976         p = capture[0]
977         try:
978             ip = p[IP]
979             tcp = p[TCP]
980             self.assertEqual(ip.src, self.snat_addr)
981             self.assertEqual(ip.dst, server.ip4)
982             self.assertNotEqual(tcp.sport, host_in_port)
983             self.assertEqual(tcp.dport, server_in_port)
984             host_out_port = tcp.sport
985         except:
986             self.logger.error(ppp("Unexpected or invalid packet:", p))
987             raise
988
989         # send reply from server to host
990         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
991              IP(src=server.ip4, dst=self.snat_addr) /
992              TCP(sport=server_in_port, dport=host_out_port))
993         self.pg0.add_stream(p)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996         capture = self.pg0.get_capture(1)
997         p = capture[0]
998         try:
999             ip = p[IP]
1000             tcp = p[TCP]
1001             self.assertEqual(ip.src, self.snat_addr)
1002             self.assertEqual(ip.dst, host.ip4)
1003             self.assertEqual(tcp.sport, server_out_port)
1004             self.assertEqual(tcp.dport, host_in_port)
1005         except:
1006             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1007             raise
1008
1009     def test_max_translations_per_user(self):
1010         """ MAX translations per user - recycle the least recently used """
1011
1012         self.snat_add_address(self.snat_addr)
1013         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1014         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1015                                                  is_inside=0)
1016
1017         # get maximum number of translations per user
1018         snat_config = self.vapi.snat_show_config()
1019
1020         # send more than maximum number of translations per user packets
1021         pkts_num = snat_config.max_translations_per_user + 5
1022         pkts = []
1023         for port in range(0, pkts_num):
1024             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1025                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1026                  TCP(sport=1025 + port))
1027             pkts.append(p)
1028         self.pg0.add_stream(pkts)
1029         self.pg_enable_capture(self.pg_interfaces)
1030         self.pg_start()
1031
1032         # verify number of translated packet
1033         self.pg1.get_capture(pkts_num)
1034
1035     def test_interface_addr(self):
1036         """ Acquire SNAT addresses from interface """
1037         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1038
1039         # no address in NAT pool
1040         adresses = self.vapi.snat_address_dump()
1041         self.assertEqual(0, len(adresses))
1042
1043         # configure interface address and check NAT address pool
1044         self.pg7.config_ip4()
1045         adresses = self.vapi.snat_address_dump()
1046         self.assertEqual(1, len(adresses))
1047         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1048
1049         # remove interface address and check NAT address pool
1050         self.pg7.unconfig_ip4()
1051         adresses = self.vapi.snat_address_dump()
1052         self.assertEqual(0, len(adresses))
1053
1054     def test_interface_addr_static_mapping(self):
1055         """ Static mapping with addresses from interface """
1056         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1057         self.snat_add_static_mapping('1.2.3.4',
1058                                      external_sw_if_index=self.pg7.sw_if_index)
1059
1060         # static mappings with external interface
1061         static_mappings = self.vapi.snat_static_mapping_dump()
1062         self.assertEqual(1, len(static_mappings))
1063         self.assertEqual(self.pg7.sw_if_index,
1064                          static_mappings[0].external_sw_if_index)
1065
1066         # configure interface address and check static mappings
1067         self.pg7.config_ip4()
1068         static_mappings = self.vapi.snat_static_mapping_dump()
1069         self.assertEqual(1, len(static_mappings))
1070         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1071                          self.pg7.local_ip4n)
1072         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1073
1074         # remove interface address and check static mappings
1075         self.pg7.unconfig_ip4()
1076         static_mappings = self.vapi.snat_static_mapping_dump()
1077         self.assertEqual(0, len(static_mappings))
1078
1079     def test_ipfix_nat44_sess(self):
1080         """ S-NAT IPFIX logging NAT44 session created/delted """
1081         self.snat_add_address(self.snat_addr)
1082         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1083         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1084                                                  is_inside=0)
1085         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1086                                      src_address=self.pg3.local_ip4n,
1087                                      path_mtu=512,
1088                                      template_interval=10)
1089         self.vapi.snat_ipfix()
1090
1091         pkts = self.create_stream_in(self.pg0, self.pg1)
1092         self.pg0.add_stream(pkts)
1093         self.pg_enable_capture(self.pg_interfaces)
1094         self.pg_start()
1095         capture = self.pg1.get_capture(len(pkts))
1096         self.verify_capture_out(capture)
1097         self.snat_add_address(self.snat_addr, is_add=0)
1098         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1099         capture = self.pg3.get_capture(3)
1100         ipfix = IPFIXDecoder()
1101         # first load template
1102         for p in capture:
1103             self.assertTrue(p.haslayer(IPFIX))
1104             if p.haslayer(Template):
1105                 ipfix.add_template(p.getlayer(Template))
1106         # verify events in data set
1107         for p in capture:
1108             if p.haslayer(Data):
1109                 data = ipfix.decode_data_set(p.getlayer(Set))
1110                 self.verify_ipfix_nat44_ses(data)
1111
1112     def test_ipfix_addr_exhausted(self):
1113         """ S-NAT IPFIX logging NAT addresses exhausted """
1114         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1115         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1116                                                  is_inside=0)
1117         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1118                                      src_address=self.pg3.local_ip4n,
1119                                      path_mtu=512,
1120                                      template_interval=10)
1121         self.vapi.snat_ipfix()
1122
1123         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1124              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1125              TCP(sport=3025))
1126         self.pg0.add_stream(p)
1127         self.pg_enable_capture(self.pg_interfaces)
1128         self.pg_start()
1129         capture = self.pg1.get_capture(0)
1130         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1131         capture = self.pg3.get_capture(3)
1132         ipfix = IPFIXDecoder()
1133         # first load template
1134         for p in capture:
1135             self.assertTrue(p.haslayer(IPFIX))
1136             if p.haslayer(Template):
1137                 ipfix.add_template(p.getlayer(Template))
1138         # verify events in data set
1139         for p in capture:
1140             if p.haslayer(Data):
1141                 data = ipfix.decode_data_set(p.getlayer(Set))
1142                 self.verify_ipfix_addr_exhausted(data)
1143
1144     def test_pool_addr_fib(self):
1145         """ S-NAT add pool addresses to FIB """
1146         static_addr = '10.0.0.10'
1147         self.snat_add_address(self.snat_addr)
1148         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1149         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1150                                                  is_inside=0)
1151         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1152
1153         # SNAT address
1154         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1155              ARP(op=ARP.who_has, pdst=self.snat_addr,
1156                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1157         self.pg1.add_stream(p)
1158         self.pg_enable_capture(self.pg_interfaces)
1159         self.pg_start()
1160         capture = self.pg1.get_capture(1)
1161         self.assertTrue(capture[0].haslayer(ARP))
1162         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1163
1164         # 1:1 NAT address
1165         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1166              ARP(op=ARP.who_has, pdst=static_addr,
1167                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1168         self.pg1.add_stream(p)
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pg_start()
1171         capture = self.pg1.get_capture(1)
1172         self.assertTrue(capture[0].haslayer(ARP))
1173         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1174
1175         # send ARP to non-SNAT interface
1176         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1177              ARP(op=ARP.who_has, pdst=self.snat_addr,
1178                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1179         self.pg2.add_stream(p)
1180         self.pg_enable_capture(self.pg_interfaces)
1181         self.pg_start()
1182         capture = self.pg1.get_capture(0)
1183
1184         # remove addresses and verify
1185         self.snat_add_address(self.snat_addr, is_add=0)
1186         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1187                                      is_add=0)
1188
1189         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1190              ARP(op=ARP.who_has, pdst=self.snat_addr,
1191                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1192         self.pg1.add_stream(p)
1193         self.pg_enable_capture(self.pg_interfaces)
1194         self.pg_start()
1195         capture = self.pg1.get_capture(0)
1196
1197         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1198              ARP(op=ARP.who_has, pdst=static_addr,
1199                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1200         self.pg1.add_stream(p)
1201         self.pg_enable_capture(self.pg_interfaces)
1202         self.pg_start()
1203         capture = self.pg1.get_capture(0)
1204
1205     def test_vrf_mode(self):
1206         """ S-NAT tenant VRF aware address pool mode """
1207
1208         vrf_id1 = 1
1209         vrf_id2 = 2
1210         nat_ip1 = "10.0.0.10"
1211         nat_ip2 = "10.0.0.11"
1212
1213         self.pg0.unconfig_ip4()
1214         self.pg1.unconfig_ip4()
1215         self.pg0.set_table_ip4(vrf_id1)
1216         self.pg1.set_table_ip4(vrf_id2)
1217         self.pg0.config_ip4()
1218         self.pg1.config_ip4()
1219
1220         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1221         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1222         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1223         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1224         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1225                                                  is_inside=0)
1226
1227         # first VRF
1228         pkts = self.create_stream_in(self.pg0, self.pg2)
1229         self.pg0.add_stream(pkts)
1230         self.pg_enable_capture(self.pg_interfaces)
1231         self.pg_start()
1232         capture = self.pg2.get_capture(len(pkts))
1233         self.verify_capture_out(capture, nat_ip1)
1234
1235         # second VRF
1236         pkts = self.create_stream_in(self.pg1, self.pg2)
1237         self.pg1.add_stream(pkts)
1238         self.pg_enable_capture(self.pg_interfaces)
1239         self.pg_start()
1240         capture = self.pg2.get_capture(len(pkts))
1241         self.verify_capture_out(capture, nat_ip2)
1242
1243     def test_vrf_feature_independent(self):
1244         """ S-NAT tenant VRF independent address pool mode """
1245
1246         nat_ip1 = "10.0.0.10"
1247         nat_ip2 = "10.0.0.11"
1248
1249         self.snat_add_address(nat_ip1)
1250         self.snat_add_address(nat_ip2)
1251         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1252         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1253         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1254                                                  is_inside=0)
1255
1256         # first VRF
1257         pkts = self.create_stream_in(self.pg0, self.pg2)
1258         self.pg0.add_stream(pkts)
1259         self.pg_enable_capture(self.pg_interfaces)
1260         self.pg_start()
1261         capture = self.pg2.get_capture(len(pkts))
1262         self.verify_capture_out(capture, nat_ip1)
1263
1264         # second VRF
1265         pkts = self.create_stream_in(self.pg1, self.pg2)
1266         self.pg1.add_stream(pkts)
1267         self.pg_enable_capture(self.pg_interfaces)
1268         self.pg_start()
1269         capture = self.pg2.get_capture(len(pkts))
1270         self.verify_capture_out(capture, nat_ip1)
1271
1272     def tearDown(self):
1273         super(TestSNAT, self).tearDown()
1274         if not self.vpp_dead:
1275             self.logger.info(self.vapi.cli("show snat verbose"))
1276             self.clear_snat()
1277
1278
1279 class TestDeterministicNAT(VppTestCase):
1280     """ Deterministic NAT Test Cases """
1281
1282     @classmethod
1283     def setUpConstants(cls):
1284         super(TestDeterministicNAT, cls).setUpConstants()
1285         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1286
1287     @classmethod
1288     def setUpClass(cls):
1289         super(TestDeterministicNAT, cls).setUpClass()
1290
1291         try:
1292             cls.create_pg_interfaces(range(2))
1293             cls.interfaces = list(cls.pg_interfaces)
1294
1295             for i in cls.interfaces:
1296                 i.admin_up()
1297                 i.config_ip4()
1298                 i.resolve_arp()
1299
1300         except Exception:
1301             super(TestDeterministicNAT, cls).tearDownClass()
1302             raise
1303
1304     def test_deterministic_mode(self):
1305         """ S-NAT run deterministic mode """
1306         in_addr = '172.16.255.0'
1307         out_addr = '172.17.255.50'
1308         in_addr_t = '172.16.255.20'
1309         in_addr_n = socket.inet_aton(in_addr)
1310         out_addr_n = socket.inet_aton(out_addr)
1311         in_addr_t_n = socket.inet_aton(in_addr_t)
1312         in_plen = 24
1313         out_plen = 32
1314
1315         snat_config = self.vapi.snat_show_config()
1316         self.assertEqual(1, snat_config.deterministic)
1317
1318         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1319
1320         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1321         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1322         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1323         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1324
1325         deterministic_mappings = self.vapi.snat_det_map_dump()
1326         self.assertEqual(len(deterministic_mappings), 1)
1327         dsm = deterministic_mappings[0]
1328         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1329         self.assertEqual(in_plen, dsm.in_plen)
1330         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1331         self.assertEqual(out_plen, dsm.out_plen)
1332
1333         self.clear_snat()
1334         deterministic_mappings = self.vapi.snat_det_map_dump()
1335         self.assertEqual(len(deterministic_mappings), 0)
1336
1337     def clear_snat(self):
1338         """
1339         Clear SNAT configuration.
1340         """
1341         deterministic_mappings = self.vapi.snat_det_map_dump()
1342         for dsm in deterministic_mappings:
1343             self.vapi.snat_add_det_map(dsm.in_addr,
1344                                        dsm.in_plen,
1345                                        dsm.out_addr,
1346                                        dsm.out_plen,
1347                                        is_add=0)
1348
1349     def tearDown(self):
1350         super(TestDeterministicNAT, self).tearDown()
1351         if not self.vpp_dead:
1352             self.logger.info(self.vapi.cli("show snat detail"))
1353             self.clear_snat()
1354
1355 if __name__ == '__main__':
1356     unittest.main(testRunner=VppTestRunner)