tests: replace pycodestyle with black
[vpp.git] / test / test_wireguard.py
1 #!/usr/bin/env python3
2 """ Wg tests """
3
4 import datetime
5 import base64
6 import os
7
8 from hashlib import blake2s
9 from scapy.packet import Packet
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.wireguard import (
15     Wireguard,
16     WireguardResponse,
17     WireguardInitiation,
18     WireguardTransport,
19 )
20 from cryptography.hazmat.primitives.asymmetric.x25519 import (
21     X25519PrivateKey,
22     X25519PublicKey,
23 )
24 from cryptography.hazmat.primitives.serialization import (
25     Encoding,
26     PrivateFormat,
27     PublicFormat,
28     NoEncryption,
29 )
30 from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
31 from cryptography.hazmat.primitives.hmac import HMAC
32 from cryptography.hazmat.backends import default_backend
33 from noise.connection import NoiseConnection, Keypair
34
35 from vpp_ipip_tun_interface import VppIpIpTunInterface
36 from vpp_interface import VppInterface
37 from vpp_ip_route import VppIpRoute, VppRoutePath
38 from vpp_object import VppObject
39 from vpp_papi import VppEnum
40 from framework import VppTestCase
41 from re import compile
42 import unittest
43
44 """ TestWg is a subclass of  VPPTestCase classes.
45
46 Wg test.
47
48 """
49
50
51 def private_key_bytes(k):
52     return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
53
54
55 def public_key_bytes(k):
56     return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
57
58
59 class VppWgInterface(VppInterface):
60     """
61     VPP WireGuard interface
62     """
63
64     def __init__(self, test, src, port):
65         super(VppWgInterface, self).__init__(test)
66
67         self.port = port
68         self.src = src
69         self.private_key = X25519PrivateKey.generate()
70         self.public_key = self.private_key.public_key()
71
72     def public_key_bytes(self):
73         return public_key_bytes(self.public_key)
74
75     def private_key_bytes(self):
76         return private_key_bytes(self.private_key)
77
78     def add_vpp_config(self):
79         r = self.test.vapi.wireguard_interface_create(
80             interface={
81                 "user_instance": 0xFFFFFFFF,
82                 "port": self.port,
83                 "src_ip": self.src,
84                 "private_key": private_key_bytes(self.private_key),
85                 "generate_key": False,
86             }
87         )
88         self.set_sw_if_index(r.sw_if_index)
89         self.test.registry.register(self, self.test.logger)
90         return self
91
92     def remove_vpp_config(self):
93         self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
94
95     def query_vpp_config(self):
96         ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
97         for t in ts:
98             if (
99                 t.interface.sw_if_index == self._sw_if_index
100                 and str(t.interface.src_ip) == self.src
101                 and t.interface.port == self.port
102                 and t.interface.private_key == private_key_bytes(self.private_key)
103             ):
104                 return True
105         return False
106
107     def want_events(self, peer_index=0xFFFFFFFF):
108         self.test.vapi.want_wireguard_peer_events(
109             enable_disable=1,
110             pid=os.getpid(),
111             sw_if_index=self._sw_if_index,
112             peer_index=peer_index,
113         )
114
115     def wait_events(self, expect, peers, timeout=5):
116         for i in range(len(peers)):
117             rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
118             self.test.assertEqual(rv.peer_index, peers[i])
119             self.test.assertEqual(rv.flags, expect)
120
121     def __str__(self):
122         return self.object_id()
123
124     def object_id(self):
125         return "wireguard-%d" % self._sw_if_index
126
127
128 def find_route(test, prefix, is_ip6, table_id=0):
129     routes = test.vapi.ip_route_dump(table_id, is_ip6)
130
131     for e in routes:
132         if table_id == e.route.table_id and str(e.route.prefix) == str(prefix):
133             return True
134     return False
135
136
137 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
138 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
139
140
141 class VppWgPeer(VppObject):
142     def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
143         self._test = test
144         self.itf = itf
145         self.endpoint = endpoint
146         self.port = port
147         self.allowed_ips = allowed_ips
148         self.persistent_keepalive = persistent_keepalive
149
150         # remote peer's public
151         self.private_key = X25519PrivateKey.generate()
152         self.public_key = self.private_key.public_key()
153
154         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
155
156     def add_vpp_config(self, is_ip6=False):
157         rv = self._test.vapi.wireguard_peer_add(
158             peer={
159                 "public_key": self.public_key_bytes(),
160                 "port": self.port,
161                 "endpoint": self.endpoint,
162                 "n_allowed_ips": len(self.allowed_ips),
163                 "allowed_ips": self.allowed_ips,
164                 "sw_if_index": self.itf.sw_if_index,
165                 "persistent_keepalive": self.persistent_keepalive,
166             }
167         )
168         self.index = rv.peer_index
169         self.receiver_index = self.index + 1
170         self._test.registry.register(self, self._test.logger)
171         return self
172
173     def remove_vpp_config(self):
174         self._test.vapi.wireguard_peer_remove(peer_index=self.index)
175
176     def object_id(self):
177         return "wireguard-peer-%s" % self.index
178
179     def public_key_bytes(self):
180         return public_key_bytes(self.public_key)
181
182     def query_vpp_config(self):
183         peers = self._test.vapi.wireguard_peers_dump()
184
185         for p in peers:
186             if (
187                 p.peer.public_key == self.public_key_bytes()
188                 and p.peer.port == self.port
189                 and str(p.peer.endpoint) == self.endpoint
190                 and p.peer.sw_if_index == self.itf.sw_if_index
191                 and len(self.allowed_ips) == p.peer.n_allowed_ips
192             ):
193                 self.allowed_ips.sort()
194                 p.peer.allowed_ips.sort()
195
196                 for (a1, a2) in zip(self.allowed_ips, p.peer.allowed_ips):
197                     if str(a1) != str(a2):
198                         return False
199                 return True
200         return False
201
202     def set_responder(self):
203         self.noise.set_as_responder()
204
205     def mk_tunnel_header(self, tx_itf, is_ip6=False):
206         if is_ip6 is False:
207             return (
208                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
209                 / IP(src=self.endpoint, dst=self.itf.src)
210                 / UDP(sport=self.port, dport=self.itf.port)
211             )
212         else:
213             return (
214                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
215                 / IPv6(src=self.endpoint, dst=self.itf.src)
216                 / UDP(sport=self.port, dport=self.itf.port)
217             )
218
219     def noise_init(self, public_key=None):
220         self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
221         self.noise.set_psks(psk=bytes(bytearray(32)))
222
223         if not public_key:
224             public_key = self.itf.public_key
225
226         # local/this private
227         self.noise.set_keypair_from_private_bytes(
228             Keypair.STATIC, private_key_bytes(self.private_key)
229         )
230         # remote's public
231         self.noise.set_keypair_from_public_bytes(
232             Keypair.REMOTE_STATIC, public_key_bytes(public_key)
233         )
234
235         self.noise.start_handshake()
236
237     def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
238         self.noise.set_as_initiator()
239         self.noise_init(public_key)
240
241         p = Wireguard() / WireguardInitiation()
242
243         p[Wireguard].message_type = 1
244         p[Wireguard].reserved_zero = 0
245         p[WireguardInitiation].sender_index = self.receiver_index
246
247         # some random data for the message
248         #  lifted from the noise protocol's wireguard example
249         now = datetime.datetime.now()
250         tai = struct.pack(
251             "!qi",
252             4611686018427387914 + int(now.timestamp()),
253             int(now.microsecond * 1e3),
254         )
255         b = self.noise.write_message(payload=tai)
256
257         # load noise into init message
258         p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
259         p[WireguardInitiation].encrypted_static = b[32:80]
260         p[WireguardInitiation].encrypted_timestamp = b[80:108]
261
262         # generate the mac1 hash
263         mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
264         p[WireguardInitiation].mac1 = blake2s(
265             bytes(p)[0:116], digest_size=16, key=mac_key
266         ).digest()
267         p[WireguardInitiation].mac2 = bytearray(16)
268
269         p = self.mk_tunnel_header(tx_itf, is_ip6) / p
270
271         return p
272
273     def verify_header(self, p, is_ip6=False):
274         if is_ip6 is False:
275             self._test.assertEqual(p[IP].src, self.itf.src)
276             self._test.assertEqual(p[IP].dst, self.endpoint)
277         else:
278             self._test.assertEqual(p[IPv6].src, self.itf.src)
279             self._test.assertEqual(p[IPv6].dst, self.endpoint)
280         self._test.assertEqual(p[UDP].sport, self.itf.port)
281         self._test.assertEqual(p[UDP].dport, self.port)
282         self._test.assert_packet_checksums_valid(p)
283
284     def consume_init(self, p, tx_itf, is_ip6=False):
285         self.noise.set_as_responder()
286         self.noise_init(self.itf.public_key)
287         self.verify_header(p, is_ip6)
288
289         init = Wireguard(p[Raw])
290
291         self._test.assertEqual(init[Wireguard].message_type, 1)
292         self._test.assertEqual(init[Wireguard].reserved_zero, 0)
293
294         self.sender = init[WireguardInitiation].sender_index
295
296         # validate the hash
297         mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
298         mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
299         self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
300
301         # this passes only unencrypted_ephemeral, encrypted_static,
302         # encrypted_timestamp fields of the init
303         payload = self.noise.read_message(bytes(init)[8:-32])
304
305         # build the response
306         b = self.noise.write_message()
307         mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
308         resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
309             sender_index=self.receiver_index,
310             receiver_index=self.sender,
311             unencrypted_ephemeral=b[0:32],
312             encrypted_nothing=b[32:],
313         )
314         mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
315         resp[WireguardResponse].mac1 = mac1
316
317         resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
318         self._test.assertTrue(self.noise.handshake_finished)
319
320         return resp
321
322     def consume_response(self, p, is_ip6=False):
323         self.verify_header(p, is_ip6)
324
325         resp = Wireguard(p[Raw])
326
327         self._test.assertEqual(resp[Wireguard].message_type, 2)
328         self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
329         self._test.assertEqual(
330             resp[WireguardResponse].receiver_index, self.receiver_index
331         )
332
333         self.sender = resp[Wireguard].sender_index
334
335         payload = self.noise.read_message(bytes(resp)[12:60])
336         self._test.assertEqual(payload, b"")
337         self._test.assertTrue(self.noise.handshake_finished)
338
339     def decrypt_transport(self, p, is_ip6=False):
340         self.verify_header(p, is_ip6)
341
342         p = Wireguard(p[Raw])
343         self._test.assertEqual(p[Wireguard].message_type, 4)
344         self._test.assertEqual(p[Wireguard].reserved_zero, 0)
345         self._test.assertEqual(
346             p[WireguardTransport].receiver_index, self.receiver_index
347         )
348
349         d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
350         return d
351
352     def encrypt_transport(self, p):
353         return self.noise.encrypt(bytes(p))
354
355     def validate_encapped(self, rxs, tx, is_ip6=False):
356         for rx in rxs:
357             if is_ip6 is False:
358                 rx = IP(self.decrypt_transport(rx))
359
360                 # chech the oringial packet is present
361                 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
362                 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
363             else:
364                 rx = IPv6(self.decrypt_transport(rx))
365
366                 # chech the oringial packet is present
367                 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
368                 self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl - 1)
369
370     def want_events(self):
371         self._test.vapi.want_wireguard_peer_events(
372             enable_disable=1,
373             pid=os.getpid(),
374             peer_index=self.index,
375             sw_if_index=self.itf.sw_if_index,
376         )
377
378     def wait_event(self, expect, timeout=5):
379         rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
380         self._test.assertEqual(rv.flags, expect)
381         self._test.assertEqual(rv.peer_index, self.index)
382
383
384 class TestWg(VppTestCase):
385     """Wireguard Test Case"""
386
387     error_str = compile(r"Error")
388
389     wg4_output_node_name = "/err/wg4-output-tun/"
390     wg4_input_node_name = "/err/wg4-input/"
391     wg6_output_node_name = "/err/wg6-output-tun/"
392     wg6_input_node_name = "/err/wg6-input/"
393     kp4_error = wg4_output_node_name + "Keypair error"
394     mac4_error = wg4_input_node_name + "Invalid MAC handshake"
395     peer4_error = wg4_input_node_name + "Peer error"
396     kp6_error = wg6_output_node_name + "Keypair error"
397     mac6_error = wg6_input_node_name + "Invalid MAC handshake"
398     peer6_error = wg6_input_node_name + "Peer error"
399
400     @classmethod
401     def setUpClass(cls):
402         super(TestWg, cls).setUpClass()
403         try:
404             cls.create_pg_interfaces(range(3))
405             for i in cls.pg_interfaces:
406                 i.admin_up()
407                 i.config_ip4()
408                 i.config_ip6()
409                 i.resolve_arp()
410                 i.resolve_ndp()
411
412         except Exception:
413             super(TestWg, cls).tearDownClass()
414             raise
415
416     @classmethod
417     def tearDownClass(cls):
418         super(TestWg, cls).tearDownClass()
419
420     def setUp(self):
421         super(VppTestCase, self).setUp()
422         self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
423         self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
424         self.base_peer4_err = self.statistics.get_err_counter(self.peer4_error)
425         self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
426         self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
427         self.base_peer6_err = self.statistics.get_err_counter(self.peer6_error)
428
429     def test_wg_interface(self):
430         """Simple interface creation"""
431         port = 12312
432
433         # Create interface
434         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
435
436         self.logger.info(self.vapi.cli("sh int"))
437
438         # delete interface
439         wg0.remove_vpp_config()
440
441     def test_handshake_hash(self):
442         """test hashing an init message"""
443         # a init packet generated by linux given the key below
444         h = (
445             "0100000098b9032b"
446             "55cc4b39e73c3d24"
447             "a2a1ab884b524a81"
448             "1808bb86640fb70d"
449             "e93154fec1879125"
450             "ab012624a27f0b75"
451             "c0a2582f438ddb5f"
452             "8e768af40b4ab444"
453             "02f9ff473e1b797e"
454             "80d39d93c5480c82"
455             "a3d4510f70396976"
456             "586fb67300a5167b"
457             "ae6ca3ff3dfd00eb"
458             "59be198810f5aa03"
459             "6abc243d2155ee4f"
460             "2336483900aef801"
461             "08752cd700000000"
462             "0000000000000000"
463             "00000000"
464         )
465
466         b = bytearray.fromhex(h)
467         tgt = Wireguard(b)
468
469         pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
470         pub = X25519PublicKey.from_public_bytes(pubb)
471
472         self.assertEqual(pubb, public_key_bytes(pub))
473
474         # strip the macs and build a new packet
475         init = b[0:-32]
476         mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
477         init += blake2s(init, digest_size=16, key=mac_key).digest()
478         init += b"\x00" * 16
479
480         act = Wireguard(init)
481
482         self.assertEqual(tgt, act)
483
484     def test_wg_peer_resp(self):
485         """Send handshake response"""
486         port = 12323
487
488         # Create interfaces
489         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
490         wg0.admin_up()
491         wg0.config_ip4()
492
493         self.pg_enable_capture(self.pg_interfaces)
494         self.pg_start()
495
496         peer_1 = VppWgPeer(
497             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
498         ).add_vpp_config()
499         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
500
501         r1 = VppIpRoute(
502             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
503         ).add_vpp_config()
504
505         # wait for the peer to send a handshake
506         rx = self.pg1.get_capture(1, timeout=2)
507
508         # consume the handshake in the noise protocol and
509         # generate the response
510         resp = peer_1.consume_init(rx[0], self.pg1)
511
512         # send the response, get keepalive
513         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
514
515         for rx in rxs:
516             b = peer_1.decrypt_transport(rx)
517             self.assertEqual(0, len(b))
518
519         # send a packets that are routed into the tunnel
520         p = (
521             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
522             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
523             / UDP(sport=555, dport=556)
524             / Raw(b"\x00" * 80)
525         )
526
527         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
528
529         peer_1.validate_encapped(rxs, p)
530
531         # send packets into the tunnel, expect to receive them on
532         # the other side
533         p = [
534             (
535                 peer_1.mk_tunnel_header(self.pg1)
536                 / Wireguard(message_type=4, reserved_zero=0)
537                 / WireguardTransport(
538                     receiver_index=peer_1.sender,
539                     counter=ii,
540                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
541                         (
542                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
543                             / UDP(sport=222, dport=223)
544                             / Raw()
545                         )
546                     ),
547                 )
548             )
549             for ii in range(255)
550         ]
551
552         rxs = self.send_and_expect(self.pg1, p, self.pg0)
553
554         for rx in rxs:
555             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
556             self.assertEqual(rx[IP].ttl, 19)
557
558         r1.remove_vpp_config()
559         peer_1.remove_vpp_config()
560         wg0.remove_vpp_config()
561
562     def test_wg_peer_v4o4(self):
563         """Test v4o4"""
564
565         port = 12333
566
567         # Create interfaces
568         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
569         wg0.admin_up()
570         wg0.config_ip4()
571
572         peer_1 = VppWgPeer(
573             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
574         ).add_vpp_config()
575         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
576
577         r1 = VppIpRoute(
578             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
579         ).add_vpp_config()
580
581         # route a packet into the wg interface
582         #  use the allowed-ip prefix
583         #  this is dropped because the peer is not initiated
584         p = (
585             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
586             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
587             / UDP(sport=555, dport=556)
588             / Raw()
589         )
590         self.send_and_assert_no_replies(self.pg0, [p])
591         self.assertEqual(
592             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
593         )
594
595         # send a handsake from the peer with an invalid MAC
596         p = peer_1.mk_handshake(self.pg1)
597         p[WireguardInitiation].mac1 = b"foobar"
598         self.send_and_assert_no_replies(self.pg1, [p])
599         self.assertEqual(
600             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
601         )
602
603         # send a handsake from the peer but signed by the wrong key.
604         p = peer_1.mk_handshake(
605             self.pg1, False, X25519PrivateKey.generate().public_key()
606         )
607         self.send_and_assert_no_replies(self.pg1, [p])
608         self.assertEqual(
609             self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error)
610         )
611
612         # send a valid handsake init for which we expect a response
613         p = peer_1.mk_handshake(self.pg1)
614
615         rx = self.send_and_expect(self.pg1, [p], self.pg1)
616
617         peer_1.consume_response(rx[0])
618
619         # route a packet into the wg interface
620         #  this is dropped because the peer is still not initiated
621         p = (
622             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
623             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
624             / UDP(sport=555, dport=556)
625             / Raw()
626         )
627         self.send_and_assert_no_replies(self.pg0, [p])
628         self.assertEqual(
629             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
630         )
631
632         # send a data packet from the peer through the tunnel
633         # this completes the handshake
634         p = (
635             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
636             / UDP(sport=222, dport=223)
637             / Raw()
638         )
639         d = peer_1.encrypt_transport(p)
640         p = peer_1.mk_tunnel_header(self.pg1) / (
641             Wireguard(message_type=4, reserved_zero=0)
642             / WireguardTransport(
643                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
644             )
645         )
646         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
647
648         for rx in rxs:
649             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
650             self.assertEqual(rx[IP].ttl, 19)
651
652         # send a packets that are routed into the tunnel
653         p = (
654             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
655             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
656             / UDP(sport=555, dport=556)
657             / Raw(b"\x00" * 80)
658         )
659
660         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
661
662         for rx in rxs:
663             rx = IP(peer_1.decrypt_transport(rx))
664
665             # chech the oringial packet is present
666             self.assertEqual(rx[IP].dst, p[IP].dst)
667             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
668
669         # send packets into the tunnel, expect to receive them on
670         # the other side
671         p = [
672             (
673                 peer_1.mk_tunnel_header(self.pg1)
674                 / Wireguard(message_type=4, reserved_zero=0)
675                 / WireguardTransport(
676                     receiver_index=peer_1.sender,
677                     counter=ii + 1,
678                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
679                         (
680                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
681                             / UDP(sport=222, dport=223)
682                             / Raw()
683                         )
684                     ),
685                 )
686             )
687             for ii in range(255)
688         ]
689
690         rxs = self.send_and_expect(self.pg1, p, self.pg0)
691
692         for rx in rxs:
693             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
694             self.assertEqual(rx[IP].ttl, 19)
695
696         r1.remove_vpp_config()
697         peer_1.remove_vpp_config()
698         wg0.remove_vpp_config()
699
700     def test_wg_peer_v6o6(self):
701         """Test v6o6"""
702
703         port = 12343
704
705         # Create interfaces
706         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
707         wg0.admin_up()
708         wg0.config_ip6()
709
710         peer_1 = VppWgPeer(
711             self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
712         ).add_vpp_config(True)
713         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
714
715         r1 = VppIpRoute(
716             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
717         ).add_vpp_config()
718
719         # route a packet into the wg interface
720         #  use the allowed-ip prefix
721         #  this is dropped because the peer is not initiated
722
723         p = (
724             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
725             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
726             / UDP(sport=555, dport=556)
727             / Raw()
728         )
729         self.send_and_assert_no_replies(self.pg0, [p])
730
731         self.assertEqual(
732             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
733         )
734
735         # send a handsake from the peer with an invalid MAC
736         p = peer_1.mk_handshake(self.pg1, True)
737         p[WireguardInitiation].mac1 = b"foobar"
738         self.send_and_assert_no_replies(self.pg1, [p])
739
740         self.assertEqual(
741             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
742         )
743
744         # send a handsake from the peer but signed by the wrong key.
745         p = peer_1.mk_handshake(
746             self.pg1, True, X25519PrivateKey.generate().public_key()
747         )
748         self.send_and_assert_no_replies(self.pg1, [p])
749         self.assertEqual(
750             self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error)
751         )
752
753         # send a valid handsake init for which we expect a response
754         p = peer_1.mk_handshake(self.pg1, True)
755
756         rx = self.send_and_expect(self.pg1, [p], self.pg1)
757
758         peer_1.consume_response(rx[0], True)
759
760         # route a packet into the wg interface
761         #  this is dropped because the peer is still not initiated
762         p = (
763             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
764             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
765             / UDP(sport=555, dport=556)
766             / Raw()
767         )
768         self.send_and_assert_no_replies(self.pg0, [p])
769         self.assertEqual(
770             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
771         )
772
773         # send a data packet from the peer through the tunnel
774         # this completes the handshake
775         p = (
776             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
777             / UDP(sport=222, dport=223)
778             / Raw()
779         )
780         d = peer_1.encrypt_transport(p)
781         p = peer_1.mk_tunnel_header(self.pg1, True) / (
782             Wireguard(message_type=4, reserved_zero=0)
783             / WireguardTransport(
784                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
785             )
786         )
787         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
788
789         for rx in rxs:
790             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
791             self.assertEqual(rx[IPv6].hlim, 19)
792
793         # send a packets that are routed into the tunnel
794         p = (
795             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
796             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
797             / UDP(sport=555, dport=556)
798             / Raw(b"\x00" * 80)
799         )
800
801         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
802
803         for rx in rxs:
804             rx = IPv6(peer_1.decrypt_transport(rx, True))
805
806             # chech the oringial packet is present
807             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
808             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
809
810         # send packets into the tunnel, expect to receive them on
811         # the other side
812         p = [
813             (
814                 peer_1.mk_tunnel_header(self.pg1, True)
815                 / Wireguard(message_type=4, reserved_zero=0)
816                 / WireguardTransport(
817                     receiver_index=peer_1.sender,
818                     counter=ii + 1,
819                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
820                         (
821                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
822                             / UDP(sport=222, dport=223)
823                             / Raw()
824                         )
825                     ),
826                 )
827             )
828             for ii in range(255)
829         ]
830
831         rxs = self.send_and_expect(self.pg1, p, self.pg0)
832
833         for rx in rxs:
834             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
835             self.assertEqual(rx[IPv6].hlim, 19)
836
837         r1.remove_vpp_config()
838         peer_1.remove_vpp_config()
839         wg0.remove_vpp_config()
840
841     def test_wg_peer_v6o4(self):
842         """Test v6o4"""
843
844         port = 12353
845
846         # Create interfaces
847         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
848         wg0.admin_up()
849         wg0.config_ip6()
850
851         peer_1 = VppWgPeer(
852             self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
853         ).add_vpp_config(True)
854         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
855
856         r1 = VppIpRoute(
857             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
858         ).add_vpp_config()
859
860         # route a packet into the wg interface
861         #  use the allowed-ip prefix
862         #  this is dropped because the peer is not initiated
863         p = (
864             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
865             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
866             / UDP(sport=555, dport=556)
867             / Raw()
868         )
869         self.send_and_assert_no_replies(self.pg0, [p])
870         self.assertEqual(
871             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
872         )
873
874         # send a handsake from the peer with an invalid MAC
875         p = peer_1.mk_handshake(self.pg1)
876         p[WireguardInitiation].mac1 = b"foobar"
877         self.send_and_assert_no_replies(self.pg1, [p])
878
879         self.assertEqual(
880             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
881         )
882
883         # send a handsake from the peer but signed by the wrong key.
884         p = peer_1.mk_handshake(
885             self.pg1, False, X25519PrivateKey.generate().public_key()
886         )
887         self.send_and_assert_no_replies(self.pg1, [p])
888         self.assertEqual(
889             self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error)
890         )
891
892         # send a valid handsake init for which we expect a response
893         p = peer_1.mk_handshake(self.pg1)
894
895         rx = self.send_and_expect(self.pg1, [p], self.pg1)
896
897         peer_1.consume_response(rx[0])
898
899         # route a packet into the wg interface
900         #  this is dropped because the peer is still not initiated
901         p = (
902             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
903             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
904             / UDP(sport=555, dport=556)
905             / Raw()
906         )
907         self.send_and_assert_no_replies(self.pg0, [p])
908         self.assertEqual(
909             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
910         )
911
912         # send a data packet from the peer through the tunnel
913         # this completes the handshake
914         p = (
915             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
916             / UDP(sport=222, dport=223)
917             / Raw()
918         )
919         d = peer_1.encrypt_transport(p)
920         p = peer_1.mk_tunnel_header(self.pg1) / (
921             Wireguard(message_type=4, reserved_zero=0)
922             / WireguardTransport(
923                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
924             )
925         )
926         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
927
928         for rx in rxs:
929             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
930             self.assertEqual(rx[IPv6].hlim, 19)
931
932         # send a packets that are routed into the tunnel
933         p = (
934             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
935             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
936             / UDP(sport=555, dport=556)
937             / Raw(b"\x00" * 80)
938         )
939
940         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
941
942         for rx in rxs:
943             rx = IPv6(peer_1.decrypt_transport(rx))
944
945             # chech the oringial packet is present
946             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
947             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
948
949         # send packets into the tunnel, expect to receive them on
950         # the other side
951         p = [
952             (
953                 peer_1.mk_tunnel_header(self.pg1)
954                 / Wireguard(message_type=4, reserved_zero=0)
955                 / WireguardTransport(
956                     receiver_index=peer_1.sender,
957                     counter=ii + 1,
958                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
959                         (
960                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
961                             / UDP(sport=222, dport=223)
962                             / Raw()
963                         )
964                     ),
965                 )
966             )
967             for ii in range(255)
968         ]
969
970         rxs = self.send_and_expect(self.pg1, p, self.pg0)
971
972         for rx in rxs:
973             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
974             self.assertEqual(rx[IPv6].hlim, 19)
975
976         r1.remove_vpp_config()
977         peer_1.remove_vpp_config()
978         wg0.remove_vpp_config()
979
980     def test_wg_peer_v4o6(self):
981         """Test v4o6"""
982
983         port = 12363
984
985         # Create interfaces
986         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
987         wg0.admin_up()
988         wg0.config_ip4()
989
990         peer_1 = VppWgPeer(
991             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
992         ).add_vpp_config()
993         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
994
995         r1 = VppIpRoute(
996             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
997         ).add_vpp_config()
998
999         # route a packet into the wg interface
1000         #  use the allowed-ip prefix
1001         #  this is dropped because the peer is not initiated
1002         p = (
1003             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1004             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1005             / UDP(sport=555, dport=556)
1006             / Raw()
1007         )
1008         self.send_and_assert_no_replies(self.pg0, [p])
1009         self.assertEqual(
1010             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1011         )
1012
1013         # send a handsake from the peer with an invalid MAC
1014         p = peer_1.mk_handshake(self.pg1, True)
1015         p[WireguardInitiation].mac1 = b"foobar"
1016         self.send_and_assert_no_replies(self.pg1, [p])
1017         self.assertEqual(
1018             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1019         )
1020
1021         # send a handsake from the peer but signed by the wrong key.
1022         p = peer_1.mk_handshake(
1023             self.pg1, True, X25519PrivateKey.generate().public_key()
1024         )
1025         self.send_and_assert_no_replies(self.pg1, [p])
1026         self.assertEqual(
1027             self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error)
1028         )
1029
1030         # send a valid handsake init for which we expect a response
1031         p = peer_1.mk_handshake(self.pg1, True)
1032
1033         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1034
1035         peer_1.consume_response(rx[0], True)
1036
1037         # route a packet into the wg interface
1038         #  this is dropped because the peer is still not initiated
1039         p = (
1040             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1041             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1042             / UDP(sport=555, dport=556)
1043             / Raw()
1044         )
1045         self.send_and_assert_no_replies(self.pg0, [p])
1046         self.assertEqual(
1047             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1048         )
1049
1050         # send a data packet from the peer through the tunnel
1051         # this completes the handshake
1052         p = (
1053             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1054             / UDP(sport=222, dport=223)
1055             / Raw()
1056         )
1057         d = peer_1.encrypt_transport(p)
1058         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1059             Wireguard(message_type=4, reserved_zero=0)
1060             / WireguardTransport(
1061                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1062             )
1063         )
1064         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1065
1066         for rx in rxs:
1067             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1068             self.assertEqual(rx[IP].ttl, 19)
1069
1070         # send a packets that are routed into the tunnel
1071         p = (
1072             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1073             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1074             / UDP(sport=555, dport=556)
1075             / Raw(b"\x00" * 80)
1076         )
1077
1078         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1079
1080         for rx in rxs:
1081             rx = IP(peer_1.decrypt_transport(rx, True))
1082
1083             # chech the oringial packet is present
1084             self.assertEqual(rx[IP].dst, p[IP].dst)
1085             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1086
1087         # send packets into the tunnel, expect to receive them on
1088         # the other side
1089         p = [
1090             (
1091                 peer_1.mk_tunnel_header(self.pg1, True)
1092                 / Wireguard(message_type=4, reserved_zero=0)
1093                 / WireguardTransport(
1094                     receiver_index=peer_1.sender,
1095                     counter=ii + 1,
1096                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1097                         (
1098                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1099                             / UDP(sport=222, dport=223)
1100                             / Raw()
1101                         )
1102                     ),
1103                 )
1104             )
1105             for ii in range(255)
1106         ]
1107
1108         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1109
1110         for rx in rxs:
1111             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1112             self.assertEqual(rx[IP].ttl, 19)
1113
1114         r1.remove_vpp_config()
1115         peer_1.remove_vpp_config()
1116         wg0.remove_vpp_config()
1117
1118     def test_wg_multi_peer(self):
1119         """multiple peer setup"""
1120         port = 12373
1121
1122         # Create interfaces
1123         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1124         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1125         wg0.admin_up()
1126         wg1.admin_up()
1127
1128         # Check peer counter
1129         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1130
1131         self.pg_enable_capture(self.pg_interfaces)
1132         self.pg_start()
1133
1134         # Create many peers on sencond interface
1135         NUM_PEERS = 16
1136         self.pg2.generate_remote_hosts(NUM_PEERS)
1137         self.pg2.configure_ipv4_neighbors()
1138         self.pg1.generate_remote_hosts(NUM_PEERS)
1139         self.pg1.configure_ipv4_neighbors()
1140
1141         peers_1 = []
1142         peers_2 = []
1143         routes_1 = []
1144         routes_2 = []
1145         for i in range(NUM_PEERS):
1146             peers_1.append(
1147                 VppWgPeer(
1148                     self,
1149                     wg0,
1150                     self.pg1.remote_hosts[i].ip4,
1151                     port + 1 + i,
1152                     ["10.0.%d.4/32" % i],
1153                 ).add_vpp_config()
1154             )
1155             routes_1.append(
1156                 VppIpRoute(
1157                     self,
1158                     "10.0.%d.4" % i,
1159                     32,
1160                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1161                 ).add_vpp_config()
1162             )
1163
1164             peers_2.append(
1165                 VppWgPeer(
1166                     self,
1167                     wg1,
1168                     self.pg2.remote_hosts[i].ip4,
1169                     port + 100 + i,
1170                     ["10.100.%d.4/32" % i],
1171                 ).add_vpp_config()
1172             )
1173             routes_2.append(
1174                 VppIpRoute(
1175                     self,
1176                     "10.100.%d.4" % i,
1177                     32,
1178                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1179                 ).add_vpp_config()
1180             )
1181
1182         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
1183
1184         self.logger.info(self.vapi.cli("show wireguard peer"))
1185         self.logger.info(self.vapi.cli("show wireguard interface"))
1186         self.logger.info(self.vapi.cli("show adj 37"))
1187         self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
1188         self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
1189
1190         # remove routes
1191         for r in routes_1:
1192             r.remove_vpp_config()
1193         for r in routes_2:
1194             r.remove_vpp_config()
1195
1196         # remove peers
1197         for p in peers_1:
1198             self.assertTrue(p.query_vpp_config())
1199             p.remove_vpp_config()
1200         for p in peers_2:
1201             self.assertTrue(p.query_vpp_config())
1202             p.remove_vpp_config()
1203
1204         wg0.remove_vpp_config()
1205         wg1.remove_vpp_config()
1206
1207     def test_wg_multi_interface(self):
1208         """Multi-tunnel on the same port"""
1209         port = 12500
1210
1211         # Create many wireguard interfaces
1212         NUM_IFS = 4
1213         self.pg1.generate_remote_hosts(NUM_IFS)
1214         self.pg1.configure_ipv4_neighbors()
1215         self.pg0.generate_remote_hosts(NUM_IFS)
1216         self.pg0.configure_ipv4_neighbors()
1217
1218         # Create interfaces with a peer on each
1219         peers = []
1220         routes = []
1221         wg_ifs = []
1222         for i in range(NUM_IFS):
1223             # Use the same port for each interface
1224             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1225             wg0.admin_up()
1226             wg0.config_ip4()
1227             wg_ifs.append(wg0)
1228             peers.append(
1229                 VppWgPeer(
1230                     self,
1231                     wg0,
1232                     self.pg1.remote_hosts[i].ip4,
1233                     port + 1 + i,
1234                     ["10.0.%d.0/24" % i],
1235                 ).add_vpp_config()
1236             )
1237
1238             routes.append(
1239                 VppIpRoute(
1240                     self,
1241                     "10.0.%d.0" % i,
1242                     24,
1243                     [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
1244                 ).add_vpp_config()
1245             )
1246
1247         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
1248
1249         for i in range(NUM_IFS):
1250             # send a valid handsake init for which we expect a response
1251             p = peers[i].mk_handshake(self.pg1)
1252             rx = self.send_and_expect(self.pg1, [p], self.pg1)
1253             peers[i].consume_response(rx[0])
1254
1255             # send a data packet from the peer through the tunnel
1256             # this completes the handshake
1257             p = (
1258                 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
1259                 / UDP(sport=222, dport=223)
1260                 / Raw()
1261             )
1262             d = peers[i].encrypt_transport(p)
1263             p = peers[i].mk_tunnel_header(self.pg1) / (
1264                 Wireguard(message_type=4, reserved_zero=0)
1265                 / WireguardTransport(
1266                     receiver_index=peers[i].sender,
1267                     counter=0,
1268                     encrypted_encapsulated_packet=d,
1269                 )
1270             )
1271             rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1272             for rx in rxs:
1273                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
1274                 self.assertEqual(rx[IP].ttl, 19)
1275
1276         # send a packets that are routed into the tunnel
1277         for i in range(NUM_IFS):
1278             p = (
1279                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1280                 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
1281                 / UDP(sport=555, dport=556)
1282                 / Raw(b"\x00" * 80)
1283             )
1284
1285             rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
1286
1287             for rx in rxs:
1288                 rx = IP(peers[i].decrypt_transport(rx))
1289
1290                 # check the oringial packet is present
1291                 self.assertEqual(rx[IP].dst, p[IP].dst)
1292                 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1293
1294         # send packets into the tunnel
1295         for i in range(NUM_IFS):
1296             p = [
1297                 (
1298                     peers[i].mk_tunnel_header(self.pg1)
1299                     / Wireguard(message_type=4, reserved_zero=0)
1300                     / WireguardTransport(
1301                         receiver_index=peers[i].sender,
1302                         counter=ii + 1,
1303                         encrypted_encapsulated_packet=peers[i].encrypt_transport(
1304                             (
1305                                 IP(
1306                                     src="10.0.%d.4" % i,
1307                                     dst=self.pg0.remote_hosts[i].ip4,
1308                                     ttl=20,
1309                                 )
1310                                 / UDP(sport=222, dport=223)
1311                                 / Raw()
1312                             )
1313                         ),
1314                     )
1315                 )
1316                 for ii in range(64)
1317             ]
1318
1319             rxs = self.send_and_expect(self.pg1, p, self.pg0)
1320
1321             for rx in rxs:
1322                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
1323                 self.assertEqual(rx[IP].ttl, 19)
1324
1325         for r in routes:
1326             r.remove_vpp_config()
1327         for p in peers:
1328             p.remove_vpp_config()
1329         for i in wg_ifs:
1330             i.remove_vpp_config()
1331
1332     def test_wg_event(self):
1333         """Test events"""
1334         port = 12600
1335         ESTABLISHED_FLAG = (
1336             VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
1337         )
1338         DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
1339
1340         # Create interfaces
1341         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1342         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1343         wg0.admin_up()
1344         wg1.admin_up()
1345
1346         # Check peer counter
1347         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1348
1349         self.pg_enable_capture(self.pg_interfaces)
1350         self.pg_start()
1351
1352         # Create peers
1353         NUM_PEERS = 2
1354         self.pg2.generate_remote_hosts(NUM_PEERS)
1355         self.pg2.configure_ipv4_neighbors()
1356         self.pg1.generate_remote_hosts(NUM_PEERS)
1357         self.pg1.configure_ipv4_neighbors()
1358
1359         peers_0 = []
1360         peers_1 = []
1361         routes_0 = []
1362         routes_1 = []
1363         for i in range(NUM_PEERS):
1364             peers_0.append(
1365                 VppWgPeer(
1366                     self,
1367                     wg0,
1368                     self.pg1.remote_hosts[i].ip4,
1369                     port + 1 + i,
1370                     ["10.0.%d.4/32" % i],
1371                 ).add_vpp_config()
1372             )
1373             routes_0.append(
1374                 VppIpRoute(
1375                     self,
1376                     "10.0.%d.4" % i,
1377                     32,
1378                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1379                 ).add_vpp_config()
1380             )
1381
1382             peers_1.append(
1383                 VppWgPeer(
1384                     self,
1385                     wg1,
1386                     self.pg2.remote_hosts[i].ip4,
1387                     port + 100 + i,
1388                     ["10.100.%d.4/32" % i],
1389                 ).add_vpp_config()
1390             )
1391             routes_1.append(
1392                 VppIpRoute(
1393                     self,
1394                     "10.100.%d.4" % i,
1395                     32,
1396                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1397                 ).add_vpp_config()
1398             )
1399
1400         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
1401
1402         # Want events from the first perr of wg0
1403         # and from all wg1 peers
1404         peers_0[0].want_events()
1405         wg1.want_events()
1406
1407         for i in range(NUM_PEERS):
1408             # send a valid handsake init for which we expect a response
1409             p = peers_0[i].mk_handshake(self.pg1)
1410             rx = self.send_and_expect(self.pg1, [p], self.pg1)
1411             peers_0[i].consume_response(rx[0])
1412             if i == 0:
1413                 peers_0[0].wait_event(ESTABLISHED_FLAG)
1414
1415             p = peers_1[i].mk_handshake(self.pg2)
1416             rx = self.send_and_expect(self.pg2, [p], self.pg2)
1417             peers_1[i].consume_response(rx[0])
1418
1419         wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
1420
1421         # remove routes
1422         for r in routes_0:
1423             r.remove_vpp_config()
1424         for r in routes_1:
1425             r.remove_vpp_config()
1426
1427         # remove peers
1428         for i in range(NUM_PEERS):
1429             self.assertTrue(peers_0[i].query_vpp_config())
1430             peers_0[i].remove_vpp_config()
1431             if i == 0:
1432                 peers_0[i].wait_event(0)
1433                 peers_0[i].wait_event(DEAD_FLAG)
1434         for p in peers_1:
1435             self.assertTrue(p.query_vpp_config())
1436             p.remove_vpp_config()
1437             p.wait_event(0)
1438             p.wait_event(DEAD_FLAG)
1439
1440         wg0.remove_vpp_config()
1441         wg1.remove_vpp_config()
1442
1443
1444 class WireguardHandoffTests(TestWg):
1445     """Wireguard Tests in multi worker setup"""
1446
1447     vpp_worker_count = 2
1448
1449     def test_wg_peer_init(self):
1450         """Handoff"""
1451
1452         port = 12383
1453
1454         # Create interfaces
1455         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1456         wg0.admin_up()
1457         wg0.config_ip4()
1458
1459         peer_1 = VppWgPeer(
1460             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
1461         ).add_vpp_config()
1462         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1463
1464         r1 = VppIpRoute(
1465             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1466         ).add_vpp_config()
1467
1468         # send a valid handsake init for which we expect a response
1469         p = peer_1.mk_handshake(self.pg1)
1470
1471         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1472
1473         peer_1.consume_response(rx[0])
1474
1475         # send a data packet from the peer through the tunnel
1476         # this completes the handshake and pins the peer to worker 0
1477         p = (
1478             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1479             / UDP(sport=222, dport=223)
1480             / Raw()
1481         )
1482         d = peer_1.encrypt_transport(p)
1483         p = peer_1.mk_tunnel_header(self.pg1) / (
1484             Wireguard(message_type=4, reserved_zero=0)
1485             / WireguardTransport(
1486                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1487             )
1488         )
1489         rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
1490
1491         for rx in rxs:
1492             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1493             self.assertEqual(rx[IP].ttl, 19)
1494
1495         # send a packets that are routed into the tunnel
1496         # and pins the peer tp worker 1
1497         pe = (
1498             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1499             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1500             / UDP(sport=555, dport=556)
1501             / Raw(b"\x00" * 80)
1502         )
1503         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
1504         peer_1.validate_encapped(rxs, pe)
1505
1506         # send packets into the tunnel, from the other worker
1507         p = [
1508             (
1509                 peer_1.mk_tunnel_header(self.pg1)
1510                 / Wireguard(message_type=4, reserved_zero=0)
1511                 / WireguardTransport(
1512                     receiver_index=peer_1.sender,
1513                     counter=ii + 1,
1514                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1515                         (
1516                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1517                             / UDP(sport=222, dport=223)
1518                             / Raw()
1519                         )
1520                     ),
1521                 )
1522             )
1523             for ii in range(255)
1524         ]
1525
1526         rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
1527
1528         for rx in rxs:
1529             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1530             self.assertEqual(rx[IP].ttl, 19)
1531
1532         # send a packets that are routed into the tunnel
1533         # from owrker 0
1534         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
1535
1536         peer_1.validate_encapped(rxs, pe)
1537
1538         r1.remove_vpp_config()
1539         peer_1.remove_vpp_config()
1540         wg0.remove_vpp_config()
1541
1542     @unittest.skip("test disabled")
1543     def test_wg_multi_interface(self):
1544         """Multi-tunnel on the same port"""