VPP-1299: Reset uid/gid on shared memory segment if client starts first.
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.inet import IP, ICMP
5 from scapy.layers.l2 import Ether, Raw
6 from scapy.layers.ipsec import SecurityAssociation, AH
7
8 from framework import VppTestCase, VppTestRunner
9
10
11 class TestIpsecAh(VppTestCase):
12     """
13     Basic test for IPSEC using AH transport and Tunnel mode
14
15     Below 4 cases are covered as part of this test
16     1) ipsec ah v4 transport basic test  - IPv4 Transport mode
17      scenario using HMAC-SHA1-96 intergrity algo
18     2) ipsec ah v4 transport burst test
19      Above test for 257 pkts
20     3) ipsec ah 4o4 tunnel basic test    - IPv4 Tunnel mode
21      scenario using HMAC-SHA1-96 intergrity algo
22     4) ipsec ah 4o4 tunnel burst test
23      Above test for 257 pkts
24
25     TRANSPORT MODE:
26
27      ---   encrypt   ---
28     |pg2| <-------> |VPP|
29      ---   decrypt   ---
30
31     TUNNEL MODE:
32
33      ---   encrypt   ---   plain   ---
34     |pg0| <-------  |VPP| <------ |pg1|
35      ---             ---           ---
36
37      ---   decrypt   ---   plain   ---
38     |pg0| ------->  |VPP| ------> |pg1|
39      ---             ---           ---
40
41     Note : IPv6 is not covered
42     """
43
44     remote_pg0_lb_addr = '1.1.1.1'
45     remote_pg1_lb_addr = '2.2.2.2'
46     payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
47
48     @classmethod
49     def setUpClass(cls):
50         super(TestIpsecAh, cls).setUpClass()
51         try:
52             cls.create_pg_interfaces(range(3))
53             cls.interfaces = list(cls.pg_interfaces)
54             for i in cls.interfaces:
55                 i.admin_up()
56                 i.config_ip4()
57                 i.resolve_arp()
58             cls.logger.info(cls.vapi.ppcli("show int addr"))
59             cls.config_ah_tra()
60             cls.logger.info(cls.vapi.ppcli("show ipsec"))
61             cls.config_ah_tun()
62             cls.logger.info(cls.vapi.ppcli("show ipsec"))
63         except Exception:
64             super(TestIpsecAh, cls).tearDownClass()
65             raise
66
67     @classmethod
68     def config_ah_tun(cls):
69         spd_id = 1
70         remote_sa_id = 10
71         local_sa_id = 20
72         remote_tun_spi = 1001
73         local_tun_spi = 1000
74         src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
75         cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
76         dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
77         cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
78         cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tun_spi,
79                                          cls.pg0.local_ip4n,
80                                          cls.pg0.remote_ip4n,
81                                          integrity_key_length=20)
82         cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tun_spi,
83                                          cls.pg0.remote_ip4n,
84                                          cls.pg0.local_ip4n,
85                                          integrity_key_length=20)
86         cls.vapi.ipsec_spd_add_del(spd_id)
87         cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
88         l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
89         l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
90                                                    "255.255.255.255")
91         cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
92                                          r_startaddr, r_stopaddr,
93                                          protocol=socket.IPPROTO_AH)
94         cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
95                                          r_startaddr, r_stopaddr,
96                                          protocol=socket.IPPROTO_AH,
97                                          is_outbound=0)
98         l_startaddr = l_stopaddr = socket.inet_pton(socket.AF_INET,
99                                                     cls.remote_pg0_lb_addr)
100         r_startaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
101                                                     cls.remote_pg1_lb_addr)
102         cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
103                                          r_startaddr, r_stopaddr,
104                                          priority=10, policy=3,
105                                          is_outbound=0, sa_id=local_sa_id)
106         cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr,
107                                          l_startaddr, l_stopaddr, priority=10,
108                                          policy=3, sa_id=remote_sa_id)
109
110     @classmethod
111     def config_ah_tra(cls):
112         spd_id = 2
113         remote_sa_id = 30
114         local_sa_id = 40
115         remote_tra_spi = 2001
116         local_tra_spi = 2000
117         cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tra_spi,
118                                          integrity_key_length=20, is_tunnel=0)
119         cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tra_spi,
120                                          integrity_key_length=20, is_tunnel=0)
121         cls.vapi.ipsec_spd_add_del(spd_id)
122         cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
123         l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
124         l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
125                                                    "255.255.255.255")
126         cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
127                                          r_startaddr, r_stopaddr,
128                                          protocol=socket.IPPROTO_AH)
129         cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
130                                          r_startaddr, r_stopaddr,
131                                          protocol=socket.IPPROTO_AH,
132                                          is_outbound=0)
133         l_startaddr = l_stopaddr = cls.pg2.local_ip4n
134         r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
135         cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
136                                          r_startaddr, r_stopaddr,
137                                          priority=10, policy=3,
138                                          is_outbound=0, sa_id=local_sa_id)
139         cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
140                                          r_startaddr, r_stopaddr, priority=10,
141                                          policy=3, sa_id=remote_sa_id)
142
143     def configure_scapy_sa_tun(self):
144         remote_tun_sa = SecurityAssociation(AH, spi=0x000003e8,
145                                             auth_algo='HMAC-SHA1-96',
146                                             auth_key='C91KUR9GYMm5GfkEvNjX',
147                                             tunnel_header=IP(
148                                                 src=self.pg0.remote_ip4,
149                                                 dst=self.pg0.local_ip4))
150         local_tun_sa = SecurityAssociation(AH, spi=0x000003e9,
151                                            auth_algo='HMAC-SHA1-96',
152                                            auth_key='C91KUR9GYMm5GfkEvNjX',
153                                            tunnel_header=IP(
154                                                dst=self.pg0.remote_ip4,
155                                                src=self.pg0.local_ip4))
156         return local_tun_sa, remote_tun_sa
157
158     def configure_scapy_sa_tra(self):
159         remote_tra_sa = SecurityAssociation(AH, spi=0x000007d0,
160                                             auth_algo='HMAC-SHA1-96',
161                                             auth_key='C91KUR9GYMm5GfkEvNjX')
162         local_tra_sa = SecurityAssociation(AH, spi=0x000007d1,
163                                            auth_algo='HMAC-SHA1-96',
164                                            auth_key='C91KUR9GYMm5GfkEvNjX')
165         return local_tra_sa, remote_tra_sa
166
167     def tearDown(self):
168         super(TestIpsecAh, self).tearDown()
169         if not self.vpp_dead:
170             self.vapi.cli("show hardware")
171
172     def send_and_expect(self, input, pkts, output, count=1):
173         input.add_stream(pkts)
174         self.pg_enable_capture(self.pg_interfaces)
175         self.pg_start()
176         rx = output.get_capture(count)
177         return rx
178
179     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
180         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
181                 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
182                 ] * count
183
184     def gen_pkts(self, sw_intf, src, dst, count=1):
185         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
186                 IP(src=src, dst=dst) / ICMP() / self.payload
187                 ] * count
188
189     def test_ipsec_ah_tra_basic(self, count=1):
190         """ ipsec ah v4 transport basic test """
191         try:
192             local_tra_sa, remote_tra_sa = self.configure_scapy_sa_tra()
193             send_pkts = self.gen_encrypt_pkts(remote_tra_sa, self.pg2,
194                                               src=self.pg2.remote_ip4,
195                                               dst=self.pg2.local_ip4,
196                                               count=count)
197             recv_pkts = self.send_and_expect(self.pg2, send_pkts, self.pg2,
198                                              count=count)
199             # ESP TRA VPP encryption/decryption verification
200             for Pkts in recv_pkts:
201                 Pkts[AH].padding = Pkts[AH].icv[12:]
202                 Pkts[AH].icv = Pkts[AH].icv[:12]
203                 local_tra_sa.decrypt(Pkts[IP])
204         finally:
205             self.logger.info(self.vapi.ppcli("show error"))
206             self.logger.info(self.vapi.ppcli("show ipsec"))
207
208     def test_ipsec_ah_tra_burst(self):
209         """ ipsec ah v4 transport burst test """
210         try:
211             self.test_ipsec_ah_tra_basic(count=257)
212         finally:
213             self.logger.info(self.vapi.ppcli("show error"))
214             self.logger.info(self.vapi.ppcli("show ipsec"))
215
216     def test_ipsec_ah_tun_basic(self, count=1):
217         """ ipsec ah 4o4 tunnel basic test """
218         try:
219             local_tun_sa, remote_tun_sa = self.configure_scapy_sa_tun()
220             send_pkts = self.gen_encrypt_pkts(remote_tun_sa, self.pg0,
221                                               src=self.remote_pg0_lb_addr,
222                                               dst=self.remote_pg1_lb_addr,
223                                               count=count)
224             recv_pkts = self.send_and_expect(self.pg0, send_pkts, self.pg1,
225                                              count=count)
226             # ESP TUN VPP decryption verification
227             for recv_pkt in recv_pkts:
228                 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
229                 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
230             send_pkts = self.gen_pkts(self.pg1, src=self.remote_pg1_lb_addr,
231                                       dst=self.remote_pg0_lb_addr,
232                                       count=count)
233             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.pg0,
234                                              count=count)
235             # ESP TUN VPP encryption verification
236             for recv_pkt in recv_pkts:
237                 decrypt_pkt = local_tun_sa.decrypt(recv_pkt[IP])
238                 decrypt_pkt = IP(decrypt_pkt[Raw].load)
239                 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
240                 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
241         finally:
242             self.logger.info(self.vapi.ppcli("show error"))
243             self.logger.info(self.vapi.ppcli("show ipsec"))
244
245     def test_ipsec_ah_tun_burst(self):
246         """ ipsec ah 4o4 tunnel burst test """
247         try:
248             self.test_ipsec_ah_tun_basic(count=257)
249         finally:
250             self.logger.info(self.vapi.ppcli("show error"))
251             self.logger.info(self.vapi.ppcli("show ipsec"))
252
253
254 if __name__ == '__main__':
255     unittest.main(testRunner=VppTestRunner)