tests docs: update python3 venv packages
[vpp.git] / test / test_wireguard.py
1 #!/usr/bin/env python3
2 """ Wg tests """
3
4 import datetime
5 import base64
6 import os
7
8 from hashlib import blake2s
9 from scapy.packet import Packet
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.layers.vxlan import VXLAN
15 from scapy.contrib.wireguard import (
16     Wireguard,
17     WireguardResponse,
18     WireguardInitiation,
19     WireguardTransport,
20     WireguardCookieReply,
21 )
22 from cryptography.hazmat.primitives.asymmetric.x25519 import (
23     X25519PrivateKey,
24     X25519PublicKey,
25 )
26 from cryptography.hazmat.primitives.serialization import (
27     Encoding,
28     PrivateFormat,
29     PublicFormat,
30     NoEncryption,
31 )
32 from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
33 from cryptography.hazmat.primitives.hmac import HMAC
34 from cryptography.hazmat.backends import default_backend
35 from noise.connection import NoiseConnection, Keypair
36
37 from Crypto.Cipher import ChaCha20_Poly1305
38 from Crypto.Random import get_random_bytes
39
40 from vpp_ipip_tun_interface import VppIpIpTunInterface
41 from vpp_interface import VppInterface
42 from vpp_pg_interface import is_ipv6_misc
43 from vpp_ip_route import VppIpRoute, VppRoutePath
44 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
45 from vpp_vxlan_tunnel import VppVxlanTunnel
46 from vpp_object import VppObject
47 from vpp_papi import VppEnum
48 from framework import is_distro_ubuntu2204, is_distro_debian11, tag_fixme_vpp_debug
49 from framework import VppTestCase
50 from re import compile
51 import unittest
52
53 """ TestWg is a subclass of  VPPTestCase classes.
54
55 Wg test.
56
57 """
58
59
60 def private_key_bytes(k):
61     return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
62
63
64 def public_key_bytes(k):
65     return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
66
67
68 def get_field_bytes(pkt, name):
69     fld, val = pkt.getfield_and_val(name)
70     return fld.i2m(pkt, val)
71
72
73 class VppWgInterface(VppInterface):
74     """
75     VPP WireGuard interface
76     """
77
78     def __init__(self, test, src, port):
79         super(VppWgInterface, self).__init__(test)
80
81         self.port = port
82         self.src = src
83         self.private_key = X25519PrivateKey.generate()
84         self.public_key = self.private_key.public_key()
85
86         # cookie related params
87         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
88
89     def public_key_bytes(self):
90         return public_key_bytes(self.public_key)
91
92     def private_key_bytes(self):
93         return private_key_bytes(self.private_key)
94
95     def add_vpp_config(self):
96         r = self.test.vapi.wireguard_interface_create(
97             interface={
98                 "user_instance": 0xFFFFFFFF,
99                 "port": self.port,
100                 "src_ip": self.src,
101                 "private_key": private_key_bytes(self.private_key),
102                 "generate_key": False,
103             }
104         )
105         self.set_sw_if_index(r.sw_if_index)
106         self.test.registry.register(self, self.test.logger)
107         return self
108
109     def remove_vpp_config(self):
110         self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
111
112     def query_vpp_config(self):
113         ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
114         for t in ts:
115             if (
116                 t.interface.sw_if_index == self._sw_if_index
117                 and str(t.interface.src_ip) == self.src
118                 and t.interface.port == self.port
119                 and t.interface.private_key == private_key_bytes(self.private_key)
120             ):
121                 return True
122         return False
123
124     def want_events(self, peer_index=0xFFFFFFFF):
125         self.test.vapi.want_wireguard_peer_events(
126             enable_disable=1,
127             pid=os.getpid(),
128             sw_if_index=self._sw_if_index,
129             peer_index=peer_index,
130         )
131
132     def wait_events(self, expect, peers, timeout=5):
133         for i in range(len(peers)):
134             rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
135             self.test.assertEqual(rv.peer_index, peers[i])
136             self.test.assertEqual(rv.flags, expect)
137
138     def __str__(self):
139         return self.object_id()
140
141     def object_id(self):
142         return "wireguard-%d" % self._sw_if_index
143
144
145 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
146 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
147
148 HANDSHAKE_COUNTING_INTERVAL = 0.5
149 UNDER_LOAD_INTERVAL = 1.0
150 HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD = 40
151 HANDSHAKE_NUM_BEFORE_RATELIMITING = 5
152
153 HANDSHAKE_JITTER = 0.5
154
155
156 class VppWgPeer(VppObject):
157     def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
158         self._test = test
159         self.itf = itf
160         self.endpoint = endpoint
161         self.port = port
162         self.allowed_ips = allowed_ips
163         self.persistent_keepalive = persistent_keepalive
164
165         # remote peer's public
166         self.private_key = X25519PrivateKey.generate()
167         self.public_key = self.private_key.public_key()
168
169         # cookie related params
170         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
171         self.last_sent_cookie = None
172         self.last_mac1 = None
173         self.last_received_cookie = None
174
175         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
176
177     def change_endpoint(self, endpoint, port):
178         self.endpoint = endpoint
179         self.port = port
180
181     def add_vpp_config(self):
182         rv = self._test.vapi.wireguard_peer_add(
183             peer={
184                 "public_key": self.public_key_bytes(),
185                 "port": self.port,
186                 "endpoint": self.endpoint,
187                 "n_allowed_ips": len(self.allowed_ips),
188                 "allowed_ips": self.allowed_ips,
189                 "sw_if_index": self.itf.sw_if_index,
190                 "persistent_keepalive": self.persistent_keepalive,
191             }
192         )
193         self.index = rv.peer_index
194         self.receiver_index = self.index + 1
195         self._test.registry.register(self, self._test.logger)
196         return self
197
198     def remove_vpp_config(self):
199         self._test.vapi.wireguard_peer_remove(peer_index=self.index)
200
201     def object_id(self):
202         return "wireguard-peer-%s" % self.index
203
204     def public_key_bytes(self):
205         return public_key_bytes(self.public_key)
206
207     def query_vpp_config(self):
208         peers = self._test.vapi.wireguard_peers_dump()
209
210         for p in peers:
211             # "::" endpoint will be returned as "0.0.0.0" in peer's details
212             endpoint = "0.0.0.0" if self.endpoint == "::" else self.endpoint
213             if (
214                 p.peer.public_key == self.public_key_bytes()
215                 and p.peer.port == self.port
216                 and str(p.peer.endpoint) == endpoint
217                 and p.peer.sw_if_index == self.itf.sw_if_index
218                 and len(self.allowed_ips) == p.peer.n_allowed_ips
219             ):
220                 self.allowed_ips.sort()
221                 p.peer.allowed_ips.sort()
222
223                 for a1, a2 in zip(self.allowed_ips, p.peer.allowed_ips):
224                     if str(a1) != str(a2):
225                         return False
226                 return True
227         return False
228
229     def mk_tunnel_header(self, tx_itf, is_ip6=False):
230         if is_ip6 is False:
231             return (
232                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
233                 / IP(src=self.endpoint, dst=self.itf.src)
234                 / UDP(sport=self.port, dport=self.itf.port)
235             )
236         else:
237             return (
238                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
239                 / IPv6(src=self.endpoint, dst=self.itf.src)
240                 / UDP(sport=self.port, dport=self.itf.port)
241             )
242
243     def noise_reset(self):
244         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
245
246     def noise_init(self, public_key=None):
247         self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
248         self.noise.set_psks(psk=bytes(bytearray(32)))
249
250         if not public_key:
251             public_key = self.itf.public_key
252
253         # local/this private
254         self.noise.set_keypair_from_private_bytes(
255             Keypair.STATIC, private_key_bytes(self.private_key)
256         )
257         # remote's public
258         self.noise.set_keypair_from_public_bytes(
259             Keypair.REMOTE_STATIC, public_key_bytes(public_key)
260         )
261
262         self.noise.start_handshake()
263
264     def mk_cookie(self, p, tx_itf, is_resp=False, is_ip6=False):
265         self.verify_header(p, is_ip6)
266
267         wg_pkt = Wireguard(p[Raw])
268
269         if is_resp:
270             self._test.assertEqual(wg_pkt[Wireguard].message_type, 2)
271             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
272             self._test.assertEqual(wg_pkt[WireguardResponse].mac2, bytes([0] * 16))
273         else:
274             self._test.assertEqual(wg_pkt[Wireguard].message_type, 1)
275             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
276             self._test.assertEqual(wg_pkt[WireguardInitiation].mac2, bytes([0] * 16))
277
278         # collect info from wg packet (initiation or response)
279         src = get_field_bytes(p[IPv6 if is_ip6 else IP], "src")
280         sport = p[UDP].sport.to_bytes(2, byteorder="big")
281         if is_resp:
282             mac1 = wg_pkt[WireguardResponse].mac1
283             sender_index = wg_pkt[WireguardResponse].sender_index
284         else:
285             mac1 = wg_pkt[WireguardInitiation].mac1
286             sender_index = wg_pkt[WireguardInitiation].sender_index
287
288         # make cookie reply
289         cookie_reply = Wireguard() / WireguardCookieReply()
290         cookie_reply[Wireguard].message_type = 3
291         cookie_reply[Wireguard].reserved_zero = 0
292         cookie_reply[WireguardCookieReply].receiver_index = sender_index
293         nonce = get_random_bytes(24)
294         cookie_reply[WireguardCookieReply].nonce = nonce
295
296         # generate cookie data
297         changing_secret = get_random_bytes(32)
298         self.last_sent_cookie = blake2s(
299             src + sport, digest_size=16, key=changing_secret
300         ).digest()
301
302         # encrypt cookie data
303         cipher = ChaCha20_Poly1305.new(key=self.cookie_key, nonce=nonce)
304         cipher.update(mac1)
305         ciphertext, tag = cipher.encrypt_and_digest(self.last_sent_cookie)
306         cookie_reply[WireguardCookieReply].encrypted_cookie = ciphertext + tag
307
308         # prepare cookie reply to be sent
309         cookie_reply = self.mk_tunnel_header(tx_itf, is_ip6) / cookie_reply
310
311         return cookie_reply
312
313     def consume_cookie(self, p, is_ip6=False):
314         self.verify_header(p, is_ip6)
315
316         cookie_reply = Wireguard(p[Raw])
317
318         self._test.assertEqual(cookie_reply[Wireguard].message_type, 3)
319         self._test.assertEqual(cookie_reply[Wireguard].reserved_zero, 0)
320         self._test.assertEqual(
321             cookie_reply[WireguardCookieReply].receiver_index, self.receiver_index
322         )
323
324         # collect info from cookie reply
325         nonce = cookie_reply[WireguardCookieReply].nonce
326         encrypted_cookie = cookie_reply[WireguardCookieReply].encrypted_cookie
327         ciphertext, tag = encrypted_cookie[:16], encrypted_cookie[16:]
328
329         # decrypt cookie data
330         cipher = ChaCha20_Poly1305.new(key=self.itf.cookie_key, nonce=nonce)
331         cipher.update(self.last_mac1)
332         self.last_received_cookie = cipher.decrypt_and_verify(ciphertext, tag)
333
334     def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
335         self.noise.set_as_initiator()
336         self.noise_init(public_key)
337
338         p = Wireguard() / WireguardInitiation()
339
340         p[Wireguard].message_type = 1
341         p[Wireguard].reserved_zero = 0
342         p[WireguardInitiation].sender_index = self.receiver_index
343
344         # some random data for the message
345         #  lifted from the noise protocol's wireguard example
346         now = datetime.datetime.now()
347         tai = struct.pack(
348             "!qi",
349             4611686018427387914 + int(now.timestamp()),
350             int(now.microsecond * 1e3),
351         )
352         b = self.noise.write_message(payload=tai)
353
354         # load noise into init message
355         p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
356         p[WireguardInitiation].encrypted_static = b[32:80]
357         p[WireguardInitiation].encrypted_timestamp = b[80:108]
358
359         # generate the mac1 hash
360         mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
361         mac1 = blake2s(bytes(p)[0:116], digest_size=16, key=mac_key).digest()
362         p[WireguardInitiation].mac1 = mac1
363         self.last_mac1 = mac1
364
365         # generate the mac2 hash
366         if self.last_received_cookie:
367             mac2 = blake2s(
368                 bytes(p)[0:132], digest_size=16, key=self.last_received_cookie
369             ).digest()
370             p[WireguardInitiation].mac2 = mac2
371             self.last_received_cookie = None
372         else:
373             p[WireguardInitiation].mac2 = bytearray(16)
374
375         p = self.mk_tunnel_header(tx_itf, is_ip6) / p
376
377         return p
378
379     def verify_header(self, p, is_ip6=False):
380         if is_ip6 is False:
381             self._test.assertEqual(p[IP].src, self.itf.src)
382             self._test.assertEqual(p[IP].dst, self.endpoint)
383             self._test.assert_packet_checksums_valid(p)
384         else:
385             self._test.assertEqual(p[IPv6].src, self.itf.src)
386             self._test.assertEqual(p[IPv6].dst, self.endpoint)
387             self._test.assert_packet_checksums_valid(p, False)
388         self._test.assertEqual(p[UDP].sport, self.itf.port)
389         self._test.assertEqual(p[UDP].dport, self.port)
390
391     def consume_init(self, p, tx_itf, is_ip6=False, is_mac2=False):
392         self.noise.set_as_responder()
393         self.noise_init(self.itf.public_key)
394         self.verify_header(p, is_ip6)
395
396         init = Wireguard(p[Raw])
397
398         self._test.assertEqual(init[Wireguard].message_type, 1)
399         self._test.assertEqual(init[Wireguard].reserved_zero, 0)
400
401         self.sender = init[WireguardInitiation].sender_index
402
403         # validate the mac1 hash
404         mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
405         mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
406         self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
407
408         # validate the mac2 hash
409         if is_mac2:
410             self._test.assertNotEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
411             self._test.assertNotEqual(self.last_sent_cookie, None)
412             mac2 = blake2s(
413                 bytes(init)[0:-16], digest_size=16, key=self.last_sent_cookie
414             ).digest()
415             self._test.assertEqual(init[WireguardInitiation].mac2, mac2)
416             self.last_sent_cookie = None
417         else:
418             self._test.assertEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
419
420         # this passes only unencrypted_ephemeral, encrypted_static,
421         # encrypted_timestamp fields of the init
422         payload = self.noise.read_message(bytes(init)[8:-32])
423
424         # build the response
425         b = self.noise.write_message()
426         mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
427         resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
428             sender_index=self.receiver_index,
429             receiver_index=self.sender,
430             unencrypted_ephemeral=b[0:32],
431             encrypted_nothing=b[32:],
432         )
433         mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
434         resp[WireguardResponse].mac1 = mac1
435         self.last_mac1 = mac1
436
437         resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
438         self._test.assertTrue(self.noise.handshake_finished)
439
440         return resp
441
442     def consume_response(self, p, is_ip6=False):
443         self.verify_header(p, is_ip6)
444
445         resp = Wireguard(p[Raw])
446
447         self._test.assertEqual(resp[Wireguard].message_type, 2)
448         self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
449         self._test.assertEqual(
450             resp[WireguardResponse].receiver_index, self.receiver_index
451         )
452
453         self.sender = resp[Wireguard].sender_index
454
455         payload = self.noise.read_message(bytes(resp)[12:60])
456         self._test.assertEqual(payload, b"")
457         self._test.assertTrue(self.noise.handshake_finished)
458
459     def decrypt_transport(self, p, is_ip6=False):
460         self.verify_header(p, is_ip6)
461
462         p = Wireguard(p[Raw])
463         self._test.assertEqual(p[Wireguard].message_type, 4)
464         self._test.assertEqual(p[Wireguard].reserved_zero, 0)
465         self._test.assertEqual(
466             p[WireguardTransport].receiver_index, self.receiver_index
467         )
468
469         d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
470         return d
471
472     def encrypt_transport(self, p):
473         return self.noise.encrypt(bytes(p))
474
475     def validate_encapped(self, rxs, tx, is_tunnel_ip6=False, is_transport_ip6=False):
476         ret_rxs = []
477         for rx in rxs:
478             rx = self.decrypt_transport(rx, is_tunnel_ip6)
479             if is_transport_ip6 is False:
480                 rx = IP(rx)
481                 # check the original packet is present
482                 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
483                 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
484             else:
485                 rx = IPv6(rx)
486                 # check the original packet is present
487                 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
488                 self._test.assertEqual(rx[IPv6].hlim, tx[IPv6].hlim - 1)
489             ret_rxs.append(rx)
490         return ret_rxs
491
492     def want_events(self):
493         self._test.vapi.want_wireguard_peer_events(
494             enable_disable=1,
495             pid=os.getpid(),
496             peer_index=self.index,
497             sw_if_index=self.itf.sw_if_index,
498         )
499
500     def wait_event(self, expect, timeout=5):
501         rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
502         self._test.assertEqual(rv.flags, expect)
503         self._test.assertEqual(rv.peer_index, self.index)
504
505
506 def is_handshake_init(p):
507     wg_p = Wireguard(p[Raw])
508
509     return wg_p[Wireguard].message_type == 1
510
511
512 class TestWg(VppTestCase):
513     """Wireguard Test Case"""
514
515     error_str = compile(r"Error")
516
517     wg4_output_node_name = "/err/wg4-output-tun/"
518     wg4_input_node_name = "/err/wg4-input/"
519     wg6_output_node_name = "/err/wg6-output-tun/"
520     wg6_input_node_name = "/err/wg6-input/"
521     kp4_error = wg4_output_node_name + "Keypair error"
522     mac4_error = wg4_input_node_name + "Invalid MAC handshake"
523     peer4_in_err = wg4_input_node_name + "Peer error"
524     peer4_out_err = wg4_output_node_name + "Peer error"
525     kp6_error = wg6_output_node_name + "Keypair error"
526     mac6_error = wg6_input_node_name + "Invalid MAC handshake"
527     peer6_in_err = wg6_input_node_name + "Peer error"
528     peer6_out_err = wg6_output_node_name + "Peer error"
529     cookie_dec4_err = wg4_input_node_name + "Failed during Cookie decryption"
530     cookie_dec6_err = wg6_input_node_name + "Failed during Cookie decryption"
531     ratelimited4_err = wg4_input_node_name + "Handshake ratelimited"
532     ratelimited6_err = wg6_input_node_name + "Handshake ratelimited"
533
534     @classmethod
535     def setUpClass(cls):
536         super(TestWg, cls).setUpClass()
537         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
538             cls, "vpp"
539         ):
540             return
541         try:
542             cls.create_pg_interfaces(range(3))
543             for i in cls.pg_interfaces:
544                 i.admin_up()
545                 i.config_ip4()
546                 i.config_ip6()
547                 i.resolve_arp()
548                 i.resolve_ndp()
549
550         except Exception:
551             super(TestWg, cls).tearDownClass()
552             raise
553
554     @classmethod
555     def tearDownClass(cls):
556         super(TestWg, cls).tearDownClass()
557
558     def setUp(self):
559         super(VppTestCase, self).setUp()
560         self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
561         self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
562         self.base_peer4_in_err = self.statistics.get_err_counter(self.peer4_in_err)
563         self.base_peer4_out_err = self.statistics.get_err_counter(self.peer4_out_err)
564         self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
565         self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
566         self.base_peer6_in_err = self.statistics.get_err_counter(self.peer6_in_err)
567         self.base_peer6_out_err = self.statistics.get_err_counter(self.peer6_out_err)
568         self.base_cookie_dec4_err = self.statistics.get_err_counter(
569             self.cookie_dec4_err
570         )
571         self.base_cookie_dec6_err = self.statistics.get_err_counter(
572             self.cookie_dec6_err
573         )
574         self.base_ratelimited4_err = self.statistics.get_err_counter(
575             self.ratelimited4_err
576         )
577         self.base_ratelimited6_err = self.statistics.get_err_counter(
578             self.ratelimited6_err
579         )
580
581     def send_and_assert_no_replies_ignoring_init(
582         self, intf, pkts, remark="", timeout=None
583     ):
584         self.pg_send(intf, pkts)
585
586         def _filter_out_fn(p):
587             return is_ipv6_misc(p) or is_handshake_init(p)
588
589         try:
590             if not timeout:
591                 timeout = 1
592             for i in self.pg_interfaces:
593                 i.assert_nothing_captured(
594                     timeout=timeout, remark=remark, filter_out_fn=_filter_out_fn
595                 )
596                 timeout = 0.1
597         finally:
598             pass
599
600     def test_wg_interface(self):
601         """Simple interface creation"""
602         port = 12312
603
604         # Create interface
605         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
606
607         self.logger.info(self.vapi.cli("sh int"))
608
609         # delete interface
610         wg0.remove_vpp_config()
611
612     def test_handshake_hash(self):
613         """test hashing an init message"""
614         # a init packet generated by linux given the key below
615         h = (
616             "0100000098b9032b"
617             "55cc4b39e73c3d24"
618             "a2a1ab884b524a81"
619             "1808bb86640fb70d"
620             "e93154fec1879125"
621             "ab012624a27f0b75"
622             "c0a2582f438ddb5f"
623             "8e768af40b4ab444"
624             "02f9ff473e1b797e"
625             "80d39d93c5480c82"
626             "a3d4510f70396976"
627             "586fb67300a5167b"
628             "ae6ca3ff3dfd00eb"
629             "59be198810f5aa03"
630             "6abc243d2155ee4f"
631             "2336483900aef801"
632             "08752cd700000000"
633             "0000000000000000"
634             "00000000"
635         )
636
637         b = bytearray.fromhex(h)
638         tgt = Wireguard(b)
639
640         pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
641         pub = X25519PublicKey.from_public_bytes(pubb)
642
643         self.assertEqual(pubb, public_key_bytes(pub))
644
645         # strip the macs and build a new packet
646         init = b[0:-32]
647         mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
648         init += blake2s(init, digest_size=16, key=mac_key).digest()
649         init += b"\x00" * 16
650
651         act = Wireguard(init)
652
653         self.assertEqual(tgt, act)
654
655     def _test_wg_send_cookie_tmpl(self, is_resp, is_ip6):
656         port = 12323
657
658         # create wg interface
659         if is_ip6:
660             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
661             wg0.admin_up()
662             wg0.config_ip6()
663         else:
664             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
665             wg0.admin_up()
666             wg0.config_ip4()
667
668         self.pg_enable_capture(self.pg_interfaces)
669         self.pg_start()
670
671         # create a peer
672         if is_ip6:
673             peer_1 = VppWgPeer(
674                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
675             ).add_vpp_config()
676         else:
677             peer_1 = VppWgPeer(
678                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
679             ).add_vpp_config()
680         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
681
682         if is_resp:
683             # skip the first automatic handshake
684             self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
685
686             # prepare and send a handshake initiation
687             # expect the peer to send a handshake response
688             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
689             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
690         else:
691             # wait for the peer to send a handshake initiation
692             rxs = self.pg1.get_capture(1, timeout=2)
693
694         # prepare and send a wrong cookie reply
695         # expect no replies and the cookie error incremented
696         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
697         cookie.nonce = b"1234567890"
698         self.send_and_assert_no_replies(self.pg1, [cookie], timeout=0.1)
699         if is_ip6:
700             self.assertEqual(
701                 self.base_cookie_dec6_err + 1,
702                 self.statistics.get_err_counter(self.cookie_dec6_err),
703             )
704         else:
705             self.assertEqual(
706                 self.base_cookie_dec4_err + 1,
707                 self.statistics.get_err_counter(self.cookie_dec4_err),
708             )
709
710         # prepare and send a correct cookie reply
711         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
712         self.pg_send(self.pg1, [cookie])
713
714         # wait for the peer to send a handshake initiation with mac2 set
715         rxs = self.pg1.get_capture(1, timeout=6)
716
717         # verify the initiation and its mac2
718         peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6, is_mac2=True)
719
720         # remove configs
721         peer_1.remove_vpp_config()
722         wg0.remove_vpp_config()
723
724     def test_wg_send_cookie_on_init_v4(self):
725         """Send cookie on handshake initiation (v4)"""
726         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=False)
727
728     def test_wg_send_cookie_on_init_v6(self):
729         """Send cookie on handshake initiation (v6)"""
730         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=True)
731
732     def test_wg_send_cookie_on_resp_v4(self):
733         """Send cookie on handshake response (v4)"""
734         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=False)
735
736     def test_wg_send_cookie_on_resp_v6(self):
737         """Send cookie on handshake response (v6)"""
738         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=True)
739
740     def _test_wg_receive_cookie_tmpl(self, is_resp, is_ip6):
741         port = 12323
742
743         # create wg interface
744         if is_ip6:
745             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
746             wg0.admin_up()
747             wg0.config_ip6()
748         else:
749             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
750             wg0.admin_up()
751             wg0.config_ip4()
752
753         self.pg_enable_capture(self.pg_interfaces)
754         self.pg_start()
755
756         # create a peer
757         if is_ip6:
758             peer_1 = VppWgPeer(
759                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
760             ).add_vpp_config()
761         else:
762             peer_1 = VppWgPeer(
763                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
764             ).add_vpp_config()
765         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
766
767         if is_resp:
768             # wait for the peer to send a handshake initiation
769             rxs = self.pg1.get_capture(1, timeout=2)
770             # prepare and send a bunch of handshake responses
771             # expect to switch to under load state
772             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
773             txs = [resp] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
774             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
775             # reset noise to be able to turn into initiator later
776             peer_1.noise_reset()
777         else:
778             # skip the first automatic handshake
779             self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
780
781             # prepare and send a bunch of handshake initiations
782             # expect to switch to under load state
783             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
784             txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
785             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
786
787         # expect the peer to send a cookie reply
788         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
789
790         # prepare and send a handshake initiation with wrong mac2
791         # expect a cookie reply
792         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
793         init.mac2 = b"1234567890"
794         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
795         peer_1.consume_cookie(rxs[0], is_ip6=is_ip6)
796
797         # prepare and send a handshake initiation with correct mac2
798         # expect a handshake response
799         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
800         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
801
802         # verify the response
803         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
804
805         # clear up under load state
806         self.sleep(UNDER_LOAD_INTERVAL)
807
808         # remove configs
809         peer_1.remove_vpp_config()
810         wg0.remove_vpp_config()
811
812     def test_wg_receive_cookie_on_init_v4(self):
813         """Receive cookie on handshake initiation (v4)"""
814         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=False)
815
816     def test_wg_receive_cookie_on_init_v6(self):
817         """Receive cookie on handshake initiation (v6)"""
818         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=True)
819
820     def test_wg_receive_cookie_on_resp_v4(self):
821         """Receive cookie on handshake response (v4)"""
822         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=False)
823
824     def test_wg_receive_cookie_on_resp_v6(self):
825         """Receive cookie on handshake response (v6)"""
826         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=True)
827
828     def test_wg_under_load_interval(self):
829         """Under load interval"""
830         port = 12323
831
832         # create wg interface
833         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
834         wg0.admin_up()
835         wg0.config_ip4()
836
837         self.pg_enable_capture(self.pg_interfaces)
838         self.pg_start()
839
840         # create a peer
841         peer_1 = VppWgPeer(
842             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
843         ).add_vpp_config()
844         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
845
846         # skip the first automatic handshake
847         self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
848
849         # prepare and send a bunch of handshake initiations
850         # expect to switch to under load state
851         init = peer_1.mk_handshake(self.pg1)
852         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
853         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
854
855         # expect the peer to send a cookie reply
856         peer_1.consume_cookie(rxs[-1])
857
858         # sleep till the next counting interval
859         # expect under load state is still active
860         self.sleep(HANDSHAKE_COUNTING_INTERVAL)
861
862         # prepare and send a handshake initiation with wrong mac2
863         # expect a cookie reply
864         init = peer_1.mk_handshake(self.pg1)
865         init.mac2 = b"1234567890"
866         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
867         peer_1.consume_cookie(rxs[0])
868
869         # sleep till the end of being under load
870         # expect under load state is over
871         self.sleep(UNDER_LOAD_INTERVAL - HANDSHAKE_COUNTING_INTERVAL)
872
873         # prepare and send a handshake initiation with wrong mac2
874         # expect a handshake response
875         init = peer_1.mk_handshake(self.pg1)
876         init.mac2 = b"1234567890"
877         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
878
879         # verify the response
880         peer_1.consume_response(rxs[0])
881
882         # remove configs
883         peer_1.remove_vpp_config()
884         wg0.remove_vpp_config()
885
886     def _test_wg_handshake_ratelimiting_tmpl(self, is_ip6):
887         port = 12323
888
889         # create wg interface
890         if is_ip6:
891             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
892             wg0.admin_up()
893             wg0.config_ip6()
894         else:
895             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
896             wg0.admin_up()
897             wg0.config_ip4()
898
899         self.pg_enable_capture(self.pg_interfaces)
900         self.pg_start()
901
902         # create a peer
903         if is_ip6:
904             peer_1 = VppWgPeer(
905                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
906             ).add_vpp_config()
907         else:
908             peer_1 = VppWgPeer(
909                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
910             ).add_vpp_config()
911         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
912
913         # skip the first automatic handshake
914         self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
915
916         # prepare and send a bunch of handshake initiations
917         # expect to switch to under load state
918         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
919         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
920         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
921
922         # expect the peer to send a cookie reply
923         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
924
925         # prepare and send a bunch of handshake initiations with correct mac2
926         # expect a handshake response and then ratelimiting
927         NUM_TO_REJECT = 10
928         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
929         txs = [init] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + NUM_TO_REJECT)
930         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
931
932         if is_ip6:
933             self.assertEqual(
934                 self.base_ratelimited6_err + NUM_TO_REJECT,
935                 self.statistics.get_err_counter(self.ratelimited6_err),
936             )
937         else:
938             self.assertEqual(
939                 self.base_ratelimited4_err + NUM_TO_REJECT,
940                 self.statistics.get_err_counter(self.ratelimited4_err),
941             )
942
943         # verify the response
944         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
945
946         # clear up under load state
947         self.sleep(UNDER_LOAD_INTERVAL)
948
949         # remove configs
950         peer_1.remove_vpp_config()
951         wg0.remove_vpp_config()
952
953     def test_wg_handshake_ratelimiting_v4(self):
954         """Handshake ratelimiting (v4)"""
955         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=False)
956
957     def test_wg_handshake_ratelimiting_v6(self):
958         """Handshake ratelimiting (v6)"""
959         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=True)
960
961     def test_wg_handshake_ratelimiting_multi_peer(self):
962         """Handshake ratelimiting (multiple peer)"""
963         port = 12323
964
965         # create wg interface
966         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
967         wg0.admin_up()
968         wg0.config_ip4()
969
970         self.pg_enable_capture(self.pg_interfaces)
971         self.pg_start()
972
973         # create two peers
974         NUM_PEERS = 2
975         self.pg1.generate_remote_hosts(NUM_PEERS)
976         self.pg1.configure_ipv4_neighbors()
977
978         peer_1 = VppWgPeer(
979             self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
980         ).add_vpp_config()
981         peer_2 = VppWgPeer(
982             self, wg0, self.pg1.remote_hosts[1].ip4, port + 1, ["10.11.4.0/24"]
983         ).add_vpp_config()
984         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 2)
985
986         # skip the first automatic handshake
987         self.pg1.get_capture(NUM_PEERS, timeout=HANDSHAKE_JITTER)
988
989         # (peer_1) prepare and send a bunch of handshake initiations
990         # expect not to switch to under load state
991         init_1 = peer_1.mk_handshake(self.pg1)
992         txs = [init_1] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
993         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
994
995         # (peer_1) expect the peer to send a handshake response
996         peer_1.consume_response(rxs[0])
997         peer_1.noise_reset()
998
999         # (peer_1) send another bunch of handshake initiations
1000         # expect to switch to under load state
1001         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
1002
1003         # (peer_1) expect the peer to send a cookie reply
1004         peer_1.consume_cookie(rxs[-1])
1005
1006         # (peer_2) prepare and send a handshake initiation
1007         # expect a cookie reply
1008         init_2 = peer_2.mk_handshake(self.pg1)
1009         rxs = self.send_and_expect(self.pg1, [init_2], self.pg1)
1010         peer_2.consume_cookie(rxs[0])
1011
1012         # (peer_1) (peer_2) prepare and send a bunch of handshake initiations with correct mac2
1013         # expect a handshake response and then ratelimiting
1014         PEER_1_NUM_TO_REJECT = 2
1015         PEER_2_NUM_TO_REJECT = 5
1016         init_1 = peer_1.mk_handshake(self.pg1)
1017         txs = [init_1] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_1_NUM_TO_REJECT)
1018         init_2 = peer_2.mk_handshake(self.pg1)
1019         txs += [init_2] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_2_NUM_TO_REJECT)
1020         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
1021
1022         self.assertTrue(
1023             self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT
1024             < self.statistics.get_err_counter(self.ratelimited4_err)
1025             <= self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT + PEER_2_NUM_TO_REJECT
1026         )
1027
1028         # (peer_1) (peer_2) verify the response
1029         peer_1.consume_response(rxs[0])
1030         peer_2.consume_response(rxs[1])
1031
1032         # clear up under load state
1033         self.sleep(UNDER_LOAD_INTERVAL)
1034
1035         # remove configs
1036         peer_1.remove_vpp_config()
1037         peer_2.remove_vpp_config()
1038         wg0.remove_vpp_config()
1039
1040     def _test_wg_peer_roaming_on_handshake_tmpl(self, is_endpoint_set, is_resp, is_ip6):
1041         port = 12323
1042
1043         # create wg interface
1044         if is_ip6:
1045             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1046             wg0.admin_up()
1047             wg0.config_ip6()
1048         else:
1049             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1050             wg0.admin_up()
1051             wg0.config_ip4()
1052
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055
1056         # create more remote hosts
1057         NUM_REMOTE_HOSTS = 2
1058         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1059         if is_ip6:
1060             self.pg1.configure_ipv6_neighbors()
1061         else:
1062             self.pg1.configure_ipv4_neighbors()
1063
1064         # create a peer
1065         if is_ip6:
1066             peer_1 = VppWgPeer(
1067                 test=self,
1068                 itf=wg0,
1069                 endpoint=self.pg1.remote_hosts[0].ip6 if is_endpoint_set else "::",
1070                 port=port + 1 if is_endpoint_set else 0,
1071                 allowed_ips=["1::3:0/112"],
1072             ).add_vpp_config()
1073         else:
1074             peer_1 = VppWgPeer(
1075                 test=self,
1076                 itf=wg0,
1077                 endpoint=self.pg1.remote_hosts[0].ip4 if is_endpoint_set else "0.0.0.0",
1078                 port=port + 1 if is_endpoint_set else 0,
1079                 allowed_ips=["10.11.3.0/24"],
1080             ).add_vpp_config()
1081         self.assertTrue(peer_1.query_vpp_config())
1082
1083         if is_resp:
1084             # wait for the peer to send a handshake initiation
1085             rxs = self.pg1.get_capture(1, timeout=2)
1086             # prepare a handshake response
1087             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1088             # change endpoint
1089             if is_ip6:
1090                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1091                 resp[IPv6].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1092             else:
1093                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1094                 resp[IP].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1095             # send the handshake response
1096             # expect a keepalive message sent to the new endpoint
1097             rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1098             # verify the keepalive message
1099             b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1100             self.assertEqual(0, len(b))
1101         else:
1102             # change endpoint
1103             if is_ip6:
1104                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1105             else:
1106                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1107             # prepare and send a handshake initiation
1108             # expect a handshake response sent to the new endpoint
1109             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
1110             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
1111             # verify the response
1112             peer_1.consume_response(rxs[0], is_ip6=is_ip6)
1113         self.assertTrue(peer_1.query_vpp_config())
1114
1115         # remove configs
1116         peer_1.remove_vpp_config()
1117         wg0.remove_vpp_config()
1118
1119     def test_wg_peer_roaming_on_init_v4(self):
1120         """Peer roaming on handshake initiation (v4)"""
1121         self._test_wg_peer_roaming_on_handshake_tmpl(
1122             is_endpoint_set=False, is_resp=False, is_ip6=False
1123         )
1124
1125     def test_wg_peer_roaming_on_init_v6(self):
1126         """Peer roaming on handshake initiation (v6)"""
1127         self._test_wg_peer_roaming_on_handshake_tmpl(
1128             is_endpoint_set=False, is_resp=False, is_ip6=True
1129         )
1130
1131     def test_wg_peer_roaming_on_resp_v4(self):
1132         """Peer roaming on handshake response (v4)"""
1133         self._test_wg_peer_roaming_on_handshake_tmpl(
1134             is_endpoint_set=True, is_resp=True, is_ip6=False
1135         )
1136
1137     def test_wg_peer_roaming_on_resp_v6(self):
1138         """Peer roaming on handshake response (v6)"""
1139         self._test_wg_peer_roaming_on_handshake_tmpl(
1140             is_endpoint_set=True, is_resp=True, is_ip6=True
1141         )
1142
1143     def _test_wg_peer_roaming_on_data_tmpl(self, is_async, is_ip6):
1144         self.vapi.wg_set_async_mode(is_async)
1145         port = 12323
1146
1147         # create wg interface
1148         if is_ip6:
1149             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1150             wg0.admin_up()
1151             wg0.config_ip6()
1152         else:
1153             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1154             wg0.admin_up()
1155             wg0.config_ip4()
1156
1157         self.pg_enable_capture(self.pg_interfaces)
1158         self.pg_start()
1159
1160         # create more remote hosts
1161         NUM_REMOTE_HOSTS = 2
1162         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1163         if is_ip6:
1164             self.pg1.configure_ipv6_neighbors()
1165         else:
1166             self.pg1.configure_ipv4_neighbors()
1167
1168         # create a peer
1169         if is_ip6:
1170             peer_1 = VppWgPeer(
1171                 self, wg0, self.pg1.remote_hosts[0].ip6, port + 1, ["1::3:0/112"]
1172             ).add_vpp_config()
1173         else:
1174             peer_1 = VppWgPeer(
1175                 self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
1176             ).add_vpp_config()
1177         self.assertTrue(peer_1.query_vpp_config())
1178
1179         # create a route to rewrite traffic into the wg interface
1180         if is_ip6:
1181             r1 = VppIpRoute(
1182                 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1183             ).add_vpp_config()
1184         else:
1185             r1 = VppIpRoute(
1186                 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1187             ).add_vpp_config()
1188
1189         # wait for the peer to send a handshake initiation
1190         rxs = self.pg1.get_capture(1, timeout=2)
1191
1192         # prepare and send a handshake response
1193         # expect a keepalive message
1194         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1195         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1196
1197         # verify the keepalive message
1198         b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1199         self.assertEqual(0, len(b))
1200
1201         # change endpoint
1202         if is_ip6:
1203             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1204         else:
1205             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1206
1207         # prepare and send a data packet
1208         # expect endpoint change
1209         if is_ip6:
1210             ip_header = IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1211         else:
1212             ip_header = IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1213         data = (
1214             peer_1.mk_tunnel_header(self.pg1, is_ip6=is_ip6)
1215             / Wireguard(message_type=4, reserved_zero=0)
1216             / WireguardTransport(
1217                 receiver_index=peer_1.sender,
1218                 counter=0,
1219                 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1220                     ip_header / UDP(sport=222, dport=223) / Raw()
1221                 ),
1222             )
1223         )
1224         rxs = self.send_and_expect(self.pg1, [data], self.pg0)
1225         if is_ip6:
1226             self.assertEqual(rxs[0][IPv6].dst, self.pg0.remote_ip6)
1227             self.assertEqual(rxs[0][IPv6].hlim, 19)
1228         else:
1229             self.assertEqual(rxs[0][IP].dst, self.pg0.remote_ip4)
1230             self.assertEqual(rxs[0][IP].ttl, 19)
1231         self.assertTrue(peer_1.query_vpp_config())
1232
1233         # prepare and send a packet that will be rewritten into the wg interface
1234         # expect a data packet sent to the new endpoint
1235         if is_ip6:
1236             ip_header = IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1237         else:
1238             ip_header = IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1239         p = (
1240             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1241             / ip_header
1242             / UDP(sport=555, dport=556)
1243             / Raw()
1244         )
1245         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
1246
1247         # verify the data packet
1248         peer_1.validate_encapped(rxs, p, is_tunnel_ip6=is_ip6, is_transport_ip6=is_ip6)
1249
1250         # remove configs
1251         r1.remove_vpp_config()
1252         peer_1.remove_vpp_config()
1253         wg0.remove_vpp_config()
1254
1255     def test_wg_peer_roaming_on_data_v4_sync(self):
1256         """Peer roaming on data packet (v4, sync)"""
1257         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=False)
1258
1259     def test_wg_peer_roaming_on_data_v6_sync(self):
1260         """Peer roaming on data packet (v6, sync)"""
1261         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=True)
1262
1263     def test_wg_peer_roaming_on_data_v4_async(self):
1264         """Peer roaming on data packet (v4, async)"""
1265         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=False)
1266
1267     def test_wg_peer_roaming_on_data_v6_async(self):
1268         """Peer roaming on data packet (v6, async)"""
1269         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=True)
1270
1271     def test_wg_peer_resp(self):
1272         """Send handshake response IPv4 tunnel"""
1273         port = 12323
1274
1275         # Create interfaces
1276         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1277         wg0.admin_up()
1278         wg0.config_ip4()
1279
1280         self.pg_enable_capture(self.pg_interfaces)
1281         self.pg_start()
1282
1283         peer_1 = VppWgPeer(
1284             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1285         ).add_vpp_config()
1286         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1287
1288         r1 = VppIpRoute(
1289             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1290         ).add_vpp_config()
1291
1292         # wait for the peer to send a handshake
1293         rx = self.pg1.get_capture(1, timeout=2)
1294
1295         # consume the handshake in the noise protocol and
1296         # generate the response
1297         resp = peer_1.consume_init(rx[0], self.pg1)
1298
1299         # send the response, get keepalive
1300         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1301
1302         for rx in rxs:
1303             b = peer_1.decrypt_transport(rx)
1304             self.assertEqual(0, len(b))
1305
1306         # send a packets that are routed into the tunnel
1307         p = (
1308             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1309             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1310             / UDP(sport=555, dport=556)
1311             / Raw(b"\x00" * 80)
1312         )
1313
1314         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1315
1316         peer_1.validate_encapped(rxs, p)
1317
1318         # send packets into the tunnel, expect to receive them on
1319         # the other side
1320         p = [
1321             (
1322                 peer_1.mk_tunnel_header(self.pg1)
1323                 / Wireguard(message_type=4, reserved_zero=0)
1324                 / WireguardTransport(
1325                     receiver_index=peer_1.sender,
1326                     counter=ii,
1327                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1328                         (
1329                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1330                             / UDP(sport=222, dport=223)
1331                             / Raw()
1332                         )
1333                     ),
1334                 )
1335             )
1336             for ii in range(255)
1337         ]
1338
1339         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1340
1341         for rx in rxs:
1342             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1343             self.assertEqual(rx[IP].ttl, 19)
1344
1345         r1.remove_vpp_config()
1346         peer_1.remove_vpp_config()
1347         wg0.remove_vpp_config()
1348
1349     def test_wg_peer_resp_ipv6(self):
1350         """Send handshake response IPv6 tunnel"""
1351         port = 12323
1352
1353         # Create interfaces
1354         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1355         wg0.admin_up()
1356         wg0.config_ip4()
1357
1358         self.pg_enable_capture(self.pg_interfaces)
1359         self.pg_start()
1360
1361         peer_1 = VppWgPeer(
1362             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1363         ).add_vpp_config()
1364         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1365
1366         r1 = VppIpRoute(
1367             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1368         ).add_vpp_config()
1369
1370         # wait for the peer to send a handshake
1371         rx = self.pg1.get_capture(1, timeout=2)
1372
1373         # consume the handshake in the noise protocol and
1374         # generate the response
1375         resp = peer_1.consume_init(rx[0], self.pg1, is_ip6=True)
1376
1377         # send the response, get keepalive
1378         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1379
1380         for rx in rxs:
1381             b = peer_1.decrypt_transport(rx, True)
1382             self.assertEqual(0, len(b))
1383
1384         # send a packets that are routed into the tunnel
1385         p = (
1386             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1387             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1388             / UDP(sport=555, dport=556)
1389             / Raw(b"\x00" * 80)
1390         )
1391
1392         rxs = self.send_and_expect(self.pg0, p * 2, self.pg1)
1393         peer_1.validate_encapped(rxs, p, True)
1394
1395         # send packets into the tunnel, expect to receive them on
1396         # the other side
1397         p = [
1398             (
1399                 peer_1.mk_tunnel_header(self.pg1, True)
1400                 / Wireguard(message_type=4, reserved_zero=0)
1401                 / WireguardTransport(
1402                     receiver_index=peer_1.sender,
1403                     counter=ii,
1404                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1405                         (
1406                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1407                             / UDP(sport=222, dport=223)
1408                             / Raw()
1409                         )
1410                     ),
1411                 )
1412             )
1413             for ii in range(255)
1414         ]
1415
1416         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1417
1418         for rx in rxs:
1419             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1420             self.assertEqual(rx[IP].ttl, 19)
1421
1422         r1.remove_vpp_config()
1423         peer_1.remove_vpp_config()
1424         wg0.remove_vpp_config()
1425
1426     def test_wg_peer_v4o4(self):
1427         """Test v4o4"""
1428
1429         port = 12333
1430
1431         # Create interfaces
1432         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1433         wg0.admin_up()
1434         wg0.config_ip4()
1435
1436         peer_1 = VppWgPeer(
1437             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1438         ).add_vpp_config()
1439         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1440
1441         r1 = VppIpRoute(
1442             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1443         ).add_vpp_config()
1444         r2 = VppIpRoute(
1445             self, "20.22.3.0", 24, [VppRoutePath("20.22.3.1", wg0.sw_if_index)]
1446         ).add_vpp_config()
1447
1448         # route a packet into the wg interface
1449         #  use the allowed-ip prefix
1450         #  this is dropped because the peer is not initiated
1451         p = (
1452             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1453             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1454             / UDP(sport=555, dport=556)
1455             / Raw()
1456         )
1457         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1458         self.assertEqual(
1459             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1460         )
1461
1462         # route a packet into the wg interface
1463         #  use a not allowed-ip prefix
1464         #  this is dropped because there is no matching peer
1465         p = (
1466             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1467             / IP(src=self.pg0.remote_ip4, dst="20.22.3.2")
1468             / UDP(sport=555, dport=556)
1469             / Raw()
1470         )
1471         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1472         self.assertEqual(
1473             self.base_peer4_out_err + 1,
1474             self.statistics.get_err_counter(self.peer4_out_err),
1475         )
1476
1477         # send a handsake from the peer with an invalid MAC
1478         p = peer_1.mk_handshake(self.pg1)
1479         p[WireguardInitiation].mac1 = b"foobar"
1480         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1481         self.assertEqual(
1482             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1483         )
1484
1485         # send a handsake from the peer but signed by the wrong key.
1486         p = peer_1.mk_handshake(
1487             self.pg1, False, X25519PrivateKey.generate().public_key()
1488         )
1489         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1490         self.assertEqual(
1491             self.base_peer4_in_err + 1,
1492             self.statistics.get_err_counter(self.peer4_in_err),
1493         )
1494
1495         # send a valid handsake init for which we expect a response
1496         p = peer_1.mk_handshake(self.pg1)
1497
1498         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1499
1500         peer_1.consume_response(rx[0])
1501
1502         # route a packet into the wg interface
1503         #  this is dropped because the peer is still not initiated
1504         p = (
1505             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1506             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1507             / UDP(sport=555, dport=556)
1508             / Raw()
1509         )
1510         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1511         self.assertEqual(
1512             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1513         )
1514
1515         # send a data packet from the peer through the tunnel
1516         # this completes the handshake
1517         p = (
1518             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1519             / UDP(sport=222, dport=223)
1520             / Raw()
1521         )
1522         d = peer_1.encrypt_transport(p)
1523         p = peer_1.mk_tunnel_header(self.pg1) / (
1524             Wireguard(message_type=4, reserved_zero=0)
1525             / WireguardTransport(
1526                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1527             )
1528         )
1529         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1530
1531         for rx in rxs:
1532             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1533             self.assertEqual(rx[IP].ttl, 19)
1534
1535         # send a packets that are routed into the tunnel
1536         p = (
1537             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1538             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1539             / UDP(sport=555, dport=556)
1540             / Raw(b"\x00" * 80)
1541         )
1542
1543         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1544
1545         for rx in rxs:
1546             rx = IP(peer_1.decrypt_transport(rx))
1547
1548             # check the original packet is present
1549             self.assertEqual(rx[IP].dst, p[IP].dst)
1550             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1551
1552         # send packets into the tunnel, expect to receive them on
1553         # the other side
1554         p = [
1555             (
1556                 peer_1.mk_tunnel_header(self.pg1)
1557                 / Wireguard(message_type=4, reserved_zero=0)
1558                 / WireguardTransport(
1559                     receiver_index=peer_1.sender,
1560                     counter=ii + 1,
1561                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1562                         (
1563                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1564                             / UDP(sport=222, dport=223)
1565                             / Raw()
1566                         )
1567                     ),
1568                 )
1569             )
1570             for ii in range(255)
1571         ]
1572
1573         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1574
1575         for rx in rxs:
1576             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1577             self.assertEqual(rx[IP].ttl, 19)
1578
1579         r1.remove_vpp_config()
1580         r2.remove_vpp_config()
1581         peer_1.remove_vpp_config()
1582         wg0.remove_vpp_config()
1583
1584     def test_wg_peer_v6o6(self):
1585         """Test v6o6"""
1586
1587         port = 12343
1588
1589         # Create interfaces
1590         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1591         wg0.admin_up()
1592         wg0.config_ip6()
1593
1594         peer_1 = VppWgPeer(
1595             self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
1596         ).add_vpp_config()
1597         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1598
1599         r1 = VppIpRoute(
1600             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1601         ).add_vpp_config()
1602         r2 = VppIpRoute(
1603             self, "22::3:0", 112, [VppRoutePath("22::3:1", wg0.sw_if_index)]
1604         ).add_vpp_config()
1605
1606         # route a packet into the wg interface
1607         #  use the allowed-ip prefix
1608         #  this is dropped because the peer is not initiated
1609
1610         p = (
1611             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1612             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1613             / UDP(sport=555, dport=556)
1614             / Raw()
1615         )
1616         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1617
1618         self.assertEqual(
1619             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1620         )
1621
1622         # route a packet into the wg interface
1623         #  use a not allowed-ip prefix
1624         #  this is dropped because there is no matching peer
1625         p = (
1626             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1627             / IPv6(src=self.pg0.remote_ip6, dst="22::3:2")
1628             / UDP(sport=555, dport=556)
1629             / Raw()
1630         )
1631         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1632         self.assertEqual(
1633             self.base_peer6_out_err + 1,
1634             self.statistics.get_err_counter(self.peer6_out_err),
1635         )
1636
1637         # send a handsake from the peer with an invalid MAC
1638         p = peer_1.mk_handshake(self.pg1, True)
1639         p[WireguardInitiation].mac1 = b"foobar"
1640         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1641
1642         self.assertEqual(
1643             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1644         )
1645
1646         # send a handsake from the peer but signed by the wrong key.
1647         p = peer_1.mk_handshake(
1648             self.pg1, True, X25519PrivateKey.generate().public_key()
1649         )
1650         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1651         self.assertEqual(
1652             self.base_peer6_in_err + 1,
1653             self.statistics.get_err_counter(self.peer6_in_err),
1654         )
1655
1656         # send a valid handsake init for which we expect a response
1657         p = peer_1.mk_handshake(self.pg1, True)
1658
1659         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1660
1661         peer_1.consume_response(rx[0], True)
1662
1663         # route a packet into the wg interface
1664         #  this is dropped because the peer is still not initiated
1665         p = (
1666             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1667             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1668             / UDP(sport=555, dport=556)
1669             / Raw()
1670         )
1671         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1672         self.assertEqual(
1673             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1674         )
1675
1676         # send a data packet from the peer through the tunnel
1677         # this completes the handshake
1678         p = (
1679             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1680             / UDP(sport=222, dport=223)
1681             / Raw()
1682         )
1683         d = peer_1.encrypt_transport(p)
1684         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1685             Wireguard(message_type=4, reserved_zero=0)
1686             / WireguardTransport(
1687                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1688             )
1689         )
1690         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1691
1692         for rx in rxs:
1693             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1694             self.assertEqual(rx[IPv6].hlim, 19)
1695
1696         # send a packets that are routed into the tunnel
1697         p = (
1698             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1699             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1700             / UDP(sport=555, dport=556)
1701             / Raw(b"\x00" * 80)
1702         )
1703
1704         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1705
1706         for rx in rxs:
1707             rx = IPv6(peer_1.decrypt_transport(rx, True))
1708
1709             # check the original packet is present
1710             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1711             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1712
1713         # send packets into the tunnel, expect to receive them on
1714         # the other side
1715         p = [
1716             (
1717                 peer_1.mk_tunnel_header(self.pg1, True)
1718                 / Wireguard(message_type=4, reserved_zero=0)
1719                 / WireguardTransport(
1720                     receiver_index=peer_1.sender,
1721                     counter=ii + 1,
1722                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1723                         (
1724                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1725                             / UDP(sport=222, dport=223)
1726                             / Raw()
1727                         )
1728                     ),
1729                 )
1730             )
1731             for ii in range(255)
1732         ]
1733
1734         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1735
1736         for rx in rxs:
1737             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1738             self.assertEqual(rx[IPv6].hlim, 19)
1739
1740         r1.remove_vpp_config()
1741         r2.remove_vpp_config()
1742         peer_1.remove_vpp_config()
1743         wg0.remove_vpp_config()
1744
1745     def test_wg_peer_v6o4(self):
1746         """Test v6o4"""
1747
1748         port = 12353
1749
1750         # Create interfaces
1751         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1752         wg0.admin_up()
1753         wg0.config_ip6()
1754
1755         peer_1 = VppWgPeer(
1756             self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
1757         ).add_vpp_config()
1758         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1759
1760         r1 = VppIpRoute(
1761             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1762         ).add_vpp_config()
1763
1764         # route a packet into the wg interface
1765         #  use the allowed-ip prefix
1766         #  this is dropped because the peer is not initiated
1767         p = (
1768             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1769             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1770             / UDP(sport=555, dport=556)
1771             / Raw()
1772         )
1773         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1774         self.assertEqual(
1775             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1776         )
1777
1778         # send a handsake from the peer with an invalid MAC
1779         p = peer_1.mk_handshake(self.pg1)
1780         p[WireguardInitiation].mac1 = b"foobar"
1781         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1782
1783         self.assertEqual(
1784             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1785         )
1786
1787         # send a handsake from the peer but signed by the wrong key.
1788         p = peer_1.mk_handshake(
1789             self.pg1, False, X25519PrivateKey.generate().public_key()
1790         )
1791         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1792         self.assertEqual(
1793             self.base_peer4_in_err + 1,
1794             self.statistics.get_err_counter(self.peer4_in_err),
1795         )
1796
1797         # send a valid handsake init for which we expect a response
1798         p = peer_1.mk_handshake(self.pg1)
1799
1800         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1801
1802         peer_1.consume_response(rx[0])
1803
1804         # route a packet into the wg interface
1805         #  this is dropped because the peer is still not initiated
1806         p = (
1807             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1808             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1809             / UDP(sport=555, dport=556)
1810             / Raw()
1811         )
1812         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1813         self.assertEqual(
1814             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1815         )
1816
1817         # send a data packet from the peer through the tunnel
1818         # this completes the handshake
1819         p = (
1820             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1821             / UDP(sport=222, dport=223)
1822             / Raw()
1823         )
1824         d = peer_1.encrypt_transport(p)
1825         p = peer_1.mk_tunnel_header(self.pg1) / (
1826             Wireguard(message_type=4, reserved_zero=0)
1827             / WireguardTransport(
1828                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1829             )
1830         )
1831         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1832
1833         for rx in rxs:
1834             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1835             self.assertEqual(rx[IPv6].hlim, 19)
1836
1837         # send a packets that are routed into the tunnel
1838         p = (
1839             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1840             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1841             / UDP(sport=555, dport=556)
1842             / Raw(b"\x00" * 80)
1843         )
1844
1845         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1846
1847         for rx in rxs:
1848             rx = IPv6(peer_1.decrypt_transport(rx))
1849
1850             # check the original packet is present
1851             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1852             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1853
1854         # send packets into the tunnel, expect to receive them on
1855         # the other side
1856         p = [
1857             (
1858                 peer_1.mk_tunnel_header(self.pg1)
1859                 / Wireguard(message_type=4, reserved_zero=0)
1860                 / WireguardTransport(
1861                     receiver_index=peer_1.sender,
1862                     counter=ii + 1,
1863                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1864                         (
1865                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1866                             / UDP(sport=222, dport=223)
1867                             / Raw()
1868                         )
1869                     ),
1870                 )
1871             )
1872             for ii in range(255)
1873         ]
1874
1875         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1876
1877         for rx in rxs:
1878             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1879             self.assertEqual(rx[IPv6].hlim, 19)
1880
1881         r1.remove_vpp_config()
1882         peer_1.remove_vpp_config()
1883         wg0.remove_vpp_config()
1884
1885     def test_wg_peer_v4o6(self):
1886         """Test v4o6"""
1887
1888         port = 12363
1889
1890         # Create interfaces
1891         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1892         wg0.admin_up()
1893         wg0.config_ip4()
1894
1895         peer_1 = VppWgPeer(
1896             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1897         ).add_vpp_config()
1898         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1899
1900         r1 = VppIpRoute(
1901             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1902         ).add_vpp_config()
1903
1904         # route a packet into the wg interface
1905         #  use the allowed-ip prefix
1906         #  this is dropped because the peer is not initiated
1907         p = (
1908             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1909             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1910             / UDP(sport=555, dport=556)
1911             / Raw()
1912         )
1913         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1914         self.assertEqual(
1915             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1916         )
1917
1918         # send a handsake from the peer with an invalid MAC
1919         p = peer_1.mk_handshake(self.pg1, True)
1920         p[WireguardInitiation].mac1 = b"foobar"
1921         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1922         self.assertEqual(
1923             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1924         )
1925
1926         # send a handsake from the peer but signed by the wrong key.
1927         p = peer_1.mk_handshake(
1928             self.pg1, True, X25519PrivateKey.generate().public_key()
1929         )
1930         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1931         self.assertEqual(
1932             self.base_peer6_in_err + 1,
1933             self.statistics.get_err_counter(self.peer6_in_err),
1934         )
1935
1936         # send a valid handsake init for which we expect a response
1937         p = peer_1.mk_handshake(self.pg1, True)
1938
1939         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1940
1941         peer_1.consume_response(rx[0], True)
1942
1943         # route a packet into the wg interface
1944         #  this is dropped because the peer is still not initiated
1945         p = (
1946             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1947             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1948             / UDP(sport=555, dport=556)
1949             / Raw()
1950         )
1951         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1952         self.assertEqual(
1953             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1954         )
1955
1956         # send a data packet from the peer through the tunnel
1957         # this completes the handshake
1958         p = (
1959             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1960             / UDP(sport=222, dport=223)
1961             / Raw()
1962         )
1963         d = peer_1.encrypt_transport(p)
1964         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1965             Wireguard(message_type=4, reserved_zero=0)
1966             / WireguardTransport(
1967                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1968             )
1969         )
1970         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1971
1972         for rx in rxs:
1973             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1974             self.assertEqual(rx[IP].ttl, 19)
1975
1976         # send a packets that are routed into the tunnel
1977         p = (
1978             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1979             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1980             / UDP(sport=555, dport=556)
1981             / Raw(b"\x00" * 80)
1982         )
1983
1984         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1985
1986         for rx in rxs:
1987             rx = IP(peer_1.decrypt_transport(rx, True))
1988
1989             # check the original packet is present
1990             self.assertEqual(rx[IP].dst, p[IP].dst)
1991             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1992
1993         # send packets into the tunnel, expect to receive them on
1994         # the other side
1995         p = [
1996             (
1997                 peer_1.mk_tunnel_header(self.pg1, True)
1998                 / Wireguard(message_type=4, reserved_zero=0)
1999                 / WireguardTransport(
2000                     receiver_index=peer_1.sender,
2001                     counter=ii + 1,
2002                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2003                         (
2004                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2005                             / UDP(sport=222, dport=223)
2006                             / Raw()
2007                         )
2008                     ),
2009                 )
2010             )
2011             for ii in range(255)
2012         ]
2013
2014         rxs = self.send_and_expect(self.pg1, p, self.pg0)
2015
2016         for rx in rxs:
2017             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2018             self.assertEqual(rx[IP].ttl, 19)
2019
2020         r1.remove_vpp_config()
2021         peer_1.remove_vpp_config()
2022         wg0.remove_vpp_config()
2023
2024     def test_wg_multi_peer(self):
2025         """multiple peer setup"""
2026         port = 12373
2027
2028         # Create interfaces
2029         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2030         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2031         wg0.admin_up()
2032         wg1.admin_up()
2033
2034         # Check peer counter
2035         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2036
2037         self.pg_enable_capture(self.pg_interfaces)
2038         self.pg_start()
2039
2040         # Create many peers on sencond interface
2041         NUM_PEERS = 16
2042         self.pg2.generate_remote_hosts(NUM_PEERS)
2043         self.pg2.configure_ipv4_neighbors()
2044         self.pg1.generate_remote_hosts(NUM_PEERS)
2045         self.pg1.configure_ipv4_neighbors()
2046
2047         peers_1 = []
2048         peers_2 = []
2049         routes_1 = []
2050         routes_2 = []
2051         for i in range(NUM_PEERS):
2052             peers_1.append(
2053                 VppWgPeer(
2054                     self,
2055                     wg0,
2056                     self.pg1.remote_hosts[i].ip4,
2057                     port + 1 + i,
2058                     ["10.0.%d.4/32" % i],
2059                 ).add_vpp_config()
2060             )
2061             routes_1.append(
2062                 VppIpRoute(
2063                     self,
2064                     "10.0.%d.4" % i,
2065                     32,
2066                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2067                 ).add_vpp_config()
2068             )
2069
2070             peers_2.append(
2071                 VppWgPeer(
2072                     self,
2073                     wg1,
2074                     self.pg2.remote_hosts[i].ip4,
2075                     port + 100 + i,
2076                     ["10.100.%d.4/32" % i],
2077                 ).add_vpp_config()
2078             )
2079             routes_2.append(
2080                 VppIpRoute(
2081                     self,
2082                     "10.100.%d.4" % i,
2083                     32,
2084                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2085                 ).add_vpp_config()
2086             )
2087
2088         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2089
2090         self.logger.info(self.vapi.cli("show wireguard peer"))
2091         self.logger.info(self.vapi.cli("show wireguard interface"))
2092         self.logger.info(self.vapi.cli("show adj 37"))
2093         self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
2094         self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
2095
2096         # remove routes
2097         for r in routes_1:
2098             r.remove_vpp_config()
2099         for r in routes_2:
2100             r.remove_vpp_config()
2101
2102         # remove peers
2103         for p in peers_1:
2104             self.assertTrue(p.query_vpp_config())
2105             p.remove_vpp_config()
2106         for p in peers_2:
2107             self.assertTrue(p.query_vpp_config())
2108             p.remove_vpp_config()
2109
2110         wg0.remove_vpp_config()
2111         wg1.remove_vpp_config()
2112
2113     def test_wg_multi_interface(self):
2114         """Multi-tunnel on the same port"""
2115         port = 12500
2116
2117         # Create many wireguard interfaces
2118         NUM_IFS = 4
2119         self.pg1.generate_remote_hosts(NUM_IFS)
2120         self.pg1.configure_ipv4_neighbors()
2121         self.pg0.generate_remote_hosts(NUM_IFS)
2122         self.pg0.configure_ipv4_neighbors()
2123
2124         self.pg_enable_capture(self.pg_interfaces)
2125         self.pg_start()
2126
2127         # Create interfaces with a peer on each
2128         peers = []
2129         routes = []
2130         wg_ifs = []
2131         for i in range(NUM_IFS):
2132             # Use the same port for each interface
2133             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2134             wg0.admin_up()
2135             wg0.config_ip4()
2136             wg_ifs.append(wg0)
2137             peers.append(
2138                 VppWgPeer(
2139                     self,
2140                     wg0,
2141                     self.pg1.remote_hosts[i].ip4,
2142                     port + 1 + i,
2143                     ["10.0.%d.0/24" % i],
2144                 ).add_vpp_config()
2145             )
2146
2147             routes.append(
2148                 VppIpRoute(
2149                     self,
2150                     "10.0.%d.0" % i,
2151                     24,
2152                     [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
2153                 ).add_vpp_config()
2154             )
2155
2156         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
2157
2158         # skip the first automatic handshake
2159         self.pg1.get_capture(NUM_IFS, timeout=HANDSHAKE_JITTER)
2160
2161         for i in range(NUM_IFS):
2162             # send a valid handsake init for which we expect a response
2163             p = peers[i].mk_handshake(self.pg1)
2164             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2165             peers[i].consume_response(rx[0])
2166
2167             # send a data packet from the peer through the tunnel
2168             # this completes the handshake
2169             p = (
2170                 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
2171                 / UDP(sport=222, dport=223)
2172                 / Raw()
2173             )
2174             d = peers[i].encrypt_transport(p)
2175             p = peers[i].mk_tunnel_header(self.pg1) / (
2176                 Wireguard(message_type=4, reserved_zero=0)
2177                 / WireguardTransport(
2178                     receiver_index=peers[i].sender,
2179                     counter=0,
2180                     encrypted_encapsulated_packet=d,
2181                 )
2182             )
2183             rxs = self.send_and_expect(self.pg1, [p], self.pg0)
2184             for rx in rxs:
2185                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2186                 self.assertEqual(rx[IP].ttl, 19)
2187
2188         # send a packets that are routed into the tunnel
2189         for i in range(NUM_IFS):
2190             p = (
2191                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2192                 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
2193                 / UDP(sport=555, dport=556)
2194                 / Raw(b"\x00" * 80)
2195             )
2196
2197             rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
2198
2199             for rx in rxs:
2200                 rx = IP(peers[i].decrypt_transport(rx))
2201
2202                 # check the oringial packet is present
2203                 self.assertEqual(rx[IP].dst, p[IP].dst)
2204                 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
2205
2206         # send packets into the tunnel
2207         for i in range(NUM_IFS):
2208             p = [
2209                 (
2210                     peers[i].mk_tunnel_header(self.pg1)
2211                     / Wireguard(message_type=4, reserved_zero=0)
2212                     / WireguardTransport(
2213                         receiver_index=peers[i].sender,
2214                         counter=ii + 1,
2215                         encrypted_encapsulated_packet=peers[i].encrypt_transport(
2216                             (
2217                                 IP(
2218                                     src="10.0.%d.4" % i,
2219                                     dst=self.pg0.remote_hosts[i].ip4,
2220                                     ttl=20,
2221                                 )
2222                                 / UDP(sport=222, dport=223)
2223                                 / Raw()
2224                             )
2225                         ),
2226                     )
2227                 )
2228                 for ii in range(64)
2229             ]
2230
2231             rxs = self.send_and_expect(self.pg1, p, self.pg0)
2232
2233             for rx in rxs:
2234                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2235                 self.assertEqual(rx[IP].ttl, 19)
2236
2237         for r in routes:
2238             r.remove_vpp_config()
2239         for p in peers:
2240             p.remove_vpp_config()
2241         for i in wg_ifs:
2242             i.remove_vpp_config()
2243
2244     def test_wg_event(self):
2245         """Test events"""
2246         port = 12600
2247         ESTABLISHED_FLAG = (
2248             VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
2249         )
2250         DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
2251
2252         # Create interfaces
2253         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2254         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2255         wg0.admin_up()
2256         wg1.admin_up()
2257
2258         # Check peer counter
2259         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2260
2261         self.pg_enable_capture(self.pg_interfaces)
2262         self.pg_start()
2263
2264         # Create peers
2265         NUM_PEERS = 2
2266         self.pg2.generate_remote_hosts(NUM_PEERS)
2267         self.pg2.configure_ipv4_neighbors()
2268         self.pg1.generate_remote_hosts(NUM_PEERS)
2269         self.pg1.configure_ipv4_neighbors()
2270
2271         peers_0 = []
2272         peers_1 = []
2273         routes_0 = []
2274         routes_1 = []
2275         for i in range(NUM_PEERS):
2276             peers_0.append(
2277                 VppWgPeer(
2278                     self,
2279                     wg0,
2280                     self.pg1.remote_hosts[i].ip4,
2281                     port + 1 + i,
2282                     ["10.0.%d.4/32" % i],
2283                 ).add_vpp_config()
2284             )
2285             routes_0.append(
2286                 VppIpRoute(
2287                     self,
2288                     "10.0.%d.4" % i,
2289                     32,
2290                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2291                 ).add_vpp_config()
2292             )
2293
2294             peers_1.append(
2295                 VppWgPeer(
2296                     self,
2297                     wg1,
2298                     self.pg2.remote_hosts[i].ip4,
2299                     port + 100 + i,
2300                     ["10.100.%d.4/32" % i],
2301                 ).add_vpp_config()
2302             )
2303             routes_1.append(
2304                 VppIpRoute(
2305                     self,
2306                     "10.100.%d.4" % i,
2307                     32,
2308                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2309                 ).add_vpp_config()
2310             )
2311
2312         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2313
2314         # skip the first automatic handshake
2315         self.pg1.get_capture(NUM_PEERS, timeout=HANDSHAKE_JITTER)
2316         self.pg2.get_capture(NUM_PEERS, timeout=HANDSHAKE_JITTER)
2317
2318         # Want events from the first perr of wg0
2319         # and from all wg1 peers
2320         peers_0[0].want_events()
2321         wg1.want_events()
2322
2323         for i in range(NUM_PEERS):
2324             # wg0 peers: send a valid handsake init for which we expect a response
2325             p = peers_0[i].mk_handshake(self.pg1)
2326             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2327             peers_0[i].consume_response(rx[0])
2328
2329             # wg0 peers: send empty packet, it means successful connection (WIREGUARD_PEER_ESTABLISHED)
2330             keepalive = peers_0[i].encrypt_transport(0)
2331             p = peers_0[i].mk_tunnel_header(self.pg1) / (
2332                 Wireguard(message_type=4, reserved_zero=0)
2333                 / WireguardTransport(
2334                     receiver_index=peers_0[i].sender,
2335                     counter=0,
2336                     encrypted_encapsulated_packet=keepalive,
2337                 )
2338             )
2339             self.send_and_assert_no_replies(self.pg1, [p])
2340
2341             # wg0 peers: wait for established flag
2342             if i == 0:
2343                 peers_0[0].wait_event(ESTABLISHED_FLAG)
2344
2345             # wg1 peers: send a valid handsake init for which we expect a response
2346             p = peers_1[i].mk_handshake(self.pg2)
2347             rx = self.send_and_expect(self.pg2, [p], self.pg2)
2348             peers_1[i].consume_response(rx[0])
2349
2350             # wg1 peers: send empty packet, it means successful connection (WIREGUARD_PEER_ESTABLISHED)
2351             keepalive = peers_1[i].encrypt_transport(0)
2352             p = peers_1[i].mk_tunnel_header(self.pg2) / (
2353                 Wireguard(message_type=4, reserved_zero=0)
2354                 / WireguardTransport(
2355                     receiver_index=peers_1[i].sender,
2356                     counter=0,
2357                     encrypted_encapsulated_packet=keepalive,
2358                 )
2359             )
2360             self.send_and_assert_no_replies(self.pg2, [p])
2361
2362         # wg1 peers: wait for established flag
2363         wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
2364
2365         # remove routes
2366         for r in routes_0:
2367             r.remove_vpp_config()
2368         for r in routes_1:
2369             r.remove_vpp_config()
2370
2371         # remove peers
2372         for i in range(NUM_PEERS):
2373             self.assertTrue(peers_0[i].query_vpp_config())
2374             peers_0[i].remove_vpp_config()
2375             if i == 0:
2376                 peers_0[i].wait_event(0)
2377                 peers_0[i].wait_event(DEAD_FLAG)
2378         for p in peers_1:
2379             self.assertTrue(p.query_vpp_config())
2380             p.remove_vpp_config()
2381             p.wait_event(0)
2382             p.wait_event(DEAD_FLAG)
2383
2384         wg0.remove_vpp_config()
2385         wg1.remove_vpp_config()
2386
2387     def test_wg_sending_handshake_when_admin_down(self):
2388         """Sending handshake when admin down"""
2389         port = 12323
2390
2391         # create wg interface
2392         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2393         wg0.config_ip4()
2394
2395         # create a peer
2396         peer_1 = VppWgPeer(
2397             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2398         ).add_vpp_config()
2399         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2400
2401         self.pg_enable_capture(self.pg_interfaces)
2402         self.pg_start()
2403
2404         # wait for the peer to send a handshake initiation
2405         # expect no handshakes
2406         for i in range(2):
2407             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2408
2409         self.pg_enable_capture(self.pg_interfaces)
2410         self.pg_start()
2411
2412         # administratively enable the wg interface
2413         # expect the peer to send a handshake initiation
2414         wg0.admin_up()
2415         rxs = self.pg1.get_capture(1, timeout=2)
2416         peer_1.consume_init(rxs[0], self.pg1)
2417
2418         self.pg_enable_capture(self.pg_interfaces)
2419         self.pg_start()
2420
2421         # administratively disable the wg interface
2422         # expect no handshakes
2423         wg0.admin_down()
2424         for i in range(6):
2425             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2426
2427         # remove configs
2428         peer_1.remove_vpp_config()
2429         wg0.remove_vpp_config()
2430
2431     def test_wg_sending_data_when_admin_down(self):
2432         """Sending data when admin down"""
2433         port = 12323
2434
2435         # create wg interface
2436         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2437         wg0.admin_up()
2438         wg0.config_ip4()
2439
2440         self.pg_enable_capture(self.pg_interfaces)
2441         self.pg_start()
2442
2443         # create a peer
2444         peer_1 = VppWgPeer(
2445             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2446         ).add_vpp_config()
2447         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2448
2449         # create a route to rewrite traffic into the wg interface
2450         r1 = VppIpRoute(
2451             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2452         ).add_vpp_config()
2453
2454         # wait for the peer to send a handshake initiation
2455         rxs = self.pg1.get_capture(1, timeout=2)
2456
2457         # prepare and send a handshake response
2458         # expect a keepalive message
2459         resp = peer_1.consume_init(rxs[0], self.pg1)
2460         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2461
2462         # verify the keepalive message
2463         b = peer_1.decrypt_transport(rxs[0])
2464         self.assertEqual(0, len(b))
2465
2466         # prepare and send a packet that will be rewritten into the wg interface
2467         # expect a data packet sent
2468         p = (
2469             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2470             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2471             / UDP(sport=555, dport=556)
2472             / Raw()
2473         )
2474         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2475
2476         # verify the data packet
2477         peer_1.validate_encapped(rxs, p)
2478
2479         # administratively disable the wg interface
2480         wg0.admin_down()
2481
2482         # send a packet that will be rewritten into the wg interface
2483         # expect no data packets sent
2484         self.send_and_assert_no_replies(self.pg0, [p])
2485
2486         # administratively enable the wg interface
2487         # expect the peer to send a handshake initiation
2488         wg0.admin_up()
2489         peer_1.noise_reset()
2490         rxs = self.pg1.get_capture(1, timeout=2)
2491         resp = peer_1.consume_init(rxs[0], self.pg1)
2492
2493         # send a packet that will be rewritten into the wg interface
2494         # expect no data packets sent because the peer is not initiated
2495         self.send_and_assert_no_replies(self.pg0, [p])
2496         self.assertEqual(
2497             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
2498         )
2499
2500         # send a handshake response and expect a keepalive message
2501         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2502
2503         # verify the keepalive message
2504         b = peer_1.decrypt_transport(rxs[0])
2505         self.assertEqual(0, len(b))
2506
2507         # send a packet that will be rewritten into the wg interface
2508         # expect a data packet sent
2509         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2510
2511         # verify the data packet
2512         peer_1.validate_encapped(rxs, p)
2513
2514         # remove configs
2515         r1.remove_vpp_config()
2516         peer_1.remove_vpp_config()
2517         wg0.remove_vpp_config()
2518
2519     def _test_wg_large_packet_tmpl(self, is_async, is_ip6):
2520         self.vapi.wg_set_async_mode(is_async)
2521         port = 12323
2522
2523         # create wg interface
2524         if is_ip6:
2525             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
2526             wg0.admin_up()
2527             wg0.config_ip6()
2528         else:
2529             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2530             wg0.admin_up()
2531             wg0.config_ip4()
2532
2533         self.pg_enable_capture(self.pg_interfaces)
2534         self.pg_start()
2535
2536         # create a peer
2537         if is_ip6:
2538             peer_1 = VppWgPeer(
2539                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
2540             ).add_vpp_config()
2541         else:
2542             peer_1 = VppWgPeer(
2543                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2544             ).add_vpp_config()
2545         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2546
2547         # create a route to rewrite traffic into the wg interface
2548         if is_ip6:
2549             r1 = VppIpRoute(
2550                 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
2551             ).add_vpp_config()
2552         else:
2553             r1 = VppIpRoute(
2554                 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2555             ).add_vpp_config()
2556
2557         # wait for the peer to send a handshake initiation
2558         rxs = self.pg1.get_capture(1, timeout=2)
2559
2560         # prepare and send a handshake response
2561         # expect a keepalive message
2562         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
2563         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2564
2565         # verify the keepalive message
2566         b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
2567         self.assertEqual(0, len(b))
2568
2569         # prepare and send data packets
2570         # expect to receive them decrypted
2571         if is_ip6:
2572             ip_header = IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
2573         else:
2574             ip_header = IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2575         packet_len_opts = (
2576             2500,  # two buffers
2577             1500,  # one buffer
2578             4500,  # three buffers
2579             1910 if is_ip6 else 1950,  # auth tag is not contiguous
2580         )
2581         txs = []
2582         for l in packet_len_opts:
2583             txs.append(
2584                 peer_1.mk_tunnel_header(self.pg1, is_ip6=is_ip6)
2585                 / Wireguard(message_type=4, reserved_zero=0)
2586                 / WireguardTransport(
2587                     receiver_index=peer_1.sender,
2588                     counter=len(txs),
2589                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2590                         ip_header / UDP(sport=222, dport=223) / Raw(b"\xfe" * l)
2591                     ),
2592                 )
2593             )
2594         rxs = self.send_and_expect(self.pg1, txs, self.pg0)
2595
2596         # verify decrypted packets
2597         for i, l in enumerate(packet_len_opts):
2598             if is_ip6:
2599                 self.assertEqual(rxs[i][IPv6].dst, self.pg0.remote_ip6)
2600                 self.assertEqual(rxs[i][IPv6].hlim, ip_header.hlim - 1)
2601             else:
2602                 self.assertEqual(rxs[i][IP].dst, self.pg0.remote_ip4)
2603                 self.assertEqual(rxs[i][IP].ttl, ip_header.ttl - 1)
2604             self.assertEqual(len(rxs[i][Raw]), l)
2605             self.assertEqual(bytes(rxs[i][Raw]), b"\xfe" * l)
2606
2607         # prepare and send packets that will be rewritten into the wg interface
2608         # expect data packets sent
2609         if is_ip6:
2610             ip_header = IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
2611         else:
2612             ip_header = IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2613         packet_len_opts = (
2614             2500,  # two buffers
2615             1500,  # one buffer
2616             4500,  # three buffers
2617             1980 if is_ip6 else 2000,  # no free space to write auth tag
2618         )
2619         txs = []
2620         for l in packet_len_opts:
2621             txs.append(
2622                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2623                 / ip_header
2624                 / UDP(sport=555, dport=556)
2625                 / Raw(b"\xfe" * l)
2626             )
2627         rxs = self.send_and_expect(self.pg0, txs, self.pg1)
2628
2629         # verify the data packets
2630         rxs_decrypted = peer_1.validate_encapped(
2631             rxs, ip_header, is_tunnel_ip6=is_ip6, is_transport_ip6=is_ip6
2632         )
2633
2634         for i, l in enumerate(packet_len_opts):
2635             self.assertEqual(len(rxs_decrypted[i][Raw]), l)
2636             self.assertEqual(bytes(rxs_decrypted[i][Raw]), b"\xfe" * l)
2637
2638         # remove configs
2639         r1.remove_vpp_config()
2640         peer_1.remove_vpp_config()
2641         wg0.remove_vpp_config()
2642
2643     def test_wg_large_packet_v4_sync(self):
2644         """Large packet (v4, sync)"""
2645         self._test_wg_large_packet_tmpl(is_async=False, is_ip6=False)
2646
2647     def test_wg_large_packet_v6_sync(self):
2648         """Large packet (v6, sync)"""
2649         self._test_wg_large_packet_tmpl(is_async=False, is_ip6=True)
2650
2651     def test_wg_large_packet_v4_async(self):
2652         """Large packet (v4, async)"""
2653         self._test_wg_large_packet_tmpl(is_async=True, is_ip6=False)
2654
2655     def test_wg_large_packet_v6_async(self):
2656         """Large packet (v6, async)"""
2657         self._test_wg_large_packet_tmpl(is_async=True, is_ip6=True)
2658
2659     def test_wg_lack_of_buf_headroom(self):
2660         """Lack of buffer's headroom (v6 vxlan over v6 wg)"""
2661         port = 12323
2662
2663         # create wg interface
2664         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
2665         wg0.admin_up()
2666         wg0.config_ip6()
2667
2668         self.pg_enable_capture(self.pg_interfaces)
2669         self.pg_start()
2670
2671         # create a peer
2672         peer_1 = VppWgPeer(
2673             self, wg0, self.pg1.remote_ip6, port + 1, ["::/0"]
2674         ).add_vpp_config()
2675         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2676
2677         # create a route to enable communication between wg interface addresses
2678         r1 = VppIpRoute(
2679             self, wg0.remote_ip6, 128, [VppRoutePath("0.0.0.0", wg0.sw_if_index)]
2680         ).add_vpp_config()
2681
2682         # wait for the peer to send a handshake initiation
2683         rxs = self.pg1.get_capture(1, timeout=2)
2684
2685         # prepare and send a handshake response
2686         # expect a keepalive message
2687         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=True)
2688         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2689
2690         # verify the keepalive message
2691         b = peer_1.decrypt_transport(rxs[0], is_ip6=True)
2692         self.assertEqual(0, len(b))
2693
2694         # create vxlan interface over the wg interface
2695         vxlan0 = VppVxlanTunnel(self, src=wg0.local_ip6, dst=wg0.remote_ip6, vni=1111)
2696         vxlan0.add_vpp_config()
2697
2698         # create bridge domain
2699         bd1 = VppBridgeDomain(self, bd_id=1)
2700         bd1.add_vpp_config()
2701
2702         # add the vxlan interface and pg0 to the bridge domain
2703         bd1_ports = (
2704             VppBridgeDomainPort(self, bd1, vxlan0).add_vpp_config(),
2705             VppBridgeDomainPort(self, bd1, self.pg0).add_vpp_config(),
2706         )
2707
2708         # prepare and send packets that will be rewritten into the vxlan interface
2709         # expect they to be rewritten into the wg interface then and data packets sent
2710         tx = (
2711             Ether(dst="00:00:00:00:00:01", src="00:00:00:00:00:02")
2712             / IPv6(src="::1", dst="::2", hlim=20)
2713             / UDP(sport=1111, dport=1112)
2714             / Raw(b"\xfe" * 1900)
2715         )
2716         rxs = self.send_and_expect(self.pg0, [tx] * 5, self.pg1)
2717
2718         # verify the data packet
2719         for rx in rxs:
2720             rx_decrypted = IPv6(peer_1.decrypt_transport(rx, is_ip6=True))
2721
2722             self.assertEqual(rx_decrypted[VXLAN].vni, vxlan0.vni)
2723             inner = rx_decrypted[VXLAN].payload
2724
2725             # check the original packet is present
2726             self.assertEqual(inner[IPv6].dst, tx[IPv6].dst)
2727             self.assertEqual(inner[IPv6].hlim, tx[IPv6].hlim)
2728             self.assertEqual(len(inner[Raw]), len(tx[Raw]))
2729             self.assertEqual(bytes(inner[Raw]), bytes(tx[Raw]))
2730
2731         # remove configs
2732         for bdp in bd1_ports:
2733             bdp.remove_vpp_config()
2734         bd1.remove_vpp_config()
2735         vxlan0.remove_vpp_config()
2736         r1.remove_vpp_config()
2737         peer_1.remove_vpp_config()
2738         wg0.remove_vpp_config()
2739
2740
2741 @tag_fixme_vpp_debug
2742 class WireguardHandoffTests(TestWg):
2743     """Wireguard Tests in multi worker setup"""
2744
2745     vpp_worker_count = 2
2746
2747     def test_wg_peer_init(self):
2748         """Handoff"""
2749
2750         port = 12383
2751
2752         # Create interfaces
2753         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2754         wg0.admin_up()
2755         wg0.config_ip4()
2756
2757         self.pg_enable_capture(self.pg_interfaces)
2758         self.pg_start()
2759
2760         peer_1 = VppWgPeer(
2761             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
2762         ).add_vpp_config()
2763         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2764
2765         r1 = VppIpRoute(
2766             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2767         ).add_vpp_config()
2768
2769         # skip the first automatic handshake
2770         self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
2771
2772         # send a valid handsake init for which we expect a response
2773         p = peer_1.mk_handshake(self.pg1)
2774
2775         rx = self.send_and_expect(self.pg1, [p], self.pg1)
2776
2777         peer_1.consume_response(rx[0])
2778
2779         # send a data packet from the peer through the tunnel
2780         # this completes the handshake and pins the peer to worker 0
2781         p = (
2782             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2783             / UDP(sport=222, dport=223)
2784             / Raw()
2785         )
2786         d = peer_1.encrypt_transport(p)
2787         p = peer_1.mk_tunnel_header(self.pg1) / (
2788             Wireguard(message_type=4, reserved_zero=0)
2789             / WireguardTransport(
2790                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
2791             )
2792         )
2793         rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
2794
2795         for rx in rxs:
2796             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2797             self.assertEqual(rx[IP].ttl, 19)
2798
2799         # send a packets that are routed into the tunnel
2800         # and pins the peer tp worker 1
2801         pe = (
2802             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2803             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2804             / UDP(sport=555, dport=556)
2805             / Raw(b"\x00" * 80)
2806         )
2807         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
2808         peer_1.validate_encapped(rxs, pe)
2809
2810         # send packets into the tunnel, from the other worker
2811         p = [
2812             (
2813                 peer_1.mk_tunnel_header(self.pg1)
2814                 / Wireguard(message_type=4, reserved_zero=0)
2815                 / WireguardTransport(
2816                     receiver_index=peer_1.sender,
2817                     counter=ii + 1,
2818                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2819                         (
2820                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2821                             / UDP(sport=222, dport=223)
2822                             / Raw()
2823                         )
2824                     ),
2825                 )
2826             )
2827             for ii in range(255)
2828         ]
2829
2830         rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
2831
2832         for rx in rxs:
2833             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2834             self.assertEqual(rx[IP].ttl, 19)
2835
2836         # send a packets that are routed into the tunnel
2837         # from worker 0
2838         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
2839
2840         peer_1.validate_encapped(rxs, pe)
2841
2842         r1.remove_vpp_config()
2843         peer_1.remove_vpp_config()
2844         wg0.remove_vpp_config()
2845
2846     @unittest.skip("test disabled")
2847     def test_wg_multi_interface(self):
2848         """Multi-tunnel on the same port"""
2849
2850
2851 class TestWgFIB(VppTestCase):
2852     """Wireguard FIB Test Case"""
2853
2854     @classmethod
2855     def setUpClass(cls):
2856         super(TestWgFIB, cls).setUpClass()
2857
2858     @classmethod
2859     def tearDownClass(cls):
2860         super(TestWgFIB, cls).tearDownClass()
2861
2862     def setUp(self):
2863         super(TestWgFIB, self).setUp()
2864
2865         self.create_pg_interfaces(range(2))
2866
2867         for i in self.pg_interfaces:
2868             i.admin_up()
2869             i.config_ip4()
2870
2871     def tearDown(self):
2872         for i in self.pg_interfaces:
2873             i.unconfig_ip4()
2874             i.admin_down()
2875         super(TestWgFIB, self).tearDown()
2876
2877     def test_wg_fib_tracking(self):
2878         """FIB tracking"""
2879         port = 12323
2880
2881         # create wg interface
2882         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2883         wg0.admin_up()
2884         wg0.config_ip4()
2885
2886         self.pg_enable_capture(self.pg_interfaces)
2887         self.pg_start()
2888
2889         # create a peer
2890         peer_1 = VppWgPeer(
2891             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2892         ).add_vpp_config()
2893         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2894
2895         # create a route to rewrite traffic into the wg interface
2896         r1 = VppIpRoute(
2897             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2898         ).add_vpp_config()
2899
2900         # resolve ARP and expect the adjacency to update
2901         self.pg1.resolve_arp()
2902
2903         # wait for the peer to send a handshake initiation
2904         rxs = self.pg1.get_capture(2, timeout=6)
2905
2906         # prepare and send a handshake response
2907         # expect a keepalive message
2908         resp = peer_1.consume_init(rxs[1], self.pg1)
2909         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2910
2911         # verify the keepalive message
2912         b = peer_1.decrypt_transport(rxs[0])
2913         self.assertEqual(0, len(b))
2914
2915         # prepare and send a packet that will be rewritten into the wg interface
2916         # expect a data packet sent
2917         p = (
2918             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2919             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2920             / UDP(sport=555, dport=556)
2921             / Raw()
2922         )
2923         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2924
2925         # verify the data packet
2926         peer_1.validate_encapped(rxs, p)
2927
2928         # remove configs
2929         r1.remove_vpp_config()
2930         peer_1.remove_vpp_config()
2931         wg0.remove_vpp_config()